update to latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
150
etcd.go
150
etcd.go
@@ -1,4 +1,4 @@
|
||||
// Package etcd provides an etcd service registry
|
||||
// Package etcd provides an etcd service register
|
||||
package etcd
|
||||
|
||||
import (
|
||||
@@ -13,45 +13,49 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
hash "github.com/mitchellh/hashstructure"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
)
|
||||
|
||||
const (
|
||||
prefix = "/micro/registry/"
|
||||
prefix = "/micro/register/"
|
||||
defaultDomain = "micro"
|
||||
)
|
||||
|
||||
type etcdRegistry struct {
|
||||
var (
|
||||
_ register.Register = &etcdRegister{}
|
||||
)
|
||||
|
||||
type etcdRegister struct {
|
||||
client *clientv3.Client
|
||||
options registry.Options
|
||||
options register.Options
|
||||
|
||||
// register and leases are grouped by domain
|
||||
sync.RWMutex
|
||||
register map[string]register
|
||||
leases map[string]leases
|
||||
reg map[string]reg
|
||||
leases map[string]leases
|
||||
}
|
||||
|
||||
type register map[string]uint64
|
||||
type reg map[string]uint64
|
||||
type leases map[string]clientv3.LeaseID
|
||||
|
||||
// NewRegistry returns an initialized etcd registry
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
e := &etcdRegistry{
|
||||
options: registry.NewOptions(opts...),
|
||||
register: make(map[string]register),
|
||||
leases: make(map[string]leases),
|
||||
// NewRegister returns an initialized etcd register
|
||||
func NewRegister(opts ...register.Option) *etcdRegister {
|
||||
e := &etcdRegister{
|
||||
options: register.NewOptions(opts...),
|
||||
reg: make(map[string]reg),
|
||||
leases: make(map[string]leases),
|
||||
}
|
||||
configure(e, opts...)
|
||||
return e
|
||||
}
|
||||
|
||||
func newClient(e *etcdRegistry) (*clientv3.Client, error) {
|
||||
func newClient(e *etcdRegister) (*clientv3.Client, error) {
|
||||
config := clientv3.Config{
|
||||
Endpoints: []string{"127.0.0.1:2379"},
|
||||
}
|
||||
@@ -60,7 +64,7 @@ func newClient(e *etcdRegistry) (*clientv3.Client, error) {
|
||||
e.options.Timeout = 5 * time.Second
|
||||
}
|
||||
|
||||
if e.options.Secure || e.options.TLSConfig != nil {
|
||||
if e.options.TLSConfig != nil {
|
||||
tlsConfig := e.options.TLSConfig
|
||||
if tlsConfig == nil {
|
||||
tlsConfig = &tls.Config{
|
||||
@@ -122,7 +126,7 @@ func newClient(e *etcdRegistry) (*clientv3.Client, error) {
|
||||
}
|
||||
|
||||
// configure will setup the registry with new options
|
||||
func configure(e *etcdRegistry, opts ...registry.Option) error {
|
||||
func configure(e *etcdRegister, opts ...register.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&e.options)
|
||||
}
|
||||
@@ -170,13 +174,13 @@ func getName(key, prefix string) (string, string, bool) {
|
||||
return parts[0], parts[1], true
|
||||
}
|
||||
|
||||
func encode(s *registry.Service) string {
|
||||
func encode(s *register.Service) string {
|
||||
b, _ := json.Marshal(s)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func decode(ds []byte) *registry.Service {
|
||||
var s *registry.Service
|
||||
func decode(ds []byte) *register.Service {
|
||||
var s *register.Service
|
||||
json.Unmarshal(ds, &s)
|
||||
return s
|
||||
}
|
||||
@@ -199,31 +203,31 @@ func prefixWithDomain(domain string) string {
|
||||
return path.Join(prefix, domain)
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Init(opts ...registry.Option) error {
|
||||
func (e *etcdRegister) Init(opts ...register.Option) error {
|
||||
return configure(e, opts...)
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Options() registry.Options {
|
||||
func (e *etcdRegister) Options() register.Options {
|
||||
return e.options
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Connect(ctx context.Context) error {
|
||||
func (e *etcdRegister) Connect(ctx context.Context) error {
|
||||
// TODO: real connect to etcd
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Disconnect(ctx context.Context) error {
|
||||
func (e *etcdRegister) Disconnect(ctx context.Context) error {
|
||||
// TODO: real diconnect from etcd
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
|
||||
func (e *etcdRegister) registerNode(s *register.Service, node *register.Node, opts ...register.RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
// parse the options
|
||||
var options registry.RegisterOptions
|
||||
var options register.RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -247,12 +251,12 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
if _, ok := e.leases[options.Domain]; !ok {
|
||||
e.leases[options.Domain] = make(leases)
|
||||
}
|
||||
if _, ok := e.register[options.Domain]; !ok {
|
||||
e.register[options.Domain] = make(register)
|
||||
if _, ok := e.reg[options.Domain]; !ok {
|
||||
e.reg[options.Domain] = make(reg)
|
||||
}
|
||||
|
||||
// check to see if we already have a lease cached
|
||||
leaseID, ok := e.leases[options.Domain][s.Name+node.Id]
|
||||
leaseID, ok := e.leases[options.Domain][s.Name+node.ID]
|
||||
e.Unlock()
|
||||
|
||||
if !ok {
|
||||
@@ -261,7 +265,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
defer cancel()
|
||||
|
||||
// look for the existing key
|
||||
key := nodePath(options.Domain, s.Name, node.Id)
|
||||
key := nodePath(options.Domain, s.Name, node.ID)
|
||||
rsp, err := e.client.Get(ctx, key, clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -286,8 +290,8 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
|
||||
// save the info
|
||||
e.Lock()
|
||||
e.leases[options.Domain][s.Name+node.Id] = leaseID
|
||||
e.register[options.Domain][s.Name+node.Id] = h
|
||||
e.leases[options.Domain][s.Name+node.ID] = leaseID
|
||||
e.reg[options.Domain][s.Name+node.ID] = h
|
||||
e.Unlock()
|
||||
|
||||
break
|
||||
@@ -300,7 +304,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
// renew the lease if it exists
|
||||
if leaseID > 0 {
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
|
||||
logger.Tracef(e.options.Context, "Renewing existing lease for %s %d", s.Name, leaseID)
|
||||
}
|
||||
|
||||
if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil {
|
||||
@@ -309,7 +313,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
}
|
||||
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Tracef("Lease not found for %s %d", s.Name, leaseID)
|
||||
logger.Tracef(e.options.Context, "Lease not found for %s %d", s.Name, leaseID)
|
||||
}
|
||||
|
||||
// lease not found do register
|
||||
@@ -325,13 +329,13 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
|
||||
// get existing hash for the service node
|
||||
e.RLock()
|
||||
v, ok := e.register[options.Domain][s.Name+node.Id]
|
||||
v, ok := e.reg[options.Domain][s.Name+node.ID]
|
||||
e.RUnlock()
|
||||
|
||||
// the service is unchanged, skip registering
|
||||
if ok && v == h && !leaseNotFound {
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id)
|
||||
logger.Tracef(e.options.Context, "Service %s node %s unchanged skipping registration", s.Name, node.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -343,12 +347,12 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
s.Metadata["domain"] = options.Domain
|
||||
}
|
||||
|
||||
service := ®istry.Service{
|
||||
service := ®ister.Service{
|
||||
Name: s.Name,
|
||||
Version: s.Version,
|
||||
Metadata: s.Metadata,
|
||||
Endpoints: s.Endpoints,
|
||||
Nodes: []*registry.Node{node},
|
||||
Nodes: []*register.Node{node},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
|
||||
@@ -369,36 +373,36 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
|
||||
putOpts = append(putOpts, clientv3.WithLease(lgr.ID))
|
||||
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
|
||||
logger.Tracef(e.options.Context, "Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.ID, lgr, lgr.ID, options.TTL)
|
||||
}
|
||||
} else if logger.V(logger.TraceLevel) {
|
||||
logger.Tracef("Registering %s id %s without lease", service.Name, node.Id)
|
||||
logger.Tracef(e.options.Context, "Registering %s id %s without lease", service.Name, node.ID)
|
||||
}
|
||||
|
||||
key := nodePath(options.Domain, s.Name, node.Id)
|
||||
key := nodePath(options.Domain, s.Name, node.ID)
|
||||
if _, err = e.client.Put(ctx, key, encode(service), putOpts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.Lock()
|
||||
// save our hash of the service
|
||||
e.register[options.Domain][s.Name+node.Id] = h
|
||||
e.reg[options.Domain][s.Name+node.ID] = h
|
||||
// save our leaseID of the service
|
||||
if lgr != nil {
|
||||
e.leases[options.Domain][s.Name+node.Id] = lgr.ID
|
||||
e.leases[options.Domain][s.Name+node.ID] = lgr.ID
|
||||
}
|
||||
e.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Deregister(ctx context.Context, s *registry.Service, opts ...registry.DeregisterOption) error {
|
||||
func (e *etcdRegister) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
// parse the options
|
||||
var options registry.DeregisterOptions
|
||||
var options register.DeregisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -409,16 +413,16 @@ func (e *etcdRegistry) Deregister(ctx context.Context, s *registry.Service, opts
|
||||
for _, node := range s.Nodes {
|
||||
e.Lock()
|
||||
// delete our hash of the service
|
||||
nodes, ok := e.register[options.Domain]
|
||||
nodes, ok := e.reg[options.Domain]
|
||||
if ok {
|
||||
delete(nodes, s.Name+node.Id)
|
||||
e.register[options.Domain] = nodes
|
||||
delete(nodes, s.Name+node.ID)
|
||||
e.reg[options.Domain] = nodes
|
||||
}
|
||||
|
||||
// delete our lease of the service
|
||||
leases, ok := e.leases[options.Domain]
|
||||
if ok {
|
||||
delete(leases, s.Name+node.Id)
|
||||
delete(leases, s.Name+node.ID)
|
||||
e.leases[options.Domain] = leases
|
||||
}
|
||||
e.Unlock()
|
||||
@@ -427,10 +431,10 @@ func (e *etcdRegistry) Deregister(ctx context.Context, s *registry.Service, opts
|
||||
defer cancel()
|
||||
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Tracef("Deregistering %s id %s", s.Name, node.Id)
|
||||
logger.Tracef(e.options.Context, "Deregistering %s id %s", s.Name, node.ID)
|
||||
}
|
||||
|
||||
if _, err := e.client.Delete(ctx, nodePath(options.Domain, s.Name, node.Id)); err != nil {
|
||||
if _, err := e.client.Delete(ctx, nodePath(options.Domain, s.Name, node.ID)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -438,7 +442,7 @@ func (e *etcdRegistry) Deregister(ctx context.Context, s *registry.Service, opts
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Register(ctx context.Context, s *registry.Service, opts ...registry.RegisterOption) error {
|
||||
func (e *etcdRegister) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
@@ -455,13 +459,13 @@ func (e *etcdRegistry) Register(ctx context.Context, s *registry.Service, opts .
|
||||
return gerr
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
||||
func (e *etcdRegister) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, e.options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
// parse the options and fallback to the default domain
|
||||
var options registry.GetOptions
|
||||
var options register.LookupOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -472,7 +476,7 @@ func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...regi
|
||||
var results []*mvccpb.KeyValue
|
||||
|
||||
// TODO: refactorout wildcard, this is an incredibly expensive operation
|
||||
if options.Domain == registry.WildcardDomain {
|
||||
if options.Domain == register.WildcardDomain {
|
||||
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -499,10 +503,10 @@ func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...regi
|
||||
}
|
||||
|
||||
if len(results) == 0 {
|
||||
return nil, registry.ErrNotFound
|
||||
return nil, register.ErrNotFound
|
||||
}
|
||||
|
||||
versions := make(map[string]*registry.Service)
|
||||
versions := make(map[string]*register.Service)
|
||||
|
||||
for _, n := range results {
|
||||
// only process the things we care about
|
||||
@@ -517,7 +521,7 @@ func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...regi
|
||||
|
||||
s, ok := versions[key]
|
||||
if !ok {
|
||||
s = ®istry.Service{
|
||||
s = ®ister.Service{
|
||||
Name: sn.Name,
|
||||
Version: sn.Version,
|
||||
Metadata: sn.Metadata,
|
||||
@@ -529,7 +533,7 @@ func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...regi
|
||||
}
|
||||
}
|
||||
|
||||
services := make([]*registry.Service, 0, len(versions))
|
||||
services := make([]*register.Service, 0, len(versions))
|
||||
|
||||
for _, service := range versions {
|
||||
services = append(services, service)
|
||||
@@ -538,16 +542,16 @@ func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...regi
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) ListServices(ctx context.Context, opts ...registry.ListOption) ([]*registry.Service, error) {
|
||||
func (e *etcdRegister) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) {
|
||||
// parse the options
|
||||
options := registry.NewListOptions(opts...)
|
||||
options := register.NewListOptions(opts...)
|
||||
if len(options.Domain) == 0 {
|
||||
options.Domain = defaultDomain
|
||||
}
|
||||
|
||||
// determine the prefix
|
||||
var p string
|
||||
if options.Domain == registry.WildcardDomain {
|
||||
if options.Domain == register.WildcardDomain {
|
||||
p = prefix
|
||||
} else {
|
||||
p = prefixWithDomain(options.Domain)
|
||||
@@ -561,10 +565,10 @@ func (e *etcdRegistry) ListServices(ctx context.Context, opts ...registry.ListOp
|
||||
return nil, err
|
||||
}
|
||||
if len(rsp.Kvs) == 0 {
|
||||
return []*registry.Service{}, nil
|
||||
return []*register.Service{}, nil
|
||||
}
|
||||
|
||||
versions := make(map[string]*registry.Service)
|
||||
versions := make(map[string]*register.Service)
|
||||
for _, n := range rsp.Kvs {
|
||||
domain, service, ok := getName(string(n.Key), prefix)
|
||||
if !ok {
|
||||
@@ -589,7 +593,7 @@ func (e *etcdRegistry) ListServices(ctx context.Context, opts ...registry.ListOp
|
||||
v.Nodes = append(v.Nodes, sn.Nodes...)
|
||||
}
|
||||
|
||||
services := make([]*registry.Service, 0, len(versions))
|
||||
services := make([]*register.Service, 0, len(versions))
|
||||
for _, service := range versions {
|
||||
services = append(services, service)
|
||||
}
|
||||
@@ -600,7 +604,7 @@ func (e *etcdRegistry) ListServices(ctx context.Context, opts ...registry.ListOp
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Watch(ctx context.Context, opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
func (e *etcdRegister) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) {
|
||||
cli, err := newClient(e)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -608,6 +612,10 @@ func (e *etcdRegistry) Watch(ctx context.Context, opts ...registry.WatchOption)
|
||||
return newEtcdWatcher(cli, e.options.Timeout, opts...)
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) String() string {
|
||||
func (e *etcdRegister) String() string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
func (e *etcdRegister) Name() string {
|
||||
return e.options.Name
|
||||
}
|
||||
|
Reference in New Issue
Block a user