Initialise registry for broker
This commit is contained in:
parent
a1c22ce78a
commit
8724e68ae4
@ -22,6 +22,8 @@ import (
|
|||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
mls "github.com/micro/misc/lib/tls"
|
mls "github.com/micro/misc/lib/tls"
|
||||||
"github.com/pborman/uuid"
|
"github.com/pborman/uuid"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTP Broker is a placeholder for actual message brokers.
|
// HTTP Broker is a placeholder for actual message brokers.
|
||||||
@ -35,6 +37,7 @@ type httpBroker struct {
|
|||||||
opts Options
|
opts Options
|
||||||
|
|
||||||
c *http.Client
|
c *http.Client
|
||||||
|
r registry.Registry
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
subscribers map[string][]*httpSubscriber
|
subscribers map[string][]*httpSubscriber
|
||||||
@ -88,7 +91,10 @@ func newTransport(config *tls.Config) *http.Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newHttpBroker(addrs []string, opts ...Option) Broker {
|
func newHttpBroker(addrs []string, opts ...Option) Broker {
|
||||||
var options Options
|
options := Options{
|
||||||
|
Context: context.TODO(),
|
||||||
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
@ -98,10 +104,16 @@ func newHttpBroker(addrs []string, opts ...Option) Broker {
|
|||||||
addr = addrs[0]
|
addr = addrs[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reg, ok := options.Context.Value(registryKey).(registry.Registry)
|
||||||
|
if !ok {
|
||||||
|
reg = registry.DefaultRegistry
|
||||||
|
}
|
||||||
|
|
||||||
return &httpBroker{
|
return &httpBroker{
|
||||||
id: "broker-" + uuid.NewUUID().String(),
|
id: "broker-" + uuid.NewUUID().String(),
|
||||||
address: addr,
|
address: addr,
|
||||||
opts: options,
|
opts: options,
|
||||||
|
r: reg,
|
||||||
c: &http.Client{Transport: newTransport(options.TLSConfig)},
|
c: &http.Client{Transport: newTransport(options.TLSConfig)},
|
||||||
subscribers: make(map[string][]*httpSubscriber),
|
subscribers: make(map[string][]*httpSubscriber),
|
||||||
unsubscribe: make(chan *httpSubscriber),
|
unsubscribe: make(chan *httpSubscriber),
|
||||||
@ -184,7 +196,7 @@ func (h *httpBroker) start() error {
|
|||||||
var subscribers []*httpSubscriber
|
var subscribers []*httpSubscriber
|
||||||
for _, sub := range h.subscribers[subscriber.topic] {
|
for _, sub := range h.subscribers[subscriber.topic] {
|
||||||
if sub.id == subscriber.id {
|
if sub.id == subscriber.id {
|
||||||
registry.Deregister(sub.svc)
|
h.r.Deregister(sub.svc)
|
||||||
}
|
}
|
||||||
subscribers = append(subscribers, sub)
|
subscribers = append(subscribers, sub)
|
||||||
}
|
}
|
||||||
@ -273,6 +285,13 @@ func (h *httpBroker) Init(opts ...Option) error {
|
|||||||
h.id = "broker-" + uuid.NewUUID().String()
|
h.id = "broker-" + uuid.NewUUID().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
|
||||||
|
if !ok {
|
||||||
|
reg = registry.DefaultRegistry
|
||||||
|
}
|
||||||
|
|
||||||
|
h.r = reg
|
||||||
|
|
||||||
http.Handle(DefaultSubPath, h)
|
http.Handle(DefaultSubPath, h)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -282,7 +301,7 @@ func (h *httpBroker) Options() Options {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
|
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
|
||||||
s, err := registry.GetService("topic:" + topic)
|
s, err := h.r.GetService("topic:" + topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -382,7 +401,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
|||||||
svc: service,
|
svc: service,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := registry.Register(service); err != nil {
|
if err := h.r.Register(service); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package broker
|
|||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/registry"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,6 +42,12 @@ type PublishOption func(*PublishOptions)
|
|||||||
|
|
||||||
type SubscribeOption func(*SubscribeOptions)
|
type SubscribeOption func(*SubscribeOptions)
|
||||||
|
|
||||||
|
type contextKeyT string
|
||||||
|
|
||||||
|
var (
|
||||||
|
registryKey = contextKeyT("github.com/micro/go-micro/registry")
|
||||||
|
)
|
||||||
|
|
||||||
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||||
opt := SubscribeOptions{
|
opt := SubscribeOptions{
|
||||||
AutoAck: true,
|
AutoAck: true,
|
||||||
@ -68,6 +75,12 @@ func QueueName(name string) SubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Registry(r registry.Registry) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Context = context.WithValue(o.Context, registryKey, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Secure communication with the broker
|
// Secure communication with the broker
|
||||||
func Secure(b bool) Option {
|
func Secure(b bool) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@ -255,6 +255,8 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
|||||||
|
|
||||||
(*c.opts.Selector).Init(selector.Registry(*c.opts.Registry))
|
(*c.opts.Selector).Init(selector.Registry(*c.opts.Registry))
|
||||||
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
||||||
|
|
||||||
|
(*c.opts.Broker).Init(broker.Registry(*c.opts.Registry))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the selector
|
// Set the selector
|
||||||
|
Loading…
Reference in New Issue
Block a user