Merge pull request #820 from micro/etcd-reg

Fix etcd registry lease processing and suppression
This commit is contained in:
Asim Aslam 2019-10-06 10:03:38 +01:00 committed by GitHub
commit 68a3fc7996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,9 +13,9 @@ import (
"time"
"github.com/coreos/etcd/clientv3"
"github.com/micro/go-micro/registry"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/util/log"
hash "github.com/mitchellh/hashstructure"
)
@ -26,7 +26,8 @@ var (
type etcdRegistry struct {
client *clientv3.Client
options registry.Options
sync.Mutex
sync.RWMutex
register map[string]uint64
leases map[string]clientv3.LeaseID
}
@ -131,62 +132,77 @@ func (e *etcdRegistry) Options() registry.Options {
return e.options
}
func (e *etcdRegistry) Deregister(s *registry.Service) error {
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
e.Lock()
// delete our hash of the service
delete(e.register, s.Name)
// delete our lease of the service
delete(e.leases, s.Name)
e.Unlock()
// check existing lease cache
e.RLock()
leaseID, ok := e.leases[s.Name+node.Id]
e.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
if !ok {
// missing lease, check if the key exists
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
for _, node := range s.Nodes {
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
// look for the existing key
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id))
if err != nil {
return err
}
}
return nil
}
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
// get the existing lease
for _, kv := range rsp.Kvs {
if kv.Lease > 0 {
leaseID = clientv3.LeaseID(kv.Lease)
// create hash of service; uint64
h, err := hash.Hash(node, nil)
if err != nil {
return err
}
// save the info
e.Lock()
e.leases[s.Name+node.Id] = leaseID
e.register[s.Name+node.Id] = h
e.Unlock()
break
}
}
}
var leaseNotFound bool
//refreshing lease if existing
leaseID, ok := e.leases[s.Name]
if ok {
// renew the lease if it exists
if leaseID > 0 {
log.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
if err != rpctypes.ErrLeaseNotFound {
return err
}
log.Tracef("Lease not found for %s %d", s.Name, leaseID)
// lease not found do register
leaseNotFound = true
}
}
// create hash of service; uint64
h, err := hash.Hash(s, nil)
h, err := hash.Hash(node, nil)
if err != nil {
return err
}
// get existing hash
// get existing hash for the service node
e.Lock()
v, ok := e.register[s.Name]
v, ok := e.register[s.Name+node.Id]
e.Unlock()
// the service is unchanged, skip registering
if ok && v == h && !leaseNotFound {
log.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id)
return nil
}
@ -195,6 +211,7 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp
Version: s.Version,
Metadata: s.Metadata,
Endpoints: s.Endpoints,
Nodes: []*registry.Node{node},
}
var options registry.RegisterOptions
@ -207,36 +224,80 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp
var lgr *clientv3.LeaseGrantResponse
if options.TTL.Seconds() > 0 {
// get a lease used to expire keys since we have a ttl
lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds()))
if err != nil {
return err
}
}
log.Tracef("Registering %s id %s with lease %v and ttl %v", service.Name, node.Id, lgr, options.TTL)
// create an entry for the node
if lgr != nil {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
} else {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
}
if err != nil {
return err
}
e.Lock()
// save our hash of the service
e.register[s.Name+node.Id] = h
// save our leaseID of the service
if lgr != nil {
e.leases[s.Name+node.Id] = lgr.ID
}
e.Unlock()
return nil
}
func (e *etcdRegistry) Deregister(s *registry.Service) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
for _, node := range s.Nodes {
service.Nodes = []*registry.Node{node}
if lgr != nil {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
} else {
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
}
e.Lock()
// delete our hash of the service
delete(e.register, s.Name+node.Id)
// delete our lease of the service
delete(e.leases, s.Name+node.Id)
e.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
log.Tracef("Deregistering %s id %s", s.Name, node.Id)
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
if err != nil {
return err
}
}
e.Lock()
// save our hash of the service
e.register[s.Name] = h
// save our leaseID of the service
if lgr != nil {
e.leases[s.Name] = lgr.ID
}
e.Unlock()
return nil
}
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
var gerr error
// register each node individually
for _, node := range s.Nodes {
err := e.registerNode(s, node, opts...)
if err != nil {
gerr = err
}
}
return gerr
}
func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()