Update registry so version is surfaced
This commit is contained in:
		| @@ -190,13 +190,14 @@ func (h *httpBroker) Publish(topic string, msg *Message) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	for _, node := range s.Nodes { | ||||
| 		r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b)) | ||||
| 		if err == nil { | ||||
| 			r.Body.Close() | ||||
| 	for _, service := range s { | ||||
| 		for _, node := range service.Nodes { | ||||
| 			r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b)) | ||||
| 			if err == nil { | ||||
| 				r.Body.Close() | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										29
									
								
								client/node_selector.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								client/node_selector.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,29 @@ | ||||
| package client | ||||
|  | ||||
| import ( | ||||
| 	"math/rand" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/myodc/go-micro/errors" | ||||
| 	"github.com/myodc/go-micro/registry" | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	rand.Seed(time.Now().UnixNano()) | ||||
| } | ||||
|  | ||||
| func nodeSelector(service []*registry.Service) (*registry.Node, error) { | ||||
| 	if len(service) == 0 { | ||||
| 		return nil, errors.NotFound("go.micro.client", "Service not found") | ||||
| 	} | ||||
|  | ||||
| 	i := rand.Int() | ||||
| 	j := i % len(service) | ||||
|  | ||||
| 	if len(service[j].Nodes) == 0 { | ||||
| 		return nil, errors.NotFound("go.micro.client", "Service not found") | ||||
| 	} | ||||
|  | ||||
| 	n := i % len(service[j].Nodes) | ||||
| 	return service[j].Nodes[n], nil | ||||
| } | ||||
							
								
								
									
										46
									
								
								client/node_selector_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								client/node_selector_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,46 @@ | ||||
| package client | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/myodc/go-micro/registry" | ||||
| ) | ||||
|  | ||||
| func TestNodeSelector(t *testing.T) { | ||||
| 	services := []*registry.Service{ | ||||
| 		{ | ||||
| 			Name:    "foo", | ||||
| 			Version: "1.0.0", | ||||
| 			Nodes: []*registry.Node{ | ||||
| 				{ | ||||
| 					Id:      "foo-123", | ||||
| 					Address: "localhost", | ||||
| 					Port:    9999, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:    "foo", | ||||
| 			Version: "1.0.1", | ||||
| 			Nodes: []*registry.Node{ | ||||
| 				{ | ||||
| 					Id:      "foo-321", | ||||
| 					Address: "localhost", | ||||
| 					Port:    6666, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	counts := map[string]int{} | ||||
|  | ||||
| 	for i := 0; i < 100; i++ { | ||||
| 		n, err := nodeSelector(services) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("Expected node, got err: %v", err) | ||||
| 		} | ||||
| 		counts[n.Id]++ | ||||
| 	} | ||||
|  | ||||
| 	t.Logf("Counts %v", counts) | ||||
| } | ||||
| @@ -3,10 +3,8 @@ package client | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/myodc/go-micro/broker" | ||||
| 	c "github.com/myodc/go-micro/context" | ||||
| @@ -29,10 +27,6 @@ type rpcClient struct { | ||||
| 	opts options | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	rand.Seed(time.Now().UnixNano()) | ||||
| } | ||||
|  | ||||
| func newRpcClient(opt ...Option) Client { | ||||
| 	var opts options | ||||
|  | ||||
| @@ -129,13 +123,11 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac | ||||
| 		return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	if len(service.Nodes) == 0 { | ||||
| 		return errors.NotFound("go.micro.client", "Service not found") | ||||
| 	node, err := nodeSelector(service) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	n := rand.Int() % len(service.Nodes) | ||||
| 	node := service.Nodes[n] | ||||
|  | ||||
| 	address := node.Address | ||||
| 	if node.Port > 0 { | ||||
| 		address = fmt.Sprintf("%s:%d", address, node.Port) | ||||
| @@ -154,13 +146,11 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in | ||||
| 		return nil, errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	if len(service.Nodes) == 0 { | ||||
| 		return nil, errors.NotFound("go.micro.client", "Service not found") | ||||
| 	node, err := nodeSelector(service) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	n := rand.Int() % len(service.Nodes) | ||||
| 	node := service.Nodes[n] | ||||
|  | ||||
| 	address := node.Address | ||||
| 	if node.Port > 0 { | ||||
| 		address = fmt.Sprintf("%s:%d", address, node.Port) | ||||
|   | ||||
| @@ -39,6 +39,11 @@ var ( | ||||
| 			EnvVar: "MICRO_SERVER_NAME", | ||||
| 			Usage:  "Name of the server. go.micro.srv.example", | ||||
| 		}, | ||||
| 		cli.StringFlag{ | ||||
| 			Name:   "server_version", | ||||
| 			EnvVar: "MICRO_SERVER_VERSION", | ||||
| 			Usage:  "Version of the server. 1.1.0", | ||||
| 		}, | ||||
| 		cli.StringFlag{ | ||||
| 			Name:   "server_id", | ||||
| 			EnvVar: "MICRO_SERVER_ID", | ||||
| @@ -178,6 +183,7 @@ func Setup(c *cli.Context) error { | ||||
|  | ||||
| 	server.DefaultServer = server.NewServer( | ||||
| 		server.Name(c.String("server_name")), | ||||
| 		server.Version(c.String("server_version")), | ||||
| 		server.Id(c.String("server_id")), | ||||
| 		server.Address(c.String("server_address")), | ||||
| 		server.Metadata(metadata), | ||||
|   | ||||
| @@ -13,7 +13,7 @@ type consulRegistry struct { | ||||
| 	Client  *consul.Client | ||||
|  | ||||
| 	mtx      sync.RWMutex | ||||
| 	services map[string]*Service | ||||
| 	services map[string][]*Service | ||||
| } | ||||
|  | ||||
| func encodeEndpoints(en []*Endpoint) []string { | ||||
| @@ -80,7 +80,7 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry { | ||||
| 	cr := &consulRegistry{ | ||||
| 		Address:  config.Address, | ||||
| 		Client:   client, | ||||
| 		services: make(map[string]*Service), | ||||
| 		services: make(map[string][]*Service), | ||||
| 	} | ||||
|  | ||||
| 	return cr | ||||
| @@ -115,7 +115,7 @@ func (c *consulRegistry) Register(s *Service) error { | ||||
| 		Node:    node.Id, | ||||
| 		Address: node.Address, | ||||
| 		Service: &consul.AgentService{ | ||||
| 			ID:      node.Id, | ||||
| 			ID:      s.Version, | ||||
| 			Service: s.Name, | ||||
| 			Port:    node.Port, | ||||
| 			Tags:    tags, | ||||
| @@ -125,7 +125,7 @@ func (c *consulRegistry) Register(s *Service) error { | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (c *consulRegistry) GetService(name string) (*Service, error) { | ||||
| func (c *consulRegistry) GetService(name string) ([]*Service, error) { | ||||
| 	c.mtx.RLock() | ||||
| 	service, ok := c.services[name] | ||||
| 	c.mtx.RUnlock() | ||||
| @@ -139,24 +139,47 @@ func (c *consulRegistry) GetService(name string) (*Service, error) { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	cs := &Service{} | ||||
| 	serviceMap := map[string]*Service{} | ||||
|  | ||||
| 	for _, s := range rsp { | ||||
| 		if s.ServiceName != name { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		cs.Endpoints = decodeEndpoints(s.ServiceTags) | ||||
| 		cs.Name = s.ServiceName | ||||
| 		cs.Nodes = append(cs.Nodes, &Node{ | ||||
| 			Id:       s.ServiceID, | ||||
| 		id := s.Node | ||||
| 		key := s.ServiceID | ||||
| 		version := s.ServiceID | ||||
|  | ||||
| 		// We're adding service version but | ||||
| 		// don't want to break backwards compatibility | ||||
| 		if id == version { | ||||
| 			key = "default" | ||||
| 			version = "" | ||||
| 		} | ||||
|  | ||||
| 		svc, ok := serviceMap[key] | ||||
| 		if !ok { | ||||
| 			svc = &Service{ | ||||
| 				Endpoints: decodeEndpoints(s.ServiceTags), | ||||
| 				Name:      s.ServiceName, | ||||
| 				Version:   version, | ||||
| 			} | ||||
| 			serviceMap[key] = svc | ||||
| 		} | ||||
|  | ||||
| 		svc.Nodes = append(svc.Nodes, &Node{ | ||||
| 			Id:       id, | ||||
| 			Address:  s.Address, | ||||
| 			Port:     s.ServicePort, | ||||
| 			Metadata: decodeMetadata(s.ServiceTags), | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	return cs, nil | ||||
| 	var services []*Service | ||||
| 	for _, service := range serviceMap { | ||||
| 		services = append(services, service) | ||||
| 	} | ||||
| 	return services, nil | ||||
| } | ||||
|  | ||||
| func (c *consulRegistry) ListServices() ([]*Service, error) { | ||||
| @@ -167,8 +190,8 @@ func (c *consulRegistry) ListServices() ([]*Service, error) { | ||||
| 	var services []*Service | ||||
|  | ||||
| 	if len(serviceMap) > 0 { | ||||
| 		for _, service := range services { | ||||
| 			services = append(services, service) | ||||
| 		for _, service := range serviceMap { | ||||
| 			services = append(services, service...) | ||||
| 		} | ||||
| 		return services, nil | ||||
| 	} | ||||
|   | ||||
| @@ -39,13 +39,34 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	cs := &Service{} | ||||
| 	serviceMap := map[string]*Service{} | ||||
| 	serviceName := "" | ||||
|  | ||||
| 	for _, e := range entries { | ||||
| 		cs.Endpoints = decodeEndpoints(e.Service.Tags) | ||||
| 		cs.Name = e.Service.Service | ||||
| 		cs.Nodes = append(cs.Nodes, &Node{ | ||||
| 			Id:       e.Service.ID, | ||||
| 		serviceName = e.Service.Service | ||||
| 		id := e.Node.Node | ||||
| 		key := e.Service.Service + e.Service.ID | ||||
| 		version := e.Service.ID | ||||
|  | ||||
| 		// We're adding service version but | ||||
| 		// don't want to break backwards compatibility | ||||
| 		if id == version { | ||||
| 			key = e.Service.Service + "default" | ||||
| 			version = "" | ||||
| 		} | ||||
|  | ||||
| 		svc, ok := serviceMap[key] | ||||
| 		if !ok { | ||||
| 			svc = &Service{ | ||||
| 				Endpoints: decodeEndpoints(e.Service.Tags), | ||||
| 				Name:      e.Service.Service, | ||||
| 				Version:   version, | ||||
| 			} | ||||
| 			serviceMap[key] = svc | ||||
| 		} | ||||
|  | ||||
| 		svc.Nodes = append(svc.Nodes, &Node{ | ||||
| 			Id:       id, | ||||
| 			Address:  e.Node.Address, | ||||
| 			Port:     e.Service.Port, | ||||
| 			Metadata: decodeMetadata(e.Service.Tags), | ||||
| @@ -53,7 +74,11 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { | ||||
| 	} | ||||
|  | ||||
| 	cw.Registry.mtx.Lock() | ||||
| 	cw.Registry.services[cs.Name] = cs | ||||
| 	var services []*Service | ||||
| 	for _, service := range serviceMap { | ||||
| 		services = append(services, service) | ||||
| 	} | ||||
| 	cw.Registry.services[serviceName] = services | ||||
| 	cw.Registry.mtx.Unlock() | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -20,7 +20,7 @@ type etcdRegistry struct { | ||||
| 	client etcd.KeysAPI | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	services map[string]*registry.Service | ||||
| 	services map[string][]*registry.Service | ||||
| } | ||||
|  | ||||
| func encode(s *registry.Service) string { | ||||
| @@ -85,7 +85,7 @@ func (e *etcdRegistry) Register(s *registry.Service) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) GetService(name string) (*registry.Service, error) { | ||||
| func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { | ||||
| 	e.RLock() | ||||
| 	service, ok := e.services[name] | ||||
| 	e.RUnlock() | ||||
| @@ -99,23 +99,35 @@ func (e *etcdRegistry) GetService(name string) (*registry.Service, error) { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	s := ®istry.Service{} | ||||
| 	serviceMap := map[string]*registry.Service{} | ||||
|  | ||||
| 	for _, n := range rsp.Node.Nodes { | ||||
| 		if n.Dir { | ||||
| 			continue | ||||
| 		} | ||||
| 		sn := decode(n.Value) | ||||
| 		s.Name = sn.Name | ||||
| 		s.Version = sn.Version | ||||
| 		s.Metadata = sn.Metadata | ||||
| 		s.Endpoints = sn.Endpoints | ||||
|  | ||||
| 		s, ok := serviceMap[sn.Version] | ||||
| 		if !ok { | ||||
| 			s = ®istry.Service{ | ||||
| 				Name:      sn.Name, | ||||
| 				Version:   sn.Version, | ||||
| 				Metadata:  sn.Metadata, | ||||
| 				Endpoints: sn.Endpoints, | ||||
| 			} | ||||
| 			serviceMap[s.Version] = s | ||||
| 		} | ||||
|  | ||||
| 		for _, node := range sn.Nodes { | ||||
| 			s.Nodes = append(s.Nodes, node) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return s, nil | ||||
| 	var services []*registry.Service | ||||
| 	for _, service := range serviceMap { | ||||
| 		services = append(services, service) | ||||
| 	} | ||||
| 	return services, nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { | ||||
| @@ -126,8 +138,8 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { | ||||
| 	var services []*registry.Service | ||||
|  | ||||
| 	if len(serviceMap) > 0 { | ||||
| 		for _, service := range services { | ||||
| 			services = append(services, service) | ||||
| 		for _, service := range serviceMap { | ||||
| 			services = append(services, service...) | ||||
| 		} | ||||
| 		return services, nil | ||||
| 	} | ||||
| @@ -139,15 +151,10 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { | ||||
|  | ||||
| 	for _, node := range rsp.Node.Nodes { | ||||
| 		service := ®istry.Service{} | ||||
|  | ||||
| 		for _, n := range node.Nodes { | ||||
| 			i := decode(n.Value) | ||||
| 			service.Name = i.Name | ||||
| 			for _, in := range i.Nodes { | ||||
| 				service.Nodes = append(service.Nodes, in) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		services = append(services, service) | ||||
| 	} | ||||
|  | ||||
| @@ -179,7 +186,7 @@ func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { | ||||
|  | ||||
| 	e := &etcdRegistry{ | ||||
| 		client:   etcd.NewKeysAPI(c), | ||||
| 		services: make(map[string]*registry.Service), | ||||
| 		services: make(map[string][]*registry.Service), | ||||
| 	} | ||||
|  | ||||
| 	return e | ||||
|   | ||||
| @@ -11,6 +11,77 @@ type etcdWatcher struct { | ||||
| 	stop     chan bool | ||||
| } | ||||
|  | ||||
| func addNodes(old, neu []*registry.Node) []*registry.Node { | ||||
| 	for _, n := range neu { | ||||
| 		var seen bool | ||||
| 		for i, o := range old { | ||||
| 			if o.Id == n.Id { | ||||
| 				seen = true | ||||
| 				old[i] = n | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !seen { | ||||
| 			old = append(old, n) | ||||
| 		} | ||||
| 	} | ||||
| 	return old | ||||
| } | ||||
|  | ||||
| func addServices(old, neu []*registry.Service) []*registry.Service { | ||||
| 	for _, s := range neu { | ||||
| 		var seen bool | ||||
| 		for i, o := range old { | ||||
| 			if o.Version == s.Version { | ||||
| 				s.Nodes = addNodes(o.Nodes, s.Nodes) | ||||
| 				seen = true | ||||
| 				old[i] = s | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !seen { | ||||
| 			old = append(old, s) | ||||
| 		} | ||||
| 	} | ||||
| 	return old | ||||
| } | ||||
|  | ||||
| func delNodes(old, del []*registry.Node) []*registry.Node { | ||||
| 	var nodes []*registry.Node | ||||
| 	for _, o := range old { | ||||
| 		var rem bool | ||||
| 		for _, n := range del { | ||||
| 			if o.Id == n.Id { | ||||
| 				rem = true | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !rem { | ||||
| 			nodes = append(nodes, o) | ||||
| 		} | ||||
| 	} | ||||
| 	return nodes | ||||
| } | ||||
|  | ||||
| func delServices(old, del []*registry.Service) []*registry.Service { | ||||
| 	var services []*registry.Service | ||||
| 	for i, o := range old { | ||||
| 		var rem bool | ||||
| 		for _, s := range del { | ||||
| 			if o.Version == s.Version { | ||||
| 				old[i].Nodes = delNodes(o.Nodes, s.Nodes) | ||||
| 				if len(old[i].Nodes) == 0 { | ||||
| 					rem = true | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		if !rem { | ||||
| 			services = append(services, o) | ||||
| 		} | ||||
| 	} | ||||
| 	return services | ||||
| } | ||||
|  | ||||
| func newEtcdWatcher(r *etcdRegistry) (registry.Watcher, error) { | ||||
| 	ew := &etcdWatcher{ | ||||
| 		registry: r, | ||||
| @@ -53,7 +124,7 @@ func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) { | ||||
| 		service, ok := e.registry.services[s.Name] | ||||
| 		if !ok { | ||||
| 			if rsp.Action == "create" { | ||||
| 				e.registry.services[s.Name] = s | ||||
| 				e.registry.services[s.Name] = []*registry.Service{s} | ||||
| 			} | ||||
| 			e.registry.Unlock() | ||||
| 			continue | ||||
| @@ -61,22 +132,14 @@ func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) { | ||||
|  | ||||
| 		switch rsp.Action { | ||||
| 		case "delete": | ||||
| 			var nodes []*registry.Node | ||||
| 			for _, node := range service.Nodes { | ||||
| 				var seen bool | ||||
| 				for _, n := range s.Nodes { | ||||
| 					if node.Id == n.Id { | ||||
| 						seen = true | ||||
| 						break | ||||
| 					} | ||||
| 				} | ||||
| 				if !seen { | ||||
| 					nodes = append(nodes, node) | ||||
| 				} | ||||
| 			services := delServices(service, []*registry.Service{s}) | ||||
| 			if len(services) > 0 { | ||||
| 				e.registry.services[s.Name] = services | ||||
| 			} else { | ||||
| 				delete(e.registry.services, s.Name) | ||||
| 			} | ||||
| 			service.Nodes = nodes | ||||
| 		case "create": | ||||
| 			service.Nodes = append(service.Nodes, s.Nodes...) | ||||
| 			e.registry.services[s.Name] = addServices(service, []*registry.Service{s}) | ||||
| 		} | ||||
|  | ||||
| 		e.registry.Unlock() | ||||
|   | ||||
| @@ -35,7 +35,7 @@ type memoryRegistry struct { | ||||
| 	updates    chan *update | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	services map[string]*registry.Service | ||||
| 	services map[string][]*registry.Service | ||||
| } | ||||
|  | ||||
| type update struct { | ||||
| @@ -96,7 +96,7 @@ func (d *delegate) LocalState(join bool) []byte { | ||||
| 	} | ||||
|  | ||||
| 	syncCh := make(chan *registry.Service, 1) | ||||
| 	m := map[string]*registry.Service{} | ||||
| 	m := map[string][]*registry.Service{} | ||||
|  | ||||
| 	d.updates <- &update{ | ||||
| 		Action: syncAction, | ||||
| @@ -104,7 +104,7 @@ func (d *delegate) LocalState(join bool) []byte { | ||||
| 	} | ||||
|  | ||||
| 	for s := range syncCh { | ||||
| 		m[s.Name] = s | ||||
| 		m[s.Name] = append(m[s.Name], s) | ||||
| 	} | ||||
|  | ||||
| 	b, _ := json.Marshal(m) | ||||
| @@ -119,16 +119,18 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	var m map[string]*registry.Service | ||||
| 	var m map[string][]*registry.Service | ||||
| 	if err := json.Unmarshal(buf, &m); err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	for _, service := range m { | ||||
| 		d.updates <- &update{ | ||||
| 			Action:  addAction, | ||||
| 			Service: service, | ||||
| 			sync:    nil, | ||||
| 	for _, services := range m { | ||||
| 		for _, service := range services { | ||||
| 			d.updates <- &update{ | ||||
| 				Action:  addAction, | ||||
| 				Service: service, | ||||
| 				sync:    nil, | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -138,19 +140,31 @@ func (m *memoryRegistry) run() { | ||||
| 		switch u.Action { | ||||
| 		case addAction: | ||||
| 			m.Lock() | ||||
| 			m.services[u.Service.Name] = u.Service | ||||
| 			if service, ok := m.services[u.Service.Name]; !ok { | ||||
| 				m.services[u.Service.Name] = []*registry.Service{u.Service} | ||||
| 			} else { | ||||
| 				m.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service}) | ||||
| 			} | ||||
| 			m.Unlock() | ||||
| 		case delAction: | ||||
| 			m.Lock() | ||||
| 			delete(m.services, u.Service.Name) | ||||
| 			if service, ok := m.services[u.Service.Name]; ok { | ||||
| 				if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { | ||||
| 					delete(m.services, u.Service.Name) | ||||
| 				} else { | ||||
| 					m.services[u.Service.Name] = services | ||||
| 				} | ||||
| 			} | ||||
| 			m.Unlock() | ||||
| 		case syncAction: | ||||
| 			if u.sync == nil { | ||||
| 				continue | ||||
| 			} | ||||
| 			m.RLock() | ||||
| 			for _, service := range m.services { | ||||
| 				u.sync <- service | ||||
| 			for _, services := range m.services { | ||||
| 				for _, service := range services { | ||||
| 					u.sync <- service | ||||
| 				} | ||||
| 			} | ||||
| 			m.RUnlock() | ||||
| 			close(u.sync) | ||||
| @@ -160,7 +174,11 @@ func (m *memoryRegistry) run() { | ||||
|  | ||||
| func (m *memoryRegistry) Register(s *registry.Service) error { | ||||
| 	m.Lock() | ||||
| 	m.services[s.Name] = s | ||||
| 	if service, ok := m.services[s.Name]; !ok { | ||||
| 		m.services[s.Name] = []*registry.Service{s} | ||||
| 	} else { | ||||
| 		m.services[s.Name] = addServices(service, []*registry.Service{s}) | ||||
| 	} | ||||
| 	m.Unlock() | ||||
|  | ||||
| 	b, _ := json.Marshal([]*update{ | ||||
| @@ -180,7 +198,13 @@ func (m *memoryRegistry) Register(s *registry.Service) error { | ||||
|  | ||||
| func (m *memoryRegistry) Deregister(s *registry.Service) error { | ||||
| 	m.Lock() | ||||
| 	delete(m.services, s.Name) | ||||
| 	if service, ok := m.services[s.Name]; ok { | ||||
| 		if services := delServices(service, []*registry.Service{s}); len(services) == 0 { | ||||
| 			delete(m.services, s.Name) | ||||
| 		} else { | ||||
| 			m.services[s.Name] = services | ||||
| 		} | ||||
| 	} | ||||
| 	m.Unlock() | ||||
|  | ||||
| 	b, _ := json.Marshal([]*update{ | ||||
| @@ -198,7 +222,7 @@ func (m *memoryRegistry) Deregister(s *registry.Service) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *memoryRegistry) GetService(name string) (*registry.Service, error) { | ||||
| func (m *memoryRegistry) GetService(name string) ([]*registry.Service, error) { | ||||
| 	m.RLock() | ||||
| 	service, ok := m.services[name] | ||||
| 	m.RUnlock() | ||||
| @@ -212,7 +236,7 @@ func (m *memoryRegistry) ListServices() ([]*registry.Service, error) { | ||||
| 	var services []*registry.Service | ||||
| 	m.RLock() | ||||
| 	for _, service := range m.services { | ||||
| 		services = append(services, service) | ||||
| 		services = append(services, service...) | ||||
| 	} | ||||
| 	m.RUnlock() | ||||
| 	return services, nil | ||||
| @@ -226,6 +250,77 @@ func (w *watcher) Stop() { | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func addNodes(old, neu []*registry.Node) []*registry.Node { | ||||
| 	for _, n := range neu { | ||||
| 		var seen bool | ||||
| 		for i, o := range old { | ||||
| 			if o.Id == n.Id { | ||||
| 				seen = true | ||||
| 				old[i] = n | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !seen { | ||||
| 			old = append(old, n) | ||||
| 		} | ||||
| 	} | ||||
| 	return old | ||||
| } | ||||
|  | ||||
| func addServices(old, neu []*registry.Service) []*registry.Service { | ||||
| 	for _, s := range neu { | ||||
| 		var seen bool | ||||
| 		for i, o := range old { | ||||
| 			if o.Version == s.Version { | ||||
| 				s.Nodes = addNodes(o.Nodes, s.Nodes) | ||||
| 				seen = true | ||||
| 				old[i] = s | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !seen { | ||||
| 			old = append(old, s) | ||||
| 		} | ||||
| 	} | ||||
| 	return old | ||||
| } | ||||
|  | ||||
| func delNodes(old, del []*registry.Node) []*registry.Node { | ||||
| 	var nodes []*registry.Node | ||||
| 	for _, o := range old { | ||||
| 		var rem bool | ||||
| 		for _, n := range del { | ||||
| 			if o.Id == n.Id { | ||||
| 				rem = true | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		if !rem { | ||||
| 			nodes = append(nodes, o) | ||||
| 		} | ||||
| 	} | ||||
| 	return nodes | ||||
| } | ||||
|  | ||||
| func delServices(old, del []*registry.Service) []*registry.Service { | ||||
| 	var services []*registry.Service | ||||
| 	for i, o := range old { | ||||
| 		var rem bool | ||||
| 		for _, s := range del { | ||||
| 			if o.Version == s.Version { | ||||
| 				old[i].Nodes = delNodes(o.Nodes, s.Nodes) | ||||
| 				if len(old[i].Nodes) == 0 { | ||||
| 					rem = true | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		if !rem { | ||||
| 			services = append(services, o) | ||||
| 		} | ||||
| 	} | ||||
| 	return services | ||||
| } | ||||
|  | ||||
| func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { | ||||
| 	cAddrs := []string{} | ||||
| 	hostname, _ := os.Hostname() | ||||
| @@ -246,7 +341,7 @@ func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { | ||||
|  | ||||
| 	mr := &memoryRegistry{ | ||||
| 		broadcasts: broadcasts, | ||||
| 		services:   make(map[string]*registry.Service), | ||||
| 		services:   make(map[string][]*registry.Service), | ||||
| 		updates:    updates, | ||||
| 	} | ||||
|  | ||||
|   | ||||
							
								
								
									
										78
									
								
								registry/memory/memory_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								registry/memory/memory_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,78 @@ | ||||
| package memory | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/myodc/go-micro/registry" | ||||
| ) | ||||
|  | ||||
| func TestDelServices(t *testing.T) { | ||||
| 	services := []*registry.Service{ | ||||
| 		{ | ||||
| 			Name:    "foo", | ||||
| 			Version: "1.0.0", | ||||
| 			Nodes: []*registry.Node{ | ||||
| 				{ | ||||
| 					Id:      "foo-123", | ||||
| 					Address: "localhost", | ||||
| 					Port:    9999, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:    "foo", | ||||
| 			Version: "1.0.0", | ||||
| 			Nodes: []*registry.Node{ | ||||
| 				{ | ||||
| 					Id:      "foo-123", | ||||
| 					Address: "localhost", | ||||
| 					Port:    6666, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	servs := delServices([]*registry.Service{services[0]}, []*registry.Service{services[1]}) | ||||
| 	if i := len(servs); i > 0 { | ||||
| 		t.Errorf("Expected 0 nodes, got %d: %+v", i, servs) | ||||
| 	} | ||||
| 	t.Logf("Services %+v", servs) | ||||
| } | ||||
|  | ||||
| func TestDelNodes(t *testing.T) { | ||||
| 	services := []*registry.Service{ | ||||
| 		{ | ||||
| 			Name:    "foo", | ||||
| 			Version: "1.0.0", | ||||
| 			Nodes: []*registry.Node{ | ||||
| 				{ | ||||
| 					Id:      "foo-123", | ||||
| 					Address: "localhost", | ||||
| 					Port:    9999, | ||||
| 				}, | ||||
| 				{ | ||||
| 					Id:      "foo-321", | ||||
| 					Address: "localhost", | ||||
| 					Port:    6666, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:    "foo", | ||||
| 			Version: "1.0.0", | ||||
| 			Nodes: []*registry.Node{ | ||||
| 				{ | ||||
| 					Id:      "foo-123", | ||||
| 					Address: "localhost", | ||||
| 					Port:    6666, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	nodes := delNodes(services[0].Nodes, services[1].Nodes) | ||||
| 	if i := len(nodes); i != 1 { | ||||
| 		t.Errorf("Expected only 1 node, got %d: %+v", i, nodes) | ||||
| 	} | ||||
| 	t.Logf("Nodes %+v", nodes) | ||||
| } | ||||
| @@ -3,7 +3,7 @@ package registry | ||||
| type Registry interface { | ||||
| 	Register(*Service) error | ||||
| 	Deregister(*Service) error | ||||
| 	GetService(string) (*Service, error) | ||||
| 	GetService(string) ([]*Service, error) | ||||
| 	ListServices() ([]*Service, error) | ||||
| 	Watch() (Watcher, error) | ||||
| } | ||||
| @@ -32,7 +32,7 @@ func Deregister(s *Service) error { | ||||
| 	return DefaultRegistry.Deregister(s) | ||||
| } | ||||
|  | ||||
| func GetService(name string) (*Service, error) { | ||||
| func GetService(name string) ([]*Service, error) { | ||||
| 	return DefaultRegistry.GetService(name) | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user