From ae10eb2ab89da088c6a8d4430d57a175c8950cd4 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 26 Jan 2019 13:37:58 +0300 Subject: [PATCH] propagate context and SuccessAutoAck option other brokers Signed-off-by: Vasiliy Tolstov --- context.go | 37 +++++++++++++++++++++++++++++++++++++ nats.go | 7 ++++++- options.go | 11 ++--------- 3 files changed, 45 insertions(+), 10 deletions(-) create mode 100644 context.go diff --git a/context.go b/context.go new file mode 100644 index 0000000..0f009e8 --- /dev/null +++ b/context.go @@ -0,0 +1,37 @@ +package nats + +import ( + "context" + + "github.com/micro/go-micro/broker" +) + +// setSubscribeOption returns a function to setup a context with given value +func setSubscribeOption(k, v interface{}) broker.SubscribeOption { + return func(o *broker.SubscribeOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// setBrokerOption returns a function to setup a context with given value +func setBrokerOption(k, v interface{}) broker.Option { + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// setPublishOption returns a function to setup a context with given value +func setPublishOption(k, v interface{}) broker.PublishOption { + return func(o *broker.PublishOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/nats.go b/nats.go index 911cae7..e7c49d2 100644 --- a/nats.go +++ b/nats.go @@ -3,13 +3,14 @@ package nats import ( "context" + "errors" "strings" "sync" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/cmd" "github.com/micro/go-micro/codec/json" - "github.com/nats-io/go-nats" + nats "github.com/nats-io/go-nats" ) type nbroker struct { @@ -144,6 +145,10 @@ func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ } func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + if n.conn == nil { + return nil, errors.New("not connected") + } + opt := broker.SubscribeOptions{ AutoAck: true, } diff --git a/options.go b/options.go index a09ab7f..96e2fb7 100644 --- a/options.go +++ b/options.go @@ -1,20 +1,13 @@ package nats import ( - "context" - "github.com/micro/go-micro/broker" - "github.com/nats-io/go-nats" + nats "github.com/nats-io/go-nats" ) type optionsKey struct{} // Options accepts nats.Options func Options(opts nats.Options) broker.Option { - return func(o *broker.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, optionsKey{}, opts) - } + return setBrokerOption(optionsKey{}, opts) }