replace memory registry

This commit is contained in:
Asim Aslam 2019-01-18 17:29:17 +00:00
parent 943219f203
commit 48b80dd051
6 changed files with 237 additions and 74 deletions

51
registry/memory/data.go Normal file
View File

@ -0,0 +1,51 @@
package memory
import (
"github.com/micro/go-micro/registry"
)
var (
// mock data
Data = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
)

View File

@ -2,60 +2,24 @@
package memory package memory
import ( import (
"context"
"sync" "sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
type Registry struct { type Registry struct {
options registry.Options
sync.RWMutex sync.RWMutex
Services map[string][]*registry.Service Services map[string][]*registry.Service
Watchers map[string]*Watcher
} }
var ( var (
// mock data timeout = time.Millisecond * 10
Data = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
) )
// Setup sets mock data // Setup sets mock data
@ -67,69 +31,130 @@ func (m *Registry) Setup() {
m.Services = Data m.Services = Data
} }
func (m *Registry) GetService(service string) ([]*registry.Service, error) { func (m *Registry) watch(r *registry.Result) {
m.Lock() var watchers []*Watcher
defer m.Unlock()
m.RLock()
for _, w := range m.Watchers {
watchers = append(watchers, w)
}
m.RUnlock()
for _, w := range watchers {
select {
case <-w.exit:
m.Lock()
delete(m.Watchers, w.id)
m.Unlock()
default:
select {
case w.res <- r:
case <-time.After(timeout):
}
}
}
}
func (m *Registry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&m.options)
}
// add services
m.Lock()
for k, v := range getServices(m.options.Context) {
s := m.Services[k]
m.Services[k] = addServices(s, v)
}
m.Unlock()
return nil
}
func (m *Registry) Options() registry.Options {
return m.options
}
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
m.RLock()
s, ok := m.Services[service] s, ok := m.Services[service]
if !ok || len(s) == 0 { if !ok || len(s) == 0 {
m.RUnlock()
return nil, registry.ErrNotFound return nil, registry.ErrNotFound
} }
m.RUnlock()
return s, nil return s, nil
} }
func (m *Registry) ListServices() ([]*registry.Service, error) { func (m *Registry) ListServices() ([]*registry.Service, error) {
m.Lock() m.RLock()
defer m.Unlock()
var services []*registry.Service var services []*registry.Service
for _, service := range m.Services { for _, service := range m.Services {
services = append(services, service...) services = append(services, service...)
} }
m.RUnlock()
return services, nil return services, nil
} }
func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error { func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
m.Lock() go m.watch(&registry.Result{Action: "update", Service: s})
defer m.Unlock()
m.Lock()
services := addServices(m.Services[s.Name], []*registry.Service{s}) services := addServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services m.Services[s.Name] = services
m.Unlock()
return nil return nil
} }
func (m *Registry) Deregister(s *registry.Service) error { func (m *Registry) Deregister(s *registry.Service) error {
m.Lock() go m.watch(&registry.Result{Action: "delete", Service: s})
defer m.Unlock()
m.Lock()
services := delServices(m.Services[s.Name], []*registry.Service{s}) services := delServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services m.Services[s.Name] = services
m.Unlock()
return nil return nil
} }
func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wopts registry.WatchOptions var wo registry.WatchOptions
for _, o := range opts { for _, o := range opts {
o(&wopts) o(&wo)
} }
return &memoryWatcher{exit: make(chan bool), opts: wopts}, nil
w := &Watcher{
exit: make(chan bool),
res: make(chan *registry.Result),
id: uuid.New().String(),
wo: wo,
}
m.Lock()
m.Watchers[w.id] = w
m.Unlock()
return w, nil
} }
func (m *Registry) String() string { func (m *Registry) String() string {
return "memory" return "memory"
} }
func (m *Registry) Init(opts ...registry.Option) error {
return nil
}
func (m *Registry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry(opts ...registry.Option) registry.Registry { func NewRegistry(opts ...registry.Option) registry.Registry {
options := registry.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
services := getServices(options.Context)
if services == nil {
services = make(map[string][]*registry.Service)
}
return &Registry{ return &Registry{
Services: make(map[string][]*registry.Service), options: options,
Services: services,
Watchers: make(map[string]*Watcher),
} }
} }

View File

@ -80,9 +80,8 @@ var (
} }
) )
func TestMockRegistry(t *testing.T) { func TestMemoryRegistry(t *testing.T) {
m := NewRegistry() m := NewRegistry()
m.(*Registry).Setup()
fn := func(k string, v []*registry.Service) { fn := func(k string, v []*registry.Service) {
services, err := m.GetService(k) services, err := m.GetService(k)
@ -108,11 +107,6 @@ func TestMockRegistry(t *testing.T) {
} }
} }
// test existing memory data
for k, v := range Data {
fn(k, v)
}
// register data // register data
for _, v := range testData { for _, v := range testData {
for _, service := range v { for _, service := range v {
@ -124,7 +118,6 @@ func TestMockRegistry(t *testing.T) {
// using test data // using test data
for k, v := range testData { for k, v := range testData {
fn(k, v) fn(k, v)
} }

View File

@ -0,0 +1,27 @@
package memory
import (
"context"
"github.com/micro/go-micro/registry"
)
type servicesKey struct{}
func getServices(ctx context.Context) map[string][]*registry.Service {
s, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service)
if !ok {
return nil
}
return s
}
// Services is an option that preloads service data
func Services(s map[string][]*registry.Service) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, servicesKey{}, s)
}
}

View File

@ -0,0 +1,37 @@
package memory
import (
"errors"
"github.com/micro/go-micro/registry"
)
type Watcher struct {
id string
wo registry.WatchOptions
res chan *registry.Result
exit chan bool
}
func (m *Watcher) Next() (*registry.Result, error) {
for {
select {
case r := <-m.res:
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
continue
}
return r, nil
case <-m.exit:
return nil, errors.New("watcher stopped")
}
}
}
func (m *Watcher) Stop() {
select {
case <-m.exit:
return
default:
close(m.exit)
}
}

View File

@ -0,0 +1,30 @@
package memory
import (
"testing"
"github.com/micro/go-micro/registry"
)
func TestWatcher(t *testing.T) {
w := &Watcher{
id: "test",
res: make(chan *registry.Result),
exit: make(chan bool),
}
go func() {
w.res <- &registry.Result{}
}()
_, err := w.Next()
if err != nil {
t.Fatal("unexpected err", err)
}
w.Stop()
if _, err := w.Next(); err == nil {
t.Fatal("expected error on Next()")
}
}