From 87543b2c8afbd301bfe7d1eeebcabbdaaea471f4 Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Fri, 19 Jun 2020 14:58:16 +0100 Subject: [PATCH] registry/etcd: add support for domain options (#1714) --- registry/etcd/etcd.go | 204 +++++++++++++++++++++++++++++---------- registry/etcd/watcher.go | 12 ++- 2 files changed, 163 insertions(+), 53 deletions(-) diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index 8cbda8cf..4e52b650 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "fmt" "net" "path" "sort" @@ -15,30 +16,37 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/mvcc/mvccpb" "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" hash "github.com/mitchellh/hashstructure" "go.uber.org/zap" ) -var ( - prefix = "/micro/registry/" +const ( + prefix = "/micro/registry/" + defaultDomain = "micro" ) type etcdRegistry struct { client *clientv3.Client options registry.Options + // register and leases are grouped by domain sync.RWMutex - register map[string]uint64 - leases map[string]clientv3.LeaseID + register map[string]register + leases map[string]leases } +type register map[string]uint64 +type leases map[string]clientv3.LeaseID + +// NewRegistry returns an initialized etcd registry func NewRegistry(opts ...registry.Option) registry.Registry { e := &etcdRegistry{ options: registry.Options{}, - register: make(map[string]uint64), - leases: make(map[string]clientv3.LeaseID), + register: make(map[string]register), + leases: make(map[string]leases), } configure(e, opts...) return e @@ -48,7 +56,6 @@ func configure(e *etcdRegistry, opts ...registry.Option) error { config := clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, } - for _, o := range opts { o(&e.options) } @@ -120,14 +127,22 @@ func decode(ds []byte) *registry.Service { return s } -func nodePath(s, id string) string { +func nodePath(domain, s, id string) string { service := strings.Replace(s, "/", "-", -1) node := strings.Replace(id, "/", "-", -1) - return path.Join(prefix, service, node) + return path.Join(prefixWithDomain(domain), service, node) } -func servicePath(s string) string { - return path.Join(prefix, strings.Replace(s, "/", "-", -1)) +func servicePath(domain, s string) string { + return path.Join(prefixWithDomain(domain), serializeServiceName(s)) +} + +func serializeServiceName(s string) string { + return strings.ReplaceAll(s, "/", "-") +} + +func prefixWithDomain(domain string) string { + return path.Join(prefix, domain) } func (e *etcdRegistry) Init(opts ...registry.Option) error { @@ -143,10 +158,27 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op return errors.New("Require at least one node") } - // check existing lease cache - e.RLock() - leaseID, ok := e.leases[s.Name+node.Id] - e.RUnlock() + // parse the options + var options registry.RegisterOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain + } + + e.Lock() + // ensure the leases and registers are setup for this domain + if _, ok := e.leases[options.Domain]; !ok { + e.leases[options.Domain] = make(leases) + } + if _, ok := e.register[options.Domain]; !ok { + e.register[options.Domain] = make(register) + } + + // check to see if we already have a lease cached + leaseID, ok := e.leases[options.Domain][s.Name+node.Id] + e.Unlock() if !ok { // missing lease, check if the key exists @@ -154,7 +186,8 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op defer cancel() // look for the existing key - rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable()) + key := nodePath(options.Domain, s.Name, node.Id) + rsp, err := e.client.Get(ctx, key, clientv3.WithSerializable()) if err != nil { return err } @@ -178,8 +211,8 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op // save the info e.Lock() - e.leases[s.Name+node.Id] = leaseID - e.register[s.Name+node.Id] = h + e.leases[options.Domain][s.Name+node.Id] = leaseID + e.register[options.Domain][s.Name+node.Id] = h e.Unlock() break @@ -194,6 +227,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) } + if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { if err != rpctypes.ErrLeaseNotFound { return err @@ -202,6 +236,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Lease not found for %s %d", s.Name, leaseID) } + // lease not found do register leaseNotFound = true } @@ -214,9 +249,9 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op } // get existing hash for the service node - e.Lock() - v, ok := e.register[s.Name+node.Id] - e.Unlock() + e.RLock() + v, ok := e.register[options.Domain][s.Name+node.Id] + e.RUnlock() // the service is unchanged, skip registering if ok && v == h && !leaseNotFound { @@ -226,6 +261,13 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op return nil } + // add domain to the service metadata so it can be determined when doing wildcard queries + if s.Metadata == nil { + s.Metadata = map[string]string{"domain": options.Domain} + } else { + s.Metadata["domain"] = options.Domain + } + service := ®istry.Service{ Name: s.Name, Version: s.Version, @@ -234,11 +276,6 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op Nodes: []*registry.Node{node}, } - var options registry.RegisterOptions - for _, o := range opts { - o(&options) - } - ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() @@ -254,22 +291,24 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL) } + // create an entry for the node + var putOpts []clientv3.OpOption if lgr != nil { - _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) - } else { - _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) + putOpts = append(putOpts, clientv3.WithLease(lgr.ID)) } - if err != nil { + + key := nodePath(options.Domain, s.Name, node.Id) + if _, err = e.client.Put(ctx, key, encode(service), putOpts...); err != nil { return err } e.Lock() // save our hash of the service - e.register[s.Name+node.Id] = h + e.register[options.Domain][s.Name+node.Id] = h // save our leaseID of the service if lgr != nil { - e.leases[s.Name+node.Id] = lgr.ID + e.leases[options.Domain][s.Name+node.Id] = lgr.ID } e.Unlock() @@ -281,6 +320,15 @@ func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.Deregist return errors.New("Require at least one node") } + // parse the options + var options registry.DeregisterOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain + } + for _, node := range s.Nodes { e.Lock() // delete our hash of the service @@ -295,8 +343,8 @@ func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.Deregist if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("Deregistering %s id %s", s.Name, node.Id) } - _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) - if err != nil { + + if _, err := e.client.Delete(ctx, nodePath(options.Domain, s.Name, node.Id)); err != nil { return err } } @@ -313,8 +361,7 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp // register each node individually for _, node := range s.Nodes { - err := e.registerNode(s, node, opts...) - if err != nil { + if err := e.registerNode(s, node, opts...); err != nil { gerr = err } } @@ -326,20 +373,52 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() - rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable()) - if err != nil { - return nil, err + // parse the options and fallback to the default domain + var options registry.GetOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain } - if len(rsp.Kvs) == 0 { + var results []*mvccpb.KeyValue + if options.Domain == registry.WildcardDomain { + rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + + // filter using a check for the service name + keyPath := fmt.Sprintf("/%v/", serializeServiceName(name)) + for _, kv := range rsp.Kvs { + if strings.Contains(string(kv.Key), keyPath) { + results = append(results, kv) + } + } + } else { + prefix := servicePath(options.Domain, name) + "/" + rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + results = rsp.Kvs + } + + if len(results) == 0 { return nil, registry.ErrNotFound } - serviceMap := map[string]*registry.Service{} + versions := make(map[string]*registry.Service) + + for _, n := range results { + // key contains the domain, service name and version. hence, if a service name exists in two + // seperate domains, it'll be returned twice (for wildcard queries), this is because although + // the name is the same, the endpoints / metadata could differ + key, _ := path.Split(string(n.Key)) - for _, n := range rsp.Kvs { if sn := decode(n.Value); sn != nil { - s, ok := serviceMap[sn.Version] + s, ok := versions[key] if !ok { s = ®istry.Service{ Name: sn.Name, @@ -347,15 +426,15 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r Metadata: sn.Metadata, Endpoints: sn.Endpoints, } - serviceMap[s.Version] = s + versions[s.Version] = s } s.Nodes = append(s.Nodes, sn.Nodes...) } } - services := make([]*registry.Service, 0, len(serviceMap)) - for _, service := range serviceMap { + services := make([]*registry.Service, 0, len(versions)) + for _, service := range versions { services = append(services, service) } @@ -363,30 +442,53 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r } func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) { - versions := make(map[string]*registry.Service) + // parse the options + var options registry.ListOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain + } + + // determine the prefix + var p string + if options.Domain == registry.WildcardDomain { + p = prefix + } else { + p = prefixWithDomain(options.Domain) + } ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() - rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) + rsp, err := e.client.Get(ctx, p, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } - if len(rsp.Kvs) == 0 { return []*registry.Service{}, nil } + versions := make(map[string]*registry.Service) for _, n := range rsp.Kvs { sn := decode(n.Value) + if sn == nil { continue } - v, ok := versions[sn.Name+sn.Version] + + // key contains the domain, service name and version. hence, if a service name exists in two + // seperate domains, it'll be returned twice (for wildcard queries), this is because although + // the name is the same, the endpoints / metadata could differ + key, _ := path.Split(string(n.Key)) + + v, ok := versions[key] if !ok { - versions[sn.Name+sn.Version] = sn + versions[key] = sn continue } + // append to service:version nodes v.Nodes = append(v.Nodes, sn.Nodes...) } diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go index b8ce443d..e1fab511 100644 --- a/registry/etcd/watcher.go +++ b/registry/etcd/watcher.go @@ -21,6 +21,9 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat for _, o := range opts { o(&wo) } + if len(wo.Domain) == 0 { + wo.Domain = defaultDomain + } ctx, cancel := context.WithCancel(context.Background()) stop := make(chan bool, 1) @@ -31,8 +34,13 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat }() watchPath := prefix - if len(wo.Service) > 0 { - watchPath = servicePath(wo.Service) + "/" + if wo.Domain == registry.WildcardDomain { + if len(wo.Service) > 0 { + return nil, errors.New("Cannot watch a service accross domains") + } + watchPath = prefix + } else if len(wo.Service) > 0 { + watchPath = servicePath(wo.Domain, wo.Service) + "/" } return &etcdWatcher{