diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index d6ee3eb2..82d4714c 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -27,6 +27,7 @@ import ( // registries "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/consul" + "github.com/micro/go-micro/registry/etcd" "github.com/micro/go-micro/registry/gossip" "github.com/micro/go-micro/registry/mdns" rmem "github.com/micro/go-micro/registry/memory" @@ -154,7 +155,7 @@ var ( cli.StringFlag{ Name: "registry", EnvVar: "MICRO_REGISTRY", - Usage: "Registry for discovery. consul, mdns", + Usage: "Registry for discovery. consul, etcd, mdns", }, cli.StringFlag{ Name: "registry_address", @@ -194,6 +195,7 @@ var ( "go.micro.registry": regSrv.NewRegistry, "service": regSrv.NewRegistry, "consul": consul.NewRegistry, + "etcd": etcd.NewRegistry, "gossip": gossip.NewRegistry, "mdns": mdns.NewRegistry, "memory": rmem.NewRegistry, diff --git a/go.mod b/go.mod index 0dfd6091..f5889f4e 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,9 @@ require ( github.com/beevik/ntp v0.2.0 github.com/bitly/go-simplejson v0.5.0 github.com/bwmarrin/discordgo v0.19.0 + github.com/coreos/etcd v3.3.15+incompatible + github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/forestgiant/sliceutil v0.0.0-20160425183142-94783f95db6c github.com/fsnotify/fsnotify v1.4.7 github.com/fsouza/go-dockerclient v1.4.4 @@ -33,6 +36,9 @@ require ( github.com/nlopes/slack v0.6.0 github.com/pkg/errors v0.8.1 github.com/technoweenie/multipartstreamer v1.0.1 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.2.0 // indirect + go.uber.org/zap v1.10.0 // indirect golang.org/x/crypto v0.0.0-20191001170739-f9e2070545dc golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3 google.golang.org/grpc v1.24.0 diff --git a/go.sum b/go.sum index ae387d3e..648484b8 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,12 @@ github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wX github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 h1:4BX8f882bXEDKfWIf0wa8HRvpnBoPszJJXL+TVbBw4M= github.com/containerd/continuity v0.0.0-20181203112020-004b46473808/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= +github.com/coreos/etcd v3.3.15+incompatible h1:+9RjdC18gMxNQVvSiXvObLu29mOFmkgdsB4cRTlV+EE= +github.com/coreos/etcd v3.3.15+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -209,6 +215,12 @@ github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQ github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= +go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go new file mode 100644 index 00000000..495eed91 --- /dev/null +++ b/registry/etcd/etcd.go @@ -0,0 +1,309 @@ +// Package etcd provides an etcd service registry +package etcd + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "path" + "strings" + "sync" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/micro/go-micro/registry" + + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + hash "github.com/mitchellh/hashstructure" +) + +var ( + prefix = "/micro-registry" +) + +type etcdRegistry struct { + client *clientv3.Client + options registry.Options + sync.Mutex + register map[string]uint64 + leases map[string]clientv3.LeaseID +} + +func NewRegistry(opts ...registry.Option) registry.Registry { + e := &etcdRegistry{ + options: registry.Options{}, + register: make(map[string]uint64), + leases: make(map[string]clientv3.LeaseID), + } + configure(e, opts...) + return e +} + +func configure(e *etcdRegistry, opts ...registry.Option) error { + config := clientv3.Config{ + Endpoints: []string{"127.0.0.1:2379"}, + } + + for _, o := range opts { + o(&e.options) + } + + if e.options.Timeout == 0 { + e.options.Timeout = 5 * time.Second + } + + if e.options.Secure || e.options.TLSConfig != nil { + tlsConfig := e.options.TLSConfig + if tlsConfig == nil { + tlsConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } + + config.TLS = tlsConfig + } + + if e.options.Context != nil { + u, ok := e.options.Context.Value(authKey{}).(*authCreds) + if ok { + config.Username = u.Username + config.Password = u.Password + } + } + + var cAddrs []string + + for _, addr := range e.options.Addrs { + if len(addr) == 0 { + continue + } + cAddrs = append(cAddrs, addr) + } + + // if we got addrs then we'll update + if len(cAddrs) > 0 { + config.Endpoints = cAddrs + } + + cli, err := clientv3.New(config) + if err != nil { + return err + } + e.client = cli + return nil +} + +func encode(s *registry.Service) string { + b, _ := json.Marshal(s) + return string(b) +} + +func decode(ds []byte) *registry.Service { + var s *registry.Service + json.Unmarshal(ds, &s) + return s +} + +func nodePath(s, id string) string { + service := strings.Replace(s, "/", "-", -1) + node := strings.Replace(id, "/", "-", -1) + return path.Join(prefix, service, node) +} + +func servicePath(s string) string { + return path.Join(prefix, strings.Replace(s, "/", "-", -1)) +} + +func (e *etcdRegistry) Init(opts ...registry.Option) error { + return configure(e, opts...) +} + +func (e *etcdRegistry) Options() registry.Options { + return e.options +} + +func (e *etcdRegistry) Deregister(s *registry.Service) error { + if len(s.Nodes) == 0 { + return errors.New("Require at least one node") + } + + e.Lock() + // delete our hash of the service + delete(e.register, s.Name) + // delete our lease of the service + delete(e.leases, s.Name) + e.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) + defer cancel() + + for _, node := range s.Nodes { + _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) + if err != nil { + return err + } + } + return nil +} + +func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { + if len(s.Nodes) == 0 { + return errors.New("Require at least one node") + } + + var leaseNotFound bool + //refreshing lease if existing + leaseID, ok := e.leases[s.Name] + if ok { + if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { + if err != rpctypes.ErrLeaseNotFound { + return err + } + + // lease not found do register + leaseNotFound = true + } + } + + // create hash of service; uint64 + h, err := hash.Hash(s, nil) + if err != nil { + return err + } + + // get existing hash + e.Lock() + v, ok := e.register[s.Name] + e.Unlock() + + // the service is unchanged, skip registering + if ok && v == h && !leaseNotFound { + return nil + } + + service := ®istry.Service{ + Name: s.Name, + Version: s.Version, + Metadata: s.Metadata, + Endpoints: s.Endpoints, + } + + var options registry.RegisterOptions + for _, o := range opts { + o(&options) + } + + ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) + defer cancel() + + var lgr *clientv3.LeaseGrantResponse + if options.TTL.Seconds() > 0 { + lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds())) + if err != nil { + return err + } + } + + for _, node := range s.Nodes { + service.Nodes = []*registry.Node{node} + if lgr != nil { + _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) + } else { + _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service)) + } + if err != nil { + return err + } + } + + e.Lock() + // save our hash of the service + e.register[s.Name] = h + // save our leaseID of the service + if lgr != nil { + e.leases[s.Name] = lgr.ID + } + e.Unlock() + + return nil +} + +func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { + ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) + defer cancel() + + rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + if err != nil { + return nil, err + } + + if len(rsp.Kvs) == 0 { + return nil, registry.ErrNotFound + } + + serviceMap := map[string]*registry.Service{} + + for _, n := range rsp.Kvs { + if sn := decode(n.Value); sn != nil { + s, ok := serviceMap[sn.Version] + if !ok { + s = ®istry.Service{ + Name: sn.Name, + Version: sn.Version, + Metadata: sn.Metadata, + Endpoints: sn.Endpoints, + } + serviceMap[s.Version] = s + } + + for _, node := range sn.Nodes { + s.Nodes = append(s.Nodes, node) + } + } + } + + var services []*registry.Service + for _, service := range serviceMap { + services = append(services, service) + } + return services, nil +} + +func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { + var services []*registry.Service + nameSet := make(map[string]struct{}) + + ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) + defer cancel() + + rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) + if err != nil { + return nil, err + } + + if len(rsp.Kvs) == 0 { + return []*registry.Service{}, nil + } + + for _, n := range rsp.Kvs { + if sn := decode(n.Value); sn != nil { + nameSet[sn.Name] = struct{}{} + } + } + for k := range nameSet { + service := ®istry.Service{} + service.Name = k + services = append(services, service) + } + + return services, nil +} + +func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + return newEtcdWatcher(e, e.options.Timeout, opts...) +} + +func (e *etcdRegistry) String() string { + return "etcd" +} diff --git a/registry/etcd/options.go b/registry/etcd/options.go new file mode 100644 index 00000000..46bacc09 --- /dev/null +++ b/registry/etcd/options.go @@ -0,0 +1,24 @@ +package etcd + +import ( + "context" + + "github.com/micro/go-micro/registry" +) + +type authKey struct{} + +type authCreds struct { + Username string + Password string +} + +// Auth allows you to specify username/password +func Auth(username, password string) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, authKey{}, &authCreds{Username: username, Password: password}) + } +} diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go new file mode 100644 index 00000000..4eef76ba --- /dev/null +++ b/registry/etcd/watcher.go @@ -0,0 +1,88 @@ +package etcd + +import ( + "context" + "errors" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/micro/go-micro/registry" +) + +type etcdWatcher struct { + stop chan bool + w clientv3.WatchChan + client *clientv3.Client + timeout time.Duration +} + +func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) { + var wo registry.WatchOptions + for _, o := range opts { + o(&wo) + } + + ctx, cancel := context.WithCancel(context.Background()) + stop := make(chan bool, 1) + + go func() { + <-stop + cancel() + }() + + watchPath := prefix + if len(wo.Service) > 0 { + watchPath = servicePath(wo.Service) + "/" + } + + return &etcdWatcher{ + stop: stop, + w: r.client.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()), + client: r.client, + timeout: timeout, + }, nil +} + +func (ew *etcdWatcher) Next() (*registry.Result, error) { + for wresp := range ew.w { + if wresp.Err() != nil { + return nil, wresp.Err() + } + for _, ev := range wresp.Events { + service := decode(ev.Kv.Value) + var action string + + switch ev.Type { + case clientv3.EventTypePut: + if ev.IsCreate() { + action = "create" + } else if ev.IsModify() { + action = "update" + } + case clientv3.EventTypeDelete: + action = "delete" + + // get service from prevKv + service = decode(ev.PrevKv.Value) + } + + if service == nil { + continue + } + return ®istry.Result{ + Action: action, + Service: service, + }, nil + } + } + return nil, errors.New("could not get next") +} + +func (ew *etcdWatcher) Stop() { + select { + case <-ew.stop: + return + default: + close(ew.stop) + } +}