Fix etcd registry lease processing and suppression
This commit is contained in:
		| @@ -13,9 +13,9 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/coreos/etcd/clientv3" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
|  | ||||
| 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| 	hash "github.com/mitchellh/hashstructure" | ||||
| ) | ||||
|  | ||||
| @@ -26,7 +26,8 @@ var ( | ||||
| type etcdRegistry struct { | ||||
| 	client  *clientv3.Client | ||||
| 	options registry.Options | ||||
| 	sync.Mutex | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	register map[string]uint64 | ||||
| 	leases   map[string]clientv3.LeaseID | ||||
| } | ||||
| @@ -131,62 +132,77 @@ func (e *etcdRegistry) Options() registry.Options { | ||||
| 	return e.options | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Deregister(s *registry.Service) error { | ||||
| func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error { | ||||
| 	if len(s.Nodes) == 0 { | ||||
| 		return errors.New("Require at least one node") | ||||
| 	} | ||||
|  | ||||
| 	e.Lock() | ||||
| 	// delete our hash of the service | ||||
| 	delete(e.register, s.Name) | ||||
| 	// delete our lease of the service | ||||
| 	delete(e.leases, s.Name) | ||||
| 	e.Unlock() | ||||
| 	// check existing lease cache | ||||
| 	e.RLock() | ||||
| 	leaseID, ok := e.leases[s.Name+node.Id] | ||||
| 	e.RUnlock() | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 	defer cancel() | ||||
| 	if !ok { | ||||
| 		// missing lease, check if the key exists | ||||
| 		ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 		defer cancel() | ||||
|  | ||||
| 	for _, node := range s.Nodes { | ||||
| 		_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) | ||||
| 		// look for the existing key | ||||
| 		rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { | ||||
| 	if len(s.Nodes) == 0 { | ||||
| 		return errors.New("Require at least one node") | ||||
| 		// get the existing lease | ||||
| 		for _, kv := range rsp.Kvs { | ||||
| 			if kv.Lease > 0 { | ||||
| 				leaseID = clientv3.LeaseID(kv.Lease) | ||||
| 				// create hash of service; uint64 | ||||
| 				h, err := hash.Hash(node, nil) | ||||
| 				if err != nil { | ||||
| 					return err | ||||
| 				} | ||||
|  | ||||
| 				// save the info | ||||
| 				e.Lock() | ||||
| 				e.leases[s.Name+node.Id] = leaseID | ||||
| 				e.register[s.Name+node.Id] = h | ||||
| 				e.Unlock() | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	var leaseNotFound bool | ||||
| 	//refreshing lease if existing | ||||
| 	leaseID, ok := e.leases[s.Name] | ||||
| 	if ok { | ||||
|  | ||||
| 	// renew the lease if it exists | ||||
| 	if leaseID > 0 { | ||||
| 		log.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) | ||||
| 		if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { | ||||
| 			if err != rpctypes.ErrLeaseNotFound { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			log.Tracef("Lease not found for %s %d", s.Name, leaseID) | ||||
| 			// lease not found do register | ||||
| 			leaseNotFound = true | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// create hash of service; uint64 | ||||
| 	h, err := hash.Hash(s, nil) | ||||
| 	h, err := hash.Hash(node, nil) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// get existing hash | ||||
| 	// get existing hash for the service node | ||||
| 	e.Lock() | ||||
| 	v, ok := e.register[s.Name] | ||||
| 	v, ok := e.register[s.Name+node.Id] | ||||
| 	e.Unlock() | ||||
|  | ||||
| 	// the service is unchanged, skip registering | ||||
| 	if ok && v == h && !leaseNotFound { | ||||
| 		log.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| @@ -195,6 +211,7 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp | ||||
| 		Version:   s.Version, | ||||
| 		Metadata:  s.Metadata, | ||||
| 		Endpoints: s.Endpoints, | ||||
| 		Nodes:     []*registry.Node{node}, | ||||
| 	} | ||||
|  | ||||
| 	var options registry.RegisterOptions | ||||
| @@ -207,36 +224,80 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp | ||||
|  | ||||
| 	var lgr *clientv3.LeaseGrantResponse | ||||
| 	if options.TTL.Seconds() > 0 { | ||||
| 		// get a lease used to expire keys since we have a ttl | ||||
| 		lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds())) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	log.Tracef("Registering %s id %s with lease %v and ttl %v", service.Name, node.Id, lgr, options.TTL) | ||||
| 	// create an entry for the node | ||||
| 	if lgr != nil { | ||||
| 		_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) | ||||
| 	} else { | ||||
| 		_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	e.Lock() | ||||
| 	// save our hash of the service | ||||
| 	e.register[s.Name+node.Id] = h | ||||
| 	// save our leaseID of the service | ||||
| 	if lgr != nil { | ||||
| 		e.leases[s.Name+node.Id] = lgr.ID | ||||
| 	} | ||||
| 	e.Unlock() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Deregister(s *registry.Service) error { | ||||
| 	if len(s.Nodes) == 0 { | ||||
| 		return errors.New("Require at least one node") | ||||
| 	} | ||||
|  | ||||
| 	for _, node := range s.Nodes { | ||||
| 		service.Nodes = []*registry.Node{node} | ||||
| 		if lgr != nil { | ||||
| 			_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) | ||||
| 		} else { | ||||
| 			_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) | ||||
| 		} | ||||
| 		e.Lock() | ||||
| 		// delete our hash of the service | ||||
| 		delete(e.register, s.Name+node.Id) | ||||
| 		// delete our lease of the service | ||||
| 		delete(e.leases, s.Name+node.Id) | ||||
| 		e.Unlock() | ||||
|  | ||||
| 		ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 		defer cancel() | ||||
|  | ||||
| 		log.Tracef("Deregistering %s id %s", s.Name, node.Id) | ||||
| 		_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	e.Lock() | ||||
| 	// save our hash of the service | ||||
| 	e.register[s.Name] = h | ||||
| 	// save our leaseID of the service | ||||
| 	if lgr != nil { | ||||
| 		e.leases[s.Name] = lgr.ID | ||||
| 	} | ||||
| 	e.Unlock() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { | ||||
| 	if len(s.Nodes) == 0 { | ||||
| 		return errors.New("Require at least one node") | ||||
| 	} | ||||
|  | ||||
| 	var gerr error | ||||
|  | ||||
| 	// register each node individually | ||||
| 	for _, node := range s.Nodes { | ||||
| 		err := e.registerNode(s, node, opts...) | ||||
| 		if err != nil { | ||||
| 			gerr = err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return gerr | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 	defer cancel() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user