Merge pull request #399 from unistack-org/master

add context to SubscriberOptions
This commit is contained in:
Asim Aslam 2019-01-24 13:11:33 +00:00 committed by GitHub
commit 67a738b504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 36 additions and 5 deletions

View File

@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option {
o.TLSConfig = t o.TLSConfig = t
} }
} }
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {
o.Context = ctx
}
}

View File

@ -1,13 +1,20 @@
package server package server
import "context"
type HandlerOption func(*HandlerOptions)
type HandlerOptions struct { type HandlerOptions struct {
Internal bool Internal bool
Metadata map[string]map[string]string Metadata map[string]map[string]string
} }
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct { type SubscriberOptions struct {
Queue string Queue string
Internal bool Internal bool
Context context.Context
} }
// EndpointMetadata is a Handler option that allows metadata to be added to // EndpointMetadata is a Handler option that allows metadata to be added to
@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption {
o.Internal = b o.Internal = b
} }
} }
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
opt := SubscriberOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
// Shared queue name distributed messages across subscribers // Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption { func SubscriberQueue(n string) SubscriberOption {
@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption {
o.Queue = n o.Queue = n
} }
} }
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}

View File

@ -10,7 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/micro/go-log" log "github.com/micro/go-log"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
@ -357,6 +357,9 @@ func (s *rpcServer) Register() error {
if queue := sb.Options().Queue; len(queue) > 0 { if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue)) opts = append(opts, broker.Queue(queue))
} }
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {
return err return err

View File

@ -114,10 +114,6 @@ type Subscriber interface {
type Option func(*Options) type Option func(*Options)
type HandlerOption func(*HandlerOptions)
type SubscriberOption func(*SubscriberOptions)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
DefaultName = "go-server" DefaultName = "go-server"