diff --git a/register/memory.go b/register/memory/memory.go similarity index 77% rename from register/memory.go rename to register/memory/memory.go index 8cd75935..8be85a0f 100644 --- a/register/memory.go +++ b/register/memory/memory.go @@ -1,7 +1,8 @@ -package register +package memory import ( "context" + "go.unistack.org/micro/v4/register" "sync" "time" @@ -16,7 +17,7 @@ var ( type node struct { LastSeen time.Time - *Node + *register.Node TTL time.Duration } @@ -25,23 +26,23 @@ type record struct { Version string Metadata map[string]string Nodes map[string]*node - Endpoints []*Endpoint + Endpoints []*register.Endpoint } type memory struct { sync.RWMutex records map[string]services watchers map[string]*watcher - opts Options + opts register.Options } // services is a KV map with service name as the key and a map of records as the value type services map[string]map[string]*record // NewRegister returns an initialized in-memory register -func NewRegister(opts ...Option) Register { +func NewRegister(opts ...register.Option) register.Register { r := &memory{ - opts: NewOptions(opts...), + opts: register.NewOptions(opts...), records: make(map[string]services), watchers: make(map[string]*watcher), } @@ -75,7 +76,7 @@ func (m *memory) ttlPrune() { } } -func (m *memory) sendEvent(r *Result) { +func (m *memory) sendEvent(r *register.Result) { m.RLock() watchers := make([]*watcher, 0, len(m.watchers)) for _, w := range m.watchers { @@ -106,7 +107,7 @@ func (m *memory) Disconnect(ctx context.Context) error { return nil } -func (m *memory) Init(opts ...Option) error { +func (m *memory) Init(opts ...register.Option) error { for _, o := range opts { o(&m.opts) } @@ -118,15 +119,15 @@ func (m *memory) Init(opts ...Option) error { return nil } -func (m *memory) Options() Options { +func (m *memory) Options() register.Options { return m.opts } -func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOption) error { +func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error { m.Lock() defer m.Unlock() - options := NewRegisterOptions(opts...) + options := register.NewRegisterOptions(opts...) // get the services for this domain from the register srvs, ok := m.records[options.Domain] @@ -153,7 +154,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio m.opts.Logger.Debug(m.opts.Context, "register added new service: "+s.Name+", version "+s.Version) } m.records[options.Domain] = srvs - go m.sendEvent(&Result{Action: "create", Service: s}) + go m.sendEvent(®ister.Result{Action: "create", Service: s}) } var addedNodes bool @@ -176,7 +177,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio // add the node srvs[s.Name][s.Version].Nodes[n.ID] = &node{ - Node: &Node{ + Node: ®ister.Node{ ID: n.ID, Address: n.Address, Metadata: metadata, @@ -192,7 +193,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, "register added new node to service: "+s.Name+", version "+s.Version) } - go m.sendEvent(&Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: "update", Service: s}) } else { // refresh TTL and timestamp for _, n := range s.Nodes { @@ -208,11 +209,11 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio return nil } -func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterOption) error { +func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error { m.Lock() defer m.Unlock() - options := NewDeregisterOptions(opts...) + options := register.NewDeregisterOptions(opts...) // domain is set in metadata so it can be passed to watchers if s.Metadata == nil { @@ -252,7 +253,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // is cleanup if len(version.Nodes) > 0 { m.records[options.Domain][s.Name][s.Version] = version - go m.sendEvent(&Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: "update", Service: s}) return nil } @@ -260,7 +261,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // register and exit if len(versions) == 1 { delete(m.records[options.Domain], s.Name) - go m.sendEvent(&Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, "register removed service: "+s.Name) @@ -270,7 +271,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO // there are other versions of the service running, so only remove this version of it delete(m.records[options.Domain][s.Name], s.Version) - go m.sendEvent(&Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, "register removed service: "+s.Name+", version "+s.Version) } @@ -278,20 +279,20 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO return nil } -func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) { - options := NewLookupOptions(opts...) +func (m *memory) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) { + options := register.NewLookupOptions(opts...) // if it's a wildcard domain, return from all domains - if options.Domain == WildcardDomain { + if options.Domain == register.WildcardDomain { m.RLock() recs := m.records m.RUnlock() - var services []*Service + var services []*register.Service for domain := range recs { - srvs, err := m.LookupService(ctx, name, append(opts, LookupDomain(domain))...) - if err == ErrNotFound { + srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...) + if err == register.ErrNotFound { continue } else if err != nil { return nil, err @@ -300,7 +301,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO } if len(services) == 0 { - return nil, ErrNotFound + return nil, register.ErrNotFound } return services, nil } @@ -311,17 +312,17 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO // check the domain exists services, ok := m.records[options.Domain] if !ok { - return nil, ErrNotFound + return nil, register.ErrNotFound } // check the service exists versions, ok := services[name] if !ok || len(versions) == 0 { - return nil, ErrNotFound + return nil, register.ErrNotFound } // serialize the response - result := make([]*Service, len(versions)) + result := make([]*register.Service, len(versions)) var i int @@ -333,19 +334,19 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO return result, nil } -func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { - options := NewListOptions(opts...) +func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) { + options := register.NewListOptions(opts...) // if it's a wildcard domain, list from all domains - if options.Domain == WildcardDomain { + if options.Domain == register.WildcardDomain { m.RLock() recs := m.records m.RUnlock() - var services []*Service + var services []*register.Service for domain := range recs { - srvs, err := m.ListServices(ctx, append(opts, ListDomain(domain))...) + srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...) if err != nil { return nil, err } @@ -361,11 +362,11 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi // ensure the domain exists services, ok := m.records[options.Domain] if !ok { - return make([]*Service, 0), nil + return make([]*register.Service, 0), nil } // serialize the result, each version counts as an individual service - var result []*Service + var result []*register.Service for _, service := range services { for _, version := range service { @@ -376,16 +377,16 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi return result, nil } -func (m *memory) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { +func (m *memory) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) { id, err := id.New() if err != nil { return nil, err } - wo := NewWatchOptions(opts...) + wo := register.NewWatchOptions(opts...) // construct the watcher w := &watcher{ exit: make(chan bool), - res: make(chan *Result), + res: make(chan *register.Result), id: id, wo: wo, } @@ -406,13 +407,13 @@ func (m *memory) String() string { } type watcher struct { - res chan *Result + res chan *register.Result exit chan bool - wo WatchOptions + wo register.WatchOptions id string } -func (m *watcher) Next() (*Result, error) { +func (m *watcher) Next() (*register.Result, error) { for { select { case r := <-m.res: @@ -429,15 +430,15 @@ func (m *watcher) Next() (*Result, error) { if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 { domain = r.Service.Metadata["domain"] } else { - domain = DefaultDomain + domain = register.DefaultDomain } // only send the event if watching the wildcard or this specific domain - if m.wo.Domain == WildcardDomain || m.wo.Domain == domain { + if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain { return r, nil } case <-m.exit: - return nil, ErrWatcherStopped + return nil, register.ErrWatcherStopped } } } @@ -451,7 +452,7 @@ func (m *watcher) Stop() { } } -func serviceToRecord(s *Service, ttl time.Duration) *record { +func serviceToRecord(s *register.Service, ttl time.Duration) *record { metadata := make(map[string]string, len(s.Metadata)) for k, v := range s.Metadata { metadata[k] = v @@ -466,7 +467,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record { } } - endpoints := make([]*Endpoint, len(s.Endpoints)) + endpoints := make([]*register.Endpoint, len(s.Endpoints)) for i, e := range s.Endpoints { // TODO: vtolstov use copy endpoints[i] = e } @@ -480,7 +481,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record { } } -func recordToService(r *record, domain string) *Service { +func recordToService(r *record, domain string) *register.Service { metadata := make(map[string]string, len(r.Metadata)) for k, v := range r.Metadata { metadata[k] = v @@ -489,14 +490,14 @@ func recordToService(r *record, domain string) *Service { // set the domain in metadata so it can be determined when a wildcard query is performed metadata["domain"] = domain - endpoints := make([]*Endpoint, len(r.Endpoints)) + endpoints := make([]*register.Endpoint, len(r.Endpoints)) for i, e := range r.Endpoints { md := make(map[string]string, len(e.Metadata)) for k, v := range e.Metadata { md[k] = v } - endpoints[i] = &Endpoint{ + endpoints[i] = ®ister.Endpoint{ Name: e.Name, Request: e.Request, Response: e.Response, @@ -504,7 +505,7 @@ func recordToService(r *record, domain string) *Service { } } - nodes := make([]*Node, len(r.Nodes)) + nodes := make([]*register.Node, len(r.Nodes)) i := 0 for _, n := range r.Nodes { md := make(map[string]string, len(n.Metadata)) @@ -512,7 +513,7 @@ func recordToService(r *record, domain string) *Service { md[k] = v } - nodes[i] = &Node{ + nodes[i] = ®ister.Node{ ID: n.ID, Address: n.Address, Metadata: md, @@ -520,7 +521,7 @@ func recordToService(r *record, domain string) *Service { i++ } - return &Service{ + return ®ister.Service{ Name: r.Name, Version: r.Version, Metadata: metadata, diff --git a/register/memory_test.go b/register/memory/memory_test.go similarity index 75% rename from register/memory_test.go rename to register/memory/memory_test.go index 75b2c798..c4dd7151 100644 --- a/register/memory_test.go +++ b/register/memory/memory_test.go @@ -1,19 +1,22 @@ -package register +package memory import ( "context" "fmt" + "go.unistack.org/micro/v4" + "go.unistack.org/micro/v4/register" + "reflect" "sync" "testing" "time" ) -var testData = map[string][]*Service{ +var testData = map[string][]*register.Service{ "foo": { { Name: "foo", Version: "1.0.0", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.0-123", Address: "localhost:9999", @@ -27,7 +30,7 @@ var testData = map[string][]*Service{ { Name: "foo", Version: "1.0.1", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.1-321", Address: "localhost:6666", @@ -37,7 +40,7 @@ var testData = map[string][]*Service{ { Name: "foo", Version: "1.0.3", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "foo-1.0.3-345", Address: "localhost:8888", @@ -49,7 +52,7 @@ var testData = map[string][]*Service{ { Name: "bar", Version: "default", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "bar-1.0.0-123", Address: "localhost:9999", @@ -63,7 +66,7 @@ var testData = map[string][]*Service{ { Name: "bar", Version: "latest", - Nodes: []*Node{ + Nodes: []*register.Node{ { ID: "bar-1.0.1-321", Address: "localhost:6666", @@ -78,7 +81,7 @@ func TestMemoryRegistry(t *testing.T) { ctx := context.TODO() m := NewRegister() - fn := func(k string, v []*Service) { + fn := func(k string, v []*register.Service) { services, err := m.LookupService(ctx, k) if err != nil { t.Errorf("Unexpected error getting service %s: %v", k, err) @@ -155,8 +158,8 @@ func TestMemoryRegistry(t *testing.T) { for _, v := range testData { for _, service := range v { services, err := m.LookupService(ctx, service.Name) - if err != ErrNotFound { - t.Errorf("Expected error: %v, got: %v", ErrNotFound, err) + if err != register.ErrNotFound { + t.Errorf("Expected error: %v, got: %v", register.ErrNotFound, err) } if len(services) != 0 { t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services)) @@ -171,7 +174,7 @@ func TestMemoryRegistryTTL(t *testing.T) { for _, v := range testData { for _, service := range v { - if err := m.Register(ctx, service, RegisterTTL(time.Millisecond)); err != nil { + if err := m.Register(ctx, service, register.RegisterTTL(time.Millisecond)); err != nil { t.Fatal(err) } } @@ -200,7 +203,7 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) { ctx := context.TODO() for _, v := range testData { for _, service := range v { - if err := m.Register(ctx, service, RegisterTTL(waitTime/2)); err != nil { + if err := m.Register(ctx, service, register.RegisterTTL(waitTime/2)); err != nil { t.Fatal(err) } } @@ -249,34 +252,34 @@ func TestMemoryWildcard(t *testing.T) { m := NewRegister() ctx := context.TODO() - testSrv := &Service{Name: "foo", Version: "1.0.0"} + testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} - if err := m.Register(ctx, testSrv, RegisterDomain("one")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil { t.Fatalf("Register err: %v", err) } - if err := m.Register(ctx, testSrv, RegisterDomain("two")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil { t.Fatalf("Register err: %v", err) } - if recs, err := m.ListServices(ctx, ListDomain("one")); err != nil { + if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.ListServices(ctx, ListDomain("*")); err != nil { + if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("one")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("*")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) @@ -284,7 +287,7 @@ func TestMemoryWildcard(t *testing.T) { } func TestWatcher(t *testing.T) { - testSrv := &Service{Name: "foo", Version: "1.0.0"} + testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} ctx := context.TODO() m := NewRegister() @@ -320,3 +323,37 @@ func TestWatcher(t *testing.T) { t.Fatal("expected error on Next()") } } + +func Test_service_Register(t *testing.T) { + t.Skip() + r := NewRegister() + + type args struct { + names []string + } + tests := []struct { + name string + opts []micro.Option + args args + want register.Register + }{ + { + name: "service.Register", + opts: []micro.Option{micro.Register(r)}, + args: args{ + names: []string{"memory"}, + }, + want: r, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := micro.NewService(tt.opts...) + + if got := s.Register(tt.args.names...); !reflect.DeepEqual(got, tt.want) { + t.Errorf("service.Register() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/register/noop.go b/register/noop.go new file mode 100644 index 00000000..4fb692d3 --- /dev/null +++ b/register/noop.go @@ -0,0 +1,72 @@ +package register + +import "context" + +type noop struct { + opts Options +} + +func NewRegister(opts ...Option) Register { + return &noop{ + opts: NewOptions(opts...), + } +} + +func (n *noop) Name() string { + return n.opts.Name +} + +func (n *noop) Init(opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } + return nil +} + +func (n *noop) Options() Options { + return n.opts +} + +func (n *noop) Connect(ctx context.Context) error { + return nil +} + +func (n *noop) Disconnect(ctx context.Context) error { + return nil +} + +func (n *noop) Register(ctx context.Context, service *Service, option ...RegisterOption) error { + return nil +} + +func (n *noop) Deregister(ctx context.Context, service *Service, option ...DeregisterOption) error { + return nil +} + +func (n *noop) LookupService(ctx context.Context, s string, option ...LookupOption) ([]*Service, error) { + return nil, nil +} + +func (n *noop) ListServices(ctx context.Context, option ...ListOption) ([]*Service, error) { + return nil, nil +} + +func (n *noop) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + wOpts := NewWatchOptions(opts...) + + return &watcher{wo: wOpts}, nil +} + +func (n *noop) String() string { + return "noop" +} + +type watcher struct { + wo WatchOptions +} + +func (m *watcher) Next() (*Result, error) { + return nil, nil +} + +func (m *watcher) Stop() {} diff --git a/register/register.go b/register/register.go index 74526544..c06ccc9f 100644 --- a/register/register.go +++ b/register/register.go @@ -4,7 +4,6 @@ package register // import "go.unistack.org/micro/v4/register" import ( "context" "errors" - "go.unistack.org/micro/v4/metadata" ) diff --git a/service_test.go b/service_test.go index 582318da..08a676fe 100644 --- a/service_test.go +++ b/service_test.go @@ -1,6 +1,7 @@ package micro import ( + "go.unistack.org/micro/v4/register/memory" "reflect" "testing" @@ -425,7 +426,7 @@ func Test_service_Store(t *testing.T) { } func Test_service_Register(t *testing.T) { - r := register.NewRegister() + r := memory.NewRegister() type fields struct { opts Options } @@ -444,7 +445,7 @@ func Test_service_Register(t *testing.T) { opts: Options{Registers: []register.Register{r}}, }, args: args{ - names: []string{"noop"}, + names: []string{"memory"}, }, want: r, },