diff --git a/broker/broker.go b/broker/broker.go index 633af44b..2cd0de34 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -38,11 +38,11 @@ type Subscriber interface { } var ( - DefaultBroker Broker = newHttpBroker([]string{}) + DefaultBroker Broker = newHttpBroker() ) -func NewBroker(addrs []string, opt ...Option) Broker { - return newHttpBroker(addrs, opt...) +func NewBroker(opts ...Option) Broker { + return newHttpBroker(opts...) } func Init(opts ...Option) error { diff --git a/broker/http/http.go b/broker/http/http.go index 770821b5..d871d92f 100644 --- a/broker/http/http.go +++ b/broker/http/http.go @@ -9,6 +9,6 @@ func init() { cmd.DefaultBrokers["http"] = NewBroker } -func NewBroker(addrs []string, opts ...broker.Option) broker.Broker { - return broker.NewBroker(addrs, opts...) +func NewBroker(opts ...broker.Option) broker.Broker { + return broker.NewBroker(opts...) } diff --git a/broker/http_broker.go b/broker/http_broker.go index f11991f0..6010eedd 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -92,7 +92,7 @@ func newTransport(config *tls.Config) *http.Transport { return t } -func newHttpBroker(addrs []string, opts ...Option) Broker { +func newHttpBroker(opts ...Option) Broker { options := Options{ Context: context.TODO(), } @@ -102,8 +102,8 @@ func newHttpBroker(addrs []string, opts ...Option) Broker { } addr := ":0" - if len(addrs) > 0 && len(addrs[0]) > 0 { - addr = addrs[0] + if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { + addr = options.Addrs[0] } reg, ok := options.Context.Value(registryKey).(registry.Registry) diff --git a/broker/http_broker_test.go b/broker/http_broker_test.go index cdccfaf2..af71032b 100644 --- a/broker/http_broker_test.go +++ b/broker/http_broker_test.go @@ -8,7 +8,7 @@ import ( func TestBroker(t *testing.T) { m := mock.NewRegistry() - b := NewBroker([]string{}, Registry(m)) + b := NewBroker(Registry(m)) if err := b.Init(); err != nil { t.Errorf("Unexpected init error: %v", err) diff --git a/broker/options.go b/broker/options.go index a59548f8..8b44f6aa 100644 --- a/broker/options.go +++ b/broker/options.go @@ -8,6 +8,7 @@ import ( ) type Options struct { + Addrs []string Secure bool TLSConfig *tls.Config @@ -60,6 +61,13 @@ func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { return opt } +// Addrs sets the host addresses to be used by the broker +func Addrs(addrs ...string) Option { + return func(o *Options) { + o.Addrs = addrs + } +} + // DisableAutoAck will disable auto acking of messages // after they have been handled. func DisableAutoAck() SubscribeOption { diff --git a/cmd/cmd.go b/cmd/cmd.go index fe0c64fe..e90e5d12 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -106,11 +106,11 @@ var ( }, } - DefaultBrokers = map[string]func([]string, ...broker.Option) broker.Broker{ + DefaultBrokers = map[string]func(...broker.Option) broker.Broker{ "http": broker.NewBroker, } - DefaultRegistries = map[string]func([]string, ...registry.Option) registry.Registry{ + DefaultRegistries = map[string]func(...registry.Option) registry.Registry{ "consul": registry.NewRegistry, } @@ -118,7 +118,7 @@ var ( "random": selector.NewSelector, } - DefaultTransports = map[string]func([]string, ...transport.Option) transport.Transport{ + DefaultTransports = map[string]func(...transport.Option) transport.Transport{ "http": transport.NewTransport, } @@ -198,7 +198,7 @@ func (c *cmd) Before(ctx *cli.Context) error { } if b, ok := c.opts.Brokers[name]; ok { - n := b(strings.Split(ctx.String("broker_address"), ",")) + n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)) *c.opts.Broker = n } else { return fmt.Errorf("Broker %s not found", name) @@ -216,7 +216,7 @@ func (c *cmd) Before(ctx *cli.Context) error { } if r, ok := c.opts.Registries[name]; ok { - n := r(strings.Split(ctx.String("registry_address"), ",")) + n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)) *c.opts.Registry = n } else { return fmt.Errorf("Registry %s not found", name) @@ -251,7 +251,7 @@ func (c *cmd) Before(ctx *cli.Context) error { } if t, ok := c.opts.Transports[name]; ok { - n := t(strings.Split(ctx.String("transport_address"), ",")) + n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)) *c.opts.Transport = n } else { return fmt.Errorf("Transport %s not found", name) diff --git a/cmd/options.go b/cmd/options.go index 4ecfcb7d..fb1f05e9 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -25,10 +25,10 @@ type Options struct { Client *client.Client Server *server.Server - Brokers map[string]func([]string, ...broker.Option) broker.Broker - Registries map[string]func([]string, ...registry.Option) registry.Registry + Brokers map[string]func(...broker.Option) broker.Broker + Registries map[string]func(...registry.Option) registry.Registry Selectors map[string]func(...selector.Option) selector.Selector - Transports map[string]func([]string, ...transport.Option) transport.Transport + Transports map[string]func(...transport.Option) transport.Transport // Other options for implementations of the interface // can be stored in a context @@ -93,14 +93,14 @@ func Server(s *server.Server) Option { } // New broker func -func NewBroker(name string, b func([]string, ...broker.Option) broker.Broker) Option { +func NewBroker(name string, b func(...broker.Option) broker.Broker) Option { return func(o *Options) { o.Brokers[name] = b } } // New registry func -func NewRegistry(name string, r func([]string, ...registry.Option) registry.Registry) Option { +func NewRegistry(name string, r func(...registry.Option) registry.Registry) Option { return func(o *Options) { o.Registries[name] = r } @@ -114,7 +114,7 @@ func NewSelector(name string, s func(...selector.Option) selector.Selector) Opti } // New transport func -func NewTransport(name string, t func([]string, ...transport.Option) transport.Transport) Option { +func NewTransport(name string, t func(...transport.Option) transport.Transport) Option { return func(o *Options) { o.Transports[name] = t } diff --git a/registry/consul/consul.go b/registry/consul/consul.go index 92749e71..f8ba2701 100644 --- a/registry/consul/consul.go +++ b/registry/consul/consul.go @@ -9,6 +9,6 @@ func init() { cmd.DefaultRegistries["consul"] = NewRegistry } -func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { - return registry.NewRegistry(addrs, opts...) +func NewRegistry(opts ...registry.Option) registry.Registry { + return registry.NewRegistry(opts...) } diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 607c24f9..7766e0fd 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -109,26 +109,26 @@ func decodeVersion(tags []string) (string, bool) { return "", false } -func newConsulRegistry(addrs []string, opts ...Option) Registry { - var opt Options +func newConsulRegistry(opts ...Option) Registry { + var options Options for _, o := range opts { - o(&opt) + o(&options) } // use default config config := consul.DefaultConfig() // set timeout - if opt.Timeout > 0 { - config.HttpClient.Timeout = opt.Timeout + if options.Timeout > 0 { + config.HttpClient.Timeout = options.Timeout } // check if there are any addrs - if len(addrs) > 0 { - addr, port, err := net.SplitHostPort(addrs[0]) + if len(options.Addrs) > 0 { + addr, port, err := net.SplitHostPort(options.Addrs[0]) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "8500" - addr = addrs[0] + addr = options.Addrs[0] config.Address = fmt.Sprintf("%s:%s", addr, port) } else if err == nil { config.Address = fmt.Sprintf("%s:%s", addr, port) @@ -136,10 +136,10 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry { } // requires secure connection? - if opt.Secure || opt.TLSConfig != nil { + if options.Secure || options.TLSConfig != nil { config.Scheme = "https" // We're going to support InsecureSkipVerify - config.HttpClient.Transport = newTransport(opt.TLSConfig) + config.HttpClient.Transport = newTransport(options.TLSConfig) } // create the client @@ -148,7 +148,7 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry { cr := &consulRegistry{ Address: config.Address, Client: client, - Options: opt, + Options: options, } return cr diff --git a/registry/options.go b/registry/options.go index 04e3525b..cac28b49 100644 --- a/registry/options.go +++ b/registry/options.go @@ -8,6 +8,7 @@ import ( ) type Options struct { + Addrs []string Timeout time.Duration Secure bool TLSConfig *tls.Config @@ -24,6 +25,13 @@ type RegisterOptions struct { Context context.Context } +// Addrs is the registry addresses to use +func Addrs(addrs ...string) Option { + return func(o *Options) { + o.Addrs = addrs + } +} + func Timeout(t time.Duration) Option { return func(o *Options) { o.Timeout = t diff --git a/registry/registry.go b/registry/registry.go index 909f8ab3..8a979424 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -21,13 +21,13 @@ type Option func(*Options) type RegisterOption func(*RegisterOptions) var ( - DefaultRegistry = newConsulRegistry([]string{}) + DefaultRegistry = newConsulRegistry() ErrNotFound = errors.New("not found") ) -func NewRegistry(addrs []string, opt ...Option) Registry { - return newConsulRegistry(addrs, opt...) +func NewRegistry(opts ...Option) Registry { + return newConsulRegistry(opts...) } // Register a service node. Additionally supply options such as TTL. diff --git a/transport/http/http.go b/transport/http/http.go index 16cdc102..eb5b56ba 100644 --- a/transport/http/http.go +++ b/transport/http/http.go @@ -9,6 +9,6 @@ func init() { cmd.DefaultTransports["http"] = NewTransport } -func NewTransport(addrs []string, opts ...transport.Option) transport.Transport { - return transport.NewTransport(addrs, opts...) +func NewTransport(opts ...transport.Option) transport.Transport { + return transport.NewTransport(opts...) } diff --git a/transport/http_transport.go b/transport/http_transport.go index 4cb9c152..9d4105a5 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -420,7 +420,7 @@ func (h *httpTransport) String() string { return "http" } -func newHttpTransport(addrs []string, opts ...Option) *httpTransport { +func newHttpTransport(opts ...Option) *httpTransport { var options Options for _, o := range opts { o(&options) diff --git a/transport/http_transport_test.go b/transport/http_transport_test.go index 596b3bbb..a0eb1468 100644 --- a/transport/http_transport_test.go +++ b/transport/http_transport_test.go @@ -18,7 +18,7 @@ func expectedPort(t *testing.T, expected string, lsn transport.Listener) { } func TestHTTPTransportPortRange(t *testing.T) { - tp := transport.NewTransport([]string{}) + tp := transport.NewTransport() lsn1, err := tp.Listen(":44444-44448") if err != nil { @@ -43,7 +43,7 @@ func TestHTTPTransportPortRange(t *testing.T) { } func TestHTTPTransportCommunication(t *testing.T) { - tr := transport.NewTransport([]string{}) + tr := transport.NewTransport() l, err := tr.Listen(":0") if err != nil { diff --git a/transport/options.go b/transport/options.go index cb7ba98a..8d0ecc54 100644 --- a/transport/options.go +++ b/transport/options.go @@ -8,6 +8,7 @@ import ( ) type Options struct { + Addrs []string Secure bool TLSConfig *tls.Config @@ -37,6 +38,13 @@ type ListenOptions struct { Context context.Context } +// Addrs to use for transport +func Addrs(addrs ...string) Option { + return func(o *Options) { + o.Addrs = addrs + } +} + // Use secure communication. If TLSConfig is not specified we // use InsecureSkipVerify and generate a self signed cert func Secure(b bool) Option { diff --git a/transport/transport.go b/transport/transport.go index dc130edd..1010d6b8 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -43,13 +43,13 @@ type DialOption func(*DialOptions) type ListenOption func(*ListenOptions) var ( - DefaultTransport Transport = newHttpTransport([]string{}) + DefaultTransport Transport = newHttpTransport() DefaultDialTimeout = time.Second * 5 ) -func NewTransport(addrs []string, opt ...Option) Transport { - return newHttpTransport(addrs, opt...) +func NewTransport(opts ...Option) Transport { + return newHttpTransport(opts...) } func Dial(addr string, opts ...DialOption) (Client, error) {