From d4832e8f34492e6729bad1018452e11cd1790ce2 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 15:53:28 +0100 Subject: [PATCH] Remove consul registry (#818) --- config/cmd/cmd.go | 4 +- registry/consul/consul.go | 438 ------------------------------- registry/consul/encoding.go | 170 ------------ registry/consul/encoding_test.go | 147 ----------- registry/consul/options.go | 81 ------ registry/consul/registry_test.go | 208 --------------- registry/consul/watcher.go | 290 -------------------- registry/consul/watcher_test.go | 86 ------ 8 files changed, 1 insertion(+), 1423 deletions(-) delete mode 100644 registry/consul/consul.go delete mode 100644 registry/consul/encoding.go delete mode 100644 registry/consul/encoding_test.go delete mode 100644 registry/consul/options.go delete mode 100644 registry/consul/registry_test.go delete mode 100644 registry/consul/watcher.go delete mode 100644 registry/consul/watcher_test.go diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 6cb6835b..a99557d2 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -27,7 +27,6 @@ import ( // registries "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/consul" "github.com/micro/go-micro/registry/etcd" "github.com/micro/go-micro/registry/mdns" rmem "github.com/micro/go-micro/registry/memory" @@ -155,7 +154,7 @@ var ( cli.StringFlag{ Name: "registry", EnvVar: "MICRO_REGISTRY", - Usage: "Registry for discovery. consul, etcd, mdns", + Usage: "Registry for discovery. etcd, mdns", }, cli.StringFlag{ Name: "registry_address", @@ -196,7 +195,6 @@ var ( DefaultRegistries = map[string]func(...registry.Option) registry.Registry{ "go.micro.registry": regSrv.NewRegistry, "service": regSrv.NewRegistry, - "consul": consul.NewRegistry, "etcd": etcd.NewRegistry, "mdns": mdns.NewRegistry, "memory": rmem.NewRegistry, diff --git a/registry/consul/consul.go b/registry/consul/consul.go deleted file mode 100644 index 152aae69..00000000 --- a/registry/consul/consul.go +++ /dev/null @@ -1,438 +0,0 @@ -package consul - -import ( - "crypto/tls" - "errors" - "fmt" - "net" - "net/http" - "runtime" - "strconv" - "sync" - "time" - - consul "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/registry" - mnet "github.com/micro/go-micro/util/net" - hash "github.com/mitchellh/hashstructure" -) - -type consulRegistry struct { - Address []string - opts registry.Options - - client *consul.Client - config *consul.Config - - // connect enabled - connect bool - - queryOptions *consul.QueryOptions - - sync.Mutex - register map[string]uint64 - // lastChecked tracks when a node was last checked as existing in Consul - lastChecked map[string]time.Time -} - -func getDeregisterTTL(t time.Duration) time.Duration { - // splay slightly for the watcher? - splay := time.Second * 5 - deregTTL := t + splay - - // consul has a minimum timeout on deregistration of 1 minute. - if t < time.Minute { - deregTTL = time.Minute + splay - } - - return deregTTL -} - -func newTransport(config *tls.Config) *http.Transport { - if config == nil { - config = &tls.Config{ - InsecureSkipVerify: true, - } - } - - t := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: config, - } - runtime.SetFinalizer(&t, func(tr **http.Transport) { - (*tr).CloseIdleConnections() - }) - return t -} - -func configure(c *consulRegistry, opts ...registry.Option) { - // set opts - for _, o := range opts { - o(&c.opts) - } - - // use default config - config := consul.DefaultConfig() - - if c.opts.Context != nil { - // Use the consul config passed in the options, if available - if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok { - config = co - } - if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok { - c.connect = cn - } - - // Use the consul query options passed in the options, if available - if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil { - c.queryOptions = qo - } - if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok { - c.queryOptions.AllowStale = as - } - } - - // check if there are any addrs - var addrs []string - - // iterate the options addresses - for _, address := range c.opts.Addrs { - // check we have a port - addr, port, err := net.SplitHostPort(address) - if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { - port = "8500" - addr = address - addrs = append(addrs, net.JoinHostPort(addr, port)) - } else if err == nil { - addrs = append(addrs, net.JoinHostPort(addr, port)) - } - } - - // set the addrs - if len(addrs) > 0 { - c.Address = addrs - config.Address = c.Address[0] - } - - if config.HttpClient == nil { - config.HttpClient = new(http.Client) - } - - // requires secure connection? - if c.opts.Secure || c.opts.TLSConfig != nil { - config.Scheme = "https" - // We're going to support InsecureSkipVerify - config.HttpClient.Transport = newTransport(c.opts.TLSConfig) - } - - // set timeout - if c.opts.Timeout > 0 { - config.HttpClient.Timeout = c.opts.Timeout - } - - // set the config - c.config = config - - // remove client - c.client = nil - - // setup the client - c.Client() -} - -func (c *consulRegistry) Init(opts ...registry.Option) error { - configure(c, opts...) - return nil -} - -func (c *consulRegistry) Deregister(s *registry.Service) error { - if len(s.Nodes) == 0 { - return errors.New("Require at least one node") - } - - // delete our hash and time check of the service - c.Lock() - delete(c.register, s.Name) - delete(c.lastChecked, s.Name) - c.Unlock() - - node := s.Nodes[0] - return c.Client().Agent().ServiceDeregister(node.Id) -} - -func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { - if len(s.Nodes) == 0 { - return errors.New("Require at least one node") - } - - var regTCPCheck bool - var regInterval time.Duration - - var options registry.RegisterOptions - for _, o := range opts { - o(&options) - } - - if c.opts.Context != nil { - if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok { - regTCPCheck = true - regInterval = tcpCheckInterval - } - } - - // create hash of service; uint64 - h, err := hash.Hash(s, nil) - if err != nil { - return err - } - - // use first node - node := s.Nodes[0] - - // get existing hash and last checked time - c.Lock() - v, ok := c.register[s.Name] - lastChecked := c.lastChecked[s.Name] - c.Unlock() - - // if it's already registered and matches then just pass the check - if ok && v == h { - if options.TTL == time.Duration(0) { - // ensure that our service hasn't been deregistered by Consul - if time.Since(lastChecked) <= getDeregisterTTL(regInterval) { - return nil - } - services, _, err := c.Client().Health().Checks(s.Name, c.queryOptions) - if err == nil { - for _, v := range services { - if v.ServiceID == node.Id { - return nil - } - } - } - } else { - // if the err is nil we're all good, bail out - // if not, we don't know what the state is, so full re-register - if err := c.Client().Agent().PassTTL("service:"+node.Id, ""); err == nil { - return nil - } - } - } - - // encode the tags - tags := encodeMetadata(node.Metadata) - tags = append(tags, encodeEndpoints(s.Endpoints)...) - tags = append(tags, encodeVersion(s.Version)...) - - var check *consul.AgentServiceCheck - - if regTCPCheck { - deregTTL := getDeregisterTTL(regInterval) - - check = &consul.AgentServiceCheck{ - TCP: node.Address, - Interval: fmt.Sprintf("%v", regInterval), - DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL), - } - - // if the TTL is greater than 0 create an associated check - } else if options.TTL > time.Duration(0) { - deregTTL := getDeregisterTTL(options.TTL) - - check = &consul.AgentServiceCheck{ - TTL: fmt.Sprintf("%v", options.TTL), - DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL), - } - } - - host, pt, _ := net.SplitHostPort(node.Address) - if host == "" { - host = node.Address - } - port, _ := strconv.Atoi(pt) - - // register the service - asr := &consul.AgentServiceRegistration{ - ID: node.Id, - Name: s.Name, - Tags: tags, - Port: port, - Address: host, - Check: check, - } - - // Specify consul connect - if c.connect { - asr.Connect = &consul.AgentServiceConnect{ - Native: true, - } - } - - if err := c.Client().Agent().ServiceRegister(asr); err != nil { - return err - } - - // save our hash and time check of the service - c.Lock() - c.register[s.Name] = h - c.lastChecked[s.Name] = time.Now() - c.Unlock() - - // if the TTL is 0 we don't mess with the checks - if options.TTL == time.Duration(0) { - return nil - } - - // pass the healthcheck - return c.Client().Agent().PassTTL("service:"+node.Id, "") -} - -func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) { - var rsp []*consul.ServiceEntry - var err error - - // if we're connect enabled only get connect services - if c.connect { - rsp, _, err = c.Client().Health().Connect(name, "", false, c.queryOptions) - } else { - rsp, _, err = c.Client().Health().Service(name, "", false, c.queryOptions) - } - if err != nil { - return nil, err - } - - serviceMap := map[string]*registry.Service{} - - for _, s := range rsp { - if s.Service.Service != name { - continue - } - - // version is now a tag - version, _ := decodeVersion(s.Service.Tags) - // service ID is now the node id - id := s.Service.ID - // key is always the version - key := version - - // address is service address - address := s.Service.Address - - // use node address - if len(address) == 0 { - address = s.Node.Address - } - - svc, ok := serviceMap[key] - if !ok { - svc = ®istry.Service{ - Endpoints: decodeEndpoints(s.Service.Tags), - Name: s.Service.Service, - Version: version, - } - serviceMap[key] = svc - } - - var del bool - - for _, check := range s.Checks { - // delete the node if the status is critical - if check.Status == "critical" { - del = true - break - } - } - - // if delete then skip the node - if del { - continue - } - - svc.Nodes = append(svc.Nodes, ®istry.Node{ - Id: id, - Address: mnet.HostPort(address, s.Service.Port), - Metadata: decodeMetadata(s.Service.Tags), - }) - } - - var services []*registry.Service - for _, service := range serviceMap { - services = append(services, service) - } - return services, nil -} - -func (c *consulRegistry) ListServices() ([]*registry.Service, error) { - rsp, _, err := c.Client().Catalog().Services(c.queryOptions) - if err != nil { - return nil, err - } - - var services []*registry.Service - - for service := range rsp { - services = append(services, ®istry.Service{Name: service}) - } - - return services, nil -} - -func (c *consulRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - return newConsulWatcher(c, opts...) -} - -func (c *consulRegistry) String() string { - return "consul" -} - -func (c *consulRegistry) Options() registry.Options { - return c.opts -} - -func (c *consulRegistry) Client() *consul.Client { - if c.client != nil { - return c.client - } - - for _, addr := range c.Address { - // set the address - c.config.Address = addr - - // create a new client - tmpClient, _ := consul.NewClient(c.config) - - // test the client - _, err := tmpClient.Agent().Host() - if err != nil { - continue - } - - // set the client - c.client = tmpClient - return c.client - } - - // set the default - c.client, _ = consul.NewClient(c.config) - - // return the client - return c.client -} - -func NewRegistry(opts ...registry.Option) registry.Registry { - cr := &consulRegistry{ - opts: registry.Options{}, - register: make(map[string]uint64), - lastChecked: make(map[string]time.Time), - queryOptions: &consul.QueryOptions{ - AllowStale: true, - }, - } - configure(cr, opts...) - return cr -} diff --git a/registry/consul/encoding.go b/registry/consul/encoding.go deleted file mode 100644 index 8a152683..00000000 --- a/registry/consul/encoding.go +++ /dev/null @@ -1,170 +0,0 @@ -package consul - -import ( - "bytes" - "compress/zlib" - "encoding/hex" - "encoding/json" - "io/ioutil" - - "github.com/micro/go-micro/registry" -) - -func encode(buf []byte) string { - var b bytes.Buffer - defer b.Reset() - - w := zlib.NewWriter(&b) - if _, err := w.Write(buf); err != nil { - return "" - } - w.Close() - - return hex.EncodeToString(b.Bytes()) -} - -func decode(d string) []byte { - hr, err := hex.DecodeString(d) - if err != nil { - return nil - } - - br := bytes.NewReader(hr) - zr, err := zlib.NewReader(br) - if err != nil { - return nil - } - - rbuf, err := ioutil.ReadAll(zr) - if err != nil { - return nil - } - - return rbuf -} - -func encodeEndpoints(en []*registry.Endpoint) []string { - var tags []string - for _, e := range en { - if b, err := json.Marshal(e); err == nil { - tags = append(tags, "e-"+encode(b)) - } - } - return tags -} - -func decodeEndpoints(tags []string) []*registry.Endpoint { - var en []*registry.Endpoint - - // use the first format you find - var ver byte - - for _, tag := range tags { - if len(tag) == 0 || tag[0] != 'e' { - continue - } - - // check version - if ver > 0 && tag[1] != ver { - continue - } - - var e *registry.Endpoint - var buf []byte - - // Old encoding was plain - if tag[1] == '=' { - buf = []byte(tag[2:]) - } - - // New encoding is hex - if tag[1] == '-' { - buf = decode(tag[2:]) - } - - if err := json.Unmarshal(buf, &e); err == nil { - en = append(en, e) - } - - // set version - ver = tag[1] - } - return en -} - -func encodeMetadata(md map[string]string) []string { - var tags []string - for k, v := range md { - if b, err := json.Marshal(map[string]string{ - k: v, - }); err == nil { - // new encoding - tags = append(tags, "t-"+encode(b)) - } - } - return tags -} - -func decodeMetadata(tags []string) map[string]string { - md := make(map[string]string) - - var ver byte - - for _, tag := range tags { - if len(tag) == 0 || tag[0] != 't' { - continue - } - - // check version - if ver > 0 && tag[1] != ver { - continue - } - - var kv map[string]string - var buf []byte - - // Old encoding was plain - if tag[1] == '=' { - buf = []byte(tag[2:]) - } - - // New encoding is hex - if tag[1] == '-' { - buf = decode(tag[2:]) - } - - // Now unmarshal - if err := json.Unmarshal(buf, &kv); err == nil { - for k, v := range kv { - md[k] = v - } - } - - // set version - ver = tag[1] - } - return md -} - -func encodeVersion(v string) []string { - return []string{"v-" + encode([]byte(v))} -} - -func decodeVersion(tags []string) (string, bool) { - for _, tag := range tags { - if len(tag) < 2 || tag[0] != 'v' { - continue - } - - // Old encoding was plain - if tag[1] == '=' { - return tag[2:], true - } - - // New encoding is hex - if tag[1] == '-' { - return string(decode(tag[2:])), true - } - } - return "", false -} diff --git a/registry/consul/encoding_test.go b/registry/consul/encoding_test.go deleted file mode 100644 index 7f511174..00000000 --- a/registry/consul/encoding_test.go +++ /dev/null @@ -1,147 +0,0 @@ -package consul - -import ( - "encoding/json" - "testing" - - "github.com/micro/go-micro/registry" -) - -func TestEncodingEndpoints(t *testing.T) { - eps := []*registry.Endpoint{ - ®istry.Endpoint{ - Name: "endpoint1", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, - Metadata: map[string]string{ - "foo1": "bar1", - }, - }, - ®istry.Endpoint{ - Name: "endpoint2", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, - Metadata: map[string]string{ - "foo2": "bar2", - }, - }, - ®istry.Endpoint{ - Name: "endpoint3", - Request: ®istry.Value{ - Name: "request", - Type: "request", - }, - Response: ®istry.Value{ - Name: "response", - Type: "response", - }, - Metadata: map[string]string{ - "foo3": "bar3", - }, - }, - } - - testEp := func(ep *registry.Endpoint, enc string) { - // encode endpoint - e := encodeEndpoints([]*registry.Endpoint{ep}) - - // check there are two tags; old and new - if len(e) != 1 { - t.Fatalf("Expected 1 encoded tags, got %v", e) - } - - // check old encoding - var seen bool - - for _, en := range e { - if en == enc { - seen = true - break - } - } - - if !seen { - t.Fatalf("Expected %s but not found", enc) - } - - // decode - d := decodeEndpoints([]string{enc}) - if len(d) == 0 { - t.Fatalf("Expected %v got %v", ep, d) - } - - // check name - if d[0].Name != ep.Name { - t.Fatalf("Expected ep %s got %s", ep.Name, d[0].Name) - } - - // check all the metadata exists - for k, v := range ep.Metadata { - if gv := d[0].Metadata[k]; gv != v { - t.Fatalf("Expected key %s val %s got val %s", k, v, gv) - } - } - } - - for _, ep := range eps { - // JSON encoded - jencoded, err := json.Marshal(ep) - if err != nil { - t.Fatal(err) - } - - // HEX encoded - hencoded := encode(jencoded) - // endpoint tag - hepTag := "e-" + hencoded - testEp(ep, hepTag) - } -} - -func TestEncodingVersion(t *testing.T) { - testData := []struct { - decoded string - encoded string - }{ - {"1.0.0", "v-789c32d433d03300040000ffff02ce00ee"}, - {"latest", "v-789cca492c492d2e01040000ffff08cc028e"}, - } - - for _, data := range testData { - e := encodeVersion(data.decoded) - - if e[0] != data.encoded { - t.Fatalf("Expected %s got %s", data.encoded, e) - } - - d, ok := decodeVersion(e) - if !ok { - t.Fatalf("Unexpected %t for %s", ok, data.encoded) - } - - if d != data.decoded { - t.Fatalf("Expected %s got %s", data.decoded, d) - } - - d, ok = decodeVersion([]string{data.encoded}) - if !ok { - t.Fatalf("Unexpected %t for %s", ok, data.encoded) - } - - if d != data.decoded { - t.Fatalf("Expected %s got %s", data.decoded, d) - } - } -} diff --git a/registry/consul/options.go b/registry/consul/options.go deleted file mode 100644 index 29bc3ee5..00000000 --- a/registry/consul/options.go +++ /dev/null @@ -1,81 +0,0 @@ -package consul - -import ( - "context" - "time" - - consul "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/registry" -) - -// Connect specifies services should be registered as Consul Connect services -func Connect() registry.Option { - return func(o *registry.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, "consul_connect", true) - } -} - -func Config(c *consul.Config) registry.Option { - return func(o *registry.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, "consul_config", c) - } -} - -// AllowStale sets whether any Consul server (non-leader) can service -// a read. This allows for lower latency and higher throughput -// at the cost of potentially stale data. -// Works similar to Consul DNS Config option [1]. -// Defaults to true. -// -// [1] https://www.consul.io/docs/agent/options.html#allow_stale -// -func AllowStale(v bool) registry.Option { - return func(o *registry.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, "consul_allow_stale", v) - } -} - -// QueryOptions specifies the QueryOptions to be used when calling -// Consul. See `Consul API` for more information [1]. -// -// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions -// -func QueryOptions(q *consul.QueryOptions) registry.Option { - return func(o *registry.Options) { - if q == nil { - return - } - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, "consul_query_options", q) - } -} - -// -// TCPCheck will tell the service provider to check the service address -// and port every `t` interval. It will enabled only if `t` is greater than 0. -// See `TCP + Interval` for more information [1]. -// -// [1] https://www.consul.io/docs/agent/checks.html -// -func TCPCheck(t time.Duration) registry.Option { - return func(o *registry.Options) { - if t <= time.Duration(0) { - return - } - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, "consul_tcp_check", t) - } -} diff --git a/registry/consul/registry_test.go b/registry/consul/registry_test.go deleted file mode 100644 index 3a275030..00000000 --- a/registry/consul/registry_test.go +++ /dev/null @@ -1,208 +0,0 @@ -package consul - -import ( - "bytes" - "encoding/json" - "errors" - "net" - "net/http" - "testing" - "time" - - consul "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/registry" -) - -type mockRegistry struct { - body []byte - status int - err error - url string -} - -func encodeData(obj interface{}) ([]byte, error) { - buf := bytes.NewBuffer(nil) - enc := json.NewEncoder(buf) - if err := enc.Encode(obj); err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func newMockServer(rg *mockRegistry, l net.Listener) error { - mux := http.NewServeMux() - mux.HandleFunc(rg.url, func(w http.ResponseWriter, r *http.Request) { - if rg.err != nil { - http.Error(w, rg.err.Error(), 500) - return - } - w.WriteHeader(rg.status) - w.Write(rg.body) - }) - return http.Serve(l, mux) -} - -func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) { - l, err := net.Listen("tcp", "localhost:0") - if err != nil { - // blurgh?!! - panic(err.Error()) - } - cfg := consul.DefaultConfig() - cfg.Address = l.Addr().String() - - go newMockServer(r, l) - - var cr = &consulRegistry{ - config: cfg, - Address: []string{cfg.Address}, - opts: registry.Options{}, - register: make(map[string]uint64), - lastChecked: make(map[string]time.Time), - queryOptions: &consul.QueryOptions{ - AllowStale: true, - }, - } - cr.Client() - - return cr, func() { - l.Close() - } -} - -func newServiceList(svc []*consul.ServiceEntry) []byte { - bts, _ := encodeData(svc) - return bts -} - -func TestConsul_GetService_WithError(t *testing.T) { - cr, cl := newConsulTestRegistry(&mockRegistry{ - err: errors.New("client-error"), - url: "/v1/health/service/service-name", - }) - defer cl() - - if _, err := cr.GetService("test-service"); err == nil { - t.Fatalf("Expected error not to be `nil`") - } -} - -func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) { - // warning is still seen as healthy, critical is not - svcs := []*consul.ServiceEntry{ - newServiceEntry( - "node-name-1", "node-address-1", "service-name", "v1.0.0", - []*consul.HealthCheck{ - newHealthCheck("node-name-1", "service-name", "passing"), - newHealthCheck("node-name-1", "service-name", "warning"), - }, - ), - newServiceEntry( - "node-name-2", "node-address-2", "service-name", "v1.0.0", - []*consul.HealthCheck{ - newHealthCheck("node-name-2", "service-name", "passing"), - newHealthCheck("node-name-2", "service-name", "warning"), - }, - ), - } - - cr, cl := newConsulTestRegistry(&mockRegistry{ - status: 200, - body: newServiceList(svcs), - url: "/v1/health/service/service-name", - }) - defer cl() - - svc, err := cr.GetService("service-name") - if err != nil { - t.Fatal("Unexpected error", err) - } - - if exp, act := 1, len(svc); exp != act { - t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act) - } - - if exp, act := 2, len(svc[0].Nodes); exp != act { - t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act) - } -} - -func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) { - // warning is still seen as healthy, critical is not - svcs := []*consul.ServiceEntry{ - newServiceEntry( - "node-name-1", "node-address-1", "service-name", "v1.0.0", - []*consul.HealthCheck{ - newHealthCheck("node-name-1", "service-name", "passing"), - newHealthCheck("node-name-1", "service-name", "warning"), - }, - ), - newServiceEntry( - "node-name-2", "node-address-2", "service-name", "v1.0.0", - []*consul.HealthCheck{ - newHealthCheck("node-name-2", "service-name", "passing"), - newHealthCheck("node-name-2", "service-name", "critical"), - }, - ), - } - - cr, cl := newConsulTestRegistry(&mockRegistry{ - status: 200, - body: newServiceList(svcs), - url: "/v1/health/service/service-name", - }) - defer cl() - - svc, err := cr.GetService("service-name") - if err != nil { - t.Fatal("Unexpected error", err) - } - - if exp, act := 1, len(svc); exp != act { - t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act) - } - - if exp, act := 1, len(svc[0].Nodes); exp != act { - t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act) - } -} - -func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) { - // warning is still seen as healthy, critical is not - svcs := []*consul.ServiceEntry{ - newServiceEntry( - "node-name-1", "node-address-1", "service-name", "v1.0.0", - []*consul.HealthCheck{ - newHealthCheck("node-name-1", "service-name", "passing"), - newHealthCheck("node-name-1", "service-name", "critical"), - }, - ), - newServiceEntry( - "node-name-2", "node-address-2", "service-name", "v1.0.0", - []*consul.HealthCheck{ - newHealthCheck("node-name-2", "service-name", "passing"), - newHealthCheck("node-name-2", "service-name", "critical"), - }, - ), - } - - cr, cl := newConsulTestRegistry(&mockRegistry{ - status: 200, - body: newServiceList(svcs), - url: "/v1/health/service/service-name", - }) - defer cl() - - svc, err := cr.GetService("service-name") - if err != nil { - t.Fatal("Unexpected error", err) - } - - if exp, act := 1, len(svc); exp != act { - t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act) - } - - if exp, act := 0, len(svc[0].Nodes); exp != act { - t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act) - } -} diff --git a/registry/consul/watcher.go b/registry/consul/watcher.go deleted file mode 100644 index 62829136..00000000 --- a/registry/consul/watcher.go +++ /dev/null @@ -1,290 +0,0 @@ -package consul - -import ( - "fmt" - "log" - "os" - "sync" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/api/watch" - "github.com/micro/go-micro/registry" -) - -type consulWatcher struct { - r *consulRegistry - wo registry.WatchOptions - wp *watch.Plan - watchers map[string]*watch.Plan - - next chan *registry.Result - exit chan bool - - sync.RWMutex - services map[string][]*registry.Service -} - -func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registry.Watcher, error) { - var wo registry.WatchOptions - for _, o := range opts { - o(&wo) - } - - cw := &consulWatcher{ - r: cr, - wo: wo, - exit: make(chan bool), - next: make(chan *registry.Result, 10), - watchers: make(map[string]*watch.Plan), - services: make(map[string][]*registry.Service), - } - - wp, err := watch.Parse(map[string]interface{}{"type": "services"}) - if err != nil { - return nil, err - } - - wp.Handler = cw.handle - go wp.RunWithClientAndLogger(cr.Client(), log.New(os.Stderr, "", log.LstdFlags)) - cw.wp = wp - - return cw, nil -} - -func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { - entries, ok := data.([]*api.ServiceEntry) - if !ok { - return - } - - serviceMap := map[string]*registry.Service{} - serviceName := "" - - for _, e := range entries { - serviceName = e.Service.Service - // version is now a tag - version, _ := decodeVersion(e.Service.Tags) - // service ID is now the node id - id := e.Service.ID - // key is always the version - key := version - // address is service address - address := e.Service.Address - - // use node address - if len(address) == 0 { - address = e.Node.Address - } - - svc, ok := serviceMap[key] - if !ok { - svc = ®istry.Service{ - Endpoints: decodeEndpoints(e.Service.Tags), - Name: e.Service.Service, - Version: version, - } - serviceMap[key] = svc - } - - var del bool - - for _, check := range e.Checks { - // delete the node if the status is critical - if check.Status == "critical" { - del = true - break - } - } - - // if delete then skip the node - if del { - continue - } - - svc.Nodes = append(svc.Nodes, ®istry.Node{ - Id: id, - Address: fmt.Sprintf("%s:%d", address, e.Service.Port), - Metadata: decodeMetadata(e.Service.Tags), - }) - } - - cw.RLock() - // make a copy - rservices := make(map[string][]*registry.Service) - for k, v := range cw.services { - rservices[k] = v - } - cw.RUnlock() - - var newServices []*registry.Service - - // serviceMap is the new set of services keyed by name+version - for _, newService := range serviceMap { - // append to the new set of cached services - newServices = append(newServices, newService) - - // check if the service exists in the existing cache - oldServices, ok := rservices[serviceName] - if !ok { - // does not exist? then we're creating brand new entries - cw.next <- ®istry.Result{Action: "create", Service: newService} - continue - } - - // service exists. ok let's figure out what to update and delete version wise - action := "create" - - for _, oldService := range oldServices { - // does this version exist? - // no? then default to create - if oldService.Version != newService.Version { - continue - } - - // yes? then it's an update - action = "update" - - var nodes []*registry.Node - // check the old nodes to see if they've been deleted - for _, oldNode := range oldService.Nodes { - var seen bool - for _, newNode := range newService.Nodes { - if newNode.Id == oldNode.Id { - seen = true - break - } - } - // does the old node exist in the new set of nodes - // no? then delete that shit - if !seen { - nodes = append(nodes, oldNode) - } - } - - // it's an update rather than creation - if len(nodes) > 0 { - delService := registry.CopyService(oldService) - delService.Nodes = nodes - cw.next <- ®istry.Result{Action: "delete", Service: delService} - } - } - - cw.next <- ®istry.Result{Action: action, Service: newService} - } - - // Now check old versions that may not be in new services map - for _, old := range rservices[serviceName] { - // old version does not exist in new version map - // kill it with fire! - if _, ok := serviceMap[old.Version]; !ok { - cw.next <- ®istry.Result{Action: "delete", Service: old} - } - } - - cw.Lock() - cw.services[serviceName] = newServices - cw.Unlock() -} - -func (cw *consulWatcher) handle(idx uint64, data interface{}) { - services, ok := data.(map[string][]string) - if !ok { - return - } - - // add new watchers - for service, _ := range services { - // Filter on watch options - // wo.Service: Only watch services we care about - if len(cw.wo.Service) > 0 && service != cw.wo.Service { - continue - } - - if _, ok := cw.watchers[service]; ok { - continue - } - wp, err := watch.Parse(map[string]interface{}{ - "type": "service", - "service": service, - }) - if err == nil { - wp.Handler = cw.serviceHandler - go wp.RunWithClientAndLogger(cw.r.Client(), log.New(os.Stderr, "", log.LstdFlags)) - cw.watchers[service] = wp - cw.next <- ®istry.Result{Action: "create", Service: ®istry.Service{Name: service}} - } - } - - cw.RLock() - // make a copy - rservices := make(map[string][]*registry.Service) - for k, v := range cw.services { - rservices[k] = v - } - cw.RUnlock() - - // remove unknown services from registry - // save the things we want to delete - deleted := make(map[string][]*registry.Service) - - for service, _ := range rservices { - if _, ok := services[service]; !ok { - cw.Lock() - // save this before deleting - deleted[service] = cw.services[service] - delete(cw.services, service) - cw.Unlock() - } - } - - // remove unknown services from watchers - for service, w := range cw.watchers { - if _, ok := services[service]; !ok { - w.Stop() - delete(cw.watchers, service) - for _, oldService := range deleted[service] { - // send a delete for the service nodes that we're removing - cw.next <- ®istry.Result{Action: "delete", Service: oldService} - } - // sent the empty list as the last resort to indicate to delete the entire service - cw.next <- ®istry.Result{Action: "delete", Service: ®istry.Service{Name: service}} - } - } -} - -func (cw *consulWatcher) Next() (*registry.Result, error) { - select { - case <-cw.exit: - return nil, registry.ErrWatcherStopped - case r, ok := <-cw.next: - if !ok { - return nil, registry.ErrWatcherStopped - } - return r, nil - } - // NOTE: This is a dead code path: e.g. it will never be reached - // as we return in all previous code paths never leading to this return - return nil, registry.ErrWatcherStopped -} - -func (cw *consulWatcher) Stop() { - select { - case <-cw.exit: - return - default: - close(cw.exit) - if cw.wp == nil { - return - } - cw.wp.Stop() - - // drain results - for { - select { - case <-cw.next: - default: - return - } - } - } -} diff --git a/registry/consul/watcher_test.go b/registry/consul/watcher_test.go deleted file mode 100644 index 19e0cb80..00000000 --- a/registry/consul/watcher_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package consul - -import ( - "testing" - - "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/registry" -) - -func TestHealthyServiceHandler(t *testing.T) { - watcher := newWatcher() - serviceEntry := newServiceEntry( - "node-name", "node-address", "service-name", "v1.0.0", - []*api.HealthCheck{ - newHealthCheck("node-name", "service-name", "passing"), - }, - ) - - watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry}) - - if len(watcher.services["service-name"][0].Nodes) != 1 { - t.Errorf("Expected length of the service nodes to be 1") - } -} - -func TestUnhealthyServiceHandler(t *testing.T) { - watcher := newWatcher() - serviceEntry := newServiceEntry( - "node-name", "node-address", "service-name", "v1.0.0", - []*api.HealthCheck{ - newHealthCheck("node-name", "service-name", "critical"), - }, - ) - - watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry}) - - if len(watcher.services["service-name"][0].Nodes) != 0 { - t.Errorf("Expected length of the service nodes to be 0") - } -} - -func TestUnhealthyNodeServiceHandler(t *testing.T) { - watcher := newWatcher() - serviceEntry := newServiceEntry( - "node-name", "node-address", "service-name", "v1.0.0", - []*api.HealthCheck{ - newHealthCheck("node-name", "service-name", "passing"), - newHealthCheck("node-name", "serfHealth", "critical"), - }, - ) - - watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry}) - - if len(watcher.services["service-name"][0].Nodes) != 0 { - t.Errorf("Expected length of the service nodes to be 0") - } -} - -func newWatcher() *consulWatcher { - return &consulWatcher{ - exit: make(chan bool), - next: make(chan *registry.Result, 10), - services: make(map[string][]*registry.Service), - } -} - -func newHealthCheck(node, name, status string) *api.HealthCheck { - return &api.HealthCheck{ - Node: node, - Name: name, - Status: status, - ServiceName: name, - } -} - -func newServiceEntry(node, address, name, version string, checks []*api.HealthCheck) *api.ServiceEntry { - return &api.ServiceEntry{ - Node: &api.Node{Node: node, Address: name}, - Service: &api.AgentService{ - Service: name, - Address: address, - Tags: encodeVersion(version), - }, - Checks: checks, - } -}