Memory registry from #801 with additional tweaks (#951)

* PoC: memory registry using maps instead of slice madness

* Updated proto and handlers. Fixed tests across codebase.

* Implemented ttl pruning for memory registry

* Added extensive memory registry tests

* Squased a bunch of bugs

* Proto indent; memory.Registry.String() returns "memory"

* Write a test to prove memory registry TTLs are busted

Signed-off-by: Erik Hollensbe <github@hollensbe.org>

* Additional memory testing and fixups:

* DefaultTTL removed
* When TTL == 0, it is automatically removed from expiry conditions
* Additional improvements to new tests

Signed-off-by: Erik Hollensbe <github@hollensbe.org>
This commit is contained in:
Erik Hollensbe
2019-11-16 02:55:11 -08:00
committed by Asim Aslam
parent 97c1300f53
commit 4107733453
12 changed files with 476 additions and 496 deletions

View File

@@ -3,7 +3,6 @@ package memory
import (
"context"
"strings"
"sync"
"time"
@@ -14,23 +13,29 @@ import (
var (
sendEventTime = 10 * time.Millisecond
ttlPruneTime = 1 * time.Minute
DefaultTTL = 1 * time.Minute
ttlPruneTime = time.Second
)
// node tracks node registration timestamp and TTL
type node struct {
lastSeen time.Time
ttl time.Duration
*registry.Node
TTL time.Duration
LastSeen time.Time
}
type record struct {
Name string
Version string
Metadata map[string]string
Nodes map[string]*node
Endpoints []*registry.Endpoint
}
type Registry struct {
options registry.Options
sync.RWMutex
Services map[string][]*registry.Service
nodes map[string]*node
Watchers map[string]*Watcher
records map[string]map[string]*record
watchers map[string]*Watcher
}
func NewRegistry(opts ...registry.Option) registry.Registry {
@@ -42,16 +47,15 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
o(&options)
}
services := getServices(options.Context)
if services == nil {
services = make(map[string][]*registry.Service)
records := getServiceRecords(options.Context)
if records == nil {
records = make(map[string]map[string]*record)
}
reg := &Registry{
options: options,
Services: services,
nodes: make(map[string]*node),
Watchers: make(map[string]*Watcher),
records: records,
watchers: make(map[string]*Watcher),
}
go reg.ttlPrune()
@@ -59,11 +63,6 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
return reg
}
// nodeTrackId returns a string we use to track a node of a given service
func nodeTrackId(svcName, svcVersion, nodeId string) string {
return svcName + "+" + svcVersion + "+" + nodeId
}
func (m *Registry) ttlPrune() {
prune := time.NewTicker(ttlPruneTime)
defer prune.Stop()
@@ -72,49 +71,26 @@ func (m *Registry) ttlPrune() {
select {
case <-prune.C:
m.Lock()
for nodeTrackId, node := range m.nodes {
// if the TTL has been set and we exceed the hresholdset by it we stop tracking the node
if node.ttl.Seconds() != 0.0 && time.Since(node.lastSeen) > node.ttl {
// split nodeTrackID into service Name, Version and Node Id
trackIdSplit := strings.Split(nodeTrackId, "+")
svcName, svcVersion, nodeId := trackIdSplit[0], trackIdSplit[1], trackIdSplit[2]
log.Debugf("[memory] Registry TTL expired for service %s, node %s", svcName, nodeId)
// we need to find a node that expired and delete it from service nodes
if _, ok := m.Services[svcName]; ok {
for _, service := range m.Services[svcName] {
if service.Version != svcVersion {
continue
}
// find expired service node and delete it
var nodes []*registry.Node
for _, n := range service.Nodes {
var del bool
if n.Id == nodeId {
del = true
}
if !del {
nodes = append(nodes, n)
}
}
service.Nodes = nodes
for name, records := range m.records {
for version, record := range records {
for id, n := range record.Nodes {
if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL {
log.Debugf("Registry TTL expired for node %s of service %s", n.Id, name)
delete(m.records[name][version].Nodes, id)
}
}
// stop tracking the node
delete(m.nodes, nodeTrackId)
}
}
m.Unlock()
}
}
return
}
func (m *Registry) sendEvent(r *registry.Result) {
watchers := make([]*Watcher, 0, len(m.Watchers))
watchers := make([]*Watcher, 0, len(m.watchers))
m.RLock()
for _, w := range m.Watchers {
for _, w := range m.watchers {
watchers = append(watchers, w)
}
m.RUnlock()
@@ -123,7 +99,7 @@ func (m *Registry) sendEvent(r *registry.Result) {
select {
case <-w.exit:
m.Lock()
delete(m.Watchers, w.id)
delete(m.watchers, w.id)
m.Unlock()
default:
select {
@@ -141,11 +117,24 @@ func (m *Registry) Init(opts ...registry.Option) error {
// add services
m.Lock()
for k, v := range getServices(m.options.Context) {
s := m.Services[k]
m.Services[k] = registry.Merge(s, v)
defer m.Unlock()
records := getServiceRecords(m.options.Context)
for name, record := range records {
// add a whole new service including all of its versions
if _, ok := m.records[name]; !ok {
m.records[name] = record
continue
}
// add the versions of the service we dont track yet
for version, r := range record {
if _, ok := m.records[name][version]; !ok {
m.records[name][version] = r
continue
}
}
}
m.Unlock()
return nil
}
@@ -153,104 +142,61 @@ func (m *Registry) Options() registry.Options {
return m.options
}
func (m *Registry) GetService(name string) ([]*registry.Service, error) {
m.RLock()
service, ok := m.Services[name]
m.RUnlock()
if !ok {
return nil, registry.ErrNotFound
}
return service, nil
}
func (m *Registry) ListServices() ([]*registry.Service, error) {
var services []*registry.Service
m.RLock()
for _, service := range m.Services {
services = append(services, service...)
}
m.RUnlock()
return services, nil
}
func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
log.Debugf("[memory] Registry registering service: %s", s.Name)
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
if service, ok := m.Services[s.Name]; !ok {
m.Services[s.Name] = []*registry.Service{s}
// add all nodes into nodes map to track their TTL
for _, n := range s.Nodes {
log.Debugf("[memory] Registry tracking new service: %s, node %s", s.Name, n.Id)
m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{
lastSeen: time.Now(),
ttl: options.TTL,
}
}
r := serviceToRecord(s, options.TTL)
if _, ok := m.records[s.Name]; !ok {
m.records[s.Name] = make(map[string]*record)
}
if _, ok := m.records[s.Name][s.Version]; !ok {
m.records[s.Name][s.Version] = r
log.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version)
go m.sendEvent(&registry.Result{Action: "update", Service: s})
return nil
} else {
// svcCount keeps the count of all versions of particular service
//svcCount := len(service)
// svcNodes maintains a list of node Ids per particular service version
svcNodes := make(map[string]map[string][]string)
// collect all service ids for all service versions
for _, s := range service {
if _, ok := svcNodes[s.Name]; !ok {
svcNodes[s.Name] = make(map[string][]string)
}
if _, ok := svcNodes[s.Name][s.Version]; !ok {
for _, n := range s.Nodes {
svcNodes[s.Name][s.Version] = append(svcNodes[s.Name][s.Version], n.Id)
}
}
}
// if merged count and original service counts changed we know we are adding a new version of the service
merged := registry.Merge(service, []*registry.Service{s})
// if the node count of any service [version] changed we know we are adding a new node to the service
for _, s := range merged {
// we know that if the node counts have changed we need to track new nodes
if len(s.Nodes) != len(svcNodes[s.Name][s.Version]) {
for _, n := range s.Nodes {
var found bool
for _, id := range svcNodes[s.Name][s.Version] {
if n.Id == id {
found = true
break
}
}
if !found {
log.Debugf("[memory] Registry tracking new node: %s for service %s", n.Id, s.Name)
m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{
lastSeen: time.Now(),
ttl: options.TTL,
}
}
}
m.Services[s.Name] = merged
go m.sendEvent(&registry.Result{Action: "update", Service: s})
return nil
}
// refresh the timestamp and TTL of the service node
for _, n := range s.Nodes {
trackId := nodeTrackId(s.Name, s.Version, n.Id)
log.Debugf("[memory] Registry refreshing TTL for node %s for service %s", n.Id, s.Name)
if trackedNode, ok := m.nodes[trackId]; ok {
trackedNode.lastSeen = time.Now()
trackedNode.ttl = options.TTL
}
addedNodes := false
for _, n := range s.Nodes {
if _, ok := m.records[s.Name][s.Version].Nodes[n.Id]; !ok {
addedNodes = true
metadata := make(map[string]string)
for k, v := range n.Metadata {
metadata[k] = v
m.records[s.Name][s.Version].Nodes[n.Id] = &node{
Node: &registry.Node{
Id: n.Id,
Address: n.Address,
Metadata: metadata,
},
TTL: options.TTL,
LastSeen: time.Now(),
}
}
}
}
if addedNodes {
log.Debugf("Registry added new node to service: %s, version: %s", s.Name, s.Version)
go m.sendEvent(&registry.Result{Action: "update", Service: s})
return nil
}
// refresh TTL and timestamp
for _, n := range s.Nodes {
log.Debugf("Updated registration for service: %s, version: %s", s.Name, s.Version)
m.records[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL
m.records[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now()
}
return nil
}
@@ -258,57 +204,62 @@ func (m *Registry) Deregister(s *registry.Service) error {
m.Lock()
defer m.Unlock()
log.Debugf("[memory] Registry deregistering service: %s", s.Name)
if service, ok := m.Services[s.Name]; ok {
// svcNodes collects the list of all node Ids for each service version
svcNodes := make(map[string]map[string][]string)
// collect all service node ids for all service versions
for _, svc := range service {
if _, ok := svcNodes[svc.Name]; !ok {
svcNodes[svc.Name] = make(map[string][]string)
}
if _, ok := svcNodes[svc.Name][svc.Version]; !ok {
for _, n := range svc.Nodes {
svcNodes[svc.Name][svc.Version] = append(svcNodes[svc.Name][svc.Version], n.Id)
if _, ok := m.records[s.Name]; ok {
if _, ok := m.records[s.Name][s.Version]; ok {
for _, n := range s.Nodes {
if _, ok := m.records[s.Name][s.Version].Nodes[n.Id]; ok {
log.Debugf("Registry removed node from service: %s, version: %s", s.Name, s.Version)
delete(m.records[s.Name][s.Version].Nodes, n.Id)
}
}
}
// if there are no more services we know we have either removed all nodes or there were no nodes
if updatedService := registry.Remove(service, []*registry.Service{s}); len(updatedService) == 0 {
for _, id := range svcNodes[s.Name][s.Version] {
log.Debugf("[memory] Registry stopped tracking node %s for service %s", id, s.Name)
delete(m.nodes, nodeTrackId(s.Name, s.Version, id))
go m.sendEvent(&registry.Result{Action: "delete", Service: s})
}
log.Debugf("[memory] Registry deleting service %s: no service nodes", s.Name)
delete(m.Services, s.Name)
return nil
} else {
// find out which nodes have been removed
for _, id := range svcNodes[s.Name][s.Version] {
for _, svc := range updatedService {
var found bool
for _, n := range svc.Nodes {
if id == n.Id {
found = true
break
}
}
if !found {
log.Debugf("[memory] Registry stopped tracking node %s for service %s", id, s.Name)
delete(m.nodes, nodeTrackId(s.Name, s.Version, id))
go m.sendEvent(&registry.Result{Action: "delete", Service: s})
}
}
m.Services[s.Name] = updatedService
if len(m.records[s.Name][s.Version].Nodes) == 0 {
delete(m.records[s.Name], s.Version)
log.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version)
}
}
if len(m.records[s.Name]) == 0 {
delete(m.records, s.Name)
log.Debugf("Registry removed service: %s", s.Name)
}
go m.sendEvent(&registry.Result{Action: "delete", Service: s})
}
return nil
}
func (m *Registry) GetService(name string) ([]*registry.Service, error) {
m.RLock()
defer m.RUnlock()
records, ok := m.records[name]
if !ok {
return nil, registry.ErrNotFound
}
services := make([]*registry.Service, len(m.records[name]))
i := 0
for _, record := range records {
services[i] = recordToService(record)
i++
}
return services, nil
}
func (m *Registry) ListServices() ([]*registry.Service, error) {
m.RLock()
defer m.RUnlock()
var services []*registry.Service
for _, records := range m.records {
for _, record := range records {
services = append(services, recordToService(record))
}
}
return services, nil
}
func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
@@ -323,8 +274,9 @@ func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error)
}
m.Lock()
m.Watchers[w.id] = w
m.watchers[w.id] = w
m.Unlock()
return w, nil
}

View File

@@ -1,7 +1,9 @@
package memory
import (
"fmt"
"testing"
"time"
"github.com/micro/go-micro/registry"
)
@@ -102,10 +104,20 @@ func TestMemoryRegistry(t *testing.T) {
// register data
for _, v := range testData {
serviceCount := 0
for _, service := range v {
if err := m.Register(service); err != nil {
t.Errorf("Unexpected register error: %v", err)
}
serviceCount++
// after the service has been registered we should be able to query it
services, err := m.GetService(service.Name)
if err != nil {
t.Errorf("Unexpected error getting service %s: %v", service.Name, err)
}
if len(services) != serviceCount {
t.Errorf("Expected %d services for %s, got %d", serviceCount, service.Name, len(services))
}
}
}
@@ -114,6 +126,22 @@ func TestMemoryRegistry(t *testing.T) {
fn(k, v)
}
services, err := m.ListServices()
if err != nil {
t.Errorf("Unexpected error when listing services: %v", err)
}
totalServiceCount := 0
for _, testSvc := range testData {
for range testSvc {
totalServiceCount++
}
}
if len(services) != totalServiceCount {
t.Errorf("Expected total service count: %d, got: %d", totalServiceCount, len(services))
}
// deregister
for _, v := range testData {
for _, service := range v {
@@ -122,4 +150,94 @@ func TestMemoryRegistry(t *testing.T) {
}
}
}
// after all the service nodes have been deregistered we should not get any results
for _, v := range testData {
for _, service := range v {
services, err := m.GetService(service.Name)
if err != registry.ErrNotFound {
t.Errorf("Expected error: %v, got: %v", registry.ErrNotFound, err)
}
if len(services) != 0 {
t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services))
}
}
}
}
func TestMemoryRegistryTTL(t *testing.T) {
m := NewRegistry()
for _, v := range testData {
for _, service := range v {
if err := m.Register(service, registry.RegisterTTL(time.Millisecond)); err != nil {
t.Fatal(err)
}
}
}
time.Sleep(ttlPruneTime * 2)
for name := range testData {
svcs, err := m.GetService(name)
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if len(svc.Nodes) > 0 {
t.Fatalf("Service %q still has nodes registered", name)
}
}
}
}
func TestMemoryRegistryTTLConcurrent(t *testing.T) {
concurrency := 1000
waitTime := ttlPruneTime * 2
m := NewRegistry()
for _, v := range testData {
for _, service := range v {
if err := m.Register(service, registry.RegisterTTL(waitTime)); err != nil {
t.Fatal(err)
}
}
}
t.Logf("test will wait %v, then check TTL timeouts", waitTime)
errChan := make(chan error, concurrency)
syncChan := make(chan struct{})
for i := 0; i < concurrency; i++ {
go func() {
<-syncChan
for name := range testData {
svcs, err := m.GetService(name)
if err != nil {
errChan <- err
return
}
for _, svc := range svcs {
if len(svc.Nodes) > 0 {
errChan <- fmt.Errorf("Service %q still has nodes registered", name)
return
}
}
}
errChan <- nil
}()
}
time.Sleep(waitTime)
close(syncChan)
for i := 0; i < concurrency; i++ {
if err := <-errChan; err != nil {
t.Fatal(err)
}
}
}

View File

@@ -8,12 +8,25 @@ import (
type servicesKey struct{}
func getServices(ctx context.Context) map[string][]*registry.Service {
s, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service)
func getServiceRecords(ctx context.Context) map[string]map[string]*record {
memServices, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service)
if !ok {
return nil
}
return s
services := make(map[string]map[string]*record)
for name, svc := range memServices {
if _, ok := services[name]; !ok {
services[name] = make(map[string]*record)
}
// go through every version of the service
for _, s := range svc {
services[s.Name][s.Version] = serviceToRecord(s, 0)
}
}
return services
}
// Services is an option that preloads service data

87
registry/memory/util.go Normal file
View File

@@ -0,0 +1,87 @@
package memory
import (
"time"
"github.com/micro/go-micro/registry"
)
func serviceToRecord(s *registry.Service, ttl time.Duration) *record {
metadata := make(map[string]string)
for k, v := range s.Metadata {
metadata[k] = v
}
nodes := make(map[string]*node)
for _, n := range s.Nodes {
nodes[n.Id] = &node{
Node: n,
TTL: ttl,
LastSeen: time.Now(),
}
}
endpoints := make([]*registry.Endpoint, len(s.Endpoints))
for i, e := range s.Endpoints {
endpoints[i] = e
}
return &record{
Name: s.Name,
Version: s.Version,
Metadata: metadata,
Nodes: nodes,
Endpoints: endpoints,
}
}
func recordToService(r *record) *registry.Service {
metadata := make(map[string]string)
for k, v := range r.Metadata {
metadata[k] = v
}
endpoints := make([]*registry.Endpoint, len(r.Endpoints))
for i, e := range r.Endpoints {
request := new(registry.Value)
request = e.Request
response := new(registry.Value)
response = e.Response
metadata := make(map[string]string)
for k, v := range e.Metadata {
metadata[k] = v
}
endpoints[i] = &registry.Endpoint{
Name: e.Name,
Request: request,
Response: response,
Metadata: metadata,
}
}
nodes := make([]*registry.Node, len(r.Nodes))
i := 0
for _, n := range r.Nodes {
metadata := make(map[string]string)
for k, v := range n.Metadata {
metadata[k] = v
}
nodes[i] = &registry.Node{
Id: n.Id,
Address: n.Address,
Metadata: metadata,
}
i++
}
return &registry.Service{
Name: r.Name,
Version: r.Version,
Metadata: metadata,
Endpoints: endpoints,
Nodes: nodes,
}
}