Merge pull request 'check subscribe errors' (#164) from subscribeerr into v3
Reviewed-on: #164
This commit is contained in:
commit
d646deb468
60
grpc.go
60
grpc.go
@ -718,31 +718,6 @@ func (g *Server) Register() error {
|
|||||||
g.Lock()
|
g.Lock()
|
||||||
defer g.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
for sb := range g.subscribers {
|
|
||||||
handler := g.createSubHandler(sb, config)
|
|
||||||
var opts []broker.SubscribeOption
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
subCtx := config.Context
|
|
||||||
if cx := sb.Options().Context; cx != nil {
|
|
||||||
subCtx = cx
|
|
||||||
}
|
|
||||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
|
||||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
|
||||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
g.registered = true
|
g.registered = true
|
||||||
g.rsvc = service
|
g.rsvc = service
|
||||||
|
|
||||||
@ -876,6 +851,10 @@ func (g *Server) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = g.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// micro: go ts.Accept(s.accept)
|
// micro: go ts.Accept(s.accept)
|
||||||
go func() {
|
go func() {
|
||||||
if err = g.srv.Serve(ts); err != nil {
|
if err = g.srv.Serve(ts); err != nil {
|
||||||
@ -987,6 +966,37 @@ func (g *Server) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Server) subscribe() error {
|
||||||
|
config := g.opts
|
||||||
|
|
||||||
|
for sb := range g.subscribers {
|
||||||
|
handler := g.createSubHandler(sb, config)
|
||||||
|
var opts []broker.SubscribeOption
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
subCtx := config.Context
|
||||||
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||||
|
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||||
|
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Server) Stop() error {
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
|
Loading…
Reference in New Issue
Block a user