register: improvements
Some checks failed
coverage / build (pull_request) Failing after 1m33s
lint / lint (pull_request) Successful in 1m52s
test / test (pull_request) Successful in 4m11s

* change domain to namespace

* lower go.mod deps

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-12-27 01:08:00 +03:00
parent 37ffbb18d8
commit d291102877
12 changed files with 107 additions and 341 deletions

View File

@@ -23,11 +23,10 @@ type node struct {
}
type record struct {
Name string
Version string
Metadata map[string]string
Nodes map[string]*node
Endpoints []*register.Endpoint
Name string
Version string
Metadata map[string]string
Nodes map[string]*node
}
type memory struct {
@@ -59,7 +58,7 @@ func (m *memory) ttlPrune() {
for range prune.C {
m.Lock()
for domain, services := range m.records {
for namespace, services := range m.records {
for service, versions := range services {
for version, record := range versions {
for id, n := range record.Nodes {
@@ -67,7 +66,7 @@ func (m *memory) ttlPrune() {
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register TTL expired for node %s of service %s", n.ID, service))
}
delete(m.records[domain][service][version].Nodes, id)
delete(m.records[namespace][service][version].Nodes, id)
}
}
}
@@ -131,17 +130,12 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
options := register.NewRegisterOptions(opts...)
// get the services for this domain from the register
srvs, ok := m.records[options.Domain]
srvs, ok := m.records[options.Namespace]
if !ok {
srvs = make(services)
}
// domain is set in metadata so it can be passed to watchers
if s.Metadata == nil {
s.Metadata = map[string]string{"domain": options.Domain}
} else {
s.Metadata["domain"] = options.Domain
}
s.Namespace = options.Namespace
// ensure the service name exists
r := serviceToRecord(s, options.TTL)
@@ -154,7 +148,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version))
}
m.records[options.Domain] = srvs
m.records[options.Namespace] = srvs
go m.sendEvent(&register.Result{Action: "create", Service: s})
}
@@ -173,9 +167,6 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
metadata[k] = v
}
// set the domain
metadata["domain"] = options.Domain
// add the node
srvs[s.Name][s.Version].Nodes[n.ID] = &node{
Node: &register.Node{
@@ -206,7 +197,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
}
}
m.records[options.Domain] = srvs
m.records[options.Namespace] = srvs
return nil
}
@@ -216,15 +207,8 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
options := register.NewDeregisterOptions(opts...)
// domain is set in metadata so it can be passed to watchers
if s.Metadata == nil {
s.Metadata = map[string]string{"domain": options.Domain}
} else {
s.Metadata["domain"] = options.Domain
}
// if the domain doesn't exist, there is nothing to deregister
services, ok := m.records[options.Domain]
services, ok := m.records[options.Namespace]
if !ok {
return nil
}
@@ -253,7 +237,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// if the nodes not empty, we replace the version in the store and exist, the rest of the logic
// is cleanup
if len(version.Nodes) > 0 {
m.records[options.Domain][s.Name][s.Version] = version
m.records[options.Namespace][s.Name][s.Version] = version
go m.sendEvent(&register.Result{Action: "update", Service: s})
return nil
}
@@ -261,7 +245,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// if this version was the only version of the service, we can remove the whole service from the
// register and exit
if len(versions) == 1 {
delete(m.records[options.Domain], s.Name)
delete(m.records[options.Namespace], s.Name)
go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
@@ -271,7 +255,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
}
// there are other versions of the service running, so only remove this version of it
delete(m.records[options.Domain][s.Name], s.Version)
delete(m.records[options.Namespace][s.Name], s.Version)
go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version))
@@ -284,15 +268,15 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
options := register.NewLookupOptions(opts...)
// if it's a wildcard domain, return from all domains
if options.Domain == register.WildcardDomain {
if options.Namespace == register.WildcardNamespace {
m.RLock()
recs := m.records
m.RUnlock()
var services []*register.Service
for domain := range recs {
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...)
for namespace := range recs {
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupNamespace(namespace))...)
if err == register.ErrNotFound {
continue
} else if err != nil {
@@ -311,7 +295,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
defer m.RUnlock()
// check the domain exists
services, ok := m.records[options.Domain]
services, ok := m.records[options.Namespace]
if !ok {
return nil, register.ErrNotFound
}
@@ -328,7 +312,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
var i int
for _, r := range versions {
result[i] = recordToService(r, options.Domain)
result[i] = recordToService(r, options.Namespace)
i++
}
@@ -339,15 +323,15 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
options := register.NewListOptions(opts...)
// if it's a wildcard domain, list from all domains
if options.Domain == register.WildcardDomain {
if options.Namespace == register.WildcardNamespace {
m.RLock()
recs := m.records
m.RUnlock()
var services []*register.Service
for domain := range recs {
srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...)
for namespace := range recs {
srvs, err := m.ListServices(ctx, append(opts, register.ListNamespace(namespace))...)
if err != nil {
return nil, err
}
@@ -361,7 +345,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
defer m.RUnlock()
// ensure the domain exists
services, ok := m.records[options.Domain]
services, ok := m.records[options.Namespace]
if !ok {
return make([]*register.Service, 0), nil
}
@@ -371,7 +355,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
for _, service := range services {
for _, version := range service {
result = append(result, recordToService(version, options.Domain))
result = append(result, recordToService(version, options.Namespace))
}
}
@@ -426,16 +410,13 @@ func (m *watcher) Next() (*register.Result, error) {
continue
}
// extract domain from service metadata
var domain string
if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 {
domain = r.Service.Metadata["domain"]
} else {
domain = register.DefaultDomain
namespace := register.DefaultNamespace
if r.Service.Namespace != "" {
namespace = r.Service.Namespace
}
// only send the event if watching the wildcard or this specific domain
if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain {
if m.wo.Namespace == register.WildcardNamespace || m.wo.Namespace == namespace {
return r, nil
}
case <-m.exit:
@@ -454,11 +435,6 @@ func (m *watcher) Stop() {
}
func serviceToRecord(s *register.Service, ttl time.Duration) *record {
metadata := make(map[string]string, len(s.Metadata))
for k, v := range s.Metadata {
metadata[k] = v
}
nodes := make(map[string]*node, len(s.Nodes))
for _, n := range s.Nodes {
nodes[n.ID] = &node{
@@ -468,42 +444,19 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
}
}
endpoints := make([]*register.Endpoint, len(s.Endpoints))
copy(endpoints, s.Endpoints)
return &record{
Name: s.Name,
Version: s.Version,
Metadata: metadata,
Nodes: nodes,
Endpoints: endpoints,
Name: s.Name,
Version: s.Version,
Nodes: nodes,
}
}
func recordToService(r *record, domain string) *register.Service {
func recordToService(r *record, namespace string) *register.Service {
metadata := make(map[string]string, len(r.Metadata))
for k, v := range r.Metadata {
metadata[k] = v
}
// set the domain in metadata so it can be determined when a wildcard query is performed
metadata["domain"] = domain
endpoints := make([]*register.Endpoint, len(r.Endpoints))
for i, e := range r.Endpoints {
md := make(map[string]string, len(e.Metadata))
for k, v := range e.Metadata {
md[k] = v
}
endpoints[i] = &register.Endpoint{
Name: e.Name,
Request: e.Request,
Response: e.Response,
Metadata: md,
}
}
nodes := make([]*register.Node, len(r.Nodes))
i := 0
for _, n := range r.Nodes {
@@ -523,8 +476,7 @@ func recordToService(r *record, domain string) *register.Service {
return &register.Service{
Name: r.Name,
Version: r.Version,
Metadata: metadata,
Endpoints: endpoints,
Nodes: nodes,
Namespace: namespace,
}
}