diff --git a/cmd/cmd.go b/cmd/cmd.go index c1173cc4..c2e4b5ed 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -262,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error { // Set the client if name := ctx.String("client"); len(name) > 0 { - if cl, ok := c.opts.Clients[name]; ok { + // only change if we have the client and type differs + if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name { *c.opts.Client = cl() } } // Set the server if name := ctx.String("server"); len(name) > 0 { - if s, ok := c.opts.Servers[name]; ok { + // only change if we have the server and type differs + if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name { *c.opts.Server = s() } } // Set the broker - if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 { - if len(name) == 0 { - name = defaultBroker - } - - if b, ok := c.opts.Brokers[name]; ok { - n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)) - *c.opts.Broker = n - } else { + if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name { + b, ok := c.opts.Brokers[name] + if !ok { return fmt.Errorf("Broker %s not found", name) } + *c.opts.Broker = b() serverOpts = append(serverOpts, server.Broker(*c.opts.Broker)) clientOpts = append(clientOpts, client.Broker(*c.opts.Broker)) } // Set the registry - if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 { - if len(name) == 0 { - name = defaultRegistry - } - - if r, ok := c.opts.Registries[name]; ok { - n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)) - *c.opts.Registry = n - } else { + if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name { + r, ok := c.opts.Registries[name] + if !ok { return fmt.Errorf("Registry %s not found", name) } + *c.opts.Registry = r() serverOpts = append(serverOpts, server.Registry(*c.opts.Registry)) clientOpts = append(clientOpts, client.Registry(*c.opts.Registry)) @@ -314,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error { } // Set the selector - if name := ctx.String("selector"); len(name) > 0 { - if s, ok := c.opts.Selectors[name]; ok { - n := s(selector.Registry(*c.opts.Registry)) - *c.opts.Selector = n - } else { + if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name { + s, ok := c.opts.Selectors[name] + if !ok { return fmt.Errorf("Selector %s not found", name) } + *c.opts.Selector = s(selector.Registry(*c.opts.Registry)) + // No server option here. Should there be? clientOpts = append(clientOpts, client.Selector(*c.opts.Selector)) } // Set the transport - if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 { - if len(name) == 0 { - name = defaultTransport - } - - if t, ok := c.opts.Transports[name]; ok { - n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)) - *c.opts.Transport = n - } else { + if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name { + t, ok := c.opts.Transports[name] + if !ok { return fmt.Errorf("Transport %s not found", name) } + *c.opts.Transport = t() serverOpts = append(serverOpts, server.Transport(*c.opts.Transport)) clientOpts = append(clientOpts, client.Transport(*c.opts.Transport)) } @@ -359,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error { serverOpts = append(serverOpts, server.Metadata(metadata)) } + if len(ctx.String("broker_address")) > 0 { + (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)) + } + + if len(ctx.String("registry_address")) > 0 { + (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)) + } + + if len(ctx.String("transport_address")) > 0 { + (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)) + } + if len(ctx.String("server_name")) > 0 { serverOpts = append(serverOpts, server.Name(ctx.String("server_name"))) } diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 751e6998..e8de9ead 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -26,6 +26,19 @@ type consulRegistry struct { register map[string]uint64 } +func getDeregisterTTL(t time.Duration) time.Duration { + // splay slightly for the watcher? + splay := time.Second * 5 + deregTTL := t + splay + + // consul has a minimum timeout on deregistration of 1 minute. + if t < time.Minute { + deregTTL = time.Minute + splay + } + + return deregTTL +} + func newTransport(config *tls.Config) *http.Transport { if config == nil { config = &tls.Config{ @@ -48,32 +61,31 @@ func newTransport(config *tls.Config) *http.Transport { return t } -func newConsulRegistry(opts ...Option) Registry { - var options Options +func configure(c *consulRegistry, opts ...Option) { + // set opts for _, o := range opts { - o(&options) + o(&c.opts) } // use default config config := consul.DefaultConfig() - connect := false - if options.Context != nil { + if c.opts.Context != nil { // Use the consul config passed in the options, if available - if c, ok := options.Context.Value("consul_config").(*consul.Config); ok { - config = c + if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok { + config = co } - if cn, ok := options.Context.Value("consul_connect").(bool); ok { - connect = cn + if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok { + c.connect = cn } } // check if there are any addrs - if len(options.Addrs) > 0 { - addr, port, err := net.SplitHostPort(options.Addrs[0]) + if len(c.opts.Addrs) > 0 { + addr, port, err := net.SplitHostPort(c.opts.Addrs[0]) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "8500" - addr = options.Addrs[0] + addr = c.opts.Addrs[0] config.Address = fmt.Sprintf("%s:%s", addr, port) } else if err == nil { config.Address = fmt.Sprintf("%s:%s", addr, port) @@ -81,35 +93,43 @@ func newConsulRegistry(opts ...Option) Registry { } // requires secure connection? - if options.Secure || options.TLSConfig != nil { + if c.opts.Secure || c.opts.TLSConfig != nil { if config.HttpClient == nil { config.HttpClient = new(http.Client) } config.Scheme = "https" // We're going to support InsecureSkipVerify - config.HttpClient.Transport = newTransport(options.TLSConfig) + config.HttpClient.Transport = newTransport(c.opts.TLSConfig) + } + + // set timeout + if c.opts.Timeout > 0 { + config.HttpClient.Timeout = c.opts.Timeout } // create the client client, _ := consul.NewClient(config) - // set timeout - if options.Timeout > 0 { - config.HttpClient.Timeout = options.Timeout - } + // set address/client + c.Address = config.Address + c.Client = client +} +func newConsulRegistry(opts ...Option) Registry { cr := &consulRegistry{ - Address: config.Address, - Client: client, - opts: options, + opts: Options{}, register: make(map[string]uint64), - connect: connect, } - + configure(cr, opts...) return cr } +func (c *consulRegistry) Init(opts ...Option) error { + configure(c, opts...) + return nil +} + func (c *consulRegistry) Deregister(s *Service) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") @@ -124,19 +144,6 @@ func (c *consulRegistry) Deregister(s *Service) error { return c.Client.Agent().ServiceDeregister(node.Id) } -func getDeregisterTTL(t time.Duration) time.Duration { - // splay slightly for the watcher? - splay := time.Second * 5 - deregTTL := t + splay - - // consul has a minimum timeout on deregistration of 1 minute. - if t < time.Minute { - deregTTL = time.Minute + splay - } - - return deregTTL -} - func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go index 1b942875..6ff6ad10 100644 --- a/registry/mdns/mdns.go +++ b/registry/mdns/mdns.go @@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry { } } +func (m *mdnsRegistry) Init(opts ...registry.Option) error { + for _, o := range opts { + o(&m.opts) + } + return nil +} + +func (m *mdnsRegistry) Options() registry.Options { + return m.opts +} + func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error { m.Lock() defer m.Unlock() @@ -322,10 +333,6 @@ func (m *mdnsRegistry) String() string { return "mdns" } -func (m *mdnsRegistry) Options() registry.Options { - return m.opts -} - func NewRegistry(opts ...registry.Option) registry.Registry { return newRegistry(opts...) } diff --git a/registry/mock/mock.go b/registry/mock/mock.go index 1c248b3e..bd467be6 100644 --- a/registry/mock/mock.go +++ b/registry/mock/mock.go @@ -99,11 +99,15 @@ func (m *mockRegistry) String() string { return "mock" } +func (m *mockRegistry) Init(opts ...registry.Option) error { + return nil +} + func (m *mockRegistry) Options() registry.Options { return registry.Options{} } -func NewRegistry() registry.Registry { +func NewRegistry(opts ...registry.Options) registry.Registry { m := &mockRegistry{Services: make(map[string][]*registry.Service)} m.init() return m diff --git a/registry/registry.go b/registry/registry.go index 489517f5..8cce18db 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -9,13 +9,14 @@ import ( // and an abstraction over varying implementations // {consul, etcd, zookeeper, ...} type Registry interface { + Init(...Option) error + Options() Options Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) Watch(...WatchOption) (Watcher, error) String() string - Options() Options } type Option func(*Options) diff --git a/transport/http_transport.go b/transport/http_transport.go index 10fb452e..c18f3561 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -423,6 +423,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err }, nil } +func (h *httpTransport) Init(opts ...Option) error { + for _, o := range opts { + o(&h.opts) + } + return nil +} + +func (h *httpTransport) Options() Options { + return h.opts +} + func (h *httpTransport) String() string { return "http" } diff --git a/transport/mock/mock.go b/transport/mock/mock.go index d544d96b..594af71f 100644 --- a/transport/mock/mock.go +++ b/transport/mock/mock.go @@ -171,6 +171,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra return listener, nil } +func (m *mockTransport) Init(opts ...transport.Option) error { + for _, o := range opts { + o(&m.opts) + } + return nil +} + +func (m *mockTransport) Options() transport.Options { + return m.opts +} + func (m *mockTransport) String() string { return "mock" } diff --git a/transport/transport.go b/transport/transport.go index 383d5b6f..313a4196 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -30,6 +30,8 @@ type Listener interface { // services. It uses socket send/recv semantics and had various // implementations {HTTP, RabbitMQ, NATS, ...} type Transport interface { + Init(...Option) error + Options() Options Dial(addr string, opts ...DialOption) (Client, error) Listen(addr string, opts ...ListenOption) (Listener, error) String() string