Compare commits

..

4 Commits

Author SHA1 Message Date
d646deb468 Merge pull request 'check subscribe errors' (#164) from subscribeerr into v3
Reviewed-on: #164
2023-05-13 16:06:53 +03:00
468819f0a0 check subscribe errors
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 16:06:37 +03:00
832f1034a8 Merge pull request 'combo prepare' (#162) from init-fix into v3
Reviewed-on: #162
2023-03-04 16:28:24 +03:00
f0b6370ee1 move codec registration to init phase
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-04 16:25:50 +03:00

68
grpc.go
View File

@@ -128,6 +128,10 @@ func (g *Server) configure(opts ...server.Option) error {
}
}
for _, k := range g.opts.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{
@@ -714,31 +718,6 @@ func (g *Server) Register() error {
g.Lock()
defer g.Unlock()
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
g.registered = true
g.rsvc = service
@@ -809,10 +788,6 @@ func (g *Server) Start() error {
config := g.Options()
for _, k := range config.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
// micro: config.Transport.Listen(config.Address)
var ts net.Listener
var err error
@@ -876,6 +851,10 @@ func (g *Server) Start() error {
}
}
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept)
go func() {
if err = g.srv.Serve(ts); err != nil {
@@ -987,6 +966,37 @@ func (g *Server) Start() error {
return nil
}
func (g *Server) subscribe() error {
config := g.opts
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (g *Server) Stop() error {
g.RLock()
if !g.started {