Add basic etcd registry implementation
This commit is contained in:
parent
bf3f55c5c0
commit
d23e46df3a
@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
// registries
|
// registries
|
||||||
"github.com/myodc/go-micro/registry/consul"
|
"github.com/myodc/go-micro/registry/consul"
|
||||||
|
"github.com/myodc/go-micro/registry/etcd"
|
||||||
"github.com/myodc/go-micro/registry/kubernetes"
|
"github.com/myodc/go-micro/registry/kubernetes"
|
||||||
|
|
||||||
// transport
|
// transport
|
||||||
@ -105,6 +106,8 @@ func Setup(c *cli.Context) error {
|
|||||||
registry.DefaultRegistry = kubernetes.NewRegistry(rAddrs)
|
registry.DefaultRegistry = kubernetes.NewRegistry(rAddrs)
|
||||||
case "consul":
|
case "consul":
|
||||||
registry.DefaultRegistry = consul.NewRegistry(rAddrs)
|
registry.DefaultRegistry = consul.NewRegistry(rAddrs)
|
||||||
|
case "etcd":
|
||||||
|
registry.DefaultRegistry = etcd.NewRegistry(rAddrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
tAddrs := strings.Split(c.String("transport_address"), ",")
|
tAddrs := strings.Split(c.String("transport_address"), ",")
|
||||||
|
177
registry/etcd/etcd.go
Normal file
177
registry/etcd/etcd.go
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/coreos/go-etcd/etcd"
|
||||||
|
"github.com/myodc/go-micro/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
prefix = "/micro-registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
type etcdRegistry struct {
|
||||||
|
client *etcd.Client
|
||||||
|
|
||||||
|
sync.RWMutex
|
||||||
|
services map[string]*registry.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func encode(s *registry.Service) string {
|
||||||
|
b, _ := json.Marshal(s)
|
||||||
|
return string(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func decode(ds string) *registry.Service {
|
||||||
|
var s *registry.Service
|
||||||
|
json.Unmarshal([]byte(ds), &s)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func nodePath(s, id string) string {
|
||||||
|
service := strings.Replace(s, "/", "-", -1)
|
||||||
|
node := strings.Replace(id, "/", "-", -1)
|
||||||
|
return filepath.Join(prefix, service, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
func servicePath(s string) string {
|
||||||
|
return filepath.Join(prefix, strings.Replace(s, "/", "-", -1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *etcdRegistry) Deregister(s *registry.Service) error {
|
||||||
|
if len(s.Nodes) == 0 {
|
||||||
|
return errors.New("Require at least one node")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range s.Nodes {
|
||||||
|
_, err := e.client.Delete(nodePath(s.Name, node.Id), false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
e.client.DeleteDir(servicePath(s.Name))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *etcdRegistry) Register(s *registry.Service) error {
|
||||||
|
if len(s.Nodes) == 0 {
|
||||||
|
return errors.New("Require at least one node")
|
||||||
|
}
|
||||||
|
|
||||||
|
service := ®istry.Service{
|
||||||
|
Name: s.Name,
|
||||||
|
Metadata: s.Metadata,
|
||||||
|
}
|
||||||
|
|
||||||
|
e.client.CreateDir(servicePath(s.Name), 0)
|
||||||
|
|
||||||
|
for _, node := range s.Nodes {
|
||||||
|
service.Nodes = []*registry.Node{node}
|
||||||
|
_, err := e.client.Create(nodePath(service.Name, node.Id), encode(service), 0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *etcdRegistry) GetService(name string) (*registry.Service, error) {
|
||||||
|
e.RLock()
|
||||||
|
service, ok := e.services[name]
|
||||||
|
e.RUnlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
return service, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rsp, err := e.client.Get(servicePath(name), false, false)
|
||||||
|
if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s := ®istry.Service{}
|
||||||
|
|
||||||
|
for _, n := range rsp.Node.Nodes {
|
||||||
|
if n.Dir {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sn := decode(n.Value)
|
||||||
|
for _, node := range sn.Nodes {
|
||||||
|
s.Nodes = append(s.Nodes, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
|
||||||
|
e.RLock()
|
||||||
|
serviceMap := e.services
|
||||||
|
e.RUnlock()
|
||||||
|
|
||||||
|
var services []*registry.Service
|
||||||
|
|
||||||
|
if len(serviceMap) > 0 {
|
||||||
|
for _, service := range services {
|
||||||
|
services = append(services, service)
|
||||||
|
}
|
||||||
|
return services, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rsp, err := e.client.Get(prefix, true, true)
|
||||||
|
if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range rsp.Node.Nodes {
|
||||||
|
service := ®istry.Service{}
|
||||||
|
|
||||||
|
for _, n := range node.Nodes {
|
||||||
|
i := decode(n.Value)
|
||||||
|
service.Name = i.Name
|
||||||
|
for _, in := range i.Nodes {
|
||||||
|
service.Nodes = append(service.Nodes, in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
services = append(services, service)
|
||||||
|
}
|
||||||
|
|
||||||
|
return services, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *etcdRegistry) Watch() {
|
||||||
|
newEtcdWatcher(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry {
|
||||||
|
var cAddrs []string
|
||||||
|
|
||||||
|
for _, addr := range addrs {
|
||||||
|
if len(addr) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cAddrs = append(cAddrs, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cAddrs) == 0 {
|
||||||
|
cAddrs = []string{"http://127.0.0.1:2379"}
|
||||||
|
}
|
||||||
|
|
||||||
|
e := &etcdRegistry{
|
||||||
|
client: etcd.NewClient(cAddrs),
|
||||||
|
services: make(map[string]*registry.Service),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to fix watcher
|
||||||
|
// e.Watch()
|
||||||
|
|
||||||
|
return e
|
||||||
|
}
|
73
registry/etcd/watcher.go
Normal file
73
registry/etcd/watcher.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/coreos/go-etcd/etcd"
|
||||||
|
"github.com/myodc/go-micro/registry"
|
||||||
|
)
|
||||||
|
|
||||||
|
type etcdWatcher struct {
|
||||||
|
registry *etcdRegistry
|
||||||
|
stop chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEtcdWatcher(r *etcdRegistry) *etcdWatcher {
|
||||||
|
ew := &etcdWatcher{
|
||||||
|
registry: r,
|
||||||
|
stop: make(chan bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan *etcd.Response)
|
||||||
|
|
||||||
|
go r.client.Watch(prefix, 0, true, ch, ew.stop)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for rsp := range ch {
|
||||||
|
if rsp.Node.Dir {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s := decode(rsp.Node.Value)
|
||||||
|
if s == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
|
||||||
|
service, ok := r.services[s.Name]
|
||||||
|
if !ok {
|
||||||
|
if rsp.Action == "create" {
|
||||||
|
r.services[s.Name] = s
|
||||||
|
}
|
||||||
|
r.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch rsp.Action {
|
||||||
|
case "delete":
|
||||||
|
var nodes []*registry.Node
|
||||||
|
for _, node := range service.Nodes {
|
||||||
|
var seen bool
|
||||||
|
for _, n := range s.Nodes {
|
||||||
|
if node.Id == n.Id {
|
||||||
|
seen = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !seen {
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
service.Nodes = nodes
|
||||||
|
case "create":
|
||||||
|
service.Nodes = append(service.Nodes, s.Nodes...)
|
||||||
|
}
|
||||||
|
r.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ew
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ew *etcdWatcher) Stop() {
|
||||||
|
ew.stop <- true
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user