diff --git a/broker/http_broker.go b/broker/http_broker.go index cb8739de..c1a249b7 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -22,6 +22,8 @@ import ( "github.com/micro/go-micro/registry" mls "github.com/micro/misc/lib/tls" "github.com/pborman/uuid" + + "golang.org/x/net/context" ) // HTTP Broker is a placeholder for actual message brokers. @@ -35,6 +37,7 @@ type httpBroker struct { opts Options c *http.Client + r registry.Registry sync.RWMutex subscribers map[string][]*httpSubscriber @@ -88,7 +91,10 @@ func newTransport(config *tls.Config) *http.Transport { } func newHttpBroker(addrs []string, opts ...Option) Broker { - var options Options + options := Options{ + Context: context.TODO(), + } + for _, o := range opts { o(&options) } @@ -98,10 +104,16 @@ func newHttpBroker(addrs []string, opts ...Option) Broker { addr = addrs[0] } + reg, ok := options.Context.Value(registryKey).(registry.Registry) + if !ok { + reg = registry.DefaultRegistry + } + return &httpBroker{ id: "broker-" + uuid.NewUUID().String(), address: addr, opts: options, + r: reg, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), unsubscribe: make(chan *httpSubscriber), @@ -184,7 +196,7 @@ func (h *httpBroker) start() error { var subscribers []*httpSubscriber for _, sub := range h.subscribers[subscriber.topic] { if sub.id == subscriber.id { - registry.Deregister(sub.svc) + h.r.Deregister(sub.svc) } subscribers = append(subscribers, sub) } @@ -273,6 +285,13 @@ func (h *httpBroker) Init(opts ...Option) error { h.id = "broker-" + uuid.NewUUID().String() } + reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) + if !ok { + reg = registry.DefaultRegistry + } + + h.r = reg + http.Handle(DefaultSubPath, h) return nil } @@ -282,7 +301,7 @@ func (h *httpBroker) Options() Options { } func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { - s, err := registry.GetService("topic:" + topic) + s, err := h.r.GetService("topic:" + topic) if err != nil { return err } @@ -382,7 +401,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO svc: service, } - if err := registry.Register(service); err != nil { + if err := h.r.Register(service); err != nil { return nil, err } diff --git a/broker/options.go b/broker/options.go index 77f5ce69..a59548f8 100644 --- a/broker/options.go +++ b/broker/options.go @@ -3,6 +3,7 @@ package broker import ( "crypto/tls" + "github.com/micro/go-micro/registry" "golang.org/x/net/context" ) @@ -41,6 +42,12 @@ type PublishOption func(*PublishOptions) type SubscribeOption func(*SubscribeOptions) +type contextKeyT string + +var ( + registryKey = contextKeyT("github.com/micro/go-micro/registry") +) + func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { opt := SubscribeOptions{ AutoAck: true, @@ -68,6 +75,12 @@ func QueueName(name string) SubscribeOption { } } +func Registry(r registry.Registry) Option { + return func(o *Options) { + o.Context = context.WithValue(o.Context, registryKey, r) + } +} + // Secure communication with the broker func Secure(b bool) Option { return func(o *Options) { diff --git a/cmd/cmd.go b/cmd/cmd.go index 8d899bd6..685b7866 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -255,6 +255,8 @@ func (c *cmd) Before(ctx *cli.Context) error { (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)) clientOpts = append(clientOpts, client.Selector(*c.opts.Selector)) + + (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)) } // Set the selector