server: add missing hook definitions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -512,29 +512,33 @@ func (n *noopServer) Start() error { | |||||||
| func (n *noopServer) subscribe() error { | func (n *noopServer) subscribe() error { | ||||||
| 	config := n.Options() | 	config := n.Options() | ||||||
|  |  | ||||||
| 	cx := config.Context | 	subCtx := config.Context | ||||||
| 	var err error |  | ||||||
| 	var sub broker.Subscriber |  | ||||||
|  |  | ||||||
| 	for sb := range n.subscribers { | 	for sb := range n.subscribers { | ||||||
| 		if sb.Options().Context != nil { |  | ||||||
| 			cx = sb.Options().Context | 		if cx := sb.Options().Context; cx != nil { | ||||||
|  | 			subCtx = cx | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		opts := []broker.SubscribeOption{ | ||||||
|  | 			broker.SubscribeContext(subCtx), | ||||||
|  | 			broker.SubscribeAutoAck(sb.Options().AutoAck), | ||||||
|  | 			broker.SubscribeBodyOnly(sb.Options().BodyOnly), | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)} |  | ||||||
| 		if queue := sb.Options().Queue; len(queue) > 0 { | 		if queue := sb.Options().Queue; len(queue) > 0 { | ||||||
| 			opts = append(opts, broker.SubscribeGroup(queue)) | 			opts = append(opts, broker.SubscribeGroup(queue)) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		if config.Logger.V(logger.InfoLevel) { | 		if config.Logger.V(logger.InfoLevel) { | ||||||
| 			config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic()) | 			config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic()) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		n.subscribers[sb] = []broker.Subscriber{sub} | 		n.subscribers[sb] = []broker.Subscriber{sub} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -65,6 +65,8 @@ type Server interface { | |||||||
| type ( | type ( | ||||||
| 	FuncSubHandler func(ctx context.Context, ms Message) error | 	FuncSubHandler func(ctx context.Context, ms Message) error | ||||||
| 	HookSubHandler func(next FuncSubHandler) FuncSubHandler | 	HookSubHandler func(next FuncSubHandler) FuncSubHandler | ||||||
|  | 	FuncHandler    func(ctx context.Context, req Request, rsp interface{}) error | ||||||
|  | 	HookHandler    func(next FuncHandler) FuncHandler | ||||||
| ) | ) | ||||||
|  |  | ||||||
| /* | /* | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user