// Package etcd provides an etcd service registry package etcd import ( "context" "crypto/tls" "encoding/json" "errors" "path" "strings" "sync" "time" "github.com/coreos/etcd/clientv3" "github.com/micro/go-micro/registry" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" hash "github.com/mitchellh/hashstructure" ) var ( prefix = "/micro/registry" ) type etcdRegistry struct { client *clientv3.Client options registry.Options sync.Mutex register map[string]uint64 leases map[string]clientv3.LeaseID } func NewRegistry(opts ...registry.Option) registry.Registry { e := &etcdRegistry{ options: registry.Options{}, register: make(map[string]uint64), leases: make(map[string]clientv3.LeaseID), } configure(e, opts...) return e } func configure(e *etcdRegistry, opts ...registry.Option) error { config := clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, } for _, o := range opts { o(&e.options) } if e.options.Timeout == 0 { e.options.Timeout = 5 * time.Second } if e.options.Secure || e.options.TLSConfig != nil { tlsConfig := e.options.TLSConfig if tlsConfig == nil { tlsConfig = &tls.Config{ InsecureSkipVerify: true, } } config.TLS = tlsConfig } if e.options.Context != nil { u, ok := e.options.Context.Value(authKey{}).(*authCreds) if ok { config.Username = u.Username config.Password = u.Password } } var cAddrs []string for _, addr := range e.options.Addrs { if len(addr) == 0 { continue } cAddrs = append(cAddrs, addr) } // if we got addrs then we'll update if len(cAddrs) > 0 { config.Endpoints = cAddrs } cli, err := clientv3.New(config) if err != nil { return err } e.client = cli return nil } func encode(s *registry.Service) string { b, _ := json.Marshal(s) return string(b) } func decode(ds []byte) *registry.Service { var s *registry.Service json.Unmarshal(ds, &s) return s } func nodePath(s, id string) string { service := strings.Replace(s, "/", "-", -1) node := strings.Replace(id, "/", "-", -1) return path.Join(prefix, service, node) } func servicePath(s string) string { return path.Join(prefix, strings.Replace(s, "/", "-", -1)) } func (e *etcdRegistry) Init(opts ...registry.Option) error { return configure(e, opts...) } func (e *etcdRegistry) Options() registry.Options { return e.options } func (e *etcdRegistry) Deregister(s *registry.Service) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } e.Lock() // delete our hash of the service delete(e.register, s.Name) // delete our lease of the service delete(e.leases, s.Name) e.Unlock() ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() for _, node := range s.Nodes { _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) if err != nil { return err } } return nil } func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } var leaseNotFound bool //refreshing lease if existing leaseID, ok := e.leases[s.Name] if ok { if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { if err != rpctypes.ErrLeaseNotFound { return err } // lease not found do register leaseNotFound = true } } // create hash of service; uint64 h, err := hash.Hash(s, nil) if err != nil { return err } // get existing hash e.Lock() v, ok := e.register[s.Name] e.Unlock() // the service is unchanged, skip registering if ok && v == h && !leaseNotFound { return nil } service := ®istry.Service{ Name: s.Name, Version: s.Version, Metadata: s.Metadata, Endpoints: s.Endpoints, } var options registry.RegisterOptions for _, o := range opts { o(&options) } ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() var lgr *clientv3.LeaseGrantResponse if options.TTL.Seconds() > 0 { lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds())) if err != nil { return err } } for _, node := range s.Nodes { service.Nodes = []*registry.Node{node} if lgr != nil { _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) } else { _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) } if err != nil { return err } } e.Lock() // save our hash of the service e.register[s.Name] = h // save our leaseID of the service if lgr != nil { e.leases[s.Name] = lgr.ID } e.Unlock() return nil } func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) if err != nil { return nil, err } if len(rsp.Kvs) == 0 { return nil, registry.ErrNotFound } serviceMap := map[string]*registry.Service{} for _, n := range rsp.Kvs { if sn := decode(n.Value); sn != nil { s, ok := serviceMap[sn.Version] if !ok { s = ®istry.Service{ Name: sn.Name, Version: sn.Version, Metadata: sn.Metadata, Endpoints: sn.Endpoints, } serviceMap[s.Version] = s } for _, node := range sn.Nodes { s.Nodes = append(s.Nodes, node) } } } var services []*registry.Service for _, service := range serviceMap { services = append(services, service) } return services, nil } func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { var services []*registry.Service nameSet := make(map[string]struct{}) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) if err != nil { return nil, err } if len(rsp.Kvs) == 0 { return []*registry.Service{}, nil } for _, n := range rsp.Kvs { if sn := decode(n.Value); sn != nil { nameSet[sn.Name] = struct{}{} } } for k := range nameSet { service := ®istry.Service{} service.Name = k services = append(services, service) } return services, nil } func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { return newEtcdWatcher(e, e.options.Timeout, opts...) } func (e *etcdRegistry) String() string { return "etcd" }