diff --git a/broker/broker.go b/broker/broker.go index 9fae86f8..1844a828 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -25,6 +25,10 @@ type Subscriber interface { Unsubscribe() error } +type options struct{} + +type Options func(*options) + var ( Address string Id string @@ -37,7 +41,7 @@ func Init() error { } if DefaultBroker == nil { - DefaultBroker = NewHttpBroker(Address) + DefaultBroker = NewHttpBroker([]string{Address}) } return DefaultBroker.Init() diff --git a/broker/http_broker.go b/broker/http_broker.go index ece12d52..4d2ae687 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -225,10 +225,15 @@ func (h *HttpBroker) Subscribe(topic string, function func(*Message)) (Subscribe return subscriber, nil } -func NewHttpBroker(address string) Broker { +func NewHttpBroker(addrs []string, opts ...Options) Broker { + addr := ":0" + if len(addrs) > 0 { + addr = addrs[0] + } + return &HttpBroker{ id: Id, - address: address, + address: addr, subscribers: make(map[string][]*HttpSubscriber), unsubscribe: make(chan *HttpSubscriber), exit: make(chan chan error), diff --git a/broker/nats_broker.go b/broker/nats_broker.go index 64671690..9cf75cf0 100644 --- a/broker/nats_broker.go +++ b/broker/nats_broker.go @@ -2,6 +2,7 @@ package broker import ( "encoding/json" + "strings" "time" "code.google.com/p/go-uuid/uuid" @@ -9,8 +10,8 @@ import ( ) type NatsBroker struct { - address string - conn *nats.Conn + addrs []string + conn *nats.Conn } type NatsSubscriber struct { @@ -26,7 +27,10 @@ func (n *NatsSubscriber) Unsubscribe() error { } func (n *NatsBroker) Address() string { - return n.address + if len(n.addrs) > 0 { + return n.addrs[0] + } + return "" } func (n *NatsBroker) Connect() error { @@ -34,7 +38,9 @@ func (n *NatsBroker) Connect() error { return nil } - c, err := nats.Connect(n.address) + opts := nats.DefaultOptions + opts.Servers = n.addrs + c, err := opts.Connect() if err != nil { return err } @@ -78,8 +84,16 @@ func (n *NatsBroker) Subscribe(topic string, function func(*Message)) (Subscribe return &NatsSubscriber{s: subscriber}, nil } -func NewNatsBroker(address string) Broker { +func NewNatsBroker(addrs []string, opts ...Options) Broker { + if len(addrs) == 0 { + addrs = []string{nats.DefaultURL} + } + for i, addr := range addrs { + if !strings.HasPrefix(addr, "nats://") { + addrs[i] = "nats://" + addr + } + } return &NatsBroker{ - address: address, + addrs: addrs, } } diff --git a/cmd/cmd.go b/cmd/cmd.go index 41928af6..ff3c12d6 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -2,10 +2,12 @@ package cmd import ( "os" + "strings" "text/tabwriter" "text/template" "github.com/codegangsta/cli" + "github.com/myodc/go-micro/broker" "github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/server" "github.com/myodc/go-micro/store" @@ -13,27 +15,81 @@ import ( var ( Flags = []cli.Flag{ - cli.StringFlag{Name: "bind_address", Value: ":0", Usage: "Bind address for the server. 127.0.0.1:8080"}, - cli.StringFlag{Name: "registry", Value: "consul", Usage: "Registry for discovery. kubernetes, consul, etc"}, - cli.StringFlag{Name: "store", Value: "consul", Usage: "Store used as a basic key/value store using consul, memcached, etc"}, + cli.StringFlag{ + Name: "server_address", + EnvVar: "MICRO_SERVER_ADDRESS", + Value: ":0", + Usage: "Bind address for the server. 127.0.0.1:8080", + }, + cli.StringFlag{ + Name: "broker", + EnvVar: "MICRO_BROKER", + Value: "http", + Usage: "Broker for pub/sub. http, nats, etc", + }, + cli.StringFlag{ + Name: "broker_address", + EnvVar: "MICRO_BROKER_ADDRESS", + Value: ":0", + Usage: "Comma-separated list of broker addresses", + }, + cli.StringFlag{ + Name: "registry", + EnvVar: "MICRO_REGISTRY", + Value: "consul", + Usage: "Registry for discovery. kubernetes, consul, etc", + }, + cli.StringFlag{ + Name: "registry_address", + EnvVar: "MICRO_REGISTRY_ADDRESS", + Value: "127.0.0.1:8500", + Usage: "Comma-separated list of registry addresses", + }, + cli.StringFlag{ + Name: "store", + EnvVar: "MICRO_STORE", + Value: "consul", + Usage: "Store used as a basic key/value store using consul, memcached, etc", + }, + cli.StringFlag{ + Name: "store_address", + EnvVar: "MICRO_STORE_ADDRESS", + Value: "127.0.0.1:8500", + Usage: "Comma-separated list of store addresses", + }, } ) func Setup(c *cli.Context) error { - server.Address = c.String("bind_address") + server.Address = c.String("server_address") + + broker_addrs := strings.Split(c.String("broker_address"), ",") + + switch c.String("broker") { + case "http": + broker.DefaultBroker = broker.NewHttpBroker(broker_addrs) + case "nats": + broker.DefaultBroker = broker.NewNatsBroker(broker_addrs) + } + + registry_addrs := strings.Split(c.String("registry_address"), ",") switch c.String("registry") { case "kubernetes": - registry.DefaultRegistry = registry.NewKubernetesRegistry() + registry.DefaultRegistry = registry.NewKubernetesRegistry(registry_addrs) + case "consul": + registry.DefaultRegistry = registry.NewConsulRegistry(registry_addrs) } + store_addrs := strings.Split(c.String("store_address"), ",") + switch c.String("store") { case "memcached": - store.DefaultStore = store.NewMemcacheStore() + store.DefaultStore = store.NewMemcacheStore(store_addrs) case "memory": - store.DefaultStore = store.NewMemoryStore() + store.DefaultStore = store.NewMemoryStore(store_addrs) case "etcd": - store.DefaultStore = store.NewEtcdStore() + store.DefaultStore = store.NewEtcdStore(store_addrs) } return nil diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 9d7eadce..9abfb278 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -138,9 +138,12 @@ func (c *ConsulRegistry) Watch() { NewConsulWatcher(c) } -func NewConsulRegistry() Registry { +func NewConsulRegistry(addrs []string, opts ...Options) Registry { config := consul.DefaultConfig() client, _ := consul.NewClient(config) + if len(addrs) > 0 { + config.Address = addrs[0] + } cr := &ConsulRegistry{ Address: config.Address, diff --git a/registry/kubernetes_registry.go b/registry/kubernetes_registry.go index ee12091b..6cdf016e 100644 --- a/registry/kubernetes_registry.go +++ b/registry/kubernetes_registry.go @@ -115,9 +115,14 @@ func (c *KubernetesRegistry) NewNode(id, address string, port int) Node { } } -func NewKubernetesRegistry() Registry { +func NewKubernetesRegistry(addrs []string, opts ...Options) Registry { + host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") + if len(addrs) > 0 { + host = addrs[0] + } + client, _ := k8s.New(&k8s.Config{ - Host: "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), + Host: host, }) kr := &KubernetesRegistry{ diff --git a/registry/registry.go b/registry/registry.go index 846c5f5a..60295b73 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -9,8 +9,12 @@ type Registry interface { NewNode(string, string, int) Node } +type options struct{} + +type Options func(*options) + var ( - DefaultRegistry = NewConsulRegistry() + DefaultRegistry = NewConsulRegistry([]string{}) ) func Register(s Service) error { diff --git a/server/rpc_server.go b/server/rpc_server.go index 6c37e835..be7d9712 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -10,6 +10,7 @@ import ( "strconv" "sync" + "github.com/bradfitz/http2" log "github.com/golang/glog" "github.com/myodc/go-micro/errors" rpc "github.com/youtube/vitess/go/rpcplus" @@ -190,7 +191,13 @@ func (s *RpcServer) Start() error { s.address = l.Addr().String() s.mtx.Unlock() - go http.Serve(l, http.HandlerFunc(s.handler)) + srv := &http.Server{ + Handler: http.HandlerFunc(s.handler), + } + + http2.ConfigureServer(srv, nil) + + go srv.Serve(l) go func() { ch := <-s.exit diff --git a/store/consul_store.go b/store/consul_store.go index 55fdec24..194fa06a 100644 --- a/store/consul_store.go +++ b/store/consul_store.go @@ -46,8 +46,13 @@ func (c *ConsulStore) NewItem(key string, value []byte) Item { } } -func NewConsulStore() Store { - client, _ := consul.NewClient(consul.DefaultConfig()) +func NewConsulStore(addrs []string, opts ...Options) Store { + config := consul.DefaultConfig() + if len(addrs) > 0 { + config.Address = addrs[0] + } + + client, _ := consul.NewClient(config) return &ConsulStore{ Client: client, diff --git a/store/etcd_store.go b/store/etcd_store.go index ccc87a37..a221f086 100644 --- a/store/etcd_store.go +++ b/store/etcd_store.go @@ -43,8 +43,12 @@ func (e *EtcdStore) NewItem(key string, value []byte) Item { } } -func NewEtcdStore() Store { - client := etcd.NewClient([]string{}) +func NewEtcdStore(addrs []string, opts ...Options) Store { + if len(addrs) == 0 { + addrs = []string{"127.0.0.1:2379"} + } + + client := etcd.NewClient(addrs) return &EtcdStore{ Client: client, diff --git a/store/memcached_store.go b/store/memcached_store.go index 54eb2306..139e5218 100644 --- a/store/memcached_store.go +++ b/store/memcached_store.go @@ -2,7 +2,6 @@ package store import ( "errors" - "os" mc "github.com/bradfitz/gomemcache/memcache" ) @@ -47,19 +46,11 @@ func (m *MemcacheStore) NewItem(key string, value []byte) Item { } } -func NewMemcacheStore() Store { - server := os.Getenv("MEMCACHED_SERVICE_HOST") - port := os.Getenv("MEMCACHED_SERVICE_PORT") - - if len(server) == 0 { - server = "127.0.0.1" +func NewMemcacheStore(addrs []string, opts ...Options) Store { + if len(addrs) == 0 { + addrs = []string{"127.0.0.1:11211"} } - - if len(port) == 0 { - port = "11211" - } - return &MemcacheStore{ - Client: mc.New(server + ":" + port), + Client: mc.New(addrs...), } } diff --git a/store/memory_store.go b/store/memory_store.go index 2b96ab0c..d85a0d68 100644 --- a/store/memory_store.go +++ b/store/memory_store.go @@ -41,7 +41,7 @@ func (m *MemoryStore) NewItem(key string, value []byte) Item { } } -func NewMemoryStore() Store { +func NewMemoryStore(addrs []string, opts ...Options) Store { return &MemoryStore{ store: make(map[string]Item), } diff --git a/store/store.go b/store/store.go index e07ce125..6cbaadb3 100644 --- a/store/store.go +++ b/store/store.go @@ -7,8 +7,12 @@ type Store interface { NewItem(string, []byte) Item } +type options struct{} + +type Options func(*options) + var ( - DefaultStore = NewConsulStore() + DefaultStore = NewConsulStore([]string{}) ) func Get(key string) (Item, error) {