From 27bd9581bfc4809784dabe401369a73f85c17be8 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 25 Sep 2019 18:19:18 +0100 Subject: [PATCH] Refresh TTL; prune expired nodes. --- registry/memory/memory.go | 104 ++++++++++++++++++++++++++++++++------ 1 file changed, 88 insertions(+), 16 deletions(-) diff --git a/registry/memory/memory.go b/registry/memory/memory.go index c9e8b353..f7134bf2 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -3,6 +3,7 @@ package memory import ( "context" + "strings" "sync" "time" @@ -12,13 +13,15 @@ import ( ) var ( - timeout = time.Millisecond * 10 + sendEventTime = 10 * time.Millisecond + ttlPruneTime = 1 * time.Minute + DefaultTTL = 1 * time.Minute ) // node tracks node registration timestamp and TTL type node struct { - ts time.Time - ttl time.Duration + lastSeen time.Time + ttl time.Duration } type Registry struct { @@ -44,12 +47,67 @@ func NewRegistry(opts ...registry.Option) registry.Registry { services = make(map[string][]*registry.Service) } - return &Registry{ + reg := &Registry{ options: options, Services: services, nodes: make(map[string]*node), Watchers: make(map[string]*Watcher), } + + go reg.ttlPrune() + + return reg +} + +// nodeTrackId returns a string we use to track a node of a given service +func nodeTrackId(svcName, svcVersion, nodeId string) string { + return svcName + "+" + svcVersion + "+" + nodeId +} + +func (m *Registry) ttlPrune() { + prune := time.NewTicker(ttlPruneTime) + defer prune.Stop() + + for { + select { + case <-prune.C: + m.Lock() + for nodeTrackId, node := range m.nodes { + // if we exceed the TTL threshold we need to stop tracking the node + if time.Since(node.lastSeen) > node.ttl { + // split nodeTrackID into service Name, Version and Node Id + trackIdSplit := strings.Split(nodeTrackId, "+") + svcName, svcVersion, nodeId := trackIdSplit[0], trackIdSplit[1], trackIdSplit[2] + log.Logf("TTL threshold reached for node %s for service %s", nodeId, svcName) + // we need to find a node that expired and delete it from service nodes + if _, ok := m.Services[svcName]; ok { + for _, service := range m.Services[svcName] { + if service.Version != svcVersion { + continue + } + // find expired service node and delete it + var nodes []*registry.Node + for _, n := range service.Nodes { + var del bool + if n.Id == nodeId { + del = true + } + if !del { + nodes = append(nodes, n) + } + } + service.Nodes = nodes + } + } + // stop tracking the node + delete(m.nodes, nodeTrackId) + } + } + m.Unlock() + } + } + + return } func (m *Registry) sendEvent(r *registry.Result) { @@ -70,7 +128,7 @@ func (m *Registry) sendEvent(r *registry.Result) { default: select { case w.res <- r: - case <-time.After(timeout): + case <-time.After(sendEventTime): } } } @@ -125,14 +183,19 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption o(&options) } + // if no TTL has been set, set it to DefaultTTL + if options.TTL.Seconds() == 0.0 { + options.TTL = DefaultTTL + } + if service, ok := m.Services[s.Name]; !ok { m.Services[s.Name] = []*registry.Service{s} // add all nodes into nodes map to track their TTL for _, n := range s.Nodes { log.Logf("Tracking node %s for service %s", n.Id, s.Name) - m.nodes[s.Name+n.Id] = &node{ - ts: time.Now(), - ttl: options.TTL, + m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{ + lastSeen: time.Now(), + ttl: options.TTL, } } go m.sendEvent(®istry.Result{Action: "update", Service: s}) @@ -159,9 +222,9 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption // we know s is the new [version of] service; we need to strart tracking its nodes for _, n := range s.Nodes { log.Logf("Tracking node %s for service %s", n.Id, s.Name) - m.nodes[s.Name+n.Id] = &node{ - ts: time.Now(), - ttl: options.TTL, + m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{ + lastSeen: time.Now(), + ttl: options.TTL, } } go m.sendEvent(®istry.Result{Action: "update", Service: s}) @@ -181,9 +244,9 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption } if !found { log.Logf("Tracking node %s for service %s", n.Id, s.Name) - m.nodes[s.Name+n.Id] = &node{ - ts: time.Now(), - ttl: options.TTL, + m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{ + lastSeen: time.Now(), + ttl: options.TTL, } } } @@ -191,6 +254,15 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption go m.sendEvent(®istry.Result{Action: "update", Service: s}) return nil } + // refresh the timestamp and TTL of the service node + for _, n := range s.Nodes { + trackId := nodeTrackId(s.Name, s.Version, n.Id) + log.Logf("Refreshing TTL for node %s for service %s", n.Id, s.Name) + if trackedNode, ok := m.nodes[trackId]; ok { + trackedNode.lastSeen = time.Now() + trackedNode.ttl = options.TTL + } + } } } @@ -218,7 +290,7 @@ func (m *Registry) Deregister(s *registry.Service) error { if service := registry.Remove(service, []*registry.Service{s}); len(service) == 0 { id := svcNodes[s.Name][s.Version][0] log.Logf("Stopped tracking node %s for service %s", id, s.Name) - delete(m.nodes, s.Name+id) + delete(m.nodes, nodeTrackId(s.Name, s.Version, id)) delete(m.Services, s.Name) } else { // find out which nodes have been removed @@ -233,7 +305,7 @@ func (m *Registry) Deregister(s *registry.Service) error { } if !found { log.Logf("Stopped tracking node %s for service %s", id, s.Name) - delete(m.nodes, s.Name+id) + delete(m.nodes, nodeTrackId(s.Name, s.Version, id)) } } m.Services[s.Name] = service