add context to server.SubscriberOptions and broker.SubscribeOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option { | ||||
| 		o.TLSConfig = t | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // SubscribeContext set context | ||||
| func SubscribeContext(ctx context.Context) SubscribeOption { | ||||
| 	return func(o *SubscribeOptions) { | ||||
| 		o.Context = ctx | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -1,13 +1,20 @@ | ||||
| package server | ||||
|  | ||||
| import "context" | ||||
|  | ||||
| type HandlerOption func(*HandlerOptions) | ||||
|  | ||||
| type HandlerOptions struct { | ||||
| 	Internal bool | ||||
| 	Metadata map[string]map[string]string | ||||
| } | ||||
|  | ||||
| type SubscriberOption func(*SubscriberOptions) | ||||
|  | ||||
| type SubscriberOptions struct { | ||||
| 	Queue    string | ||||
| 	Internal bool | ||||
| 	Context  context.Context | ||||
| } | ||||
|  | ||||
| // EndpointMetadata is a Handler option that allows metadata to be added to | ||||
| @@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption { | ||||
| 		o.Internal = b | ||||
| 	} | ||||
| } | ||||
| func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { | ||||
| 	opt := SubscriberOptions{ | ||||
| 		Context: context.Background(), | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&opt) | ||||
| 	} | ||||
|  | ||||
| 	return opt | ||||
| } | ||||
|  | ||||
| // Shared queue name distributed messages across subscribers | ||||
| func SubscriberQueue(n string) SubscriberOption { | ||||
| @@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption { | ||||
| 		o.Queue = n | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // SubscriberContext set context options to allow broker SubscriberOption passed | ||||
| func SubscriberContext(ctx context.Context) SubscriberOption { | ||||
| 	return func(o *SubscriberOptions) { | ||||
| 		o.Context = ctx | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -10,7 +10,7 @@ import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-log" | ||||
| 	log "github.com/micro/go-log" | ||||
| 	"github.com/micro/go-micro/broker" | ||||
| 	"github.com/micro/go-micro/codec" | ||||
| 	"github.com/micro/go-micro/metadata" | ||||
| @@ -357,6 +357,9 @@ func (s *rpcServer) Register() error { | ||||
| 		if queue := sb.Options().Queue; len(queue) > 0 { | ||||
| 			opts = append(opts, broker.Queue(queue)) | ||||
| 		} | ||||
| 		if cx := sb.Options().Context; cx != nil { | ||||
| 			opts = append(opts, broker.SubscribeContext(cx)) | ||||
| 		} | ||||
| 		sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
|   | ||||
| @@ -114,10 +114,6 @@ type Subscriber interface { | ||||
|  | ||||
| type Option func(*Options) | ||||
|  | ||||
| type HandlerOption func(*HandlerOptions) | ||||
|  | ||||
| type SubscriberOption func(*SubscriberOptions) | ||||
|  | ||||
| var ( | ||||
| 	DefaultAddress        = ":0" | ||||
| 	DefaultName           = "go-server" | ||||
|   | ||||
		Reference in New Issue
	
	Block a user