614 lines
14 KiB
Go
Raw Normal View History

// Package etcd provides an etcd service registry
package etcd
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"net"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc/mvccpb"
hash "github.com/mitchellh/hashstructure"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry"
"go.uber.org/zap"
)
const (
prefix = "/micro/registry/"
defaultDomain = "micro"
)
type etcdRegistry struct {
client *clientv3.Client
options registry.Options
// register and leases are grouped by domain
sync.RWMutex
register map[string]register
leases map[string]leases
}
type register map[string]uint64
type leases map[string]clientv3.LeaseID
// NewRegistry returns an initialized etcd registry
func NewRegistry(opts ...registry.Option) registry.Registry {
e := &etcdRegistry{
options: registry.NewOptions(opts...),
register: make(map[string]register),
leases: make(map[string]leases),
}
configure(e, opts...)
return e
}
func newClient(e *etcdRegistry) (*clientv3.Client, error) {
config := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
}
if e.options.Timeout == 0 {
e.options.Timeout = 5 * time.Second
}
if e.options.Secure || e.options.TLSConfig != nil {
tlsConfig := e.options.TLSConfig
if tlsConfig == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
config.TLS = tlsConfig
}
if e.options.Context != nil {
u, ok := e.options.Context.Value(authKey{}).(*authCreds)
if ok {
config.Username = u.Username
config.Password = u.Password
}
cfg, ok := e.options.Context.Value(logConfigKey{}).(*zap.Config)
if ok && cfg != nil {
config.LogConfig = cfg
}
}
var cAddrs []string
for _, address := range e.options.Addrs {
if len(address) == 0 {
continue
}
addr, port, err := net.SplitHostPort(address)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "2379"
addr = address
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
} else if err == nil {
cAddrs = append(cAddrs, net.JoinHostPort(addr, port))
}
}
// if we got addrs then we'll update
if len(cAddrs) > 0 {
config.Endpoints = cAddrs
}
// check if the endpoints have https://
if config.TLS != nil {
for i, ep := range config.Endpoints {
if !strings.HasPrefix(ep, "https://") {
config.Endpoints[i] = "https://" + ep
}
}
}
cli, err := clientv3.New(config)
if err != nil {
return nil, err
}
return cli, nil
}
// configure will setup the registry with new options
func configure(e *etcdRegistry, opts ...registry.Option) error {
for _, o := range opts {
o(&e.options)
}
// setup the client
cli, err := newClient(e)
if err != nil {
return err
}
if e.client != nil {
e.client.Close()
}
// setup new client
e.client = cli
return nil
}
// getName returns the domain and name
// it returns false if there's an issue
// the key is a path of /prefix/domain/name/id e.g /micro/registry/domain/service/uuid
func getName(key, prefix string) (string, string, bool) {
// strip the prefix from keys
key = strings.TrimPrefix(key, prefix)
// split the key so we remove domain
parts := strings.Split(key, "/")
if len(parts) == 0 {
return "", "", false
}
if len(parts[0]) == 0 {
parts = parts[1:]
}
// we expect a domain and then name domain/service
if len(parts) < 2 {
return "", "", false
}
// return name, domain
return parts[0], parts[1], true
}
func encode(s *registry.Service) string {
b, _ := json.Marshal(s)
return string(b)
}
func decode(ds []byte) *registry.Service {
var s *registry.Service
json.Unmarshal(ds, &s)
return s
}
func nodePath(domain, s, id string) string {
service := strings.Replace(s, "/", "-", -1)
node := strings.Replace(id, "/", "-", -1)
return path.Join(prefixWithDomain(domain), service, node)
}
func servicePath(domain, s string) string {
return path.Join(prefixWithDomain(domain), serializeServiceName(s))
}
func serializeServiceName(s string) string {
return strings.ReplaceAll(s, "/", "-")
}
func prefixWithDomain(domain string) string {
return path.Join(prefix, domain)
}
func (e *etcdRegistry) Init(opts ...registry.Option) error {
return configure(e, opts...)
}
func (e *etcdRegistry) Options() registry.Options {
return e.options
}
func (e *etcdRegistry) Connect(ctx context.Context) error {
// TODO: real connect to etcd
return nil
}
func (e *etcdRegistry) Disconnect(ctx context.Context) error {
// TODO: real diconnect from etcd
return nil
}
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
// parse the options
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
if s.Metadata == nil {
s.Metadata = map[string]string{}
}
if node.Metadata == nil {
node.Metadata = map[string]string{}
}
// set the domain in metadata so it can be retrieved by wildcard queries
s.Metadata["domain"] = options.Domain
node.Metadata["domain"] = options.Domain
e.Lock()
// ensure the leases and registers are setup for this domain
if _, ok := e.leases[options.Domain]; !ok {
e.leases[options.Domain] = make(leases)
}
if _, ok := e.register[options.Domain]; !ok {
e.register[options.Domain] = make(register)
}
// check to see if we already have a lease cached
leaseID, ok := e.leases[options.Domain][s.Name+node.Id]
e.Unlock()
if !ok {
// missing lease, check if the key exists
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
// look for the existing key
key := nodePath(options.Domain, s.Name, node.Id)
rsp, err := e.client.Get(ctx, key, clientv3.WithSerializable())
if err != nil {
return err
}
// get the existing lease
for _, kv := range rsp.Kvs {
if kv.Lease > 0 {
leaseID = clientv3.LeaseID(kv.Lease)
// decode the existing node
srv := decode(kv.Value)
if srv == nil || len(srv.Nodes) == 0 {
continue
}
// create hash of service; uint64
h, err := hash.Hash(srv.Nodes[0], nil)
if err != nil {
continue
}
// save the info
e.Lock()
e.leases[options.Domain][s.Name+node.Id] = leaseID
e.register[options.Domain][s.Name+node.Id] = h
e.Unlock()
break
}
}
}
var leaseNotFound bool
// renew the lease if it exists
if leaseID > 0 {
if logger.V(logger.TraceLevel) {
logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
}
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
if err != rpctypes.ErrLeaseNotFound {
return err
}
if logger.V(logger.TraceLevel) {
logger.Tracef("Lease not found for %s %d", s.Name, leaseID)
}
// lease not found do register
leaseNotFound = true
}
}
// create hash of service; uint64
h, err := hash.Hash(node, nil)
if err != nil {
return err
}
// get existing hash for the service node
e.RLock()
v, ok := e.register[options.Domain][s.Name+node.Id]
e.RUnlock()
// the service is unchanged, skip registering
if ok && v == h && !leaseNotFound {
if logger.V(logger.TraceLevel) {
logger.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id)
}
return nil
}
// add domain to the service metadata so it can be determined when doing wildcard queries
if s.Metadata == nil {
s.Metadata = map[string]string{"domain": options.Domain}
} else {
s.Metadata["domain"] = options.Domain
}
service := &registry.Service{
Name: s.Name,
Version: s.Version,
Metadata: s.Metadata,
Endpoints: s.Endpoints,
Nodes: []*registry.Node{node},
}
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
var lgr *clientv3.LeaseGrantResponse
if options.TTL.Seconds() > 0 {
// get a lease used to expire keys since we have a ttl
lgr, err = e.client.Grant(ctx, int64(options.TTL.Seconds()))
if err != nil {
return err
}
}
// create an entry for the node
var putOpts []clientv3.OpOption
if lgr != nil {
putOpts = append(putOpts, clientv3.WithLease(lgr.ID))
if logger.V(logger.TraceLevel) {
logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
}
} else if logger.V(logger.TraceLevel) {
logger.Tracef("Registering %s id %s without lease", service.Name, node.Id)
}
key := nodePath(options.Domain, s.Name, node.Id)
if _, err = e.client.Put(ctx, key, encode(service), putOpts...); err != nil {
return err
}
e.Lock()
// save our hash of the service
e.register[options.Domain][s.Name+node.Id] = h
// save our leaseID of the service
if lgr != nil {
e.leases[options.Domain][s.Name+node.Id] = lgr.ID
}
e.Unlock()
return nil
}
func (e *etcdRegistry) Deregister(ctx context.Context, s *registry.Service, opts ...registry.DeregisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
// parse the options
var options registry.DeregisterOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
for _, node := range s.Nodes {
e.Lock()
// delete our hash of the service
nodes, ok := e.register[options.Domain]
if ok {
delete(nodes, s.Name+node.Id)
e.register[options.Domain] = nodes
}
// delete our lease of the service
leases, ok := e.leases[options.Domain]
if ok {
delete(leases, s.Name+node.Id)
e.leases[options.Domain] = leases
}
e.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
if logger.V(logger.TraceLevel) {
logger.Tracef("Deregistering %s id %s", s.Name, node.Id)
}
if _, err := e.client.Delete(ctx, nodePath(options.Domain, s.Name, node.Id)); err != nil {
return err
}
}
return nil
}
func (e *etcdRegistry) Register(ctx context.Context, s *registry.Service, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
var gerr error
// register each node individually
for _, node := range s.Nodes {
if err := e.registerNode(s, node, opts...); err != nil {
gerr = err
}
}
return gerr
}
func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...registry.GetOption) ([]*registry.Service, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, e.options.Timeout)
defer cancel()
// parse the options and fallback to the default domain
var options registry.GetOptions
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
var results []*mvccpb.KeyValue
// TODO: refactorout wildcard, this is an incredibly expensive operation
if options.Domain == registry.WildcardDomain {
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
// filter the results for the key we care about
for _, kv := range rsp.Kvs {
// if the key does not contain the name then pass
_, service, ok := getName(string(kv.Key), prefix)
if !ok || service != name {
continue
}
// save the result if its what we expect
results = append(results, kv)
}
} else {
prefix := servicePath(options.Domain, name) + "/"
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
results = rsp.Kvs
}
if len(results) == 0 {
return nil, registry.ErrNotFound
}
versions := make(map[string]*registry.Service)
for _, n := range results {
// only process the things we care about
domain, service, ok := getName(string(n.Key), prefix)
if !ok || service != name {
continue
}
if sn := decode(n.Value); sn != nil {
// compose a key of name/version/domain
key := sn.Name + sn.Version + domain
s, ok := versions[key]
if !ok {
s = &registry.Service{
Name: sn.Name,
Version: sn.Version,
Metadata: sn.Metadata,
Endpoints: sn.Endpoints,
}
versions[key] = s
}
s.Nodes = append(s.Nodes, sn.Nodes...)
}
}
services := make([]*registry.Service, 0, len(versions))
for _, service := range versions {
services = append(services, service)
}
return services, nil
}
func (e *etcdRegistry) ListServices(ctx context.Context, opts ...registry.ListOption) ([]*registry.Service, error) {
// parse the options
options := registry.NewListOptions(opts...)
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
// determine the prefix
var p string
if options.Domain == registry.WildcardDomain {
p = prefix
} else {
p = prefixWithDomain(options.Domain)
}
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
rsp, err := e.client.Get(ctx, p, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
if len(rsp.Kvs) == 0 {
return []*registry.Service{}, nil
}
versions := make(map[string]*registry.Service)
for _, n := range rsp.Kvs {
domain, service, ok := getName(string(n.Key), prefix)
if !ok {
continue
}
sn := decode(n.Value)
if sn == nil || sn.Name != service {
continue
}
// key based on name/version/domain
key := sn.Name + sn.Version + domain
v, ok := versions[key]
if !ok {
versions[key] = sn
continue
}
// append to service:version nodes
v.Nodes = append(v.Nodes, sn.Nodes...)
}
services := make([]*registry.Service, 0, len(versions))
for _, service := range versions {
services = append(services, service)
}
// sort the services
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
return services, nil
}
func (e *etcdRegistry) Watch(ctx context.Context, opts ...registry.WatchOption) (registry.Watcher, error) {
cli, err := newClient(e)
if err != nil {
return nil, err
}
return newEtcdWatcher(cli, e.options.Timeout, opts...)
}
func (e *etcdRegistry) String() string {
return "etcd"
}