diff --git a/registry/memory/data.go b/registry/memory/data.go new file mode 100644 index 00000000..7999fea8 --- /dev/null +++ b/registry/memory/data.go @@ -0,0 +1,51 @@ +package memory + +import ( + "github.com/micro/go-micro/registry" +) + +var ( + // mock data + Data = map[string][]*registry.Service{ + "foo": []*registry.Service{ + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.0-123", + Address: "localhost", + Port: 9999, + }, + { + Id: "foo-1.0.0-321", + Address: "localhost", + Port: 9999, + }, + }, + }, + { + Name: "foo", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.1-321", + Address: "localhost", + Port: 6666, + }, + }, + }, + { + Name: "foo", + Version: "1.0.3", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.3-345", + Address: "localhost", + Port: 8888, + }, + }, + }, + }, + } +) diff --git a/registry/memory/memory.go b/registry/memory/memory.go index f516720a..64b7d4ca 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -2,60 +2,24 @@ package memory import ( + "context" "sync" + "time" + "github.com/google/uuid" "github.com/micro/go-micro/registry" ) type Registry struct { + options registry.Options + sync.RWMutex Services map[string][]*registry.Service + Watchers map[string]*Watcher } var ( - // mock data - Data = map[string][]*registry.Service{ - "foo": []*registry.Service{ - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.0-123", - Address: "localhost", - Port: 9999, - }, - { - Id: "foo-1.0.0-321", - Address: "localhost", - Port: 9999, - }, - }, - }, - { - Name: "foo", - Version: "1.0.1", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.1-321", - Address: "localhost", - Port: 6666, - }, - }, - }, - { - Name: "foo", - Version: "1.0.3", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.3-345", - Address: "localhost", - Port: 8888, - }, - }, - }, - }, - } + timeout = time.Millisecond * 10 ) // Setup sets mock data @@ -67,69 +31,130 @@ func (m *Registry) Setup() { m.Services = Data } -func (m *Registry) GetService(service string) ([]*registry.Service, error) { - m.Lock() - defer m.Unlock() +func (m *Registry) watch(r *registry.Result) { + var watchers []*Watcher + m.RLock() + for _, w := range m.Watchers { + watchers = append(watchers, w) + } + m.RUnlock() + + for _, w := range watchers { + select { + case <-w.exit: + m.Lock() + delete(m.Watchers, w.id) + m.Unlock() + default: + select { + case w.res <- r: + case <-time.After(timeout): + } + } + } +} + +func (m *Registry) Init(opts ...registry.Option) error { + for _, o := range opts { + o(&m.options) + } + + // add services + m.Lock() + for k, v := range getServices(m.options.Context) { + s := m.Services[k] + m.Services[k] = addServices(s, v) + } + m.Unlock() + return nil +} + +func (m *Registry) Options() registry.Options { + return m.options +} + +func (m *Registry) GetService(service string) ([]*registry.Service, error) { + m.RLock() s, ok := m.Services[service] if !ok || len(s) == 0 { + m.RUnlock() return nil, registry.ErrNotFound } + m.RUnlock() return s, nil - } func (m *Registry) ListServices() ([]*registry.Service, error) { - m.Lock() - defer m.Unlock() - + m.RLock() var services []*registry.Service for _, service := range m.Services { services = append(services, service...) } + m.RUnlock() return services, nil } func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error { - m.Lock() - defer m.Unlock() + go m.watch(®istry.Result{Action: "update", Service: s}) + m.Lock() services := addServices(m.Services[s.Name], []*registry.Service{s}) m.Services[s.Name] = services + m.Unlock() return nil } func (m *Registry) Deregister(s *registry.Service) error { - m.Lock() - defer m.Unlock() + go m.watch(®istry.Result{Action: "delete", Service: s}) + m.Lock() services := delServices(m.Services[s.Name], []*registry.Service{s}) m.Services[s.Name] = services + m.Unlock() return nil } func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - var wopts registry.WatchOptions + var wo registry.WatchOptions for _, o := range opts { - o(&wopts) + o(&wo) } - return &memoryWatcher{exit: make(chan bool), opts: wopts}, nil + + w := &Watcher{ + exit: make(chan bool), + res: make(chan *registry.Result), + id: uuid.New().String(), + wo: wo, + } + + m.Lock() + m.Watchers[w.id] = w + m.Unlock() + return w, nil } func (m *Registry) String() string { return "memory" } -func (m *Registry) Init(opts ...registry.Option) error { - return nil -} - -func (m *Registry) Options() registry.Options { - return registry.Options{} -} - func NewRegistry(opts ...registry.Option) registry.Registry { + options := registry.Options{ + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + services := getServices(options.Context) + if services == nil { + services = make(map[string][]*registry.Service) + } + return &Registry{ - Services: make(map[string][]*registry.Service), + options: options, + Services: services, + Watchers: make(map[string]*Watcher), } } diff --git a/registry/memory/memory_test.go b/registry/memory/memory_test.go index 2eb21009..3b5cd8be 100644 --- a/registry/memory/memory_test.go +++ b/registry/memory/memory_test.go @@ -80,9 +80,8 @@ var ( } ) -func TestMockRegistry(t *testing.T) { +func TestMemoryRegistry(t *testing.T) { m := NewRegistry() - m.(*Registry).Setup() fn := func(k string, v []*registry.Service) { services, err := m.GetService(k) @@ -108,11 +107,6 @@ func TestMockRegistry(t *testing.T) { } } - // test existing memory data - for k, v := range Data { - fn(k, v) - } - // register data for _, v := range testData { for _, service := range v { @@ -124,7 +118,6 @@ func TestMockRegistry(t *testing.T) { // using test data for k, v := range testData { - fn(k, v) } diff --git a/registry/memory/options.go b/registry/memory/options.go new file mode 100644 index 00000000..64680fdc --- /dev/null +++ b/registry/memory/options.go @@ -0,0 +1,27 @@ +package memory + +import ( + "context" + + "github.com/micro/go-micro/registry" +) + +type servicesKey struct{} + +func getServices(ctx context.Context) map[string][]*registry.Service { + s, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service) + if !ok { + return nil + } + return s +} + +// Services is an option that preloads service data +func Services(s map[string][]*registry.Service) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, servicesKey{}, s) + } +} diff --git a/registry/memory/watcher.go b/registry/memory/watcher.go new file mode 100644 index 00000000..c1a7067c --- /dev/null +++ b/registry/memory/watcher.go @@ -0,0 +1,37 @@ +package memory + +import ( + "errors" + + "github.com/micro/go-micro/registry" +) + +type Watcher struct { + id string + wo registry.WatchOptions + res chan *registry.Result + exit chan bool +} + +func (m *Watcher) Next() (*registry.Result, error) { + for { + select { + case r := <-m.res: + if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name { + continue + } + return r, nil + case <-m.exit: + return nil, errors.New("watcher stopped") + } + } +} + +func (m *Watcher) Stop() { + select { + case <-m.exit: + return + default: + close(m.exit) + } +} diff --git a/registry/memory/watcher_test.go b/registry/memory/watcher_test.go new file mode 100644 index 00000000..1df468fd --- /dev/null +++ b/registry/memory/watcher_test.go @@ -0,0 +1,30 @@ +package memory + +import ( + "testing" + + "github.com/micro/go-micro/registry" +) + +func TestWatcher(t *testing.T) { + w := &Watcher{ + id: "test", + res: make(chan *registry.Result), + exit: make(chan bool), + } + + go func() { + w.res <- ®istry.Result{} + }() + + _, err := w.Next() + if err != nil { + t.Fatal("unexpected err", err) + } + + w.Stop() + + if _, err := w.Next(); err == nil { + t.Fatal("expected error on Next()") + } +}