402 lines
8.8 KiB
Go
402 lines
8.8 KiB
Go
// Package etcd provides an etcd service registry
|
|
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"net"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/clientv3"
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
"github.com/micro/go-micro/v2/registry"
|
|
"github.com/micro/go-micro/v2/util/log"
|
|
hash "github.com/mitchellh/hashstructure"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
prefix = "/micro/registry/"
|
|
)
|
|
|
|
type etcdRegistry struct {
|
|
client *clientv3.Client
|
|
options registry.Options
|
|
|
|
sync.RWMutex
|
|
register map[string]uint64
|
|
leases map[string]clientv3.LeaseID
|
|
}
|
|
|
|
func NewRegistry(opts ...registry.Option) registry.Registry {
|
|
e := &etcdRegistry{
|
|
options: registry.Options{},
|
|
register: make(map[string]uint64),
|
|
leases: make(map[string]clientv3.LeaseID),
|
|
}
|
|
configure(e, opts...)
|
|
return e
|
|
}
|
|
|
|
func configure(e *etcdRegistry, opts ...registry.Option) error {
|
|
config := clientv3.Config{
|
|
Endpoints: []string{"127.0.0.1:2379"},
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&e.options)
|
|
}
|
|
|
|
if e.options.Timeout == 0 {
|
|
e.options.Timeout = 5 * time.Second
|
|
}
|
|
|
|
if e.options.Secure || e.options.TLSConfig != nil {
|
|
tlsConfig := e.options.TLSConfig
|
|
if tlsConfig == nil {
|
|
tlsConfig = &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
}
|
|
}
|
|
|
|
config.TLS = tlsConfig
|
|
}
|
|
|
|
if e.options.Context != nil {
|
|
u, ok := e.options.Context.Value(authKey{}).(*authCreds)
|
|
if ok {
|
|
config.Username = u.Username
|
|
config.Password = u.Password
|
|
}
|
|
cfg, ok := e.options.Context.Value(logConfigKey{}).(*zap.Config)
|
|
if ok && cfg != nil {
|
|
config.LogConfig = cfg
|
|
}
|
|
}
|
|
|
|
var cAddrs []string
|
|
|
|
for _, address := range e.options.Addrs {
|
|
if len(address) == 0 {
|
|
continue
|
|
}
|
|
addr, port, err := net.SplitHostPort(address)
|
|
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
|
port = "2379"
|
|
addr = address
|
|
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
|
|
} else if err == nil {
|
|
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
|
|
}
|
|
}
|
|
|
|
// if we got addrs then we'll update
|
|
if len(cAddrs) > 0 {
|
|
config.Endpoints = cAddrs
|
|
}
|
|
|
|
cli, err := clientv3.New(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.client = cli
|
|
return nil
|
|
}
|
|
|
|
func encode(s *registry.Service) string {
|
|
b, _ := json.Marshal(s)
|
|
return string(b)
|
|
}
|
|
|
|
func decode(ds []byte) *registry.Service {
|
|
var s *registry.Service
|
|
json.Unmarshal(ds, &s)
|
|
return s
|
|
}
|
|
|
|
func nodePath(s, id string) string {
|
|
service := strings.Replace(s, "/", "-", -1)
|
|
node := strings.Replace(id, "/", "-", -1)
|
|
return path.Join(prefix, service, node)
|
|
}
|
|
|
|
func servicePath(s string) string {
|
|
return path.Join(prefix, strings.Replace(s, "/", "-", -1))
|
|
}
|
|
|
|
func (e *etcdRegistry) Init(opts ...registry.Option) error {
|
|
return configure(e, opts...)
|
|
}
|
|
|
|
func (e *etcdRegistry) Options() registry.Options {
|
|
return e.options
|
|
}
|
|
|
|
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("Require at least one node")
|
|
}
|
|
|
|
// check existing lease cache
|
|
e.RLock()
|
|
leaseID, ok := e.leases[s.Name+node.Id]
|
|
e.RUnlock()
|
|
|
|
if !ok {
|
|
// missing lease, check if the key exists
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
// look for the existing key
|
|
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get the existing lease
|
|
for _, kv := range rsp.Kvs {
|
|
if kv.Lease > 0 {
|
|
leaseID = clientv3.LeaseID(kv.Lease)
|
|
|
|
// decode the existing node
|
|
srv := decode(kv.Value)
|
|
if srv == nil || len(srv.Nodes) == 0 {
|
|
continue
|
|
}
|
|
|
|
// create hash of service; uint64
|
|
h, err := hash.Hash(srv.Nodes[0], nil)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// save the info
|
|
e.Lock()
|
|
e.leases[s.Name+node.Id] = leaseID
|
|
e.register[s.Name+node.Id] = h
|
|
e.Unlock()
|
|
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
var leaseNotFound bool
|
|
|
|
// renew the lease if it exists
|
|
if leaseID > 0 {
|
|
log.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
|
|
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
|
|
if err != rpctypes.ErrLeaseNotFound {
|
|
return err
|
|
}
|
|
|
|
log.Tracef("Lease not found for %s %d", s.Name, leaseID)
|
|
// lease not found do register
|
|
leaseNotFound = true
|
|
}
|
|
}
|
|
|
|
// create hash of service; uint64
|
|
h, err := hash.Hash(node, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get existing hash for the service node
|
|
e.Lock()
|
|
v, ok := e.register[s.Name+node.Id]
|
|
e.Unlock()
|
|
|
|
// the service is unchanged, skip registering
|
|
if ok && v == h && !leaseNotFound {
|
|
log.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id)
|
|
return nil
|
|
}
|
|
|
|
service := ®istry.Service{
|
|
Name: s.Name,
|
|
Version: s.Version,
|
|
Metadata: s.Metadata,
|
|
Endpoints: s.Endpoints,
|
|
Nodes: []*registry.Node{node},
|
|
}
|
|
|
|
var options registry.RegisterOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
var lgr *clientv3.LeaseGrantResponse
|
|
if options.TTL.Seconds() > 0 {
|
|
// get a lease used to expire keys since we have a ttl
|
|
lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Tracef("Registering %s id %s with lease %v and ttl %v", service.Name, node.Id, lgr, options.TTL)
|
|
// create an entry for the node
|
|
if lgr != nil {
|
|
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID))
|
|
} else {
|
|
_, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service))
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
e.Lock()
|
|
// save our hash of the service
|
|
e.register[s.Name+node.Id] = h
|
|
// save our leaseID of the service
|
|
if lgr != nil {
|
|
e.leases[s.Name+node.Id] = lgr.ID
|
|
}
|
|
e.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Deregister(s *registry.Service) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("Require at least one node")
|
|
}
|
|
|
|
for _, node := range s.Nodes {
|
|
e.Lock()
|
|
// delete our hash of the service
|
|
delete(e.register, s.Name+node.Id)
|
|
// delete our lease of the service
|
|
delete(e.leases, s.Name+node.Id)
|
|
e.Unlock()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
log.Tracef("Deregistering %s id %s", s.Name, node.Id)
|
|
_, err := e.client.Delete(ctx, nodePath(s.Name, node.Id))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("Require at least one node")
|
|
}
|
|
|
|
var gerr error
|
|
|
|
// register each node individually
|
|
for _, node := range s.Nodes {
|
|
err := e.registerNode(s, node, opts...)
|
|
if err != nil {
|
|
gerr = err
|
|
}
|
|
}
|
|
|
|
return gerr
|
|
}
|
|
|
|
func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(rsp.Kvs) == 0 {
|
|
return nil, registry.ErrNotFound
|
|
}
|
|
|
|
serviceMap := map[string]*registry.Service{}
|
|
|
|
for _, n := range rsp.Kvs {
|
|
if sn := decode(n.Value); sn != nil {
|
|
s, ok := serviceMap[sn.Version]
|
|
if !ok {
|
|
s = ®istry.Service{
|
|
Name: sn.Name,
|
|
Version: sn.Version,
|
|
Metadata: sn.Metadata,
|
|
Endpoints: sn.Endpoints,
|
|
}
|
|
serviceMap[s.Version] = s
|
|
}
|
|
|
|
s.Nodes = append(s.Nodes, sn.Nodes...)
|
|
}
|
|
}
|
|
|
|
services := make([]*registry.Service, 0, len(serviceMap))
|
|
for _, service := range serviceMap {
|
|
services = append(services, service)
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
|
|
versions := make(map[string]*registry.Service)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
|
defer cancel()
|
|
|
|
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(rsp.Kvs) == 0 {
|
|
return []*registry.Service{}, nil
|
|
}
|
|
|
|
for _, n := range rsp.Kvs {
|
|
sn := decode(n.Value)
|
|
if sn == nil {
|
|
continue
|
|
}
|
|
v, ok := versions[sn.Name+sn.Version]
|
|
if !ok {
|
|
versions[sn.Name+sn.Version] = sn
|
|
continue
|
|
}
|
|
// append to service:version nodes
|
|
v.Nodes = append(v.Nodes, sn.Nodes...)
|
|
}
|
|
|
|
services := make([]*registry.Service, 0, len(versions))
|
|
for _, service := range versions {
|
|
services = append(services, service)
|
|
}
|
|
|
|
// sort the services
|
|
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
|
|
|
|
return services, nil
|
|
}
|
|
|
|
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
return newEtcdWatcher(e, e.options.Timeout, opts...)
|
|
}
|
|
|
|
func (e *etcdRegistry) String() string {
|
|
return "etcd"
|
|
}
|