use a totally different client for the watcher in etcd

This commit is contained in:
Asim Aslam 2020-08-08 01:40:41 +01:00
parent fc67593ee4
commit 4469a41ae7
2 changed files with 36 additions and 20 deletions

View File

@ -51,13 +51,10 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
return e
}
func configure(e *etcdRegistry, opts ...registry.Option) error {
func newClient(e *etcdRegistry) (*clientv3.Client, error) {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
}
for _, o := range opts {
o(&e.options)
}
if e.options.Timeout == 0 {
e.options.Timeout = 5 * time.Second
@ -117,17 +114,32 @@ func configure(e *etcdRegistry, opts ...registry.Option) error {
}
cli, err := clientv3.New(config)
if err != nil {
return nil, err
}
return cli, nil
}
// configure will setup the registry with new options
func configure(e *etcdRegistry, opts ...registry.Option) error {
for _, o := range opts {
o(&e.options)
}
// setup the client
cli, err := newClient(e)
if err != nil {
return err
}
// close the existing client
if e.client != nil {
e.client.Close()
}
// set the new client
// setup new client
e.client = cli
return nil
}
@ -566,7 +578,11 @@ func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
}
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
return newEtcdWatcher(e, e.options.Timeout, opts...)
cli, err := newClient(e)
if err != nil {
return nil, err
}
return newEtcdWatcher(cli, e.options.Timeout, opts...)
}
func (e *etcdRegistry) String() string {

View File

@ -15,11 +15,12 @@ type etcdWatcher struct {
client *clientv3.Client
timeout time.Duration
mtx sync.Mutex
stop chan bool
mtx sync.Mutex
stop chan bool
cancel func()
}
func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) {
func newEtcdWatcher(c *clientv3.Client, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
@ -28,14 +29,6 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat
wo.Domain = defaultDomain
}
ctx, cancel := context.WithCancel(context.Background())
stop := make(chan bool, 1)
go func() {
<-stop
cancel()
}()
watchPath := prefix
if wo.Domain == registry.WildcardDomain {
if len(wo.Service) > 0 {
@ -46,10 +39,15 @@ func newEtcdWatcher(r *etcdRegistry, timeout time.Duration, opts ...registry.Wat
watchPath = servicePath(wo.Domain, wo.Service) + "/"
}
ctx, cancel := context.WithCancel(context.Background())
w := c.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV())
stop := make(chan bool, 1)
return &etcdWatcher{
cancel: cancel,
stop: stop,
w: r.client.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV()),
client: r.client,
w: w,
client: c,
timeout: timeout,
}, nil
}
@ -101,5 +99,7 @@ func (ew *etcdWatcher) Stop() {
return
default:
close(ew.stop)
ew.cancel()
ew.client.Close()
}
}