register/noop: add noop register
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
531
register/memory/memory.go
Normal file
531
register/memory/memory.go
Normal file
@@ -0,0 +1,531 @@
|
||||
package register
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/util/id"
|
||||
)
|
||||
|
||||
var (
|
||||
sendEventTime = 10 * time.Millisecond
|
||||
ttlPruneTime = time.Second
|
||||
)
|
||||
|
||||
type node struct {
|
||||
LastSeen time.Time
|
||||
*register.Node
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
type record struct {
|
||||
Name string
|
||||
Version string
|
||||
Metadata map[string]string
|
||||
Nodes map[string]*node
|
||||
Endpoints []*register.Endpoint
|
||||
}
|
||||
|
||||
type memory struct {
|
||||
sync.RWMutex
|
||||
records map[string]services
|
||||
watchers map[string]*watcher
|
||||
opts register.Options
|
||||
}
|
||||
|
||||
// services is a KV map with service name as the key and a map of records as the value
|
||||
type services map[string]map[string]*record
|
||||
|
||||
// NewRegister returns an initialized in-memory register
|
||||
func NewRegister(opts ...register.Option) register.Register {
|
||||
r := &memory{
|
||||
opts: register.NewOptions(opts...),
|
||||
records: make(map[string]services),
|
||||
watchers: make(map[string]*watcher),
|
||||
}
|
||||
|
||||
go r.ttlPrune()
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (m *memory) ttlPrune() {
|
||||
prune := time.NewTicker(ttlPruneTime)
|
||||
defer prune.Stop()
|
||||
|
||||
for range prune.C {
|
||||
m.Lock()
|
||||
for domain, services := range m.records {
|
||||
for service, versions := range services {
|
||||
for version, record := range versions {
|
||||
for id, n := range record.Nodes {
|
||||
if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register TTL expired for node %s of service %s", n.ID, service)
|
||||
}
|
||||
delete(m.records[domain][service][version].Nodes, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memory) sendEvent(r *register.Result) {
|
||||
m.RLock()
|
||||
watchers := make([]*watcher, 0, len(m.watchers))
|
||||
for _, w := range m.watchers {
|
||||
watchers = append(watchers, w)
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
for _, w := range watchers {
|
||||
select {
|
||||
case <-w.exit:
|
||||
m.Lock()
|
||||
delete(m.watchers, w.id)
|
||||
m.Unlock()
|
||||
default:
|
||||
select {
|
||||
case w.res <- r:
|
||||
case <-time.After(sendEventTime):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memory) Connect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memory) Disconnect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memory) Init(opts ...register.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
|
||||
// add services
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memory) Options() register.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
options := register.NewRegisterOptions(opts...)
|
||||
|
||||
// get the services for this domain from the register
|
||||
srvs, ok := m.records[options.Domain]
|
||||
if !ok {
|
||||
srvs = make(services)
|
||||
}
|
||||
|
||||
// domain is set in metadata so it can be passed to watchers
|
||||
if s.Metadata == nil {
|
||||
s.Metadata = map[string]string{"domain": options.Domain}
|
||||
} else {
|
||||
s.Metadata["domain"] = options.Domain
|
||||
}
|
||||
|
||||
// ensure the service name exists
|
||||
r := serviceToRecord(s, options.TTL)
|
||||
if _, ok := srvs[s.Name]; !ok {
|
||||
srvs[s.Name] = make(map[string]*record)
|
||||
}
|
||||
|
||||
if _, ok := srvs[s.Name][s.Version]; !ok {
|
||||
srvs[s.Name][s.Version] = r
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register added new service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
m.records[options.Domain] = srvs
|
||||
go m.sendEvent(®ister.Result{Action: "create", Service: s})
|
||||
}
|
||||
|
||||
var addedNodes bool
|
||||
|
||||
for _, n := range s.Nodes {
|
||||
// check if already exists
|
||||
if _, ok := srvs[s.Name][s.Version].Nodes[n.ID]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
metadata := make(map[string]string, len(n.Metadata))
|
||||
|
||||
// make copy of metadata
|
||||
for k, v := range n.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
// set the domain
|
||||
metadata["domain"] = options.Domain
|
||||
|
||||
// add the node
|
||||
srvs[s.Name][s.Version].Nodes[n.ID] = &node{
|
||||
Node: ®ister.Node{
|
||||
ID: n.ID,
|
||||
Address: n.Address,
|
||||
Metadata: metadata,
|
||||
},
|
||||
TTL: options.TTL,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
|
||||
addedNodes = true
|
||||
}
|
||||
|
||||
if addedNodes {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register added new node to service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
go m.sendEvent(®ister.Result{Action: "update", Service: s})
|
||||
} else {
|
||||
// refresh TTL and timestamp
|
||||
for _, n := range s.Nodes {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Updated registration for service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
srvs[s.Name][s.Version].Nodes[n.ID].TTL = options.TTL
|
||||
srvs[s.Name][s.Version].Nodes[n.ID].LastSeen = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
m.records[options.Domain] = srvs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
options := register.NewDeregisterOptions(opts...)
|
||||
|
||||
// domain is set in metadata so it can be passed to watchers
|
||||
if s.Metadata == nil {
|
||||
s.Metadata = map[string]string{"domain": options.Domain}
|
||||
} else {
|
||||
s.Metadata["domain"] = options.Domain
|
||||
}
|
||||
|
||||
// if the domain doesn't exist, there is nothing to deregister
|
||||
services, ok := m.records[options.Domain]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// if no services with this name and version exist, there is nothing to deregister
|
||||
versions, ok := services[s.Name]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
version, ok := versions[s.Version]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// deregister all of the service nodes from this version
|
||||
for _, n := range s.Nodes {
|
||||
if _, ok := version.Nodes[n.ID]; ok {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register removed node from service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
delete(version.Nodes, n.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// if the nodes not empty, we replace the version in the store and exist, the rest of the logic
|
||||
// is cleanup
|
||||
if len(version.Nodes) > 0 {
|
||||
m.records[options.Domain][s.Name][s.Version] = version
|
||||
go m.sendEvent(®ister.Result{Action: "update", Service: s})
|
||||
return nil
|
||||
}
|
||||
|
||||
// if this version was the only version of the service, we can remove the whole service from the
|
||||
// register and exit
|
||||
if len(versions) == 1 {
|
||||
delete(m.records[options.Domain], s.Name)
|
||||
go m.sendEvent(®ister.Result{Action: "delete", Service: s})
|
||||
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s", s.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are other versions of the service running, so only remove this version of it
|
||||
delete(m.records[options.Domain][s.Name], s.Version)
|
||||
go m.sendEvent(®ister.Result{Action: "delete", Service: s})
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s, version: %s", s.Name, s.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memory) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) {
|
||||
options := register.NewLookupOptions(opts...)
|
||||
|
||||
// if it's a wildcard domain, return from all domains
|
||||
if options.Domain == register.WildcardDomain {
|
||||
m.RLock()
|
||||
recs := m.records
|
||||
m.RUnlock()
|
||||
|
||||
var services []*register.Service
|
||||
|
||||
for domain := range recs {
|
||||
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...)
|
||||
if err == register.ErrNotFound {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
services = append(services, srvs...)
|
||||
}
|
||||
|
||||
if len(services) == 0 {
|
||||
return nil, register.ErrNotFound
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
// check the domain exists
|
||||
services, ok := m.records[options.Domain]
|
||||
if !ok {
|
||||
return nil, register.ErrNotFound
|
||||
}
|
||||
|
||||
// check the service exists
|
||||
versions, ok := services[name]
|
||||
if !ok || len(versions) == 0 {
|
||||
return nil, register.ErrNotFound
|
||||
}
|
||||
|
||||
// serialize the response
|
||||
result := make([]*register.Service, len(versions))
|
||||
|
||||
var i int
|
||||
|
||||
for _, r := range versions {
|
||||
result[i] = recordToService(r, options.Domain)
|
||||
i++
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) {
|
||||
options := register.NewListOptions(opts...)
|
||||
|
||||
// if it's a wildcard domain, list from all domains
|
||||
if options.Domain == register.WildcardDomain {
|
||||
m.RLock()
|
||||
recs := m.records
|
||||
m.RUnlock()
|
||||
|
||||
var services []*register.Service
|
||||
|
||||
for domain := range recs {
|
||||
srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
services = append(services, srvs...)
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
// ensure the domain exists
|
||||
services, ok := m.records[options.Domain]
|
||||
if !ok {
|
||||
return make([]*register.Service, 0), nil
|
||||
}
|
||||
|
||||
// serialize the result, each version counts as an individual service
|
||||
var result []*register.Service
|
||||
|
||||
for _, service := range services {
|
||||
for _, version := range service {
|
||||
result = append(result, recordToService(version, options.Domain))
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *memory) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) {
|
||||
id, err := id.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wo := register.NewWatchOptions(opts...)
|
||||
// construct the watcher
|
||||
w := &watcher{
|
||||
exit: make(chan bool),
|
||||
res: make(chan *register.Result),
|
||||
id: id,
|
||||
wo: wo,
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.watchers[w.id] = w
|
||||
m.Unlock()
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (m *memory) Name() string {
|
||||
return m.opts.Name
|
||||
}
|
||||
|
||||
func (m *memory) String() string {
|
||||
return "memory"
|
||||
}
|
||||
|
||||
type watcher struct {
|
||||
res chan *register.Result
|
||||
exit chan bool
|
||||
wo register.WatchOptions
|
||||
id string
|
||||
}
|
||||
|
||||
func (m *watcher) Next() (*register.Result, error) {
|
||||
for {
|
||||
select {
|
||||
case r := <-m.res:
|
||||
if r.Service == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
|
||||
continue
|
||||
}
|
||||
|
||||
// extract domain from service metadata
|
||||
var domain string
|
||||
if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 {
|
||||
domain = r.Service.Metadata["domain"]
|
||||
} else {
|
||||
domain = register.DefaultDomain
|
||||
}
|
||||
|
||||
// only send the event if watching the wildcard or this specific domain
|
||||
if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain {
|
||||
return r, nil
|
||||
}
|
||||
case <-m.exit:
|
||||
return nil, register.ErrWatcherStopped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *watcher) Stop() {
|
||||
select {
|
||||
case <-m.exit:
|
||||
return
|
||||
default:
|
||||
close(m.exit)
|
||||
}
|
||||
}
|
||||
|
||||
func serviceToRecord(s *register.Service, ttl time.Duration) *record {
|
||||
metadata := make(map[string]string, len(s.Metadata))
|
||||
for k, v := range s.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
nodes := make(map[string]*node, len(s.Nodes))
|
||||
for _, n := range s.Nodes {
|
||||
nodes[n.ID] = &node{
|
||||
Node: n,
|
||||
TTL: ttl,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
endpoints := make([]*register.Endpoint, len(s.Endpoints))
|
||||
for i, e := range s.Endpoints {
|
||||
endpoints[i] = e
|
||||
}
|
||||
|
||||
return &record{
|
||||
Name: s.Name,
|
||||
Version: s.Version,
|
||||
Metadata: metadata,
|
||||
Nodes: nodes,
|
||||
Endpoints: endpoints,
|
||||
}
|
||||
}
|
||||
|
||||
func recordToService(r *record, domain string) *register.Service {
|
||||
metadata := make(map[string]string, len(r.Metadata))
|
||||
for k, v := range r.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
// set the domain in metadata so it can be determined when a wildcard query is performed
|
||||
metadata["domain"] = domain
|
||||
|
||||
endpoints := make([]*register.Endpoint, len(r.Endpoints))
|
||||
for i, e := range r.Endpoints {
|
||||
md := make(map[string]string, len(e.Metadata))
|
||||
for k, v := range e.Metadata {
|
||||
md[k] = v
|
||||
}
|
||||
|
||||
endpoints[i] = ®ister.Endpoint{
|
||||
Name: e.Name,
|
||||
Request: e.Request,
|
||||
Response: e.Response,
|
||||
Metadata: md,
|
||||
}
|
||||
}
|
||||
|
||||
nodes := make([]*register.Node, len(r.Nodes))
|
||||
i := 0
|
||||
for _, n := range r.Nodes {
|
||||
md := make(map[string]string, len(n.Metadata))
|
||||
for k, v := range n.Metadata {
|
||||
md[k] = v
|
||||
}
|
||||
|
||||
nodes[i] = ®ister.Node{
|
||||
ID: n.ID,
|
||||
Address: n.Address,
|
||||
Metadata: md,
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
return ®ister.Service{
|
||||
Name: r.Name,
|
||||
Version: r.Version,
|
||||
Metadata: metadata,
|
||||
Endpoints: endpoints,
|
||||
Nodes: nodes,
|
||||
}
|
||||
}
|
324
register/memory/memory_test.go
Normal file
324
register/memory/memory_test.go
Normal file
@@ -0,0 +1,324 @@
|
||||
package register
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/register"
|
||||
)
|
||||
|
||||
var testData = map[string][]*register.Service{
|
||||
"foo": {
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
ID: "foo-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
ID: "foo-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
ID: "foo-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
ID: "foo-1.0.3-345",
|
||||
Address: "localhost:8888",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"bar": {
|
||||
{
|
||||
Name: "bar",
|
||||
Version: "default",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
ID: "bar-1.0.0-123",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
{
|
||||
ID: "bar-1.0.0-321",
|
||||
Address: "localhost:9999",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Version: "latest",
|
||||
Nodes: []*register.Node{
|
||||
{
|
||||
ID: "bar-1.0.1-321",
|
||||
Address: "localhost:6666",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func TestMemoryRegistry(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
m := NewRegister()
|
||||
|
||||
fn := func(k string, v []*register.Service) {
|
||||
services, err := m.LookupService(ctx, k)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error getting service %s: %v", k, err)
|
||||
}
|
||||
|
||||
if len(services) != len(v) {
|
||||
t.Errorf("Expected %d services for %s, got %d", len(v), k, len(services))
|
||||
}
|
||||
|
||||
for _, service := range v {
|
||||
var seen bool
|
||||
for _, s := range services {
|
||||
if s.Version == service.Version {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
t.Errorf("expected to find version %s", service.Version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// register data
|
||||
for _, v := range testData {
|
||||
serviceCount := 0
|
||||
for _, service := range v {
|
||||
if err := m.Register(ctx, service); err != nil {
|
||||
t.Errorf("Unexpected register error: %v", err)
|
||||
}
|
||||
serviceCount++
|
||||
// after the service has been registered we should be able to query it
|
||||
services, err := m.LookupService(ctx, service.Name)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error getting service %s: %v", service.Name, err)
|
||||
}
|
||||
if len(services) != serviceCount {
|
||||
t.Errorf("Expected %d services for %s, got %d", serviceCount, service.Name, len(services))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// using test data
|
||||
for k, v := range testData {
|
||||
fn(k, v)
|
||||
}
|
||||
|
||||
services, err := m.ListServices(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when listing services: %v", err)
|
||||
}
|
||||
|
||||
totalServiceCount := 0
|
||||
for _, testSvc := range testData {
|
||||
for range testSvc {
|
||||
totalServiceCount++
|
||||
}
|
||||
}
|
||||
|
||||
if len(services) != totalServiceCount {
|
||||
t.Errorf("Expected total service count: %d, got: %d", totalServiceCount, len(services))
|
||||
}
|
||||
|
||||
// deregister
|
||||
for _, v := range testData {
|
||||
for _, service := range v {
|
||||
if err := m.Deregister(ctx, service); err != nil {
|
||||
t.Errorf("Unexpected deregister error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// after all the service nodes have been deregistered we should not get any results
|
||||
for _, v := range testData {
|
||||
for _, service := range v {
|
||||
services, err := m.LookupService(ctx, service.Name)
|
||||
if err != register.ErrNotFound {
|
||||
t.Errorf("Expected error: %v, got: %v", register.ErrNotFound, err)
|
||||
}
|
||||
if len(services) != 0 {
|
||||
t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryRegistryTTL(t *testing.T) {
|
||||
m := NewRegister()
|
||||
ctx := context.TODO()
|
||||
|
||||
for _, v := range testData {
|
||||
for _, service := range v {
|
||||
if err := m.Register(ctx, service, register.RegisterTTL(time.Millisecond)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(ttlPruneTime * 2)
|
||||
|
||||
for name := range testData {
|
||||
svcs, err := m.LookupService(ctx, name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, svc := range svcs {
|
||||
if len(svc.Nodes) > 0 {
|
||||
t.Fatalf("Service %q still has nodes registered", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryRegistryTTLConcurrent(t *testing.T) {
|
||||
concurrency := 1000
|
||||
waitTime := ttlPruneTime * 2
|
||||
m := NewRegister()
|
||||
ctx := context.TODO()
|
||||
for _, v := range testData {
|
||||
for _, service := range v {
|
||||
if err := m.Register(ctx, service, register.RegisterTTL(waitTime/2)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
|
||||
// t.Logf("test will wait %v, then check TTL timeouts", waitTime)
|
||||
//}
|
||||
|
||||
errChan := make(chan error, concurrency)
|
||||
syncChan := make(chan struct{})
|
||||
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
<-syncChan
|
||||
for name := range testData {
|
||||
svcs, err := m.LookupService(ctx, name)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
for _, svc := range svcs {
|
||||
if len(svc.Nodes) > 0 {
|
||||
errChan <- fmt.Errorf("Service %q still has nodes registered", name)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errChan <- nil
|
||||
}()
|
||||
}
|
||||
|
||||
time.Sleep(waitTime)
|
||||
close(syncChan)
|
||||
|
||||
for i := 0; i < concurrency; i++ {
|
||||
if err := <-errChan; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryWildcard(t *testing.T) {
|
||||
m := NewRegister()
|
||||
ctx := context.TODO()
|
||||
|
||||
testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"}
|
||||
|
||||
if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil {
|
||||
t.Fatalf("Register err: %v", err)
|
||||
}
|
||||
if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil {
|
||||
t.Fatalf("Register err: %v", err)
|
||||
}
|
||||
|
||||
if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil {
|
||||
t.Errorf("List err: %v", err)
|
||||
} else if len(recs) != 1 {
|
||||
t.Errorf("Expected 1 record, got %v", len(recs))
|
||||
}
|
||||
|
||||
if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil {
|
||||
t.Errorf("List err: %v", err)
|
||||
} else if len(recs) != 2 {
|
||||
t.Errorf("Expected 2 records, got %v", len(recs))
|
||||
}
|
||||
|
||||
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil {
|
||||
t.Errorf("Lookup err: %v", err)
|
||||
} else if len(recs) != 1 {
|
||||
t.Errorf("Expected 1 record, got %v", len(recs))
|
||||
}
|
||||
|
||||
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil {
|
||||
t.Errorf("Lookup err: %v", err)
|
||||
} else if len(recs) != 2 {
|
||||
t.Errorf("Expected 2 records, got %v", len(recs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcher(t *testing.T) {
|
||||
testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"}
|
||||
|
||||
ctx := context.TODO()
|
||||
m := NewRegister()
|
||||
m.Init()
|
||||
m.Connect(ctx)
|
||||
wc, err := m.Watch(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("cant watch: %v", err)
|
||||
}
|
||||
defer wc.Stop()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for {
|
||||
_, err := wc.Next()
|
||||
if err != nil {
|
||||
t.Fatal("unexpected err", err)
|
||||
}
|
||||
// t.Logf("changes %#+v", ch.Service)
|
||||
wc.Stop()
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if err := m.Register(ctx, testSrv); err != nil {
|
||||
t.Fatalf("Register err: %v", err)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
if _, err := wc.Next(); err == nil {
|
||||
t.Fatal("expected error on Next()")
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user