2 Commits

Author SHA1 Message Date
1f98cccac3 fix build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-01-27 11:33:07 +03:00
8a4ed24b38 export default domain
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-01-27 09:41:55 +03:00
2 changed files with 21 additions and 48 deletions

57
etcd.go
View File

@@ -23,13 +23,11 @@ import (
) )
const ( const (
prefix = "/micro/register/" DefaultPrefix = "/micro/register/"
defaultDomain = "micro" DefaultDomain = "micro"
) )
var ( var _ register.Register = &etcdRegister{}
_ register.Register = &etcdRegister{}
)
type etcdRegister struct { type etcdRegister struct {
client *clientv3.Client client *clientv3.Client
@@ -41,8 +39,10 @@ type etcdRegister struct {
leases map[string]leases leases map[string]leases
} }
type reg map[string]uint64 type (
type leases map[string]clientv3.LeaseID reg map[string]uint64
leases map[string]clientv3.LeaseID
)
// NewRegister returns an initialized etcd register // NewRegister returns an initialized etcd register
func NewRegister(opts ...register.Option) *etcdRegister { func NewRegister(opts ...register.Option) *etcdRegister {
@@ -200,7 +200,7 @@ func serializeServiceName(s string) string {
} }
func prefixWithDomain(domain string) string { func prefixWithDomain(domain string) string {
return path.Join(prefix, domain) return path.Join(DefaultPrefix, domain)
} }
func (e *etcdRegister) Init(opts ...register.Option) error { func (e *etcdRegister) Init(opts ...register.Option) error {
@@ -227,13 +227,7 @@ func (e *etcdRegister) registerNode(s *register.Service, node *register.Node, op
} }
// parse the options // parse the options
var options register.RegisterOptions options := register.NewRegisterOptions(opts...)
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
if s.Metadata == nil { if s.Metadata == nil {
s.Metadata = map[string]string{} s.Metadata = map[string]string{}
@@ -402,13 +396,7 @@ func (e *etcdRegister) Deregister(ctx context.Context, s *register.Service, opts
} }
// parse the options // parse the options
var options register.DeregisterOptions options := register.NewDeregisterOptions(opts...)
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
for _, node := range s.Nodes { for _, node := range s.Nodes {
e.Lock() e.Lock()
@@ -465,19 +453,13 @@ func (e *etcdRegister) LookupService(ctx context.Context, name string, opts ...r
defer cancel() defer cancel()
// parse the options and fallback to the default domain // parse the options and fallback to the default domain
var options register.LookupOptions options := register.NewLookupOptions(opts...)
for _, o := range opts {
o(&options)
}
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
var results []*mvccpb.KeyValue var results []*mvccpb.KeyValue
// TODO: refactorout wildcard, this is an incredibly expensive operation // TODO: refactorout wildcard, this is an incredibly expensive operation
if options.Domain == register.WildcardDomain { if options.Domain == register.WildcardDomain {
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) rsp, err := e.client.Get(ctx, DefaultPrefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -485,7 +467,7 @@ func (e *etcdRegister) LookupService(ctx context.Context, name string, opts ...r
// filter the results for the key we care about // filter the results for the key we care about
for _, kv := range rsp.Kvs { for _, kv := range rsp.Kvs {
// if the key does not contain the name then pass // if the key does not contain the name then pass
_, service, ok := getName(string(kv.Key), prefix) _, service, ok := getName(string(kv.Key), DefaultPrefix)
if !ok || service != name { if !ok || service != name {
continue continue
} }
@@ -510,7 +492,7 @@ func (e *etcdRegister) LookupService(ctx context.Context, name string, opts ...r
for _, n := range results { for _, n := range results {
// only process the things we care about // only process the things we care about
domain, service, ok := getName(string(n.Key), prefix) domain, service, ok := getName(string(n.Key), DefaultPrefix)
if !ok || service != name { if !ok || service != name {
continue continue
} }
@@ -545,22 +527,19 @@ func (e *etcdRegister) LookupService(ctx context.Context, name string, opts ...r
func (e *etcdRegister) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) { func (e *etcdRegister) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) {
// parse the options // parse the options
options := register.NewListOptions(opts...) options := register.NewListOptions(opts...)
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
// determine the prefix // determine the prefix
var p string var p string
if options.Domain == register.WildcardDomain { if options.Domain == register.WildcardDomain {
p = prefix p = DefaultPrefix
} else { } else {
p = prefixWithDomain(options.Domain) p = prefixWithDomain(options.Domain)
} }
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) nctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, p, clientv3.WithPrefix(), clientv3.WithSerializable()) rsp, err := e.client.Get(nctx, p, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -570,7 +549,7 @@ func (e *etcdRegister) ListServices(ctx context.Context, opts ...register.ListOp
versions := make(map[string]*register.Service) versions := make(map[string]*register.Service)
for _, n := range rsp.Kvs { for _, n := range rsp.Kvs {
domain, service, ok := getName(string(n.Key), prefix) domain, service, ok := getName(string(n.Key), DefaultPrefix)
if !ok { if !ok {
continue continue
} }

View File

@@ -21,20 +21,14 @@ type etcdWatcher struct {
} }
func newEtcdWatcher(c *clientv3.Client, timeout time.Duration, opts ...register.WatchOption) (register.Watcher, error) { func newEtcdWatcher(c *clientv3.Client, timeout time.Duration, opts ...register.WatchOption) (register.Watcher, error) {
var wo register.WatchOptions wo := register.NewWatchOptions(opts...)
for _, o := range opts {
o(&wo)
}
if len(wo.Domain) == 0 {
wo.Domain = defaultDomain
}
watchPath := prefix watchPath := DefaultPrefix
if wo.Domain == register.WildcardDomain { if wo.Domain == register.WildcardDomain {
if len(wo.Service) > 0 { if len(wo.Service) > 0 {
return nil, errors.New("Cannot watch a service across domains") return nil, errors.New("Cannot watch a service across domains")
} }
watchPath = prefix watchPath = DefaultPrefix
} else if len(wo.Service) > 0 { } else if len(wo.Service) > 0 {
watchPath = servicePath(wo.Domain, wo.Service) + "/" watchPath = servicePath(wo.Domain, wo.Service) + "/"
} }