fixup for never micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-11-30 22:03:01 +03:00
parent e77843968d
commit b74e4d4b08
3 changed files with 82 additions and 220 deletions

42
etcd.go
View File

@@ -43,7 +43,7 @@ type leases map[string]clientv3.LeaseID
// NewRegistry returns an initialized etcd registry
func NewRegistry(opts ...registry.Option) registry.Registry {
e := &etcdRegistry{
options: registry.Options{},
options: registry.NewOptions(opts...),
register: make(map[string]register),
leases: make(map[string]leases),
}
@@ -207,6 +207,16 @@ func (e *etcdRegistry) Options() registry.Options {
return e.options
}
func (e *etcdRegistry) Connect(ctx context.Context) error {
// TODO: real connect to etcd
return nil
}
func (e *etcdRegistry) Disconnect(ctx context.Context) error {
// TODO: real diconnect from etcd
return nil
}
func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
@@ -289,7 +299,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
// renew the lease if it exists
if leaseID > 0 {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID)
}
@@ -298,7 +308,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
return err
}
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("Lease not found for %s %d", s.Name, leaseID)
}
@@ -320,7 +330,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
// the service is unchanged, skip registering
if ok && v == h && !leaseNotFound {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id)
}
return nil
@@ -358,10 +368,10 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
if lgr != nil {
putOpts = append(putOpts, clientv3.WithLease(lgr.ID))
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("Registering %s id %s with lease %v and leaseID %v and ttl %v", service.Name, node.Id, lgr, lgr.ID, options.TTL)
}
} else if logger.V(logger.TraceLevel, logger.DefaultLogger) {
} else if logger.V(logger.TraceLevel) {
logger.Tracef("Registering %s id %s without lease", service.Name, node.Id)
}
@@ -382,7 +392,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
return nil
}
func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.DeregisterOption) error {
func (e *etcdRegistry) Deregister(ctx context.Context, s *registry.Service, opts ...registry.DeregisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
@@ -416,7 +426,7 @@ func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.Deregist
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel()
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("Deregistering %s id %s", s.Name, node.Id)
}
@@ -428,7 +438,7 @@ func (e *etcdRegistry) Deregister(s *registry.Service, opts ...registry.Deregist
return nil
}
func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
func (e *etcdRegistry) Register(ctx context.Context, s *registry.Service, opts ...registry.RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
@@ -445,8 +455,9 @@ func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOp
return gerr
}
func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
func (e *etcdRegistry) GetService(ctx context.Context, name string, opts ...registry.GetOption) ([]*registry.Service, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, e.options.Timeout)
defer cancel()
// parse the options and fallback to the default domain
@@ -527,12 +538,9 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
return services, nil
}
func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) {
func (e *etcdRegistry) ListServices(ctx context.Context, opts ...registry.ListOption) ([]*registry.Service, error) {
// parse the options
var options registry.ListOptions
for _, o := range opts {
o(&options)
}
options := registry.NewListOptions(opts...)
if len(options.Domain) == 0 {
options.Domain = defaultDomain
}
@@ -592,7 +600,7 @@ func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
return services, nil
}
func (e *etcdRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
func (e *etcdRegistry) Watch(ctx context.Context, opts ...registry.WatchOption) (registry.Watcher, error) {
cli, err := newClient(e)
if err != nil {
return nil, err