Broker init

This commit is contained in:
Asim 2016-03-15 22:12:28 +00:00
parent fe4362be5a
commit 64220dc0c5
7 changed files with 21 additions and 13 deletions

View File

@ -38,11 +38,11 @@ type Subscriber interface {
} }
var ( var (
DefaultBroker Broker = newHttpBroker([]string{}) DefaultBroker Broker = newHttpBroker()
) )
func NewBroker(addrs []string, opt ...Option) Broker { func NewBroker(opts ...Option) Broker {
return newHttpBroker(addrs, opt...) return newHttpBroker(opts...)
} }
func Init(opts ...Option) error { func Init(opts ...Option) error {

View File

@ -9,6 +9,6 @@ func init() {
cmd.DefaultBrokers["http"] = NewBroker cmd.DefaultBrokers["http"] = NewBroker
} }
func NewBroker(addrs []string, opts ...broker.Option) broker.Broker { func NewBroker(opts ...broker.Option) broker.Broker {
return broker.NewBroker(addrs, opts...) return broker.NewBroker(opts...)
} }

View File

@ -92,7 +92,7 @@ func newTransport(config *tls.Config) *http.Transport {
return t return t
} }
func newHttpBroker(addrs []string, opts ...Option) Broker { func newHttpBroker(opts ...Option) Broker {
options := Options{ options := Options{
Context: context.TODO(), Context: context.TODO(),
} }
@ -102,8 +102,8 @@ func newHttpBroker(addrs []string, opts ...Option) Broker {
} }
addr := ":0" addr := ":0"
if len(addrs) > 0 && len(addrs[0]) > 0 { if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = addrs[0] addr = options.Addrs[0]
} }
reg, ok := options.Context.Value(registryKey).(registry.Registry) reg, ok := options.Context.Value(registryKey).(registry.Registry)

View File

@ -8,7 +8,7 @@ import (
func TestBroker(t *testing.T) { func TestBroker(t *testing.T) {
m := mock.NewRegistry() m := mock.NewRegistry()
b := NewBroker([]string{}, Registry(m)) b := NewBroker(Registry(m))
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Errorf("Unexpected init error: %v", err) t.Errorf("Unexpected init error: %v", err)

View File

@ -8,6 +8,7 @@ import (
) )
type Options struct { type Options struct {
Addrs []string
Secure bool Secure bool
TLSConfig *tls.Config TLSConfig *tls.Config
@ -60,6 +61,13 @@ func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
return opt return opt
} }
// Addrs sets the host addresses to be used by the broker
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
// DisableAutoAck will disable auto acking of messages // DisableAutoAck will disable auto acking of messages
// after they have been handled. // after they have been handled.
func DisableAutoAck() SubscribeOption { func DisableAutoAck() SubscribeOption {

View File

@ -106,7 +106,7 @@ var (
}, },
} }
DefaultBrokers = map[string]func([]string, ...broker.Option) broker.Broker{ DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
"http": broker.NewBroker, "http": broker.NewBroker,
} }
@ -198,7 +198,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
} }
if b, ok := c.opts.Brokers[name]; ok { if b, ok := c.opts.Brokers[name]; ok {
n := b(strings.Split(ctx.String("broker_address"), ",")) n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
*c.opts.Broker = n *c.opts.Broker = n
} else { } else {
return fmt.Errorf("Broker %s not found", name) return fmt.Errorf("Broker %s not found", name)

View File

@ -25,7 +25,7 @@ type Options struct {
Client *client.Client Client *client.Client
Server *server.Server Server *server.Server
Brokers map[string]func([]string, ...broker.Option) broker.Broker Brokers map[string]func(...broker.Option) broker.Broker
Registries map[string]func([]string, ...registry.Option) registry.Registry Registries map[string]func([]string, ...registry.Option) registry.Registry
Selectors map[string]func(...selector.Option) selector.Selector Selectors map[string]func(...selector.Option) selector.Selector
Transports map[string]func([]string, ...transport.Option) transport.Transport Transports map[string]func([]string, ...transport.Option) transport.Transport
@ -93,7 +93,7 @@ func Server(s *server.Server) Option {
} }
// New broker func // New broker func
func NewBroker(name string, b func([]string, ...broker.Option) broker.Broker) Option { func NewBroker(name string, b func(...broker.Option) broker.Broker) Option {
return func(o *Options) { return func(o *Options) {
o.Brokers[name] = b o.Brokers[name] = b
} }