First commit to add etcd registry support
This commit is contained in:
		
							
								
								
									
										309
									
								
								registry/etcd/etcd.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										309
									
								
								registry/etcd/etcd.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,309 @@ | ||||
| // Package etcd provides an etcd service registry | ||||
| package etcd | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"path" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/coreos/etcd/clientv3" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
|  | ||||
| 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" | ||||
| 	hash "github.com/mitchellh/hashstructure" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	prefix = "/micro-registry" | ||||
| ) | ||||
|  | ||||
| type etcdRegistry struct { | ||||
| 	client  *clientv3.Client | ||||
| 	options registry.Options | ||||
| 	sync.Mutex | ||||
| 	register map[string]uint64 | ||||
| 	leases   map[string]clientv3.LeaseID | ||||
| } | ||||
|  | ||||
| func NewRegistry(opts ...registry.Option) registry.Registry { | ||||
| 	e := &etcdRegistry{ | ||||
| 		options:  registry.Options{}, | ||||
| 		register: make(map[string]uint64), | ||||
| 		leases:   make(map[string]clientv3.LeaseID), | ||||
| 	} | ||||
| 	configure(e, opts...) | ||||
| 	return e | ||||
| } | ||||
|  | ||||
| func configure(e *etcdRegistry, opts ...registry.Option) error { | ||||
| 	config := clientv3.Config{ | ||||
| 		Endpoints: []string{"127.0.0.1:2379"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&e.options) | ||||
| 	} | ||||
|  | ||||
| 	if e.options.Timeout == 0 { | ||||
| 		e.options.Timeout = 5 * time.Second | ||||
| 	} | ||||
|  | ||||
| 	if e.options.Secure || e.options.TLSConfig != nil { | ||||
| 		tlsConfig := e.options.TLSConfig | ||||
| 		if tlsConfig == nil { | ||||
| 			tlsConfig = &tls.Config{ | ||||
| 				InsecureSkipVerify: true, | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		config.TLS = tlsConfig | ||||
| 	} | ||||
|  | ||||
| 	if e.options.Context != nil { | ||||
| 		u, ok := e.options.Context.Value(authKey{}).(*authCreds) | ||||
| 		if ok { | ||||
| 			config.Username = u.Username | ||||
| 			config.Password = u.Password | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	var cAddrs []string | ||||
|  | ||||
| 	for _, addr := range e.options.Addrs { | ||||
| 		if len(addr) == 0 { | ||||
| 			continue | ||||
| 		} | ||||
| 		cAddrs = append(cAddrs, addr) | ||||
| 	} | ||||
|  | ||||
| 	// if we got addrs then we'll update | ||||
| 	if len(cAddrs) > 0 { | ||||
| 		config.Endpoints = cAddrs | ||||
| 	} | ||||
|  | ||||
| 	cli, err := clientv3.New(config) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	e.client = cli | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func encode(s *registry.Service) string { | ||||
| 	b, _ := json.Marshal(s) | ||||
| 	return string(b) | ||||
| } | ||||
|  | ||||
| func decode(ds []byte) *registry.Service { | ||||
| 	var s *registry.Service | ||||
| 	json.Unmarshal(ds, &s) | ||||
| 	return s | ||||
| } | ||||
|  | ||||
| func nodePath(s, id string) string { | ||||
| 	service := strings.Replace(s, "/", "-", -1) | ||||
| 	node := strings.Replace(id, "/", "-", -1) | ||||
| 	return path.Join(prefix, service, node) | ||||
| } | ||||
|  | ||||
| func servicePath(s string) string { | ||||
| 	return path.Join(prefix, strings.Replace(s, "/", "-", -1)) | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Init(opts ...registry.Option) error { | ||||
| 	return configure(e, opts...) | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Options() registry.Options { | ||||
| 	return e.options | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Deregister(s *registry.Service) error { | ||||
| 	if len(s.Nodes) == 0 { | ||||
| 		return errors.New("Require at least one node") | ||||
| 	} | ||||
|  | ||||
| 	e.Lock() | ||||
| 	// delete our hash of the service | ||||
| 	delete(e.register, s.Name) | ||||
| 	// delete our lease of the service | ||||
| 	delete(e.leases, s.Name) | ||||
| 	e.Unlock() | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	for _, node := range s.Nodes { | ||||
| 		_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { | ||||
| 	if len(s.Nodes) == 0 { | ||||
| 		return errors.New("Require at least one node") | ||||
| 	} | ||||
|  | ||||
| 	var leaseNotFound bool | ||||
| 	//refreshing lease if existing | ||||
| 	leaseID, ok := e.leases[s.Name] | ||||
| 	if ok { | ||||
| 		if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { | ||||
| 			if err != rpctypes.ErrLeaseNotFound { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			// lease not found do register | ||||
| 			leaseNotFound = true | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// create hash of service; uint64 | ||||
| 	h, err := hash.Hash(s, nil) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// get existing hash | ||||
| 	e.Lock() | ||||
| 	v, ok := e.register[s.Name] | ||||
| 	e.Unlock() | ||||
|  | ||||
| 	// the service is unchanged, skip registering | ||||
| 	if ok && v == h && !leaseNotFound { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	service := ®istry.Service{ | ||||
| 		Name:      s.Name, | ||||
| 		Version:   s.Version, | ||||
| 		Metadata:  s.Metadata, | ||||
| 		Endpoints: s.Endpoints, | ||||
| 	} | ||||
|  | ||||
| 	var options registry.RegisterOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	var lgr *clientv3.LeaseGrantResponse | ||||
| 	if options.TTL.Seconds() > 0 { | ||||
| 		lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds())) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for _, node := range s.Nodes { | ||||
| 		service.Nodes = []*registry.Node{node} | ||||
| 		if lgr != nil { | ||||
| 			_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) | ||||
| 		} else { | ||||
| 			_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	e.Lock() | ||||
| 	// save our hash of the service | ||||
| 	e.register[s.Name] = h | ||||
| 	// save our leaseID of the service | ||||
| 	if lgr != nil { | ||||
| 		e.leases[s.Name] = lgr.ID | ||||
| 	} | ||||
| 	e.Unlock() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(rsp.Kvs) == 0 { | ||||
| 		return nil, registry.ErrNotFound | ||||
| 	} | ||||
|  | ||||
| 	serviceMap := map[string]*registry.Service{} | ||||
|  | ||||
| 	for _, n := range rsp.Kvs { | ||||
| 		if sn := decode(n.Value); sn != nil { | ||||
| 			s, ok := serviceMap[sn.Version] | ||||
| 			if !ok { | ||||
| 				s = ®istry.Service{ | ||||
| 					Name:      sn.Name, | ||||
| 					Version:   sn.Version, | ||||
| 					Metadata:  sn.Metadata, | ||||
| 					Endpoints: sn.Endpoints, | ||||
| 				} | ||||
| 				serviceMap[s.Version] = s | ||||
| 			} | ||||
|  | ||||
| 			for _, node := range sn.Nodes { | ||||
| 				s.Nodes = append(s.Nodes, node) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	var services []*registry.Service | ||||
| 	for _, service := range serviceMap { | ||||
| 		services = append(services, service) | ||||
| 	} | ||||
| 	return services, nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { | ||||
| 	var services []*registry.Service | ||||
| 	nameSet := make(map[string]struct{}) | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(rsp.Kvs) == 0 { | ||||
| 		return []*registry.Service{}, nil | ||||
| 	} | ||||
|  | ||||
| 	for _, n := range rsp.Kvs { | ||||
| 		if sn := decode(n.Value); sn != nil { | ||||
| 			nameSet[sn.Name] = struct{}{} | ||||
| 		} | ||||
| 	} | ||||
| 	for k := range nameSet { | ||||
| 		service := ®istry.Service{} | ||||
| 		service.Name = k | ||||
| 		services = append(services, service) | ||||
| 	} | ||||
|  | ||||
| 	return services, nil | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { | ||||
| 	return newEtcdWatcher(e, e.options.Timeout, opts...) | ||||
| } | ||||
|  | ||||
| func (e *etcdRegistry) String() string { | ||||
| 	return "etcd" | ||||
| } | ||||
		Reference in New Issue
	
	Block a user