registry/etcd: add support for domain options (#1714)

This commit is contained in:
ben-toogood 2020-06-19 14:58:16 +01:00 committed by GitHub
parent 5f9c3a6efd
commit 87543b2c8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 163 additions and 53 deletions

View File

@ -6,6 +6,7 @@ import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net" "net"
"path" "path"
"sort" "sort"
@ -15,30 +16,37 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
hash "github.com/mitchellh/hashstructure" hash "github.com/mitchellh/hashstructure"
"go.uber.org/zap" "go.uber.org/zap"
) )
var ( const (
prefix = "/micro/registry/" prefix = "/micro/registry/"
defaultDomain = "micro"
) )
type etcdRegistry struct { type etcdRegistry struct {
client *clientv3.Client client *clientv3.Client
options registry.Options options registry.Options
// register and leases are grouped by domain
sync.RWMutex sync.RWMutex
register map[string]uint64 register map[string]register
leases map[string]clientv3.LeaseID leases map[string]leases
} }
type register map[string]uint64
type leases map[string]clientv3.LeaseID
// NewRegistry returns an initialized etcd registry
func NewRegistry(opts ...registry.Option) registry.Registry { func NewRegistry(opts ...registry.Option) registry.Registry {
e := &etcdRegistry{ e := &etcdRegistry{
options: registry.Options{}, options: registry.Options{},
register: make(map[string]uint64), register: make(map[string]register),
leases: make(map[string]clientv3.LeaseID), leases: make(map[string]leases),
} }
configure(e, opts...) configure(e, opts...)
return e return e
@ -48,7 +56,6 @@ func configure(e *etcdRegistry, opts ...registry.Option) error {
config := clientv3.Config{ config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, Endpoints: []string{"127.0.0.1:2379"},
} }
for _, o := range opts { for _, o := range opts {
o(&e.options) o(&e.options)
} }
@ -120,14 +127,22 @@ func decode(ds []byte) *registry.Service {
return s return s
} }
func nodePath(s, id string) string { func nodePath(domain, s, id string) string {
service := strings.Replace(s, "/", "-", -1) service := strings.Replace(s, "/", "-", -1)
node := strings.Replace(id, "/", "-", -1) node := strings.Replace(id, "/", "-", -1)
return path.Join(prefix, service, node) return path.Join(prefixWithDomain(domain), service, node)
} }
func servicePath(s string) string { func servicePath(domain, s string) string {
return path.Join(prefix, strings.Replace(s, "/", "-", -1)) return path.Join(prefixWithDomain(domain), serializeServiceName(s))
}
func serializeServiceName(s string) string {
return strings.ReplaceAll(s, "/", "-")
}
func prefixWithDomain(domain string) string {
return path.Join(prefix, domain)
} }
func (e *etcdRegistry) Init(opts ...registry.Option) error { func (e *etcdRegistry) Init(opts ...registry.Option) error {
@ -143,10 +158,27 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
return errors.New("Require at least one node") return errors.New("Require at least one node")
} }
// check existing lease cache // parse the options
e.RLock() var options registry.RegisterOptions
leaseID, ok := e.leases[s.Name+node.Id] for _, o := range opts {
e.RUnlock() o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
e.Lock()
// ensure the leases and registers are setup for this domain
if _, ok := e.leases[options.Domain]; !ok {
e.leases[options.Domain] = make(leases)
}
if _, ok := e.register[options.Domain]; !ok {
e.register[options.Domain] = make(register)
}
// check to see if we already have a lease cached
leaseID, ok := e.leases[options.Domain][s.Name+node.Id]
e.Unlock()
if !ok { if !ok {
// missing lease, check if the key exists // missing lease, check if the key exists
@ -154,7 +186,8 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
defer cancel() defer cancel()
// look for the existing key // look for the existing key
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable()) key := nodePath(options.Domain, s.Name, node.Id)
rsp, err := e.client.Get(ctx, key, clientv3.WithSerializable())
if err != nil { if err != nil {
return err return err
} }
@ -178,8 +211,8 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
// save the info // save the info
e.Lock() e.Lock()
e.leases[s.Name+node.Id] = leaseID e.leases[options.Domain][s.Name+node.Id] = leaseID
e.register[s.Name+node.Id] = h e.register[options.Domain][s.Name+node.Id] = h
e.Unlock() e.Unlock()
break break
@ -194,6 +227,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
} }
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
if err != rpctypes.ErrLeaseNotFound { if err != rpctypes.ErrLeaseNotFound {
return err return err
@ -202,6 +236,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Lease not found for %s %d", s.Name, leaseID) logger.Tracef("Lease not found for %s %d", s.Name, leaseID)
} }
// lease not found do register // lease not found do register
leaseNotFound = true leaseNotFound = true
} }
@ -214,9 +249,9 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
} }
// get existing hash for the service node // get existing hash for the service node
e.Lock() e.RLock()
v, ok := e.register[s.Name+node.Id] v, ok := e.register[options.Domain][s.Name+node.Id]
e.Unlock() e.RUnlock()
// the service is unchanged, skip registering // the service is unchanged, skip registering
if ok && v == h && !leaseNotFound { if ok && v == h && !leaseNotFound {
@ -226,6 +261,13 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
return nil return nil
} }
// add domain to the service metadata so it can be determined when doing wildcard queries
if s.Metadata == nil {
s.Metadata = map[string]string{"domain": options.Domain}
} else {
s.Metadata["domain"] = options.Domain
}
service := &registry.Service{ service := &registry.Service{
Name: s.Name, Name: s.Name,
Version: s.Version, Version: s.Version,
@ -234,11 +276,6 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
@ -254,22 +291,24 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL) logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
} }
// create an entry for the node // create an entry for the node
var putOpts []clientv3.OpOption
if lgr != nil { if lgr != nil {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) putOpts = append(putOpts, clientv3.WithLease(lgr.ID))
} else {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
} }
if err != nil {
key := nodePath(options.Domain, s.Name, node.Id)
if _, err = e.client.Put(ctx, key, encode(service), putOpts...); err != nil {
return err return err
} }
e.Lock() e.Lock()
// save our hash of the service // save our hash of the service
e.register[s.Name+node.Id] = h e.register[options.Domain][s.Name+node.Id] = h
// save our leaseID of the service // save our leaseID of the service
if lgr != nil { if lgr != nil {
e.leases[s.Name+node.Id] = lgr.ID e.leases[options.Domain][s.Name+node.Id] = lgr.ID
} }
e.Unlock() e.Unlock()
@ -281,6 +320,15 @@ func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.Deregist
return errors.New("Require at least one node") return errors.New("Require at least one node")
} }
// parse the options
var options registry.DeregisterOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
for _, node := range s.Nodes { for _, node := range s.Nodes {
e.Lock() e.Lock()
// delete our hash of the service // delete our hash of the service
@ -295,8 +343,8 @@ func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.Deregist
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Deregistering %s id %s", s.Name, node.Id) logger.Tracef("Deregistering %s id %s", s.Name, node.Id)
} }
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
if err != nil { if _, err := e.client.Delete(ctx, nodePath(options.Domain, s.Name, node.Id)); err != nil {
return err return err
} }
} }
@ -313,8 +361,7 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp
// register each node individually // register each node individually
for _, node := range s.Nodes { for _, node := range s.Nodes {
err := e.registerNode(s, node, opts...) if err := e.registerNode(s, node, opts...); err != nil {
if err != nil {
gerr = err gerr = err
} }
} }
@ -326,20 +373,52 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable()) // parse the options and fallback to the default domain
var options registry.GetOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
var results []*mvccpb.KeyValue
if options.Domain == registry.WildcardDomain {
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(rsp.Kvs) == 0 { // filter using a check for the service name
keyPath := fmt.Sprintf("/%v/", serializeServiceName(name))
for _, kv := range rsp.Kvs {
if strings.Contains(string(kv.Key), keyPath) {
results = append(results, kv)
}
}
} else {
prefix := servicePath(options.Domain, name) + "/"
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
results = rsp.Kvs
}
if len(results) == 0 {
return nil, registry.ErrNotFound return nil, registry.ErrNotFound
} }
serviceMap := map[string]*registry.Service{} versions := make(map[string]*registry.Service)
for _, n := range results {
// key contains the domain, service name and version. hence, if a service name exists in two
// seperate domains, it'll be returned twice (for wildcard queries), this is because although
// the name is the same, the endpoints / metadata could differ
key, _ := path.Split(string(n.Key))
for _, n := range rsp.Kvs {
if sn := decode(n.Value); sn != nil { if sn := decode(n.Value); sn != nil {
s, ok := serviceMap[sn.Version] s, ok := versions[key]
if !ok { if !ok {
s = &registry.Service{ s = &registry.Service{
Name: sn.Name, Name: sn.Name,
@ -347,15 +426,15 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
Metadata: sn.Metadata, Metadata: sn.Metadata,
Endpoints: sn.Endpoints, Endpoints: sn.Endpoints,
} }
serviceMap[s.Version] = s versions[s.Version] = s
} }
s.Nodes = append(s.Nodes, sn.Nodes...) s.Nodes = append(s.Nodes, sn.Nodes...)
} }
} }
services := make([]*registry.Service, 0, len(serviceMap)) services := make([]*registry.Service, 0, len(versions))
for _, service := range serviceMap { for _, service := range versions {
services = append(services, service) services = append(services, service)
} }
@ -363,30 +442,53 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
} }
func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) { func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
versions := make(map[string]*registry.Service) // parse the options
var options registry.ListOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
// determine the prefix
var p string
if options.Domain == registry.WildcardDomain {
p = prefix
} else {
p = prefixWithDomain(options.Domain)
}
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) rsp, err := e.client.Get(ctx, p, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(rsp.Kvs) == 0 { if len(rsp.Kvs) == 0 {
return []*registry.Service{}, nil return []*registry.Service{}, nil
} }
versions := make(map[string]*registry.Service)
for _, n := range rsp.Kvs { for _, n := range rsp.Kvs {
sn := decode(n.Value) sn := decode(n.Value)
if sn == nil { if sn == nil {
continue continue
} }
v, ok := versions[sn.Name+sn.Version]
// key contains the domain, service name and version. hence, if a service name exists in two
// seperate domains, it'll be returned twice (for wildcard queries), this is because although
// the name is the same, the endpoints / metadata could differ
key, _ := path.Split(string(n.Key))
v, ok := versions[key]
if !ok { if !ok {
versions[sn.Name+sn.Version] = sn versions[key] = sn
continue continue
} }
// append to service:version nodes // append to service:version nodes
v.Nodes = append(v.Nodes, sn.Nodes...) v.Nodes = append(v.Nodes, sn.Nodes...)
} }

View File

@ -21,6 +21,9 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat
for _, o := range opts { for _, o := range opts {
o(&wo) o(&wo)
} }
if len(wo.Domain) == 0 {
wo.Domain = defaultDomain
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
stop := make(chan bool, 1) stop := make(chan bool, 1)
@ -31,8 +34,13 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat
}() }()
watchPath := prefix watchPath := prefix
if wo.Domain == registry.WildcardDomain {
if len(wo.Service) > 0 { if len(wo.Service) > 0 {
watchPath = servicePath(wo.Service) + "/" return nil, errors.New("Cannot watch a service accross domains")
}
watchPath = prefix
} else if len(wo.Service) > 0 {
watchPath = servicePath(wo.Domain, wo.Service) + "/"
} }
return &etcdWatcher{ return &etcdWatcher{