diff --git a/server/noop.go b/server/noop.go index 85333a50..956ac9c2 100644 --- a/server/noop.go +++ b/server/noop.go @@ -512,29 +512,33 @@ func (n *noopServer) Start() error { func (n *noopServer) subscribe() error { config := n.Options() - cx := config.Context - var err error - var sub broker.Subscriber + subCtx := config.Context for sb := range n.subscribers { - if sb.Options().Context != nil { - cx = sb.Options().Context + + if cx := sb.Options().Context; cx != nil { + subCtx = cx + } + + opts := []broker.SubscribeOption{ + broker.SubscribeContext(subCtx), + broker.SubscribeAutoAck(sb.Options().AutoAck), + broker.SubscribeBodyOnly(sb.Options().BodyOnly), } - opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)} if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.SubscribeGroup(queue)) } - sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...) - if err != nil { - return err - } - if config.Logger.V(logger.InfoLevel) { config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic()) } + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...) + if err != nil { + return err + } + n.subscribers[sb] = []broker.Subscriber{sub} } diff --git a/server/server.go b/server/server.go index 3879812e..b6ada16a 100644 --- a/server/server.go +++ b/server/server.go @@ -65,6 +65,8 @@ type Server interface { type ( FuncSubHandler func(ctx context.Context, ms Message) error HookSubHandler func(next FuncSubHandler) FuncSubHandler + FuncHandler func(ctx context.Context, req Request, rsp interface{}) error + HookHandler func(next FuncHandler) FuncHandler ) /*