Allow configurable addresses for everything

This commit is contained in:
Asim 2015-05-16 00:34:02 +01:00
parent c77be7c571
commit 0e7bd77f4c
13 changed files with 143 additions and 41 deletions

View File

@ -25,6 +25,10 @@ type Subscriber interface {
Unsubscribe() error Unsubscribe() error
} }
type options struct{}
type Options func(*options)
var ( var (
Address string Address string
Id string Id string
@ -37,7 +41,7 @@ func Init() error {
} }
if DefaultBroker == nil { if DefaultBroker == nil {
DefaultBroker = NewHttpBroker(Address) DefaultBroker = NewHttpBroker([]string{Address})
} }
return DefaultBroker.Init() return DefaultBroker.Init()

View File

@ -225,10 +225,15 @@ func (h *HttpBroker) Subscribe(topic string, function func(*Message)) (Subscribe
return subscriber, nil return subscriber, nil
} }
func NewHttpBroker(address string) Broker { func NewHttpBroker(addrs []string, opts ...Options) Broker {
addr := ":0"
if len(addrs) > 0 {
addr = addrs[0]
}
return &HttpBroker{ return &HttpBroker{
id: Id, id: Id,
address: address, address: addr,
subscribers: make(map[string][]*HttpSubscriber), subscribers: make(map[string][]*HttpSubscriber),
unsubscribe: make(chan *HttpSubscriber), unsubscribe: make(chan *HttpSubscriber),
exit: make(chan chan error), exit: make(chan chan error),

View File

@ -2,6 +2,7 @@ package broker
import ( import (
"encoding/json" "encoding/json"
"strings"
"time" "time"
"code.google.com/p/go-uuid/uuid" "code.google.com/p/go-uuid/uuid"
@ -9,8 +10,8 @@ import (
) )
type NatsBroker struct { type NatsBroker struct {
address string addrs []string
conn *nats.Conn conn *nats.Conn
} }
type NatsSubscriber struct { type NatsSubscriber struct {
@ -26,7 +27,10 @@ func (n *NatsSubscriber) Unsubscribe() error {
} }
func (n *NatsBroker) Address() string { func (n *NatsBroker) Address() string {
return n.address if len(n.addrs) > 0 {
return n.addrs[0]
}
return ""
} }
func (n *NatsBroker) Connect() error { func (n *NatsBroker) Connect() error {
@ -34,7 +38,9 @@ func (n *NatsBroker) Connect() error {
return nil return nil
} }
c, err := nats.Connect(n.address) opts := nats.DefaultOptions
opts.Servers = n.addrs
c, err := opts.Connect()
if err != nil { if err != nil {
return err return err
} }
@ -78,8 +84,16 @@ func (n *NatsBroker) Subscribe(topic string, function func(*Message)) (Subscribe
return &NatsSubscriber{s: subscriber}, nil return &NatsSubscriber{s: subscriber}, nil
} }
func NewNatsBroker(address string) Broker { func NewNatsBroker(addrs []string, opts ...Options) Broker {
if len(addrs) == 0 {
addrs = []string{nats.DefaultURL}
}
for i, addr := range addrs {
if !strings.HasPrefix(addr, "nats://") {
addrs[i] = "nats://" + addr
}
}
return &NatsBroker{ return &NatsBroker{
address: address, addrs: addrs,
} }
} }

View File

@ -2,10 +2,12 @@ package cmd
import ( import (
"os" "os"
"strings"
"text/tabwriter" "text/tabwriter"
"text/template" "text/template"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/myodc/go-micro/broker"
"github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/registry"
"github.com/myodc/go-micro/server" "github.com/myodc/go-micro/server"
"github.com/myodc/go-micro/store" "github.com/myodc/go-micro/store"
@ -13,27 +15,81 @@ import (
var ( var (
Flags = []cli.Flag{ Flags = []cli.Flag{
cli.StringFlag{Name: "bind_address", Value: ":0", Usage: "Bind address for the server. 127.0.0.1:8080"}, cli.StringFlag{
cli.StringFlag{Name: "registry", Value: "consul", Usage: "Registry for discovery. kubernetes, consul, etc"}, Name: "server_address",
cli.StringFlag{Name: "store", Value: "consul", Usage: "Store used as a basic key/value store using consul, memcached, etc"}, EnvVar: "MICRO_SERVER_ADDRESS",
Value: ":0",
Usage: "Bind address for the server. 127.0.0.1:8080",
},
cli.StringFlag{
Name: "broker",
EnvVar: "MICRO_BROKER",
Value: "http",
Usage: "Broker for pub/sub. http, nats, etc",
},
cli.StringFlag{
Name: "broker_address",
EnvVar: "MICRO_BROKER_ADDRESS",
Value: ":0",
Usage: "Comma-separated list of broker addresses",
},
cli.StringFlag{
Name: "registry",
EnvVar: "MICRO_REGISTRY",
Value: "consul",
Usage: "Registry for discovery. kubernetes, consul, etc",
},
cli.StringFlag{
Name: "registry_address",
EnvVar: "MICRO_REGISTRY_ADDRESS",
Value: "127.0.0.1:8500",
Usage: "Comma-separated list of registry addresses",
},
cli.StringFlag{
Name: "store",
EnvVar: "MICRO_STORE",
Value: "consul",
Usage: "Store used as a basic key/value store using consul, memcached, etc",
},
cli.StringFlag{
Name: "store_address",
EnvVar: "MICRO_STORE_ADDRESS",
Value: "127.0.0.1:8500",
Usage: "Comma-separated list of store addresses",
},
} }
) )
func Setup(c *cli.Context) error { func Setup(c *cli.Context) error {
server.Address = c.String("bind_address") server.Address = c.String("server_address")
broker_addrs := strings.Split(c.String("broker_address"), ",")
switch c.String("broker") {
case "http":
broker.DefaultBroker = broker.NewHttpBroker(broker_addrs)
case "nats":
broker.DefaultBroker = broker.NewNatsBroker(broker_addrs)
}
registry_addrs := strings.Split(c.String("registry_address"), ",")
switch c.String("registry") { switch c.String("registry") {
case "kubernetes": case "kubernetes":
registry.DefaultRegistry = registry.NewKubernetesRegistry() registry.DefaultRegistry = registry.NewKubernetesRegistry(registry_addrs)
case "consul":
registry.DefaultRegistry = registry.NewConsulRegistry(registry_addrs)
} }
store_addrs := strings.Split(c.String("store_address"), ",")
switch c.String("store") { switch c.String("store") {
case "memcached": case "memcached":
store.DefaultStore = store.NewMemcacheStore() store.DefaultStore = store.NewMemcacheStore(store_addrs)
case "memory": case "memory":
store.DefaultStore = store.NewMemoryStore() store.DefaultStore = store.NewMemoryStore(store_addrs)
case "etcd": case "etcd":
store.DefaultStore = store.NewEtcdStore() store.DefaultStore = store.NewEtcdStore(store_addrs)
} }
return nil return nil

View File

@ -138,9 +138,12 @@ func (c *ConsulRegistry) Watch() {
NewConsulWatcher(c) NewConsulWatcher(c)
} }
func NewConsulRegistry() Registry { func NewConsulRegistry(addrs []string, opts ...Options) Registry {
config := consul.DefaultConfig() config := consul.DefaultConfig()
client, _ := consul.NewClient(config) client, _ := consul.NewClient(config)
if len(addrs) > 0 {
config.Address = addrs[0]
}
cr := &ConsulRegistry{ cr := &ConsulRegistry{
Address: config.Address, Address: config.Address,

View File

@ -115,9 +115,14 @@ func (c *KubernetesRegistry) NewNode(id, address string, port int) Node {
} }
} }
func NewKubernetesRegistry() Registry { func NewKubernetesRegistry(addrs []string, opts ...Options) Registry {
host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT")
if len(addrs) > 0 {
host = addrs[0]
}
client, _ := k8s.New(&k8s.Config{ client, _ := k8s.New(&k8s.Config{
Host: "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), Host: host,
}) })
kr := &KubernetesRegistry{ kr := &KubernetesRegistry{

View File

@ -9,8 +9,12 @@ type Registry interface {
NewNode(string, string, int) Node NewNode(string, string, int) Node
} }
type options struct{}
type Options func(*options)
var ( var (
DefaultRegistry = NewConsulRegistry() DefaultRegistry = NewConsulRegistry([]string{})
) )
func Register(s Service) error { func Register(s Service) error {

View File

@ -10,6 +10,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/bradfitz/http2"
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/myodc/go-micro/errors" "github.com/myodc/go-micro/errors"
rpc "github.com/youtube/vitess/go/rpcplus" rpc "github.com/youtube/vitess/go/rpcplus"
@ -190,7 +191,13 @@ func (s *RpcServer) Start() error {
s.address = l.Addr().String() s.address = l.Addr().String()
s.mtx.Unlock() s.mtx.Unlock()
go http.Serve(l, http.HandlerFunc(s.handler)) srv := &http.Server{
Handler: http.HandlerFunc(s.handler),
}
http2.ConfigureServer(srv, nil)
go srv.Serve(l)
go func() { go func() {
ch := <-s.exit ch := <-s.exit

View File

@ -46,8 +46,13 @@ func (c *ConsulStore) NewItem(key string, value []byte) Item {
} }
} }
func NewConsulStore() Store { func NewConsulStore(addrs []string, opts ...Options) Store {
client, _ := consul.NewClient(consul.DefaultConfig()) config := consul.DefaultConfig()
if len(addrs) > 0 {
config.Address = addrs[0]
}
client, _ := consul.NewClient(config)
return &ConsulStore{ return &ConsulStore{
Client: client, Client: client,

View File

@ -43,8 +43,12 @@ func (e *EtcdStore) NewItem(key string, value []byte) Item {
} }
} }
func NewEtcdStore() Store { func NewEtcdStore(addrs []string, opts ...Options) Store {
client := etcd.NewClient([]string{}) if len(addrs) == 0 {
addrs = []string{"127.0.0.1:2379"}
}
client := etcd.NewClient(addrs)
return &EtcdStore{ return &EtcdStore{
Client: client, Client: client,

View File

@ -2,7 +2,6 @@ package store
import ( import (
"errors" "errors"
"os"
mc "github.com/bradfitz/gomemcache/memcache" mc "github.com/bradfitz/gomemcache/memcache"
) )
@ -47,19 +46,11 @@ func (m *MemcacheStore) NewItem(key string, value []byte) Item {
} }
} }
func NewMemcacheStore() Store { func NewMemcacheStore(addrs []string, opts ...Options) Store {
server := os.Getenv("MEMCACHED_SERVICE_HOST") if len(addrs) == 0 {
port := os.Getenv("MEMCACHED_SERVICE_PORT") addrs = []string{"127.0.0.1:11211"}
if len(server) == 0 {
server = "127.0.0.1"
} }
if len(port) == 0 {
port = "11211"
}
return &MemcacheStore{ return &MemcacheStore{
Client: mc.New(server + ":" + port), Client: mc.New(addrs...),
} }
} }

View File

@ -41,7 +41,7 @@ func (m *MemoryStore) NewItem(key string, value []byte) Item {
} }
} }
func NewMemoryStore() Store { func NewMemoryStore(addrs []string, opts ...Options) Store {
return &MemoryStore{ return &MemoryStore{
store: make(map[string]Item), store: make(map[string]Item),
} }

View File

@ -7,8 +7,12 @@ type Store interface {
NewItem(string, []byte) Item NewItem(string, []byte) Item
} }
type options struct{}
type Options func(*options)
var ( var (
DefaultStore = NewConsulStore() DefaultStore = NewConsulStore([]string{})
) )
func Get(key string) (Item, error) { func Get(key string) (Item, error) {