update etcd registry to use github.com/coreos/etcd/client

This commit is contained in:
Asim 2015-09-26 17:09:03 +01:00
parent 8b2ffc40d2
commit ac995ba983
2 changed files with 35 additions and 15 deletions

View File

@ -7,8 +7,9 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/coreos/go-etcd/etcd" etcd "github.com/coreos/etcd/client"
"github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/registry"
"golang.org/x/net/context"
) )
var ( var (
@ -16,7 +17,7 @@ var (
) )
type etcdRegistry struct { type etcdRegistry struct {
client *etcd.Client client etcd.KeysAPI
sync.RWMutex sync.RWMutex
services map[string]*registry.Service services map[string]*registry.Service
@ -49,13 +50,13 @@ func (e *etcdRegistry) Deregister(s *registry.Service) error {
} }
for _, node := range s.Nodes { for _, node := range s.Nodes {
_, err := e.client.Delete(nodePath(s.Name, node.Id), false) _, err := e.client.Delete(context.Background(), nodePath(s.Name, node.Id), &etcd.DeleteOptions{Recursive: false})
if err != nil { if err != nil {
return err return err
} }
} }
e.client.DeleteDir(servicePath(s.Name)) e.client.Delete(context.Background(), servicePath(s.Name), &etcd.DeleteOptions{Dir: true})
return nil return nil
} }
@ -69,11 +70,11 @@ func (e *etcdRegistry) Register(s *registry.Service) error {
Metadata: s.Metadata, Metadata: s.Metadata,
} }
e.client.CreateDir(servicePath(s.Name), 0) e.client.Set(context.Background(), servicePath(s.Name), "", &etcd.SetOptions{Dir: true})
for _, node := range s.Nodes { for _, node := range s.Nodes {
service.Nodes = []*registry.Node{node} service.Nodes = []*registry.Node{node}
_, err := e.client.Create(nodePath(service.Name, node.Id), encode(service), 0) _, err := e.client.Set(context.Background(), nodePath(service.Name, node.Id), encode(service), &etcd.SetOptions{})
if err != nil { if err != nil {
return err return err
} }
@ -91,7 +92,7 @@ func (e *etcdRegistry) GetService(name string) (*registry.Service, error) {
return service, nil return service, nil
} }
rsp, err := e.client.Get(servicePath(name), false, false) rsp, err := e.client.Get(context.Background(), servicePath(name), &etcd.GetOptions{})
if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") { if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") {
return nil, err return nil, err
} }
@ -103,6 +104,8 @@ func (e *etcdRegistry) GetService(name string) (*registry.Service, error) {
continue continue
} }
sn := decode(n.Value) sn := decode(n.Value)
s.Name = sn.Name
s.Metadata = sn.Metadata
for _, node := range sn.Nodes { for _, node := range sn.Nodes {
s.Nodes = append(s.Nodes, node) s.Nodes = append(s.Nodes, node)
} }
@ -125,7 +128,7 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
return services, nil return services, nil
} }
rsp, err := e.client.Get(prefix, true, true) rsp, err := e.client.Get(context.Background(), prefix, &etcd.GetOptions{Recursive: true, Sort: true})
if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") { if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") {
return nil, err return nil, err
} }
@ -166,8 +169,12 @@ func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry {
cAddrs = []string{"http://127.0.0.1:2379"} cAddrs = []string{"http://127.0.0.1:2379"}
} }
c, _ := etcd.New(etcd.Config{
Endpoints: cAddrs,
})
e := &etcdRegistry{ e := &etcdRegistry{
client: etcd.NewClient(cAddrs), client: etcd.NewKeysAPI(c),
services: make(map[string]*registry.Service), services: make(map[string]*registry.Service),
} }

View File

@ -1,8 +1,9 @@
package etcd package etcd
import ( import (
"github.com/coreos/go-etcd/etcd" etcd "github.com/coreos/etcd/client"
"github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/registry"
"golang.org/x/net/context"
) )
type etcdWatcher struct { type etcdWatcher struct {
@ -16,16 +17,28 @@ func newEtcdWatcher(r *etcdRegistry) (registry.Watcher, error) {
stop: make(chan bool), stop: make(chan bool),
} }
ch := make(chan *etcd.Response) w := r.client.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
go r.client.Watch(prefix, 0, true, ch, ew.stop) c := context.Background()
go ew.watch(ch) ctx, cancel := context.WithCancel(c)
go func() {
<-ew.stop
cancel()
}()
go ew.watch(ctx, w)
return ew, nil return ew, nil
} }
func (e *etcdWatcher) watch(ch chan *etcd.Response) { func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) {
for rsp := range ch { for {
rsp, err := w.Next(ctx)
if err != nil && ctx.Err() != nil {
return
}
if rsp.Node.Dir { if rsp.Node.Dir {
continue continue
} }