diff --git a/broker/http_broker.go b/broker/http_broker.go index 0bf4cd17..ae9eed40 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -190,13 +190,14 @@ func (h *httpBroker) Publish(topic string, msg *Message) error { return err } - for _, node := range s.Nodes { - r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b)) - if err == nil { - r.Body.Close() + for _, service := range s { + for _, node := range service.Nodes { + r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b)) + if err == nil { + r.Body.Close() + } } } - return nil } diff --git a/client/node_selector.go b/client/node_selector.go new file mode 100644 index 00000000..c15f7d65 --- /dev/null +++ b/client/node_selector.go @@ -0,0 +1,29 @@ +package client + +import ( + "math/rand" + "time" + + "github.com/myodc/go-micro/errors" + "github.com/myodc/go-micro/registry" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func nodeSelector(service []*registry.Service) (*registry.Node, error) { + if len(service) == 0 { + return nil, errors.NotFound("go.micro.client", "Service not found") + } + + i := rand.Int() + j := i % len(service) + + if len(service[j].Nodes) == 0 { + return nil, errors.NotFound("go.micro.client", "Service not found") + } + + n := i % len(service[j].Nodes) + return service[j].Nodes[n], nil +} diff --git a/client/node_selector_test.go b/client/node_selector_test.go new file mode 100644 index 00000000..2e4a9321 --- /dev/null +++ b/client/node_selector_test.go @@ -0,0 +1,46 @@ +package client + +import ( + "testing" + + "github.com/myodc/go-micro/registry" +) + +func TestNodeSelector(t *testing.T) { + services := []*registry.Service{ + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-123", + Address: "localhost", + Port: 9999, + }, + }, + }, + { + Name: "foo", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "foo-321", + Address: "localhost", + Port: 6666, + }, + }, + }, + } + + counts := map[string]int{} + + for i := 0; i < 100; i++ { + n, err := nodeSelector(services) + if err != nil { + t.Errorf("Expected node, got err: %v", err) + } + counts[n.Id]++ + } + + t.Logf("Counts %v", counts) +} diff --git a/client/rpc_client.go b/client/rpc_client.go index 737bc89e..57a5df2a 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -3,10 +3,8 @@ package client import ( "encoding/json" "fmt" - "math/rand" "net/http" "sync" - "time" "github.com/myodc/go-micro/broker" c "github.com/myodc/go-micro/context" @@ -29,10 +27,6 @@ type rpcClient struct { opts options } -func init() { - rand.Seed(time.Now().UnixNano()) -} - func newRpcClient(opt ...Option) Client { var opts options @@ -129,13 +123,11 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return errors.InternalServerError("go.micro.client", err.Error()) } - if len(service.Nodes) == 0 { - return errors.NotFound("go.micro.client", "Service not found") + node, err := nodeSelector(service) + if err != nil { + return err } - n := rand.Int() % len(service.Nodes) - node := service.Nodes[n] - address := node.Address if node.Port > 0 { address = fmt.Sprintf("%s:%d", address, node.Port) @@ -154,13 +146,11 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in return nil, errors.InternalServerError("go.micro.client", err.Error()) } - if len(service.Nodes) == 0 { - return nil, errors.NotFound("go.micro.client", "Service not found") + node, err := nodeSelector(service) + if err != nil { + return nil, err } - n := rand.Int() % len(service.Nodes) - node := service.Nodes[n] - address := node.Address if node.Port > 0 { address = fmt.Sprintf("%s:%d", address, node.Port) diff --git a/cmd/cmd.go b/cmd/cmd.go index 694e1230..811d16f0 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -39,6 +39,11 @@ var ( EnvVar: "MICRO_SERVER_NAME", Usage: "Name of the server. go.micro.srv.example", }, + cli.StringFlag{ + Name: "server_version", + EnvVar: "MICRO_SERVER_VERSION", + Usage: "Version of the server. 1.1.0", + }, cli.StringFlag{ Name: "server_id", EnvVar: "MICRO_SERVER_ID", @@ -178,6 +183,7 @@ func Setup(c *cli.Context) error { server.DefaultServer = server.NewServer( server.Name(c.String("server_name")), + server.Version(c.String("server_version")), server.Id(c.String("server_id")), server.Address(c.String("server_address")), server.Metadata(metadata), diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 31281137..53754080 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -13,7 +13,7 @@ type consulRegistry struct { Client *consul.Client mtx sync.RWMutex - services map[string]*Service + services map[string][]*Service } func encodeEndpoints(en []*Endpoint) []string { @@ -80,7 +80,7 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry { cr := &consulRegistry{ Address: config.Address, Client: client, - services: make(map[string]*Service), + services: make(map[string][]*Service), } return cr @@ -115,7 +115,7 @@ func (c *consulRegistry) Register(s *Service) error { Node: node.Id, Address: node.Address, Service: &consul.AgentService{ - ID: node.Id, + ID: s.Version, Service: s.Name, Port: node.Port, Tags: tags, @@ -125,7 +125,7 @@ func (c *consulRegistry) Register(s *Service) error { return err } -func (c *consulRegistry) GetService(name string) (*Service, error) { +func (c *consulRegistry) GetService(name string) ([]*Service, error) { c.mtx.RLock() service, ok := c.services[name] c.mtx.RUnlock() @@ -139,24 +139,47 @@ func (c *consulRegistry) GetService(name string) (*Service, error) { return nil, err } - cs := &Service{} + serviceMap := map[string]*Service{} for _, s := range rsp { if s.ServiceName != name { continue } - cs.Endpoints = decodeEndpoints(s.ServiceTags) - cs.Name = s.ServiceName - cs.Nodes = append(cs.Nodes, &Node{ - Id: s.ServiceID, + id := s.Node + key := s.ServiceID + version := s.ServiceID + + // We're adding service version but + // don't want to break backwards compatibility + if id == version { + key = "default" + version = "" + } + + svc, ok := serviceMap[key] + if !ok { + svc = &Service{ + Endpoints: decodeEndpoints(s.ServiceTags), + Name: s.ServiceName, + Version: version, + } + serviceMap[key] = svc + } + + svc.Nodes = append(svc.Nodes, &Node{ + Id: id, Address: s.Address, Port: s.ServicePort, Metadata: decodeMetadata(s.ServiceTags), }) } - return cs, nil + var services []*Service + for _, service := range serviceMap { + services = append(services, service) + } + return services, nil } func (c *consulRegistry) ListServices() ([]*Service, error) { @@ -167,8 +190,8 @@ func (c *consulRegistry) ListServices() ([]*Service, error) { var services []*Service if len(serviceMap) > 0 { - for _, service := range services { - services = append(services, service) + for _, service := range serviceMap { + services = append(services, service...) } return services, nil } diff --git a/registry/consul_watcher.go b/registry/consul_watcher.go index dbe7c2fe..4027bf2c 100644 --- a/registry/consul_watcher.go +++ b/registry/consul_watcher.go @@ -39,13 +39,34 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { return } - cs := &Service{} + serviceMap := map[string]*Service{} + serviceName := "" for _, e := range entries { - cs.Endpoints = decodeEndpoints(e.Service.Tags) - cs.Name = e.Service.Service - cs.Nodes = append(cs.Nodes, &Node{ - Id: e.Service.ID, + serviceName = e.Service.Service + id := e.Node.Node + key := e.Service.Service + e.Service.ID + version := e.Service.ID + + // We're adding service version but + // don't want to break backwards compatibility + if id == version { + key = e.Service.Service + "default" + version = "" + } + + svc, ok := serviceMap[key] + if !ok { + svc = &Service{ + Endpoints: decodeEndpoints(e.Service.Tags), + Name: e.Service.Service, + Version: version, + } + serviceMap[key] = svc + } + + svc.Nodes = append(svc.Nodes, &Node{ + Id: id, Address: e.Node.Address, Port: e.Service.Port, Metadata: decodeMetadata(e.Service.Tags), @@ -53,7 +74,11 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { } cw.Registry.mtx.Lock() - cw.Registry.services[cs.Name] = cs + var services []*Service + for _, service := range serviceMap { + services = append(services, service) + } + cw.Registry.services[serviceName] = services cw.Registry.mtx.Unlock() } diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index eb38d3cc..3bca8004 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -20,7 +20,7 @@ type etcdRegistry struct { client etcd.KeysAPI sync.RWMutex - services map[string]*registry.Service + services map[string][]*registry.Service } func encode(s *registry.Service) string { @@ -85,7 +85,7 @@ func (e *etcdRegistry) Register(s *registry.Service) error { return nil } -func (e *etcdRegistry) GetService(name string) (*registry.Service, error) { +func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { e.RLock() service, ok := e.services[name] e.RUnlock() @@ -99,23 +99,35 @@ func (e *etcdRegistry) GetService(name string) (*registry.Service, error) { return nil, err } - s := ®istry.Service{} + serviceMap := map[string]*registry.Service{} for _, n := range rsp.Node.Nodes { if n.Dir { continue } sn := decode(n.Value) - s.Name = sn.Name - s.Version = sn.Version - s.Metadata = sn.Metadata - s.Endpoints = sn.Endpoints + + s, ok := serviceMap[sn.Version] + if !ok { + s = ®istry.Service{ + Name: sn.Name, + Version: sn.Version, + Metadata: sn.Metadata, + Endpoints: sn.Endpoints, + } + serviceMap[s.Version] = s + } + for _, node := range sn.Nodes { s.Nodes = append(s.Nodes, node) } } - return s, nil + var services []*registry.Service + for _, service := range serviceMap { + services = append(services, service) + } + return services, nil } func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { @@ -126,8 +138,8 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { var services []*registry.Service if len(serviceMap) > 0 { - for _, service := range services { - services = append(services, service) + for _, service := range serviceMap { + services = append(services, service...) } return services, nil } @@ -139,15 +151,10 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { for _, node := range rsp.Node.Nodes { service := ®istry.Service{} - for _, n := range node.Nodes { i := decode(n.Value) service.Name = i.Name - for _, in := range i.Nodes { - service.Nodes = append(service.Nodes, in) - } } - services = append(services, service) } @@ -179,7 +186,7 @@ func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { e := &etcdRegistry{ client: etcd.NewKeysAPI(c), - services: make(map[string]*registry.Service), + services: make(map[string][]*registry.Service), } return e diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go index 0dee8a56..0162ccf6 100644 --- a/registry/etcd/watcher.go +++ b/registry/etcd/watcher.go @@ -11,6 +11,77 @@ type etcdWatcher struct { stop chan bool } +func addNodes(old, neu []*registry.Node) []*registry.Node { + for _, n := range neu { + var seen bool + for i, o := range old { + if o.Id == n.Id { + seen = true + old[i] = n + break + } + } + if !seen { + old = append(old, n) + } + } + return old +} + +func addServices(old, neu []*registry.Service) []*registry.Service { + for _, s := range neu { + var seen bool + for i, o := range old { + if o.Version == s.Version { + s.Nodes = addNodes(o.Nodes, s.Nodes) + seen = true + old[i] = s + break + } + } + if !seen { + old = append(old, s) + } + } + return old +} + +func delNodes(old, del []*registry.Node) []*registry.Node { + var nodes []*registry.Node + for _, o := range old { + var rem bool + for _, n := range del { + if o.Id == n.Id { + rem = true + break + } + } + if !rem { + nodes = append(nodes, o) + } + } + return nodes +} + +func delServices(old, del []*registry.Service) []*registry.Service { + var services []*registry.Service + for i, o := range old { + var rem bool + for _, s := range del { + if o.Version == s.Version { + old[i].Nodes = delNodes(o.Nodes, s.Nodes) + if len(old[i].Nodes) == 0 { + rem = true + } + } + } + if !rem { + services = append(services, o) + } + } + return services +} + func newEtcdWatcher(r *etcdRegistry) (registry.Watcher, error) { ew := &etcdWatcher{ registry: r, @@ -53,7 +124,7 @@ func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) { service, ok := e.registry.services[s.Name] if !ok { if rsp.Action == "create" { - e.registry.services[s.Name] = s + e.registry.services[s.Name] = []*registry.Service{s} } e.registry.Unlock() continue @@ -61,22 +132,14 @@ func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) { switch rsp.Action { case "delete": - var nodes []*registry.Node - for _, node := range service.Nodes { - var seen bool - for _, n := range s.Nodes { - if node.Id == n.Id { - seen = true - break - } - } - if !seen { - nodes = append(nodes, node) - } + services := delServices(service, []*registry.Service{s}) + if len(services) > 0 { + e.registry.services[s.Name] = services + } else { + delete(e.registry.services, s.Name) } - service.Nodes = nodes case "create": - service.Nodes = append(service.Nodes, s.Nodes...) + e.registry.services[s.Name] = addServices(service, []*registry.Service{s}) } e.registry.Unlock() diff --git a/registry/memory/memory.go b/registry/memory/memory.go index 9867dbbf..d1089247 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -35,7 +35,7 @@ type memoryRegistry struct { updates chan *update sync.RWMutex - services map[string]*registry.Service + services map[string][]*registry.Service } type update struct { @@ -96,7 +96,7 @@ func (d *delegate) LocalState(join bool) []byte { } syncCh := make(chan *registry.Service, 1) - m := map[string]*registry.Service{} + m := map[string][]*registry.Service{} d.updates <- &update{ Action: syncAction, @@ -104,7 +104,7 @@ func (d *delegate) LocalState(join bool) []byte { } for s := range syncCh { - m[s.Name] = s + m[s.Name] = append(m[s.Name], s) } b, _ := json.Marshal(m) @@ -119,16 +119,18 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) { return } - var m map[string]*registry.Service + var m map[string][]*registry.Service if err := json.Unmarshal(buf, &m); err != nil { return } - for _, service := range m { - d.updates <- &update{ - Action: addAction, - Service: service, - sync: nil, + for _, services := range m { + for _, service := range services { + d.updates <- &update{ + Action: addAction, + Service: service, + sync: nil, + } } } } @@ -138,19 +140,31 @@ func (m *memoryRegistry) run() { switch u.Action { case addAction: m.Lock() - m.services[u.Service.Name] = u.Service + if service, ok := m.services[u.Service.Name]; !ok { + m.services[u.Service.Name] = []*registry.Service{u.Service} + } else { + m.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service}) + } m.Unlock() case delAction: m.Lock() - delete(m.services, u.Service.Name) + if service, ok := m.services[u.Service.Name]; ok { + if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { + delete(m.services, u.Service.Name) + } else { + m.services[u.Service.Name] = services + } + } m.Unlock() case syncAction: if u.sync == nil { continue } m.RLock() - for _, service := range m.services { - u.sync <- service + for _, services := range m.services { + for _, service := range services { + u.sync <- service + } } m.RUnlock() close(u.sync) @@ -160,7 +174,11 @@ func (m *memoryRegistry) run() { func (m *memoryRegistry) Register(s *registry.Service) error { m.Lock() - m.services[s.Name] = s + if service, ok := m.services[s.Name]; !ok { + m.services[s.Name] = []*registry.Service{s} + } else { + m.services[s.Name] = addServices(service, []*registry.Service{s}) + } m.Unlock() b, _ := json.Marshal([]*update{ @@ -180,7 +198,13 @@ func (m *memoryRegistry) Register(s *registry.Service) error { func (m *memoryRegistry) Deregister(s *registry.Service) error { m.Lock() - delete(m.services, s.Name) + if service, ok := m.services[s.Name]; ok { + if services := delServices(service, []*registry.Service{s}); len(services) == 0 { + delete(m.services, s.Name) + } else { + m.services[s.Name] = services + } + } m.Unlock() b, _ := json.Marshal([]*update{ @@ -198,7 +222,7 @@ func (m *memoryRegistry) Deregister(s *registry.Service) error { return nil } -func (m *memoryRegistry) GetService(name string) (*registry.Service, error) { +func (m *memoryRegistry) GetService(name string) ([]*registry.Service, error) { m.RLock() service, ok := m.services[name] m.RUnlock() @@ -212,7 +236,7 @@ func (m *memoryRegistry) ListServices() ([]*registry.Service, error) { var services []*registry.Service m.RLock() for _, service := range m.services { - services = append(services, service) + services = append(services, service...) } m.RUnlock() return services, nil @@ -226,6 +250,77 @@ func (w *watcher) Stop() { return } +func addNodes(old, neu []*registry.Node) []*registry.Node { + for _, n := range neu { + var seen bool + for i, o := range old { + if o.Id == n.Id { + seen = true + old[i] = n + break + } + } + if !seen { + old = append(old, n) + } + } + return old +} + +func addServices(old, neu []*registry.Service) []*registry.Service { + for _, s := range neu { + var seen bool + for i, o := range old { + if o.Version == s.Version { + s.Nodes = addNodes(o.Nodes, s.Nodes) + seen = true + old[i] = s + break + } + } + if !seen { + old = append(old, s) + } + } + return old +} + +func delNodes(old, del []*registry.Node) []*registry.Node { + var nodes []*registry.Node + for _, o := range old { + var rem bool + for _, n := range del { + if o.Id == n.Id { + rem = true + break + } + } + if !rem { + nodes = append(nodes, o) + } + } + return nodes +} + +func delServices(old, del []*registry.Service) []*registry.Service { + var services []*registry.Service + for i, o := range old { + var rem bool + for _, s := range del { + if o.Version == s.Version { + old[i].Nodes = delNodes(o.Nodes, s.Nodes) + if len(old[i].Nodes) == 0 { + rem = true + } + } + } + if !rem { + services = append(services, o) + } + } + return services +} + func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { cAddrs := []string{} hostname, _ := os.Hostname() @@ -246,7 +341,7 @@ func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { mr := &memoryRegistry{ broadcasts: broadcasts, - services: make(map[string]*registry.Service), + services: make(map[string][]*registry.Service), updates: updates, } diff --git a/registry/memory/memory_test.go b/registry/memory/memory_test.go new file mode 100644 index 00000000..d92f6155 --- /dev/null +++ b/registry/memory/memory_test.go @@ -0,0 +1,78 @@ +package memory + +import ( + "testing" + + "github.com/myodc/go-micro/registry" +) + +func TestDelServices(t *testing.T) { + services := []*registry.Service{ + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-123", + Address: "localhost", + Port: 9999, + }, + }, + }, + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-123", + Address: "localhost", + Port: 6666, + }, + }, + }, + } + + servs := delServices([]*registry.Service{services[0]}, []*registry.Service{services[1]}) + if i := len(servs); i > 0 { + t.Errorf("Expected 0 nodes, got %d: %+v", i, servs) + } + t.Logf("Services %+v", servs) +} + +func TestDelNodes(t *testing.T) { + services := []*registry.Service{ + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-123", + Address: "localhost", + Port: 9999, + }, + { + Id: "foo-321", + Address: "localhost", + Port: 6666, + }, + }, + }, + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-123", + Address: "localhost", + Port: 6666, + }, + }, + }, + } + + nodes := delNodes(services[0].Nodes, services[1].Nodes) + if i := len(nodes); i != 1 { + t.Errorf("Expected only 1 node, got %d: %+v", i, nodes) + } + t.Logf("Nodes %+v", nodes) +} diff --git a/registry/registry.go b/registry/registry.go index e9fbc7d5..93a5a45a 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -3,7 +3,7 @@ package registry type Registry interface { Register(*Service) error Deregister(*Service) error - GetService(string) (*Service, error) + GetService(string) ([]*Service, error) ListServices() ([]*Service, error) Watch() (Watcher, error) } @@ -32,7 +32,7 @@ func Deregister(s *Service) error { return DefaultRegistry.Deregister(s) } -func GetService(name string) (*Service, error) { +func GetService(name string) ([]*Service, error) { return DefaultRegistry.GetService(name) }