diff --git a/broker/options.go b/broker/options.go index 0a909f79..15084e49 100644 --- a/broker/options.go +++ b/broker/options.go @@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option { o.TLSConfig = t } } + +// SubscribeContext set context +func SubscribeContext(ctx context.Context) SubscribeOption { + return func(o *SubscribeOptions) { + o.Context = ctx + } +} diff --git a/server/handler.go b/server/handler.go index 336b6107..ffa0865d 100644 --- a/server/handler.go +++ b/server/handler.go @@ -1,13 +1,20 @@ package server +import "context" + +type HandlerOption func(*HandlerOptions) + type HandlerOptions struct { Internal bool Metadata map[string]map[string]string } +type SubscriberOption func(*SubscriberOptions) + type SubscriberOptions struct { Queue string Internal bool + Context context.Context } // EndpointMetadata is a Handler option that allows metadata to be added to @@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption { o.Internal = b } } +func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { + opt := SubscriberOptions{ + Context: context.Background(), + } + + for _, o := range opts { + o(&opt) + } + + return opt +} // Shared queue name distributed messages across subscribers func SubscriberQueue(n string) SubscriberOption { @@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption { o.Queue = n } } + +// SubscriberContext set context options to allow broker SubscriberOption passed +func SubscriberContext(ctx context.Context) SubscriberOption { + return func(o *SubscriberOptions) { + o.Context = ctx + } +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 0ba11836..1e9aaf7c 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/micro/go-log" + log "github.com/micro/go-log" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/metadata" @@ -357,6 +357,9 @@ func (s *rpcServer) Register() error { if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.Queue(queue)) } + if cx := sb.Options().Context; cx != nil { + opts = append(opts, broker.SubscribeContext(cx)) + } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err diff --git a/server/server.go b/server/server.go index 82113843..4240628b 100644 --- a/server/server.go +++ b/server/server.go @@ -114,10 +114,6 @@ type Subscriber interface { type Option func(*Options) -type HandlerOption func(*HandlerOptions) - -type SubscriberOption func(*SubscriberOptions) - var ( DefaultAddress = ":0" DefaultName = "go-server"