Merge pull request #230 from micro/watch

Add watch options
This commit is contained in:
Asim Aslam 2018-02-19 20:29:17 +00:00 committed by GitHub
commit 42bdca63da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 78 additions and 27 deletions

View File

@ -279,8 +279,8 @@ func (c *consulRegistry) ListServices() ([]*Service, error) {
return services, nil return services, nil
} }
func (c *consulRegistry) Watch() (Watcher, error) { func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) {
return newConsulWatcher(c) return newConsulWatcher(c, opts...)
} }
func (c *consulRegistry) String() string { func (c *consulRegistry) String() string {

View File

@ -10,6 +10,7 @@ import (
type consulWatcher struct { type consulWatcher struct {
r *consulRegistry r *consulRegistry
wo WatchOptions
wp *watch.Plan wp *watch.Plan
watchers map[string]*watch.Plan watchers map[string]*watch.Plan
@ -20,9 +21,15 @@ type consulWatcher struct {
services map[string][]*Service services map[string][]*Service
} }
func newConsulWatcher(cr *consulRegistry) (Watcher, error) { func newConsulWatcher(cr *consulRegistry, opts ...WatchOption) (Watcher, error) {
var wo WatchOptions
for _, o := range opts {
o(&wo)
}
cw := &consulWatcher{ cw := &consulWatcher{
r: cr, r: cr,
wo: wo,
exit: make(chan bool), exit: make(chan bool),
next: make(chan *Result, 10), next: make(chan *Result, 10),
watchers: make(map[string]*watch.Plan), watchers: make(map[string]*watch.Plan),
@ -185,6 +192,12 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
// add new watchers // add new watchers
for service, _ := range services { for service, _ := range services {
// Filter on watch options
// wo.Service: Only watch services we care about
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
continue
}
if _, ok := cw.watchers[service]; ok { if _, ok := cw.watchers[service]; ok {
continue continue
} }

View File

@ -297,8 +297,14 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
return services, nil return services, nil
} }
func (m *mdnsRegistry) Watch() (registry.Watcher, error) { func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
md := &mdnsWatcher{ md := &mdnsWatcher{
wo: wo,
ch: make(chan *mdns.ServiceEntry, 32), ch: make(chan *mdns.ServiceEntry, 32),
exit: make(chan struct{}), exit: make(chan struct{}),
} }

View File

@ -9,6 +9,7 @@ import (
) )
type mdnsWatcher struct { type mdnsWatcher struct {
wo registry.WatchOptions
ch chan *mdns.ServiceEntry ch chan *mdns.ServiceEntry
exit chan struct{} exit chan struct{}
} }
@ -26,6 +27,12 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
continue continue
} }
// Filter watch options
// wo.Service: Only keep services we care about
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
continue
}
var action string var action string
if e.TTL == 0 { if e.TTL == 0 {

View File

@ -87,8 +87,12 @@ func (m *mockRegistry) Deregister(s *registry.Service) error {
return nil return nil
} }
func (m *mockRegistry) Watch() (registry.Watcher, error) { func (m *mockRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
return &mockWatcher{exit: make(chan bool)}, nil var wopts registry.WatchOptions
for _, o := range opts {
o(&wopts)
}
return &mockWatcher{exit: make(chan bool), opts: wopts}, nil
} }
func (m *mockRegistry) String() string { func (m *mockRegistry) String() string {

View File

@ -8,6 +8,7 @@ import (
type mockWatcher struct { type mockWatcher struct {
exit chan bool exit chan bool
opts registry.WatchOptions
} }
func (m *mockWatcher) Next() (*registry.Result, error) { func (m *mockWatcher) Next() (*registry.Result, error) {

View File

@ -25,6 +25,15 @@ type RegisterOptions struct {
Context context.Context Context context.Context
} }
type WatchOptions struct {
// Specify a service to watch
// If blank, the watch is for all services
Service string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// Addrs is the registry addresses to use // Addrs is the registry addresses to use
func Addrs(addrs ...string) Option { func Addrs(addrs ...string) Option {
return func(o *Options) { return func(o *Options) {
@ -57,3 +66,10 @@ func RegisterTTL(t time.Duration) RegisterOption {
o.TTL = t o.TTL = t
} }
} }
// Watch a service
func WatchService(name string) WatchOption {
return func(o *WatchOptions) {
o.Service = name
}
}

View File

@ -13,7 +13,7 @@ type Registry interface {
Deregister(*Service) error Deregister(*Service) error
GetService(string) ([]*Service, error) GetService(string) ([]*Service, error)
ListServices() ([]*Service, error) ListServices() ([]*Service, error)
Watch() (Watcher, error) Watch(...WatchOption) (Watcher, error)
String() string String() string
} }
@ -21,6 +21,8 @@ type Option func(*Options)
type RegisterOption func(*RegisterOptions) type RegisterOption func(*RegisterOptions)
type WatchOption func(*WatchOptions)
var ( var (
DefaultRegistry = newConsulRegistry() DefaultRegistry = newConsulRegistry()
@ -52,8 +54,8 @@ func ListServices() ([]*Service, error) {
} }
// Watch returns a watcher which allows you to track updates to the registry. // Watch returns a watcher which allows you to track updates to the registry.
func Watch() (Watcher, error) { func Watch(opts ...WatchOption) (Watcher, error) {
return DefaultRegistry.Watch() return DefaultRegistry.Watch(opts...)
} }
func String() string { func String() string {

View File

@ -1,3 +1,4 @@
// Package cache is a caching selector. It uses the registry watcher.
package cache package cache
import ( import (
@ -9,11 +10,6 @@ import (
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
) )
/*
Cache selector is a selector which uses the registry.Watcher to Cache service entries.
It defaults to a TTL for 1 minute and causes a cache miss on the next request.
*/
type cacheSelector struct { type cacheSelector struct {
so selector.Options so selector.Options
ttl time.Duration ttl time.Duration
@ -23,7 +19,7 @@ type cacheSelector struct {
cache map[string][]*registry.Service cache map[string][]*registry.Service
ttls map[string]time.Time ttls map[string]time.Time
once sync.Once watched map[string]bool
// used to close or reload watcher // used to close or reload watcher
reload chan bool reload chan bool
@ -88,6 +84,12 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
}
// get does the actual request for a service // get does the actual request for a service
// it also caches it // it also caches it
get := func(service string) ([]*registry.Service, error) { get := func(service string) ([]*registry.Service, error) {
@ -254,7 +256,7 @@ func (c *cacheSelector) update(res *registry.Result) {
// it creates a new watcher if there's a problem // it creates a new watcher if there's a problem
// reloads the watcher if Init is called // reloads the watcher if Init is called
// and returns when Close is called // and returns when Close is called
func (c *cacheSelector) run() { func (c *cacheSelector) run(name string) {
for { for {
// exit early if already dead // exit early if already dead
if c.quit() { if c.quit() {
@ -262,7 +264,9 @@ func (c *cacheSelector) run() {
} }
// create new watcher // create new watcher
w, err := c.so.Registry.Watch() w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil { if err != nil {
if c.quit() { if c.quit() {
return return
@ -332,10 +336,6 @@ func (c *cacheSelector) Options() selector.Options {
} }
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
c.once.Do(func() {
go c.run()
})
sopts := selector.SelectOptions{ sopts := selector.SelectOptions{
Strategy: c.so.Strategy, Strategy: c.so.Strategy,
} }
@ -377,6 +377,7 @@ func (c *cacheSelector) Reset(service string) {
func (c *cacheSelector) Close() error { func (c *cacheSelector) Close() error {
c.Lock() c.Lock()
c.cache = make(map[string][]*registry.Service) c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock() c.Unlock()
select { select {
@ -414,11 +415,12 @@ func NewSelector(opts ...selector.Option) selector.Selector {
} }
return &cacheSelector{ return &cacheSelector{
so: sopts, so: sopts,
ttl: ttl, ttl: ttl,
cache: make(map[string][]*registry.Service), watched: make(map[string]bool),
ttls: make(map[string]time.Time), cache: make(map[string][]*registry.Service),
reload: make(chan bool, 1), ttls: make(map[string]time.Time),
exit: make(chan bool), reload: make(chan bool, 1),
exit: make(chan bool),
} }
} }