Add watch options
This commit is contained in:
parent
eb7788ce25
commit
02260dcaa3
@ -279,8 +279,8 @@ func (c *consulRegistry) ListServices() ([]*Service, error) {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Watch() (Watcher, error) {
|
||||
return newConsulWatcher(c)
|
||||
func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) {
|
||||
return newConsulWatcher(c, opts...)
|
||||
}
|
||||
|
||||
func (c *consulRegistry) String() string {
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
type consulWatcher struct {
|
||||
r *consulRegistry
|
||||
wo WatchOptions
|
||||
wp *watch.Plan
|
||||
watchers map[string]*watch.Plan
|
||||
|
||||
@ -20,9 +21,15 @@ type consulWatcher struct {
|
||||
services map[string][]*Service
|
||||
}
|
||||
|
||||
func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
|
||||
func newConsulWatcher(cr *consulRegistry, opts ...WatchOption) (Watcher, error) {
|
||||
var wo WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
cw := &consulWatcher{
|
||||
r: cr,
|
||||
wo: wo,
|
||||
exit: make(chan bool),
|
||||
next: make(chan *Result, 10),
|
||||
watchers: make(map[string]*watch.Plan),
|
||||
@ -185,6 +192,12 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
|
||||
|
||||
// add new watchers
|
||||
for service, _ := range services {
|
||||
// Filter on watch options
|
||||
// wo.Service: Only watch services we care about
|
||||
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := cw.watchers[service]; ok {
|
||||
continue
|
||||
}
|
||||
|
@ -297,8 +297,14 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Watch() (registry.Watcher, error) {
|
||||
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
md := &mdnsWatcher{
|
||||
wo: wo,
|
||||
ch: make(chan *mdns.ServiceEntry, 32),
|
||||
exit: make(chan struct{}),
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
)
|
||||
|
||||
type mdnsWatcher struct {
|
||||
wo registry.WatchOptions
|
||||
ch chan *mdns.ServiceEntry
|
||||
exit chan struct{}
|
||||
}
|
||||
@ -26,6 +27,12 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter watch options
|
||||
// wo.Service: Only keep services we care about
|
||||
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
|
||||
continue
|
||||
}
|
||||
|
||||
var action string
|
||||
|
||||
if e.TTL == 0 {
|
||||
|
@ -87,8 +87,12 @@ func (m *mockRegistry) Deregister(s *registry.Service) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Watch() (registry.Watcher, error) {
|
||||
return &mockWatcher{exit: make(chan bool)}, nil
|
||||
func (m *mockRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wopts registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wopts)
|
||||
}
|
||||
return &mockWatcher{exit: make(chan bool), opts: wopts}, nil
|
||||
}
|
||||
|
||||
func (m *mockRegistry) String() string {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
type mockWatcher struct {
|
||||
exit chan bool
|
||||
opts registry.WatchOptions
|
||||
}
|
||||
|
||||
func (m *mockWatcher) Next() (*registry.Result, error) {
|
||||
|
@ -25,6 +25,15 @@ type RegisterOptions struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type WatchOptions struct {
|
||||
// Specify a service to watch
|
||||
// If blank, the watch is for all services
|
||||
Service string
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// Addrs is the registry addresses to use
|
||||
func Addrs(addrs ...string) Option {
|
||||
return func(o *Options) {
|
||||
@ -57,3 +66,10 @@ func RegisterTTL(t time.Duration) RegisterOption {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
||||
|
||||
// Watch a service
|
||||
func WatchService(name string) WatchOption {
|
||||
return func(o *WatchOptions) {
|
||||
o.Service = name
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ type Registry interface {
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
Watch() (Watcher, error)
|
||||
Watch(...WatchOption) (Watcher, error)
|
||||
String() string
|
||||
}
|
||||
|
||||
@ -21,6 +21,8 @@ type Option func(*Options)
|
||||
|
||||
type RegisterOption func(*RegisterOptions)
|
||||
|
||||
type WatchOption func(*WatchOptions)
|
||||
|
||||
var (
|
||||
DefaultRegistry = newConsulRegistry()
|
||||
|
||||
@ -52,8 +54,8 @@ func ListServices() ([]*Service, error) {
|
||||
}
|
||||
|
||||
// Watch returns a watcher which allows you to track updates to the registry.
|
||||
func Watch() (Watcher, error) {
|
||||
return DefaultRegistry.Watch()
|
||||
func Watch(opts ...WatchOption) (Watcher, error) {
|
||||
return DefaultRegistry.Watch(opts...)
|
||||
}
|
||||
|
||||
func String() string {
|
||||
|
26
selector/cache/cache.go
vendored
26
selector/cache/cache.go
vendored
@ -1,3 +1,4 @@
|
||||
// Package cache is a caching selector. It uses the registry watcher.
|
||||
package cache
|
||||
|
||||
import (
|
||||
@ -9,11 +10,6 @@ import (
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
/*
|
||||
Cache selector is a selector which uses the registry.Watcher to Cache service entries.
|
||||
It defaults to a TTL for 1 minute and causes a cache miss on the next request.
|
||||
*/
|
||||
|
||||
type cacheSelector struct {
|
||||
so selector.Options
|
||||
ttl time.Duration
|
||||
@ -23,7 +19,7 @@ type cacheSelector struct {
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
|
||||
once sync.Once
|
||||
watched map[string]bool
|
||||
|
||||
// used to close or reload watcher
|
||||
reload chan bool
|
||||
@ -88,6 +84,12 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// watch service if not watched
|
||||
if _, ok := c.watched[service]; !ok {
|
||||
go c.run(service)
|
||||
c.watched[service] = true
|
||||
}
|
||||
|
||||
// get does the actual request for a service
|
||||
// it also caches it
|
||||
get := func(service string) ([]*registry.Service, error) {
|
||||
@ -254,7 +256,7 @@ func (c *cacheSelector) update(res *registry.Result) {
|
||||
// it creates a new watcher if there's a problem
|
||||
// reloads the watcher if Init is called
|
||||
// and returns when Close is called
|
||||
func (c *cacheSelector) run() {
|
||||
func (c *cacheSelector) run(name string) {
|
||||
for {
|
||||
// exit early if already dead
|
||||
if c.quit() {
|
||||
@ -262,7 +264,9 @@ func (c *cacheSelector) run() {
|
||||
}
|
||||
|
||||
// create new watcher
|
||||
w, err := c.so.Registry.Watch()
|
||||
w, err := c.so.Registry.Watch(
|
||||
registry.WatchService(name),
|
||||
)
|
||||
if err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
@ -332,10 +336,6 @@ func (c *cacheSelector) Options() selector.Options {
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
c.once.Do(func() {
|
||||
go c.run()
|
||||
})
|
||||
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
@ -377,6 +377,7 @@ func (c *cacheSelector) Reset(service string) {
|
||||
func (c *cacheSelector) Close() error {
|
||||
c.Lock()
|
||||
c.cache = make(map[string][]*registry.Service)
|
||||
c.watched = make(map[string]bool)
|
||||
c.Unlock()
|
||||
|
||||
select {
|
||||
@ -416,6 +417,7 @@ func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
return &cacheSelector{
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
watched: make(map[string]bool),
|
||||
cache: make(map[string][]*registry.Service),
|
||||
ttls: make(map[string]time.Time),
|
||||
reload: make(chan bool, 1),
|
||||
|
Loading…
Reference in New Issue
Block a user