diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index f2780058..d328a2d7 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -13,9 +13,9 @@ import ( "time" "github.com/coreos/etcd/clientv3" - "github.com/micro/go-micro/registry" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/util/log" hash "github.com/mitchellh/hashstructure" ) @@ -26,7 +26,8 @@ var ( type etcdRegistry struct { client *clientv3.Client options registry.Options - sync.Mutex + + sync.RWMutex register map[string]uint64 leases map[string]clientv3.LeaseID } @@ -131,62 +132,77 @@ func (e *etcdRegistry) Options() registry.Options { return e.options } -func (e *etcdRegistry) Deregister(s *registry.Service) error { +func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } - e.Lock() - // delete our hash of the service - delete(e.register, s.Name) - // delete our lease of the service - delete(e.leases, s.Name) - e.Unlock() + // check existing lease cache + e.RLock() + leaseID, ok := e.leases[s.Name+node.Id] + e.RUnlock() - ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) - defer cancel() + if !ok { + // missing lease, check if the key exists + ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) + defer cancel() - for _, node := range s.Nodes { - _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) + // look for the existing key + rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id)) if err != nil { return err } - } - return nil -} -func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { - if len(s.Nodes) == 0 { - return errors.New("Require at least one node") + // get the existing lease + for _, kv := range rsp.Kvs { + if kv.Lease > 0 { + leaseID = clientv3.LeaseID(kv.Lease) + // create hash of service; uint64 + h, err := hash.Hash(node, nil) + if err != nil { + return err + } + + // save the info + e.Lock() + e.leases[s.Name+node.Id] = leaseID + e.register[s.Name+node.Id] = h + e.Unlock() + break + } + } } var leaseNotFound bool - //refreshing lease if existing - leaseID, ok := e.leases[s.Name] - if ok { + + // renew the lease if it exists + if leaseID > 0 { + log.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { if err != rpctypes.ErrLeaseNotFound { return err } + log.Tracef("Lease not found for %s %d", s.Name, leaseID) // lease not found do register leaseNotFound = true } } // create hash of service; uint64 - h, err := hash.Hash(s, nil) + h, err := hash.Hash(node, nil) if err != nil { return err } - // get existing hash + // get existing hash for the service node e.Lock() - v, ok := e.register[s.Name] + v, ok := e.register[s.Name+node.Id] e.Unlock() // the service is unchanged, skip registering if ok && v == h && !leaseNotFound { + log.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id) return nil } @@ -195,6 +211,7 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp Version: s.Version, Metadata: s.Metadata, Endpoints: s.Endpoints, + Nodes: []*registry.Node{node}, } var options registry.RegisterOptions @@ -207,36 +224,80 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp var lgr *clientv3.LeaseGrantResponse if options.TTL.Seconds() > 0 { + // get a lease used to expire keys since we have a ttl lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds())) if err != nil { return err } } + log.Tracef("Registering %s id %s with lease %v and ttl %v", service.Name, node.Id, lgr, options.TTL) + // create an entry for the node + if lgr != nil { + _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) + } else { + _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) + } + if err != nil { + return err + } + + e.Lock() + // save our hash of the service + e.register[s.Name+node.Id] = h + // save our leaseID of the service + if lgr != nil { + e.leases[s.Name+node.Id] = lgr.ID + } + e.Unlock() + + return nil +} + +func (e *etcdRegistry) Deregister(s *registry.Service) error { + if len(s.Nodes) == 0 { + return errors.New("Require at least one node") + } + for _, node := range s.Nodes { - service.Nodes = []*registry.Node{node} - if lgr != nil { - _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) - } else { - _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) - } + e.Lock() + // delete our hash of the service + delete(e.register, s.Name+node.Id) + // delete our lease of the service + delete(e.leases, s.Name+node.Id) + e.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) + defer cancel() + + log.Tracef("Deregistering %s id %s", s.Name, node.Id) + _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) if err != nil { return err } } - e.Lock() - // save our hash of the service - e.register[s.Name] = h - // save our leaseID of the service - if lgr != nil { - e.leases[s.Name] = lgr.ID - } - e.Unlock() - return nil } +func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { + if len(s.Nodes) == 0 { + return errors.New("Require at least one node") + } + + var gerr error + + // register each node individually + for _, node := range s.Nodes { + err := e.registerNode(s, node, opts...) + if err != nil { + gerr = err + } + } + + return gerr +} + func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel()