server: add missing hook definitions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
92a3a547b8
commit
4e99680c30
@ -512,29 +512,33 @@ func (n *noopServer) Start() error {
|
|||||||
func (n *noopServer) subscribe() error {
|
func (n *noopServer) subscribe() error {
|
||||||
config := n.Options()
|
config := n.Options()
|
||||||
|
|
||||||
cx := config.Context
|
subCtx := config.Context
|
||||||
var err error
|
|
||||||
var sub broker.Subscriber
|
|
||||||
|
|
||||||
for sb := range n.subscribers {
|
for sb := range n.subscribers {
|
||||||
if sb.Options().Context != nil {
|
|
||||||
cx = sb.Options().Context
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []broker.SubscribeOption{
|
||||||
|
broker.SubscribeContext(subCtx),
|
||||||
|
broker.SubscribeAutoAck(sb.Options().AutoAck),
|
||||||
|
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
|
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,6 +65,8 @@ type Server interface {
|
|||||||
type (
|
type (
|
||||||
FuncSubHandler func(ctx context.Context, ms Message) error
|
FuncSubHandler func(ctx context.Context, ms Message) error
|
||||||
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
||||||
|
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
|
||||||
|
HookHandler func(next FuncHandler) FuncHandler
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
Loading…
x
Reference in New Issue
Block a user