Merge pull request #60 from micro/init

Strip string slice from New[Thing] for addrs
This commit is contained in:
Asim 2016-03-16 10:05:57 +00:00
commit 7126dc1238
16 changed files with 69 additions and 45 deletions

View File

@ -38,11 +38,11 @@ type Subscriber interface {
} }
var ( var (
DefaultBroker Broker = newHttpBroker([]string{}) DefaultBroker Broker = newHttpBroker()
) )
func NewBroker(addrs []string, opt ...Option) Broker { func NewBroker(opts ...Option) Broker {
return newHttpBroker(addrs, opt...) return newHttpBroker(opts...)
} }
func Init(opts ...Option) error { func Init(opts ...Option) error {

View File

@ -9,6 +9,6 @@ func init() {
cmd.DefaultBrokers["http"] = NewBroker cmd.DefaultBrokers["http"] = NewBroker
} }
func NewBroker(addrs []string, opts ...broker.Option) broker.Broker { func NewBroker(opts ...broker.Option) broker.Broker {
return broker.NewBroker(addrs, opts...) return broker.NewBroker(opts...)
} }

View File

@ -92,7 +92,7 @@ func newTransport(config *tls.Config) *http.Transport {
return t return t
} }
func newHttpBroker(addrs []string, opts ...Option) Broker { func newHttpBroker(opts ...Option) Broker {
options := Options{ options := Options{
Context: context.TODO(), Context: context.TODO(),
} }
@ -102,8 +102,8 @@ func newHttpBroker(addrs []string, opts ...Option) Broker {
} }
addr := ":0" addr := ":0"
if len(addrs) > 0 && len(addrs[0]) > 0 { if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = addrs[0] addr = options.Addrs[0]
} }
reg, ok := options.Context.Value(registryKey).(registry.Registry) reg, ok := options.Context.Value(registryKey).(registry.Registry)

View File

@ -8,7 +8,7 @@ import (
func TestBroker(t *testing.T) { func TestBroker(t *testing.T) {
m := mock.NewRegistry() m := mock.NewRegistry()
b := NewBroker([]string{}, Registry(m)) b := NewBroker(Registry(m))
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Errorf("Unexpected init error: %v", err) t.Errorf("Unexpected init error: %v", err)

View File

@ -8,6 +8,7 @@ import (
) )
type Options struct { type Options struct {
Addrs []string
Secure bool Secure bool
TLSConfig *tls.Config TLSConfig *tls.Config
@ -60,6 +61,13 @@ func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
return opt return opt
} }
// Addrs sets the host addresses to be used by the broker
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
// DisableAutoAck will disable auto acking of messages // DisableAutoAck will disable auto acking of messages
// after they have been handled. // after they have been handled.
func DisableAutoAck() SubscribeOption { func DisableAutoAck() SubscribeOption {

View File

@ -106,11 +106,11 @@ var (
}, },
} }
DefaultBrokers = map[string]func([]string, ...broker.Option) broker.Broker{ DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
"http": broker.NewBroker, "http": broker.NewBroker,
} }
DefaultRegistries = map[string]func([]string, ...registry.Option) registry.Registry{ DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
"consul": registry.NewRegistry, "consul": registry.NewRegistry,
} }
@ -118,7 +118,7 @@ var (
"random": selector.NewSelector, "random": selector.NewSelector,
} }
DefaultTransports = map[string]func([]string, ...transport.Option) transport.Transport{ DefaultTransports = map[string]func(...transport.Option) transport.Transport{
"http": transport.NewTransport, "http": transport.NewTransport,
} }
@ -198,7 +198,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
} }
if b, ok := c.opts.Brokers[name]; ok { if b, ok := c.opts.Brokers[name]; ok {
n := b(strings.Split(ctx.String("broker_address"), ",")) n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
*c.opts.Broker = n *c.opts.Broker = n
} else { } else {
return fmt.Errorf("Broker %s not found", name) return fmt.Errorf("Broker %s not found", name)
@ -216,7 +216,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
} }
if r, ok := c.opts.Registries[name]; ok { if r, ok := c.opts.Registries[name]; ok {
n := r(strings.Split(ctx.String("registry_address"), ",")) n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
*c.opts.Registry = n *c.opts.Registry = n
} else { } else {
return fmt.Errorf("Registry %s not found", name) return fmt.Errorf("Registry %s not found", name)
@ -251,7 +251,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
} }
if t, ok := c.opts.Transports[name]; ok { if t, ok := c.opts.Transports[name]; ok {
n := t(strings.Split(ctx.String("transport_address"), ",")) n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
*c.opts.Transport = n *c.opts.Transport = n
} else { } else {
return fmt.Errorf("Transport %s not found", name) return fmt.Errorf("Transport %s not found", name)

View File

@ -25,10 +25,10 @@ type Options struct {
Client *client.Client Client *client.Client
Server *server.Server Server *server.Server
Brokers map[string]func([]string, ...broker.Option) broker.Broker Brokers map[string]func(...broker.Option) broker.Broker
Registries map[string]func([]string, ...registry.Option) registry.Registry Registries map[string]func(...registry.Option) registry.Registry
Selectors map[string]func(...selector.Option) selector.Selector Selectors map[string]func(...selector.Option) selector.Selector
Transports map[string]func([]string, ...transport.Option) transport.Transport Transports map[string]func(...transport.Option) transport.Transport
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
@ -93,14 +93,14 @@ func Server(s *server.Server) Option {
} }
// New broker func // New broker func
func NewBroker(name string, b func([]string, ...broker.Option) broker.Broker) Option { func NewBroker(name string, b func(...broker.Option) broker.Broker) Option {
return func(o *Options) { return func(o *Options) {
o.Brokers[name] = b o.Brokers[name] = b
} }
} }
// New registry func // New registry func
func NewRegistry(name string, r func([]string, ...registry.Option) registry.Registry) Option { func NewRegistry(name string, r func(...registry.Option) registry.Registry) Option {
return func(o *Options) { return func(o *Options) {
o.Registries[name] = r o.Registries[name] = r
} }
@ -114,7 +114,7 @@ func NewSelector(name string, s func(...selector.Option) selector.Selector) Opti
} }
// New transport func // New transport func
func NewTransport(name string, t func([]string, ...transport.Option) transport.Transport) Option { func NewTransport(name string, t func(...transport.Option) transport.Transport) Option {
return func(o *Options) { return func(o *Options) {
o.Transports[name] = t o.Transports[name] = t
} }

View File

@ -9,6 +9,6 @@ func init() {
cmd.DefaultRegistries["consul"] = NewRegistry cmd.DefaultRegistries["consul"] = NewRegistry
} }
func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { func NewRegistry(opts ...registry.Option) registry.Registry {
return registry.NewRegistry(addrs, opts...) return registry.NewRegistry(opts...)
} }

View File

@ -109,26 +109,26 @@ func decodeVersion(tags []string) (string, bool) {
return "", false return "", false
} }
func newConsulRegistry(addrs []string, opts ...Option) Registry { func newConsulRegistry(opts ...Option) Registry {
var opt Options var options Options
for _, o := range opts { for _, o := range opts {
o(&opt) o(&options)
} }
// use default config // use default config
config := consul.DefaultConfig() config := consul.DefaultConfig()
// set timeout // set timeout
if opt.Timeout > 0 { if options.Timeout > 0 {
config.HttpClient.Timeout = opt.Timeout config.HttpClient.Timeout = options.Timeout
} }
// check if there are any addrs // check if there are any addrs
if len(addrs) > 0 { if len(options.Addrs) > 0 {
addr, port, err := net.SplitHostPort(addrs[0]) addr, port, err := net.SplitHostPort(options.Addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500" port = "8500"
addr = addrs[0] addr = options.Addrs[0]
config.Address = fmt.Sprintf("%s:%s", addr, port) config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil { } else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port) config.Address = fmt.Sprintf("%s:%s", addr, port)
@ -136,10 +136,10 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry {
} }
// requires secure connection? // requires secure connection?
if opt.Secure || opt.TLSConfig != nil { if options.Secure || options.TLSConfig != nil {
config.Scheme = "https" config.Scheme = "https"
// We're going to support InsecureSkipVerify // We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(opt.TLSConfig) config.HttpClient.Transport = newTransport(options.TLSConfig)
} }
// create the client // create the client
@ -148,7 +148,7 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry {
cr := &consulRegistry{ cr := &consulRegistry{
Address: config.Address, Address: config.Address,
Client: client, Client: client,
Options: opt, Options: options,
} }
return cr return cr

View File

@ -8,6 +8,7 @@ import (
) )
type Options struct { type Options struct {
Addrs []string
Timeout time.Duration Timeout time.Duration
Secure bool Secure bool
TLSConfig *tls.Config TLSConfig *tls.Config
@ -24,6 +25,13 @@ type RegisterOptions struct {
Context context.Context Context context.Context
} }
// Addrs is the registry addresses to use
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
func Timeout(t time.Duration) Option { func Timeout(t time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.Timeout = t o.Timeout = t

View File

@ -21,13 +21,13 @@ type Option func(*Options)
type RegisterOption func(*RegisterOptions) type RegisterOption func(*RegisterOptions)
var ( var (
DefaultRegistry = newConsulRegistry([]string{}) DefaultRegistry = newConsulRegistry()
ErrNotFound = errors.New("not found") ErrNotFound = errors.New("not found")
) )
func NewRegistry(addrs []string, opt ...Option) Registry { func NewRegistry(opts ...Option) Registry {
return newConsulRegistry(addrs, opt...) return newConsulRegistry(opts...)
} }
// Register a service node. Additionally supply options such as TTL. // Register a service node. Additionally supply options such as TTL.

View File

@ -9,6 +9,6 @@ func init() {
cmd.DefaultTransports["http"] = NewTransport cmd.DefaultTransports["http"] = NewTransport
} }
func NewTransport(addrs []string, opts ...transport.Option) transport.Transport { func NewTransport(opts ...transport.Option) transport.Transport {
return transport.NewTransport(addrs, opts...) return transport.NewTransport(opts...)
} }

View File

@ -420,7 +420,7 @@ func (h *httpTransport) String() string {
return "http" return "http"
} }
func newHttpTransport(addrs []string, opts ...Option) *httpTransport { func newHttpTransport(opts ...Option) *httpTransport {
var options Options var options Options
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)

View File

@ -18,7 +18,7 @@ func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
} }
func TestHTTPTransportPortRange(t *testing.T) { func TestHTTPTransportPortRange(t *testing.T) {
tp := transport.NewTransport([]string{}) tp := transport.NewTransport()
lsn1, err := tp.Listen(":44444-44448") lsn1, err := tp.Listen(":44444-44448")
if err != nil { if err != nil {
@ -43,7 +43,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
} }
func TestHTTPTransportCommunication(t *testing.T) { func TestHTTPTransportCommunication(t *testing.T) {
tr := transport.NewTransport([]string{}) tr := transport.NewTransport()
l, err := tr.Listen(":0") l, err := tr.Listen(":0")
if err != nil { if err != nil {

View File

@ -8,6 +8,7 @@ import (
) )
type Options struct { type Options struct {
Addrs []string
Secure bool Secure bool
TLSConfig *tls.Config TLSConfig *tls.Config
@ -37,6 +38,13 @@ type ListenOptions struct {
Context context.Context Context context.Context
} }
// Addrs to use for transport
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
// Use secure communication. If TLSConfig is not specified we // Use secure communication. If TLSConfig is not specified we
// use InsecureSkipVerify and generate a self signed cert // use InsecureSkipVerify and generate a self signed cert
func Secure(b bool) Option { func Secure(b bool) Option {

View File

@ -43,13 +43,13 @@ type DialOption func(*DialOptions)
type ListenOption func(*ListenOptions) type ListenOption func(*ListenOptions)
var ( var (
DefaultTransport Transport = newHttpTransport([]string{}) DefaultTransport Transport = newHttpTransport()
DefaultDialTimeout = time.Second * 5 DefaultDialTimeout = time.Second * 5
) )
func NewTransport(addrs []string, opt ...Option) Transport { func NewTransport(opts ...Option) Transport {
return newHttpTransport(addrs, opt...) return newHttpTransport(opts...)
} }
func Dial(addr string, opts ...DialOption) (Client, error) { func Dial(addr string, opts ...DialOption) (Client, error) {