Merge pull request #292 from micro/init
Add Init to all things, use init in cmd package to initialise
This commit is contained in:
commit
a0d3917832
67
cmd/cmd.go
67
cmd/cmd.go
@ -262,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
|||||||
|
|
||||||
// Set the client
|
// Set the client
|
||||||
if name := ctx.String("client"); len(name) > 0 {
|
if name := ctx.String("client"); len(name) > 0 {
|
||||||
if cl, ok := c.opts.Clients[name]; ok {
|
// only change if we have the client and type differs
|
||||||
|
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
|
||||||
*c.opts.Client = cl()
|
*c.opts.Client = cl()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the server
|
// Set the server
|
||||||
if name := ctx.String("server"); len(name) > 0 {
|
if name := ctx.String("server"); len(name) > 0 {
|
||||||
if s, ok := c.opts.Servers[name]; ok {
|
// only change if we have the server and type differs
|
||||||
|
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
|
||||||
*c.opts.Server = s()
|
*c.opts.Server = s()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the broker
|
// Set the broker
|
||||||
if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 {
|
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
|
||||||
if len(name) == 0 {
|
b, ok := c.opts.Brokers[name]
|
||||||
name = defaultBroker
|
if !ok {
|
||||||
}
|
|
||||||
|
|
||||||
if b, ok := c.opts.Brokers[name]; ok {
|
|
||||||
n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
|
||||||
*c.opts.Broker = n
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Broker %s not found", name)
|
return fmt.Errorf("Broker %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*c.opts.Broker = b()
|
||||||
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
|
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
|
||||||
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
|
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the registry
|
// Set the registry
|
||||||
if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 {
|
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
|
||||||
if len(name) == 0 {
|
r, ok := c.opts.Registries[name]
|
||||||
name = defaultRegistry
|
if !ok {
|
||||||
}
|
|
||||||
|
|
||||||
if r, ok := c.opts.Registries[name]; ok {
|
|
||||||
n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
|
||||||
*c.opts.Registry = n
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Registry %s not found", name)
|
return fmt.Errorf("Registry %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*c.opts.Registry = r()
|
||||||
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
|
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
|
||||||
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
|
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
|
||||||
|
|
||||||
@ -314,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set the selector
|
// Set the selector
|
||||||
if name := ctx.String("selector"); len(name) > 0 {
|
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
|
||||||
if s, ok := c.opts.Selectors[name]; ok {
|
s, ok := c.opts.Selectors[name]
|
||||||
n := s(selector.Registry(*c.opts.Registry))
|
if !ok {
|
||||||
*c.opts.Selector = n
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Selector %s not found", name)
|
return fmt.Errorf("Selector %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*c.opts.Selector = s(selector.Registry(*c.opts.Registry))
|
||||||
|
|
||||||
// No server option here. Should there be?
|
// No server option here. Should there be?
|
||||||
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the transport
|
// Set the transport
|
||||||
if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 {
|
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
|
||||||
if len(name) == 0 {
|
t, ok := c.opts.Transports[name]
|
||||||
name = defaultTransport
|
if !ok {
|
||||||
}
|
|
||||||
|
|
||||||
if t, ok := c.opts.Transports[name]; ok {
|
|
||||||
n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
|
||||||
*c.opts.Transport = n
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Transport %s not found", name)
|
return fmt.Errorf("Transport %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*c.opts.Transport = t()
|
||||||
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
|
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
|
||||||
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
|
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
|
||||||
}
|
}
|
||||||
@ -359,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
|||||||
serverOpts = append(serverOpts, server.Metadata(metadata))
|
serverOpts = append(serverOpts, server.Metadata(metadata))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(ctx.String("broker_address")) > 0 {
|
||||||
|
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ctx.String("registry_address")) > 0 {
|
||||||
|
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ctx.String("transport_address")) > 0 {
|
||||||
|
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
||||||
|
}
|
||||||
|
|
||||||
if len(ctx.String("server_name")) > 0 {
|
if len(ctx.String("server_name")) > 0 {
|
||||||
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
|
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,19 @@ type consulRegistry struct {
|
|||||||
register map[string]uint64
|
register map[string]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||||
|
// splay slightly for the watcher?
|
||||||
|
splay := time.Second * 5
|
||||||
|
deregTTL := t + splay
|
||||||
|
|
||||||
|
// consul has a minimum timeout on deregistration of 1 minute.
|
||||||
|
if t < time.Minute {
|
||||||
|
deregTTL = time.Minute + splay
|
||||||
|
}
|
||||||
|
|
||||||
|
return deregTTL
|
||||||
|
}
|
||||||
|
|
||||||
func newTransport(config *tls.Config) *http.Transport {
|
func newTransport(config *tls.Config) *http.Transport {
|
||||||
if config == nil {
|
if config == nil {
|
||||||
config = &tls.Config{
|
config = &tls.Config{
|
||||||
@ -48,32 +61,31 @@ func newTransport(config *tls.Config) *http.Transport {
|
|||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConsulRegistry(opts ...Option) Registry {
|
func configure(c *consulRegistry, opts ...Option) {
|
||||||
var options Options
|
// set opts
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&c.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// use default config
|
// use default config
|
||||||
config := consul.DefaultConfig()
|
config := consul.DefaultConfig()
|
||||||
connect := false
|
|
||||||
|
|
||||||
if options.Context != nil {
|
if c.opts.Context != nil {
|
||||||
// Use the consul config passed in the options, if available
|
// Use the consul config passed in the options, if available
|
||||||
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
|
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
|
||||||
config = c
|
config = co
|
||||||
}
|
}
|
||||||
if cn, ok := options.Context.Value("consul_connect").(bool); ok {
|
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
|
||||||
connect = cn
|
c.connect = cn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if there are any addrs
|
// check if there are any addrs
|
||||||
if len(options.Addrs) > 0 {
|
if len(c.opts.Addrs) > 0 {
|
||||||
addr, port, err := net.SplitHostPort(options.Addrs[0])
|
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
|
||||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
||||||
port = "8500"
|
port = "8500"
|
||||||
addr = options.Addrs[0]
|
addr = c.opts.Addrs[0]
|
||||||
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
||||||
@ -81,33 +93,41 @@ func newConsulRegistry(opts ...Option) Registry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// requires secure connection?
|
// requires secure connection?
|
||||||
if options.Secure || options.TLSConfig != nil {
|
if c.opts.Secure || c.opts.TLSConfig != nil {
|
||||||
if config.HttpClient == nil {
|
if config.HttpClient == nil {
|
||||||
config.HttpClient = new(http.Client)
|
config.HttpClient = new(http.Client)
|
||||||
}
|
}
|
||||||
|
|
||||||
config.Scheme = "https"
|
config.Scheme = "https"
|
||||||
// We're going to support InsecureSkipVerify
|
// We're going to support InsecureSkipVerify
|
||||||
config.HttpClient.Transport = newTransport(options.TLSConfig)
|
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
// set timeout
|
||||||
|
if c.opts.Timeout > 0 {
|
||||||
|
config.HttpClient.Timeout = c.opts.Timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the client
|
// create the client
|
||||||
client, _ := consul.NewClient(config)
|
client, _ := consul.NewClient(config)
|
||||||
|
|
||||||
// set timeout
|
// set address/client
|
||||||
if options.Timeout > 0 {
|
c.Address = config.Address
|
||||||
config.HttpClient.Timeout = options.Timeout
|
c.Client = client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newConsulRegistry(opts ...Option) Registry {
|
||||||
cr := &consulRegistry{
|
cr := &consulRegistry{
|
||||||
Address: config.Address,
|
opts: Options{},
|
||||||
Client: client,
|
|
||||||
opts: options,
|
|
||||||
register: make(map[string]uint64),
|
register: make(map[string]uint64),
|
||||||
connect: connect,
|
}
|
||||||
|
configure(cr, opts...)
|
||||||
|
return cr
|
||||||
}
|
}
|
||||||
|
|
||||||
return cr
|
func (c *consulRegistry) Init(opts ...Option) error {
|
||||||
|
configure(c, opts...)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulRegistry) Deregister(s *Service) error {
|
func (c *consulRegistry) Deregister(s *Service) error {
|
||||||
@ -124,19 +144,6 @@ func (c *consulRegistry) Deregister(s *Service) error {
|
|||||||
return c.Client.Agent().ServiceDeregister(node.Id)
|
return c.Client.Agent().ServiceDeregister(node.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
|
||||||
// splay slightly for the watcher?
|
|
||||||
splay := time.Second * 5
|
|
||||||
deregTTL := t + splay
|
|
||||||
|
|
||||||
// consul has a minimum timeout on deregistration of 1 minute.
|
|
||||||
if t < time.Minute {
|
|
||||||
deregTTL = time.Minute + splay
|
|
||||||
}
|
|
||||||
|
|
||||||
return deregTTL
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||||
if len(s.Nodes) == 0 {
|
if len(s.Nodes) == 0 {
|
||||||
return errors.New("Require at least one node")
|
return errors.New("Require at least one node")
|
||||||
|
@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&m.opts)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mdnsRegistry) Options() registry.Options {
|
||||||
|
return m.opts
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
|
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
@ -322,10 +333,6 @@ func (m *mdnsRegistry) String() string {
|
|||||||
return "mdns"
|
return "mdns"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mdnsRegistry) Options() registry.Options {
|
|
||||||
return m.opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||||
return newRegistry(opts...)
|
return newRegistry(opts...)
|
||||||
}
|
}
|
||||||
|
@ -99,11 +99,15 @@ func (m *mockRegistry) String() string {
|
|||||||
return "mock"
|
return "mock"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockRegistry) Init(opts ...registry.Option) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockRegistry) Options() registry.Options {
|
func (m *mockRegistry) Options() registry.Options {
|
||||||
return registry.Options{}
|
return registry.Options{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRegistry() registry.Registry {
|
func NewRegistry(opts ...registry.Options) registry.Registry {
|
||||||
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
|
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
|
||||||
m.init()
|
m.init()
|
||||||
return m
|
return m
|
||||||
|
@ -9,13 +9,14 @@ import (
|
|||||||
// and an abstraction over varying implementations
|
// and an abstraction over varying implementations
|
||||||
// {consul, etcd, zookeeper, ...}
|
// {consul, etcd, zookeeper, ...}
|
||||||
type Registry interface {
|
type Registry interface {
|
||||||
|
Init(...Option) error
|
||||||
|
Options() Options
|
||||||
Register(*Service, ...RegisterOption) error
|
Register(*Service, ...RegisterOption) error
|
||||||
Deregister(*Service) error
|
Deregister(*Service) error
|
||||||
GetService(string) ([]*Service, error)
|
GetService(string) ([]*Service, error)
|
||||||
ListServices() ([]*Service, error)
|
ListServices() ([]*Service, error)
|
||||||
Watch(...WatchOption) (Watcher, error)
|
Watch(...WatchOption) (Watcher, error)
|
||||||
String() string
|
String() string
|
||||||
Options() Options
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
@ -423,6 +423,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpTransport) Init(opts ...Option) error {
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&h.opts)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *httpTransport) Options() Options {
|
||||||
|
return h.opts
|
||||||
|
}
|
||||||
|
|
||||||
func (h *httpTransport) String() string {
|
func (h *httpTransport) String() string {
|
||||||
return "http"
|
return "http"
|
||||||
}
|
}
|
||||||
|
@ -171,6 +171,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
|
|||||||
return listener, nil
|
return listener, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockTransport) Init(opts ...transport.Option) error {
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&m.opts)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTransport) Options() transport.Options {
|
||||||
|
return m.opts
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockTransport) String() string {
|
func (m *mockTransport) String() string {
|
||||||
return "mock"
|
return "mock"
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,8 @@ type Listener interface {
|
|||||||
// services. It uses socket send/recv semantics and had various
|
// services. It uses socket send/recv semantics and had various
|
||||||
// implementations {HTTP, RabbitMQ, NATS, ...}
|
// implementations {HTTP, RabbitMQ, NATS, ...}
|
||||||
type Transport interface {
|
type Transport interface {
|
||||||
|
Init(...Option) error
|
||||||
|
Options() Options
|
||||||
Dial(addr string, opts ...DialOption) (Client, error)
|
Dial(addr string, opts ...DialOption) (Client, error)
|
||||||
Listen(addr string, opts ...ListenOption) (Listener, error)
|
Listen(addr string, opts ...ListenOption) (Listener, error)
|
||||||
String() string
|
String() string
|
||||||
|
Loading…
Reference in New Issue
Block a user