From 4469a41ae7a9fea65b8a881ed186094194080009 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 8 Aug 2020 01:40:41 +0100 Subject: [PATCH] use a totally different client for the watcher in etcd --- registry/etcd/etcd.go | 30 +++++++++++++++++++++++------- registry/etcd/watcher.go | 26 +++++++++++++------------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index 36f67dca..55b4ada9 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -51,13 +51,10 @@ func NewRegistry(opts ...registry.Option) registry.Registry { return e } -func configure(e *etcdRegistry, opts ...registry.Option) error { +func newClient(e *etcdRegistry) (*clientv3.Client, error) { config := clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, } - for _, o := range opts { - o(&e.options) - } if e.options.Timeout == 0 { e.options.Timeout = 5 * time.Second @@ -117,17 +114,32 @@ func configure(e *etcdRegistry, opts ...registry.Option) error { } cli, err := clientv3.New(config) + if err != nil { + return nil, err + } + + return cli, nil +} + +// configure will setup the registry with new options +func configure(e *etcdRegistry, opts ...registry.Option) error { + for _, o := range opts { + o(&e.options) + } + + // setup the client + cli, err := newClient(e) if err != nil { return err } - // close the existing client if e.client != nil { e.client.Close() } - // set the new client + // setup new client e.client = cli + return nil } @@ -566,7 +578,11 @@ func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se } func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - return newEtcdWatcher(e, e.options.Timeout, opts...) + cli, err := newClient(e) + if err != nil { + return nil, err + } + return newEtcdWatcher(cli, e.options.Timeout, opts...) } func (e *etcdRegistry) String() string { diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go index ba7df9bc..ff1c6065 100644 --- a/registry/etcd/watcher.go +++ b/registry/etcd/watcher.go @@ -15,11 +15,12 @@ type etcdWatcher struct { client *clientv3.Client timeout time.Duration - mtx sync.Mutex - stop chan bool + mtx sync.Mutex + stop chan bool + cancel func() } -func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) { +func newEtcdWatcher(c *clientv3.Client, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) { var wo registry.WatchOptions for _, o := range opts { o(&wo) @@ -28,14 +29,6 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat wo.Domain = defaultDomain } - ctx, cancel := context.WithCancel(context.Background()) - stop := make(chan bool, 1) - - go func() { - <-stop - cancel() - }() - watchPath := prefix if wo.Domain == registry.WildcardDomain { if len(wo.Service) > 0 { @@ -46,10 +39,15 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat watchPath = servicePath(wo.Domain, wo.Service) + "/" } + ctx, cancel := context.WithCancel(context.Background()) + w := c.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()) + stop := make(chan bool, 1) + return &etcdWatcher{ + cancel: cancel, stop: stop, - w: r.client.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()), - client: r.client, + w: w, + client: c, timeout: timeout, }, nil } @@ -101,5 +99,7 @@ func (ew *etcdWatcher) Stop() { return default: close(ew.stop) + ew.cancel() + ew.client.Close() } }