Move proxy/router

This commit is contained in:
Asim Aslam
2019-08-05 17:44:33 +01:00
parent 2e67e23a23
commit 4030ccc27b
28 changed files with 138 additions and 856 deletions

316
router/service/service.go Normal file
View File

@@ -0,0 +1,316 @@
package service
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
)
type svc struct {
sync.RWMutex
opts router.Options
callOpts []client.CallOption
router pb.RouterService
table *table
status *router.Status
exit chan bool
errChan chan error
advertChan chan *router.Advert
}
// NewRouter creates new service router and returns it
func NewRouter(opts ...router.Option) router.Router {
// get default options
options := router.DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
// NOTE: might need some client opts here
cli := client.DefaultClient
// set options client
if options.Client != nil {
cli = options.Client
}
// NOTE: should we have Client/Service option in router.Options?
s := &svc{
opts: options,
router: pb.NewRouterService(router.DefaultName, cli),
}
// set the router address to call
if len(options.Address) > 0 {
s.callOpts = []client.CallOption{
client.WithAddress(options.Address),
}
}
// set the table
s.table = &table{pb.NewTableService(router.DefaultName, cli), s.callOpts}
return s
}
// Init initializes router with given options
func (s *svc) Init(opts ...router.Option) error {
for _, o := range opts {
o(&s.opts)
}
return nil
}
// Options returns router options
func (s *svc) Options() router.Options {
return s.opts
}
func (s *svc) Table() router.Table {
return s.table
}
func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_AdvertiseService) error {
go func() {
<-s.exit
stream.Close()
}()
var advErr error
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
advErr = err
}
break
}
events := make([]*router.Event, len(resp.Events))
for i, event := range resp.Events {
route := router.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Link: event.Route.Link,
Metric: int(event.Route.Metric),
}
events[i] = &router.Event{
Type: router.EventType(event.Type),
Timestamp: time.Unix(0, event.Timestamp),
Route: route,
}
}
advert := &router.Advert{
Id: resp.Id,
Type: router.AdvertType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp),
TTL: time.Duration(resp.Ttl),
Events: events,
}
select {
case advertChan <- advert:
case <-s.exit:
close(advertChan)
return nil
}
}
// close the channel on exit
close(advertChan)
return advErr
}
// Advertise advertises routes to the network
func (s *svc) Advertise() (<-chan *router.Advert, error) {
s.Lock()
defer s.Unlock()
// get the status
status := s.Status()
switch status.Code {
case router.Running, router.Advertising:
stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{}, s.callOpts...)
if err != nil {
return nil, fmt.Errorf("failed getting advert stream: %s", err)
}
// create advertise and event channels
advertChan := make(chan *router.Advert)
go s.advertiseEvents(advertChan, stream)
return advertChan, nil
case router.Stopped:
// check if our router is stopped
select {
case <-s.exit:
s.exit = make(chan bool)
// call advertise again
return s.Advertise()
default:
return nil, fmt.Errorf("not running")
}
}
return nil, fmt.Errorf("error: %s", s.status.Error)
}
// Process processes incoming adverts
func (s *svc) Process(advert *router.Advert) error {
var events []*pb.Event
for _, event := range advert.Events {
route := &pb.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Link: event.Route.Link,
Metric: int64(event.Route.Metric),
}
e := &pb.Event{
Type: pb.EventType(event.Type),
Timestamp: event.Timestamp.UnixNano(),
Route: route,
}
events = append(events, e)
}
advertReq := &pb.Advert{
Id: s.Options().Id,
Type: pb.AdvertType(advert.Type),
Timestamp: advert.Timestamp.UnixNano(),
Events: events,
}
if _, err := s.router.Process(context.Background(), advertReq, s.callOpts...); err != nil {
return err
}
return nil
}
// Status returns router status
func (s *svc) Status() router.Status {
s.Lock()
defer s.Unlock()
// check if its stopped
select {
case <-s.exit:
return router.Status{
Code: router.Stopped,
Error: nil,
}
default:
// don't block
}
// check the remote router
rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...)
if err != nil {
return router.Status{
Code: router.Error,
Error: err,
}
}
code := router.Running
var serr error
switch rsp.Status.Code {
case "running":
code = router.Running
case "advertising":
code = router.Advertising
case "stopped":
code = router.Stopped
case "error":
code = router.Error
}
if len(rsp.Status.Error) > 0 {
serr = errors.New(rsp.Status.Error)
}
return router.Status{
Code: code,
Error: serr,
}
}
// Remote router cannot be stopped
func (s *svc) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.exit:
return nil
default:
close(s.exit)
}
return nil
}
// Lookup looks up routes in the routing table and returns them
func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
// call the router
resp, err := s.router.Lookup(context.Background(), &pb.LookupRequest{
Query: &pb.Query{
Service: q.Options().Service,
Gateway: q.Options().Gateway,
Network: q.Options().Network,
},
}, s.callOpts...)
// errored out
if err != nil {
return nil, err
}
routes := make([]router.Route, len(resp.Routes))
for i, route := range resp.Routes {
routes[i] = router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Link: route.Link,
Metric: int(route.Metric),
}
}
return routes, nil
}
// Watch returns a watcher which allows to track updates to the routing table
func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
rsp, err := s.router.Watch(context.Background(), &pb.WatchRequest{}, s.callOpts...)
if err != nil {
return nil, err
}
var options router.WatchOptions
for _, o := range opts {
o(&options)
}
return newWatcher(rsp, options)
}
// Returns the router implementation
func (s *svc) String() string {
return "service"
}

121
router/service/table.go Normal file
View File

@@ -0,0 +1,121 @@
package service
import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
)
type table struct {
table pb.TableService
callOpts []client.CallOption
}
// Create new route in the routing table
func (t *table) Create(r router.Route) error {
route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := t.table.Create(context.Background(), route, t.callOpts...); err != nil {
return err
}
return nil
}
// Delete deletes existing route from the routing table
func (t *table) Delete(r router.Route) error {
route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := t.table.Delete(context.Background(), route, t.callOpts...); err != nil {
return err
}
return nil
}
// Update updates route in the routing table
func (t *table) Update(r router.Route) error {
route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := t.table.Update(context.Background(), route, t.callOpts...); err != nil {
return err
}
return nil
}
// List returns the list of all routes in the table
func (t *table) List() ([]router.Route, error) {
resp, err := t.table.List(context.Background(), &pb.Request{}, t.callOpts...)
if err != nil {
return nil, err
}
routes := make([]router.Route, len(resp.Routes))
for i, route := range resp.Routes {
routes[i] = router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Link: route.Link,
Metric: int(route.Metric),
}
}
return routes, nil
}
// Lookup looks up routes in the routing table and returns them
func (t *table) Query(q router.Query) ([]router.Route, error) {
// call the router
resp, err := t.table.Query(context.Background(), &pb.QueryRequest{
Query: &pb.Query{
Service: q.Options().Service,
Gateway: q.Options().Gateway,
Network: q.Options().Network,
},
}, t.callOpts...)
// errored out
if err != nil {
return nil, err
}
routes := make([]router.Route, len(resp.Routes))
for i, route := range resp.Routes {
routes[i] = router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Link: route.Link,
Metric: int(route.Metric),
}
}
return routes, nil
}

117
router/service/watcher.go Normal file
View File

@@ -0,0 +1,117 @@
package service
import (
"io"
"sync"
"time"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
)
type watcher struct {
sync.RWMutex
opts router.WatchOptions
resChan chan *router.Event
done chan struct{}
}
func newWatcher(rsp pb.Router_WatchService, opts router.WatchOptions) (*watcher, error) {
w := &watcher{
opts: opts,
resChan: make(chan *router.Event),
done: make(chan struct{}),
}
go func() {
for {
select {
case <-w.done:
return
default:
if err := w.watch(rsp); err != nil {
w.Stop()
return
}
}
}
}()
return w, nil
}
// watchRouter watches router and send events to all registered watchers
func (w *watcher) watch(stream pb.Router_WatchService) error {
defer stream.Close()
var watchErr error
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
watchErr = err
}
break
}
route := router.Route{
Service: resp.Route.Service,
Address: resp.Route.Address,
Gateway: resp.Route.Gateway,
Network: resp.Route.Network,
Link: resp.Route.Link,
Metric: int(resp.Route.Metric),
}
event := &router.Event{
Type: router.EventType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp),
Route: route,
}
for {
select {
case w.resChan <- event:
case <-w.done:
}
}
}
return watchErr
}
// Next is a blocking call that returns watch result
func (w *watcher) Next() (*router.Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Service {
case res.Route.Service, "*":
return res, nil
default:
continue
}
case <-w.done:
return nil, router.ErrWatcherStopped
}
}
}
// Chan returns event channel
func (w *watcher) Chan() (<-chan *router.Event, error) {
return w.resChan, nil
}
// Stop stops watcher
func (w *watcher) Stop() {
w.Lock()
defer w.Unlock()
select {
case <-w.done:
return
default:
close(w.done)
}
}