From 02260dcaa321ce8a823468e1b831680063464f3b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 19 Feb 2018 17:12:37 +0000 Subject: [PATCH] Add watch options --- registry/consul_registry.go | 4 ++-- registry/consul_watcher.go | 15 +++++++++++++- registry/mdns/mdns.go | 8 +++++++- registry/mdns/watcher.go | 7 +++++++ registry/mock/mock.go | 8 ++++++-- registry/mock/mock_watcher.go | 1 + registry/options.go | 16 +++++++++++++++ registry/registry.go | 8 +++++--- selector/cache/cache.go | 38 ++++++++++++++++++----------------- 9 files changed, 78 insertions(+), 27 deletions(-) diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 62344f35..80beb393 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -279,8 +279,8 @@ func (c *consulRegistry) ListServices() ([]*Service, error) { return services, nil } -func (c *consulRegistry) Watch() (Watcher, error) { - return newConsulWatcher(c) +func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) { + return newConsulWatcher(c, opts...) } func (c *consulRegistry) String() string { diff --git a/registry/consul_watcher.go b/registry/consul_watcher.go index 742b53ee..c34ec9c6 100644 --- a/registry/consul_watcher.go +++ b/registry/consul_watcher.go @@ -10,6 +10,7 @@ import ( type consulWatcher struct { r *consulRegistry + wo WatchOptions wp *watch.Plan watchers map[string]*watch.Plan @@ -20,9 +21,15 @@ type consulWatcher struct { services map[string][]*Service } -func newConsulWatcher(cr *consulRegistry) (Watcher, error) { +func newConsulWatcher(cr *consulRegistry, opts ...WatchOption) (Watcher, error) { + var wo WatchOptions + for _, o := range opts { + o(&wo) + } + cw := &consulWatcher{ r: cr, + wo: wo, exit: make(chan bool), next: make(chan *Result, 10), watchers: make(map[string]*watch.Plan), @@ -185,6 +192,12 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) { // add new watchers for service, _ := range services { + // Filter on watch options + // wo.Service: Only watch services we care about + if len(cw.wo.Service) > 0 && service != cw.wo.Service { + continue + } + if _, ok := cw.watchers[service]; ok { continue } diff --git a/registry/mdns/mdns.go b/registry/mdns/mdns.go index 47f9f8d5..d8ffef7a 100644 --- a/registry/mdns/mdns.go +++ b/registry/mdns/mdns.go @@ -297,8 +297,14 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) { return services, nil } -func (m *mdnsRegistry) Watch() (registry.Watcher, error) { +func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + var wo registry.WatchOptions + for _, o := range opts { + o(&wo) + } + md := &mdnsWatcher{ + wo: wo, ch: make(chan *mdns.ServiceEntry, 32), exit: make(chan struct{}), } diff --git a/registry/mdns/watcher.go b/registry/mdns/watcher.go index 379b40f6..952816d2 100644 --- a/registry/mdns/watcher.go +++ b/registry/mdns/watcher.go @@ -9,6 +9,7 @@ import ( ) type mdnsWatcher struct { + wo registry.WatchOptions ch chan *mdns.ServiceEntry exit chan struct{} } @@ -26,6 +27,12 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) { continue } + // Filter watch options + // wo.Service: Only keep services we care about + if len(m.wo.Service) > 0 && txt.Service != m.wo.Service { + continue + } + var action string if e.TTL == 0 { diff --git a/registry/mock/mock.go b/registry/mock/mock.go index 9dd39b3a..947feb30 100644 --- a/registry/mock/mock.go +++ b/registry/mock/mock.go @@ -87,8 +87,12 @@ func (m *mockRegistry) Deregister(s *registry.Service) error { return nil } -func (m *mockRegistry) Watch() (registry.Watcher, error) { - return &mockWatcher{exit: make(chan bool)}, nil +func (m *mockRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + var wopts registry.WatchOptions + for _, o := range opts { + o(&wopts) + } + return &mockWatcher{exit: make(chan bool), opts: wopts}, nil } func (m *mockRegistry) String() string { diff --git a/registry/mock/mock_watcher.go b/registry/mock/mock_watcher.go index 26580730..11e3a881 100644 --- a/registry/mock/mock_watcher.go +++ b/registry/mock/mock_watcher.go @@ -8,6 +8,7 @@ import ( type mockWatcher struct { exit chan bool + opts registry.WatchOptions } func (m *mockWatcher) Next() (*registry.Result, error) { diff --git a/registry/options.go b/registry/options.go index cac28b49..b2979a39 100644 --- a/registry/options.go +++ b/registry/options.go @@ -25,6 +25,15 @@ type RegisterOptions struct { Context context.Context } +type WatchOptions struct { + // Specify a service to watch + // If blank, the watch is for all services + Service string + // Other options for implementations of the interface + // can be stored in a context + Context context.Context +} + // Addrs is the registry addresses to use func Addrs(addrs ...string) Option { return func(o *Options) { @@ -57,3 +66,10 @@ func RegisterTTL(t time.Duration) RegisterOption { o.TTL = t } } + +// Watch a service +func WatchService(name string) WatchOption { + return func(o *WatchOptions) { + o.Service = name + } +} diff --git a/registry/registry.go b/registry/registry.go index f3465bf3..25e647ab 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -13,7 +13,7 @@ type Registry interface { Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) - Watch() (Watcher, error) + Watch(...WatchOption) (Watcher, error) String() string } @@ -21,6 +21,8 @@ type Option func(*Options) type RegisterOption func(*RegisterOptions) +type WatchOption func(*WatchOptions) + var ( DefaultRegistry = newConsulRegistry() @@ -52,8 +54,8 @@ func ListServices() ([]*Service, error) { } // Watch returns a watcher which allows you to track updates to the registry. -func Watch() (Watcher, error) { - return DefaultRegistry.Watch() +func Watch(opts ...WatchOption) (Watcher, error) { + return DefaultRegistry.Watch(opts...) } func String() string { diff --git a/selector/cache/cache.go b/selector/cache/cache.go index 5e61dce7..529ae65a 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -1,3 +1,4 @@ +// Package cache is a caching selector. It uses the registry watcher. package cache import ( @@ -9,11 +10,6 @@ import ( "github.com/micro/go-micro/selector" ) -/* - Cache selector is a selector which uses the registry.Watcher to Cache service entries. - It defaults to a TTL for 1 minute and causes a cache miss on the next request. -*/ - type cacheSelector struct { so selector.Options ttl time.Duration @@ -23,7 +19,7 @@ type cacheSelector struct { cache map[string][]*registry.Service ttls map[string]time.Time - once sync.Once + watched map[string]bool // used to close or reload watcher reload chan bool @@ -88,6 +84,12 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { c.Lock() defer c.Unlock() + // watch service if not watched + if _, ok := c.watched[service]; !ok { + go c.run(service) + c.watched[service] = true + } + // get does the actual request for a service // it also caches it get := func(service string) ([]*registry.Service, error) { @@ -254,7 +256,7 @@ func (c *cacheSelector) update(res *registry.Result) { // it creates a new watcher if there's a problem // reloads the watcher if Init is called // and returns when Close is called -func (c *cacheSelector) run() { +func (c *cacheSelector) run(name string) { for { // exit early if already dead if c.quit() { @@ -262,7 +264,9 @@ func (c *cacheSelector) run() { } // create new watcher - w, err := c.so.Registry.Watch() + w, err := c.so.Registry.Watch( + registry.WatchService(name), + ) if err != nil { if c.quit() { return @@ -332,10 +336,6 @@ func (c *cacheSelector) Options() selector.Options { } func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - c.once.Do(func() { - go c.run() - }) - sopts := selector.SelectOptions{ Strategy: c.so.Strategy, } @@ -377,6 +377,7 @@ func (c *cacheSelector) Reset(service string) { func (c *cacheSelector) Close() error { c.Lock() c.cache = make(map[string][]*registry.Service) + c.watched = make(map[string]bool) c.Unlock() select { @@ -414,11 +415,12 @@ func NewSelector(opts ...selector.Option) selector.Selector { } return &cacheSelector{ - so: sopts, - ttl: ttl, - cache: make(map[string][]*registry.Service), - ttls: make(map[string]time.Time), - reload: make(chan bool, 1), - exit: make(chan bool), + so: sopts, + ttl: ttl, + watched: make(map[string]bool), + cache: make(map[string][]*registry.Service), + ttls: make(map[string]time.Time), + reload: make(chan bool, 1), + exit: make(chan bool), } }