yolo commit functioning router code. all credit to the milos gajdos
This commit is contained in:
@@ -2,12 +2,12 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/network/router"
|
||||
pb "github.com/micro/go-micro/network/router/proto"
|
||||
@@ -16,13 +16,13 @@ import (
|
||||
type svc struct {
|
||||
sync.RWMutex
|
||||
opts router.Options
|
||||
callOpts []client.CallOption
|
||||
router pb.RouterService
|
||||
status router.Status
|
||||
watchers map[string]*svcWatcher
|
||||
exit chan struct{}
|
||||
table *table
|
||||
status *router.Status
|
||||
exit chan bool
|
||||
errChan chan error
|
||||
advertChan chan *router.Advert
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewRouter creates new service router and returns it
|
||||
@@ -36,18 +36,27 @@ func NewRouter(opts ...router.Option) router.Router {
|
||||
}
|
||||
|
||||
// NOTE: might need some client opts here
|
||||
client := client.DefaultClient
|
||||
cli := client.DefaultClient
|
||||
|
||||
// set options client
|
||||
if options.Client != nil {
|
||||
cli = options.Client
|
||||
}
|
||||
|
||||
// NOTE: should we have Client/Service option in router.Options?
|
||||
s := &svc{
|
||||
opts: options,
|
||||
router: pb.NewRouterService(router.DefaultName, client),
|
||||
status: router.Status{Code: router.Stopped, Error: nil},
|
||||
watchers: make(map[string]*svcWatcher),
|
||||
wg: &sync.WaitGroup{},
|
||||
opts: options,
|
||||
router: pb.NewRouterService(router.DefaultName, cli),
|
||||
}
|
||||
|
||||
go s.run()
|
||||
// set the router address to call
|
||||
if len(options.Address) > 0 {
|
||||
s.callOpts = []client.CallOption{
|
||||
client.WithAddress(options.Address),
|
||||
}
|
||||
}
|
||||
// set the table
|
||||
s.table = &table{pb.NewTableService(router.DefaultName, cli), s.callOpts}
|
||||
|
||||
return s
|
||||
}
|
||||
@@ -65,128 +74,12 @@ func (s *svc) Options() router.Options {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
// watchRouter watches router and send events to all registered watchers
|
||||
func (s *svc) watchRouter(stream pb.Router_WatchService) error {
|
||||
s.wg.Add(1)
|
||||
func (s *svc) Table() router.Table {
|
||||
return s.table
|
||||
}
|
||||
|
||||
func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_AdvertiseService) error {
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
<-s.exit
|
||||
stream.Close()
|
||||
}()
|
||||
|
||||
var watchErr error
|
||||
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
watchErr = err
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
route := router.Route{
|
||||
Service: resp.Route.Service,
|
||||
Address: resp.Route.Address,
|
||||
Gateway: resp.Route.Gateway,
|
||||
Network: resp.Route.Network,
|
||||
Link: resp.Route.Link,
|
||||
Metric: int(resp.Route.Metric),
|
||||
}
|
||||
|
||||
event := &router.Event{
|
||||
Type: router.EventType(resp.Type),
|
||||
Timestamp: time.Unix(0, resp.Timestamp),
|
||||
Route: route,
|
||||
}
|
||||
|
||||
// TODO: might make this non-blocking
|
||||
s.RLock()
|
||||
for _, w := range s.watchers {
|
||||
select {
|
||||
case w.resChan <- event:
|
||||
case <-w.done:
|
||||
}
|
||||
}
|
||||
s.RUnlock()
|
||||
}
|
||||
|
||||
return watchErr
|
||||
}
|
||||
|
||||
// watchErrors watches router errors and takes appropriate actions
|
||||
func (s *svc) watchErrors() {
|
||||
var err error
|
||||
|
||||
select {
|
||||
case <-s.exit:
|
||||
case err = <-s.errChan:
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if s.status.Code != router.Stopped {
|
||||
// notify all goroutines to finish
|
||||
close(s.exit)
|
||||
if s.status.Code == router.Advertising {
|
||||
// drain the advertise channel
|
||||
for range s.advertChan {
|
||||
}
|
||||
}
|
||||
s.status = router.Status{Code: router.Stopped, Error: nil}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
s.status = router.Status{Code: router.Error, Error: err}
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the router.
|
||||
func (s *svc) run() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
switch s.status.Code {
|
||||
case router.Stopped, router.Error:
|
||||
stream, err := s.router.Watch(context.Background(), &pb.WatchRequest{})
|
||||
if err != nil {
|
||||
s.status = router.Status{Code: router.Error, Error: fmt.Errorf("failed getting event stream: %s", err)}
|
||||
return
|
||||
}
|
||||
|
||||
// create error and exit channels
|
||||
s.errChan = make(chan error, 1)
|
||||
s.exit = make(chan struct{})
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
select {
|
||||
case s.errChan <- s.watchRouter(stream):
|
||||
case <-s.exit:
|
||||
}
|
||||
}()
|
||||
|
||||
// watch for errors and cleanup
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.watchErrors()
|
||||
}()
|
||||
|
||||
// mark router as Running and set its Error to nil
|
||||
s.status = router.Status{Code: router.Running, Error: nil}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error {
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
<-s.exit
|
||||
stream.Close()
|
||||
}()
|
||||
@@ -229,15 +122,15 @@ func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error {
|
||||
}
|
||||
|
||||
select {
|
||||
case s.advertChan <- advert:
|
||||
case advertChan <- advert:
|
||||
case <-s.exit:
|
||||
close(s.advertChan)
|
||||
close(advertChan)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// close the channel on exit
|
||||
close(s.advertChan)
|
||||
close(advertChan)
|
||||
|
||||
return advErr
|
||||
}
|
||||
@@ -247,33 +140,29 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
switch s.status.Code {
|
||||
case router.Advertising:
|
||||
return s.advertChan, nil
|
||||
case router.Running:
|
||||
stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{})
|
||||
// get the status
|
||||
status := s.Status()
|
||||
|
||||
switch status.Code {
|
||||
case router.Running, router.Advertising:
|
||||
stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{}, s.callOpts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting advert stream: %s", err)
|
||||
}
|
||||
|
||||
// create advertise and event channels
|
||||
s.advertChan = make(chan *router.Advert)
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
select {
|
||||
case s.errChan <- s.advertiseEvents(stream):
|
||||
case <-s.exit:
|
||||
}
|
||||
}()
|
||||
|
||||
// mark router as Running and set its Error to nil
|
||||
s.status = router.Status{Code: router.Advertising, Error: nil}
|
||||
|
||||
return s.advertChan, nil
|
||||
advertChan := make(chan *router.Advert)
|
||||
go s.advertiseEvents(advertChan, stream)
|
||||
return advertChan, nil
|
||||
case router.Stopped:
|
||||
return nil, fmt.Errorf("not running")
|
||||
// check if our router is stopped
|
||||
select {
|
||||
case <-s.exit:
|
||||
s.exit = make(chan bool)
|
||||
// call advertise again
|
||||
return s.Advertise()
|
||||
default:
|
||||
return nil, fmt.Errorf("not running")
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("error: %s", s.status.Error)
|
||||
@@ -306,87 +195,75 @@ func (s *svc) Process(advert *router.Advert) error {
|
||||
Events: events,
|
||||
}
|
||||
|
||||
if _, err := s.router.Process(context.Background(), advertReq); err != nil {
|
||||
if _, err := s.router.Process(context.Background(), advertReq, s.callOpts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create new route in the routing table
|
||||
func (s *svc) Create(r router.Route) error {
|
||||
route := &pb.Route{
|
||||
Service: r.Service,
|
||||
Address: r.Address,
|
||||
Gateway: r.Gateway,
|
||||
Network: r.Network,
|
||||
Link: r.Link,
|
||||
Metric: int64(r.Metric),
|
||||
// Status returns router status
|
||||
func (s *svc) Status() router.Status {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// check if its stopped
|
||||
select {
|
||||
case <-s.exit:
|
||||
return router.Status{
|
||||
Code: router.Stopped,
|
||||
Error: nil,
|
||||
}
|
||||
default:
|
||||
// don't block
|
||||
}
|
||||
|
||||
if _, err := s.router.Create(context.Background(), route); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes existing route from the routing table
|
||||
func (s *svc) Delete(r router.Route) error {
|
||||
route := &pb.Route{
|
||||
Service: r.Service,
|
||||
Address: r.Address,
|
||||
Gateway: r.Gateway,
|
||||
Network: r.Network,
|
||||
Link: r.Link,
|
||||
Metric: int64(r.Metric),
|
||||
}
|
||||
|
||||
if _, err := s.router.Delete(context.Background(), route); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update updates route in the routing table
|
||||
func (s *svc) Update(r router.Route) error {
|
||||
route := &pb.Route{
|
||||
Service: r.Service,
|
||||
Address: r.Address,
|
||||
Gateway: r.Gateway,
|
||||
Network: r.Network,
|
||||
Link: r.Link,
|
||||
Metric: int64(r.Metric),
|
||||
}
|
||||
|
||||
if _, err := s.router.Update(context.Background(), route); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// List returns the list of all routes in the table
|
||||
func (s *svc) List() ([]router.Route, error) {
|
||||
resp, err := s.router.List(context.Background(), &pb.ListRequest{})
|
||||
// check the remote router
|
||||
rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
routes := make([]router.Route, len(resp.Routes))
|
||||
for i, route := range resp.Routes {
|
||||
routes[i] = router.Route{
|
||||
Service: route.Service,
|
||||
Address: route.Address,
|
||||
Gateway: route.Gateway,
|
||||
Network: route.Network,
|
||||
Link: route.Link,
|
||||
Metric: int(route.Metric),
|
||||
return router.Status{
|
||||
Code: router.Error,
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
|
||||
return routes, nil
|
||||
code := router.Running
|
||||
var serr error
|
||||
|
||||
switch rsp.Status.Code {
|
||||
case "running":
|
||||
code = router.Running
|
||||
case "advertising":
|
||||
code = router.Advertising
|
||||
case "stopped":
|
||||
code = router.Stopped
|
||||
case "error":
|
||||
code = router.Error
|
||||
}
|
||||
|
||||
if len(rsp.Status.Error) > 0 {
|
||||
serr = errors.New(rsp.Status.Error)
|
||||
}
|
||||
|
||||
return router.Status{
|
||||
Code: code,
|
||||
Error: serr,
|
||||
}
|
||||
}
|
||||
|
||||
// Remote router cannot be stopped
|
||||
func (s *svc) Stop() error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.exit:
|
||||
return nil
|
||||
default:
|
||||
close(s.exit)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lookup looks up routes in the routing table and returns them
|
||||
@@ -398,7 +275,7 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
|
||||
Gateway: q.Options().Gateway,
|
||||
Network: q.Options().Network,
|
||||
},
|
||||
})
|
||||
}, s.callOpts...)
|
||||
|
||||
// errored out
|
||||
if err != nil {
|
||||
@@ -422,69 +299,15 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
|
||||
|
||||
// Watch returns a watcher which allows to track updates to the routing table
|
||||
func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
|
||||
wopts := router.WatchOptions{
|
||||
Service: "*",
|
||||
rsp, err := s.router.Watch(context.Background(), &pb.WatchRequest{}, s.callOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var options router.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wopts)
|
||||
o(&options)
|
||||
}
|
||||
|
||||
w := &svcWatcher{
|
||||
opts: wopts,
|
||||
resChan: make(chan *router.Event, 10),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
s.watchers[uuid.New().String()] = w
|
||||
s.Unlock()
|
||||
|
||||
// when the router stops, stop the watcher and exit
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
<-s.exit
|
||||
w.Stop()
|
||||
}()
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Status returns router status
|
||||
func (s *svc) Status() router.Status {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
// make a copy of the status
|
||||
status := s.status
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
// Stop stops the router
|
||||
func (s *svc) Stop() error {
|
||||
s.Lock()
|
||||
// only close the channel if the router is running and/or advertising
|
||||
if s.status.Code == router.Running || s.status.Code == router.Advertising {
|
||||
// notify all goroutines to finish
|
||||
close(s.exit)
|
||||
|
||||
// drain the advertise channel only if advertising
|
||||
if s.status.Code == router.Advertising {
|
||||
for range s.advertChan {
|
||||
}
|
||||
}
|
||||
|
||||
// mark the router as Stopped and set its Error to nil
|
||||
s.status = router.Status{Code: router.Stopped, Error: nil}
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
// wait for all goroutines to finish
|
||||
s.wg.Wait()
|
||||
|
||||
return nil
|
||||
return newWatcher(rsp, options)
|
||||
}
|
||||
|
||||
// Returns the router implementation
|
||||
|
||||
Reference in New Issue
Block a user