From c5740ae03149943891870024a12a5dffe5664cb9 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 26 Jul 2019 17:11:59 +0100 Subject: [PATCH] Outline of Advertise, Watch and start of the router. --- network/router/service/service.go | 197 ++++++++++++++++++++++++++++-- network/router/table.go | 6 +- 2 files changed, 192 insertions(+), 11 deletions(-) diff --git a/network/router/service/service.go b/network/router/service/service.go index d1929b55..c5a088f5 100644 --- a/network/router/service/service.go +++ b/network/router/service/service.go @@ -3,7 +3,10 @@ package service import ( "context" "errors" + "fmt" + "io" "sync" + "time" "github.com/google/uuid" "github.com/micro/go-micro/client" @@ -17,10 +20,14 @@ var ( ) type svc struct { - opts router.Options - router pb.RouterService - status router.Status - watchers map[string]*svcWatcher + opts router.Options + router pb.RouterService + status router.Status + watchers map[string]*svcWatcher + exit chan struct{} + errChan chan error + advertChan chan *router.Advert + wg *sync.WaitGroup sync.RWMutex } @@ -43,8 +50,11 @@ func NewRouter(opts ...router.Option) router.Router { router: pb.NewRouterService(router.DefaultName, client), status: router.Status{Code: router.Stopped, Error: nil}, watchers: make(map[string]*svcWatcher), + wg: &sync.WaitGroup{}, } + go s.run() + return s } @@ -61,6 +71,70 @@ func (s *svc) Options() router.Options { return s.opts } +// watchErrors watches router errors and takes appropriate actions +func (s *svc) watchErrors() { + var err error + + select { + case <-s.exit: + case err = <-s.errChan: + } + + s.Lock() + defer s.Unlock() + if s.status.Code != router.Stopped { + // notify all goroutines to finish + close(s.exit) + // TODO" might need to drain some channels here + } + + if err != nil { + s.status = router.Status{Code: router.Error, Error: err} + } +} + +// watchRouter watches router and send events to all registered watchers +func (s *svc) watchRouter(stream pb.Router_WatchService) error { + defer stream.Close() + var watchErr error + + for { + resp, err := stream.Recv() + if err != nil { + if err != io.EOF { + watchErr = err + } + break + } + + route := router.Route{ + Service: resp.Route.Service, + Address: resp.Route.Address, + Gateway: resp.Route.Gateway, + Network: resp.Route.Network, + Link: resp.Route.Link, + Metric: int(resp.Route.Metric), + } + + event := &router.Event{ + Type: router.EventType(resp.Type), + Timestamp: time.Unix(0, resp.Timestamp), + Route: route, + } + + s.RLock() + for _, w := range s.watchers { + select { + case w.resChan <- event: + case <-w.done: + } + } + s.RUnlock() + } + + return watchErr +} + // Run runs the router. // It returns error if the router is already running. func (s *svc) run() { @@ -69,15 +143,107 @@ func (s *svc) run() { switch s.status.Code { case router.Stopped, router.Error: - // TODO: start event stream watcher - // TODO: start watchError monitor + stream, err := s.router.Watch(context.Background(), &pb.WatchRequest{}) + if err != nil { + s.status = router.Status{Code: router.Error, Error: fmt.Errorf("failed getting router stream: %s", err)} + return + } + + // create error and exit channels + s.errChan = make(chan error, 1) + s.exit = make(chan struct{}) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case s.errChan <- s.watchRouter(stream): + case <-s.exit: + } + }() + + // watch for errors and cleanup + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.watchErrors() + }() + + // mark router as Running and set its Error to nil + s.status = router.Status{Code: router.Running, Error: nil} + + return } + + return +} + +func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error { + defer stream.Close() + var advErr error + + for { + resp, err := stream.Recv() + if err != nil { + if err != io.EOF { + advErr = err + } + break + } + + // TODO: sort out events and TTL + advert := &router.Advert{ + Id: resp.Id, + Type: router.AdvertType(resp.Type), + Timestamp: time.Unix(0, resp.Timestamp), + //Events: events, + } + + select { + case s.advertChan <- advert: + case <-s.exit: + return nil + } + } + + return advErr } // Advertise advertises routes to the network func (s *svc) Advertise() (<-chan *router.Advert, error) { - // TODO: start advert stream watcher - return nil, nil + s.Lock() + defer s.Unlock() + + switch s.status.Code { + case router.Advertising: + return s.advertChan, nil + case router.Running: + stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{}) + if err != nil { + return nil, fmt.Errorf("failed getting advert stream: %s", err) + } + + // create advertise and event channels + s.advertChan = make(chan *router.Advert) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case s.errChan <- s.advertiseEvents(stream): + case <-s.exit: + } + }() + + // mark router as Running and set its Error to nil + s.status = router.Status{Code: router.Advertising, Error: nil} + + return s.advertChan, nil + case router.Stopped: + return nil, fmt.Errorf("not running") + } + + return nil, fmt.Errorf("error: %s", s.status.Error) } // Process processes incoming adverts @@ -228,6 +394,21 @@ func (s *svc) Status() router.Status { // Stop stops the router func (s *svc) Stop() error { + s.Lock() + // only close the channel if the router is running and/or advertising + if s.status.Code == router.Running || s.status.Code == router.Advertising { + // notify all goroutines to finish + close(s.exit) + // TODO: might need to drain some channels here + + // mark the router as Stopped and set its Error to nil + s.status = router.Status{Code: router.Stopped, Error: nil} + } + s.Unlock() + + // wait for all goroutines to finish + s.wg.Wait() + return nil } diff --git a/network/router/table.go b/network/router/table.go index cb746cbb..75b7a10d 100644 --- a/network/router/table.go +++ b/network/router/table.go @@ -179,14 +179,14 @@ func (t *Table) Watch(opts ...WatchOption) (Watcher, error) { return w, nil } -// sendEvent sends rules to all subscribe watchers -func (t *Table) sendEvent(r *Event) { +// sendEvent sends events to all subscribed watchers +func (t *Table) sendEvent(e *Event) { t.RLock() defer t.RUnlock() for _, w := range t.watchers { select { - case w.resChan <- r: + case w.resChan <- e: case <-w.done: } }