Watcher now emits events instead of results.

This commit is contained in:
Milos Gajdos 2019-06-18 10:57:43 +01:00
parent 5088c9d916
commit b20dd16f92
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
3 changed files with 52 additions and 26 deletions

View File

@ -253,7 +253,7 @@ func (r *router) watchTable(w Watcher) error {
// watch for changes to services // watch for changes to services
for { for {
res, err := w.Next() event, err := w.Next()
if err == ErrWatcherStopped { if err == ErrWatcherStopped {
break break
} }
@ -269,21 +269,21 @@ func (r *router) watchTable(w Watcher) error {
} }
service := &registry.Service{ service := &registry.Service{
Name: res.Route.Options().DestAddr, Name: event.Route.Options().DestAddr,
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
switch res.Action { switch event.Type {
case "add": case CreateEvent:
// only register remotely if the service is "local" // only register remotely if the service is "local"
if res.Route.Options().Network == "local" { if event.Route.Options().Network == "local" {
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil { if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
} }
} }
case "delete": case UpdateEvent:
// only deregister remotely if the service is "local" // only deregister remotely if the service is "local"
if res.Route.Options().Network == "local" { if event.Route.Options().Network == "local" {
if err := r.opts.NetworkRegistry.Deregister(service); err != nil { if err := r.opts.NetworkRegistry.Deregister(service); err != nil {
return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err) return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err)
} }

View File

@ -11,8 +11,8 @@ import (
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter"
) )
// TODO: table options TBD in the future
// TableOptions are routing table options // TableOptions are routing table options
// TODO: This will allow for arbitrary routing table options in the future
type TableOptions struct{} type TableOptions struct{}
// table is in memory routing table // table is in memory routing table
@ -74,14 +74,14 @@ func (t *table) Add(r Route) error {
if _, ok := t.m[destAddr]; !ok { if _, ok := t.m[destAddr]; !ok {
t.m[destAddr] = make(map[uint64]Route) t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendResult(&Result{Action: "add", Route: r}) go t.sendEvent(&Event{Type: CreateEvent, Route: r})
return nil return nil
} }
// only add the route if it exists and if override is requested // only add the route if it exists and if override is requested
if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists { if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists {
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendResult(&Result{Action: "update", Route: r}) go t.sendEvent(&Event{Type: UpdateEvent, Route: r})
return nil return nil
} }
@ -107,7 +107,7 @@ func (t *table) Delete(r Route) error {
} }
delete(t.m[destAddr], sum) delete(t.m[destAddr], sum)
go t.sendResult(&Result{Action: "delete", Route: r}) go t.sendEvent(&Event{Type: DeleteEvent, Route: r})
return nil return nil
} }
@ -128,7 +128,7 @@ func (t *table) Update(r Route) error {
// if the route has been found update it // if the route has been found update it
if _, ok := t.m[destAddr][sum]; ok { if _, ok := t.m[destAddr][sum]; ok {
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendResult(&Result{Action: "update", Route: r}) go t.sendEvent(&Event{Type: UpdateEvent, Route: r})
return nil return nil
} }
@ -188,7 +188,7 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
watcher := &tableWatcher{ watcher := &tableWatcher{
opts: wopts, opts: wopts,
resChan: make(chan *Result, 10), resChan: make(chan *Event, 10),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -199,8 +199,8 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
return watcher, nil return watcher, nil
} }
// sendResult sends rules to all subscribe watchers // sendEvent sends rules to all subscribe watchers
func (t *table) sendResult(r *Result) { func (t *table) sendEvent(r *Event) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()

View File

@ -9,6 +9,40 @@ var (
ErrWatcherStopped = errors.New("routing table watcher stopped") ErrWatcherStopped = errors.New("routing table watcher stopped")
) )
// EventType defines routing table event
type EventType int
const (
// CreateEvent is emitted when new route has been created
CreateEvent EventType = iota
// DeleteEvent is emitted when an existing route has been deleted
DeleteEvent
// UpdateEvent is emitted when a routing table has been updated
UpdateEvent
)
// String returns string representation of the event
func (et EventType) String() string {
switch et {
case CreateEvent:
return "CREATE"
case DeleteEvent:
return "DELETE"
case UpdateEvent:
return "UPDATE"
default:
return "UNKNOWN"
}
}
// Event is returned by a call to Next on the watcher.
type Event struct {
// Type defines type of event
Type EventType
// Route is table rout
Route Route
}
// WatchOption is used to define what routes to watch in the table // WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)
@ -16,19 +50,11 @@ type WatchOption func(*WatchOptions)
// Watcher returns updates to the routing table // Watcher returns updates to the routing table
type Watcher interface { type Watcher interface {
// Next is a blocking call that returns watch result // Next is a blocking call that returns watch result
Next() (*Result, error) Next() (*Event, error)
// Stop stops watcher // Stop stops watcher
Stop() Stop()
} }
// Result is returned by a call to Next on the watcher.
type Result struct {
// Action is routing table action which is either of add, delete or update
Action string
// Route is table rout
Route Route
}
// WatchOptions are table watcher options // WatchOptions are table watcher options
type WatchOptions struct { type WatchOptions struct {
// Specify destination address to watch // Specify destination address to watch
@ -54,14 +80,14 @@ func WatchNetwork(n string) WatchOption {
type tableWatcher struct { type tableWatcher struct {
opts WatchOptions opts WatchOptions
resChan chan *Result resChan chan *Event
done chan struct{} done chan struct{}
} }
// Next returns the next noticed action taken on table // Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly // TODO: this needs to be thought through properly
// we are aiming to provide the same watch options Query() provides // we are aiming to provide the same watch options Query() provides
func (w *tableWatcher) Next() (*Result, error) { func (w *tableWatcher) Next() (*Event, error) {
for { for {
select { select {
case res := <-w.resChan: case res := <-w.resChan: