diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index cf38eb44..bcf26d4d 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -7,8 +7,9 @@ import ( "strings" "sync" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/myodc/go-micro/registry" + "golang.org/x/net/context" ) var ( @@ -16,7 +17,7 @@ var ( ) type etcdRegistry struct { - client *etcd.Client + client etcd.KeysAPI sync.RWMutex services map[string]*registry.Service @@ -49,13 +50,13 @@ func (e *etcdRegistry) Deregister(s *registry.Service) error { } for _, node := range s.Nodes { - _, err := e.client.Delete(nodePath(s.Name, node.Id), false) + _, err := e.client.Delete(context.Background(), nodePath(s.Name, node.Id), &etcd.DeleteOptions{Recursive: false}) if err != nil { return err } } - e.client.DeleteDir(servicePath(s.Name)) + e.client.Delete(context.Background(), servicePath(s.Name), &etcd.DeleteOptions{Dir: true}) return nil } @@ -69,11 +70,11 @@ func (e *etcdRegistry) Register(s *registry.Service) error { Metadata: s.Metadata, } - e.client.CreateDir(servicePath(s.Name), 0) + e.client.Set(context.Background(), servicePath(s.Name), "", &etcd.SetOptions{Dir: true}) for _, node := range s.Nodes { service.Nodes = []*registry.Node{node} - _, err := e.client.Create(nodePath(service.Name, node.Id), encode(service), 0) + _, err := e.client.Set(context.Background(), nodePath(service.Name, node.Id), encode(service), &etcd.SetOptions{}) if err != nil { return err } @@ -91,7 +92,7 @@ func (e *etcdRegistry) GetService(name string) (*registry.Service, error) { return service, nil } - rsp, err := e.client.Get(servicePath(name), false, false) + rsp, err := e.client.Get(context.Background(), servicePath(name), &etcd.GetOptions{}) if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") { return nil, err } @@ -103,6 +104,8 @@ func (e *etcdRegistry) GetService(name string) (*registry.Service, error) { continue } sn := decode(n.Value) + s.Name = sn.Name + s.Metadata = sn.Metadata for _, node := range sn.Nodes { s.Nodes = append(s.Nodes, node) } @@ -125,7 +128,7 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { return services, nil } - rsp, err := e.client.Get(prefix, true, true) + rsp, err := e.client.Get(context.Background(), prefix, &etcd.GetOptions{Recursive: true, Sort: true}) if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") { return nil, err } @@ -166,8 +169,12 @@ func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { cAddrs = []string{"http://127.0.0.1:2379"} } + c, _ := etcd.New(etcd.Config{ + Endpoints: cAddrs, + }) + e := &etcdRegistry{ - client: etcd.NewClient(cAddrs), + client: etcd.NewKeysAPI(c), services: make(map[string]*registry.Service), } diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go index 4e8fa55b..0dee8a56 100644 --- a/registry/etcd/watcher.go +++ b/registry/etcd/watcher.go @@ -1,8 +1,9 @@ package etcd import ( - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/myodc/go-micro/registry" + "golang.org/x/net/context" ) type etcdWatcher struct { @@ -16,16 +17,28 @@ func newEtcdWatcher(r *etcdRegistry) (registry.Watcher, error) { stop: make(chan bool), } - ch := make(chan *etcd.Response) + w := r.client.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true}) - go r.client.Watch(prefix, 0, true, ch, ew.stop) - go ew.watch(ch) + c := context.Background() + ctx, cancel := context.WithCancel(c) + + go func() { + <-ew.stop + cancel() + }() + + go ew.watch(ctx, w) return ew, nil } -func (e *etcdWatcher) watch(ch chan *etcd.Response) { - for rsp := range ch { +func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) { + for { + rsp, err := w.Next(ctx) + if err != nil && ctx.Err() != nil { + return + } + if rsp.Node.Dir { continue }