diff --git a/broker/options.go b/broker/options.go index b3317d24..18fd5935 100644 --- a/broker/options.go +++ b/broker/options.go @@ -49,6 +49,13 @@ type Option func(*Options) type PublishOption func(*PublishOptions) +// PublishContext set context +func PublishContext(ctx context.Context) PublishOption { + return func(o *PublishOptions) { + o.Context = ctx + } +} + type SubscribeOption func(*SubscribeOptions) func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index d41d977e..9445eabe 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -653,7 +653,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie return g.opts.Broker.Publish(topic, &broker.Message{ Header: md, Body: body, - }) + }, broker.PublishContext(options.Context)) } func (g *grpcClient) String() string { diff --git a/client/rpc_client.go b/client/rpc_client.go index 19a18bdb..d66a918a 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -628,7 +628,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt return r.opts.Broker.Publish(topic, &broker.Message{ Header: md, Body: body, - }) + }, broker.PublishContext(options.Context)) } func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {