initial import from go-micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
105
watcher.go
Normal file
105
watcher.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
)
|
||||
|
||||
type etcdWatcher struct {
|
||||
w clientv3.WatchChan
|
||||
client *clientv3.Client
|
||||
timeout time.Duration
|
||||
|
||||
mtx sync.Mutex
|
||||
stop chan bool
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func newEtcdWatcher(c *clientv3.Client, timeout time.Duration, opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
if len(wo.Domain) == 0 {
|
||||
wo.Domain = defaultDomain
|
||||
}
|
||||
|
||||
watchPath := prefix
|
||||
if wo.Domain == registry.WildcardDomain {
|
||||
if len(wo.Service) > 0 {
|
||||
return nil, errors.New("Cannot watch a service across domains")
|
||||
}
|
||||
watchPath = prefix
|
||||
} else if len(wo.Service) > 0 {
|
||||
watchPath = servicePath(wo.Domain, wo.Service) + "/"
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
w := c.Watch(ctx, watchPath, clientv3.WithPrefix(), clientv3.WithPrevKV())
|
||||
stop := make(chan bool, 1)
|
||||
|
||||
return &etcdWatcher{
|
||||
cancel: cancel,
|
||||
stop: stop,
|
||||
w: w,
|
||||
client: c,
|
||||
timeout: timeout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ew *etcdWatcher) Next() (*registry.Result, error) {
|
||||
for wresp := range ew.w {
|
||||
if wresp.Err() != nil {
|
||||
return nil, wresp.Err()
|
||||
}
|
||||
if wresp.Canceled {
|
||||
return nil, errors.New("could not get next")
|
||||
}
|
||||
for _, ev := range wresp.Events {
|
||||
service := decode(ev.Kv.Value)
|
||||
var action string
|
||||
|
||||
switch ev.Type {
|
||||
case clientv3.EventTypePut:
|
||||
if ev.IsCreate() {
|
||||
action = "create"
|
||||
} else if ev.IsModify() {
|
||||
action = "update"
|
||||
}
|
||||
case clientv3.EventTypeDelete:
|
||||
action = "delete"
|
||||
|
||||
// get service from prevKv
|
||||
service = decode(ev.PrevKv.Value)
|
||||
}
|
||||
|
||||
if service == nil {
|
||||
continue
|
||||
}
|
||||
return ®istry.Result{
|
||||
Action: action,
|
||||
Service: service,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("could not get next")
|
||||
}
|
||||
|
||||
func (ew *etcdWatcher) Stop() {
|
||||
ew.mtx.Lock()
|
||||
defer ew.mtx.Unlock()
|
||||
|
||||
select {
|
||||
case <-ew.stop:
|
||||
return
|
||||
default:
|
||||
close(ew.stop)
|
||||
ew.cancel()
|
||||
ew.client.Close()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user