317 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			317 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package service
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/client"
 | 
						|
	"github.com/micro/go-micro/router"
 | 
						|
	pb "github.com/micro/go-micro/router/proto"
 | 
						|
)
 | 
						|
 | 
						|
type svc struct {
 | 
						|
	sync.RWMutex
 | 
						|
	opts       router.Options
 | 
						|
	callOpts   []client.CallOption
 | 
						|
	router     pb.RouterService
 | 
						|
	table      *table
 | 
						|
	status     *router.Status
 | 
						|
	exit       chan bool
 | 
						|
	errChan    chan error
 | 
						|
	advertChan chan *router.Advert
 | 
						|
}
 | 
						|
 | 
						|
// NewRouter creates new service router and returns it
 | 
						|
func NewRouter(opts ...router.Option) router.Router {
 | 
						|
	// get default options
 | 
						|
	options := router.DefaultOptions()
 | 
						|
 | 
						|
	// apply requested options
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	// NOTE: might need some client opts here
 | 
						|
	cli := client.DefaultClient
 | 
						|
 | 
						|
	// set options client
 | 
						|
	if options.Client != nil {
 | 
						|
		cli = options.Client
 | 
						|
	}
 | 
						|
 | 
						|
	// NOTE: should we have Client/Service option in router.Options?
 | 
						|
	s := &svc{
 | 
						|
		opts:   options,
 | 
						|
		router: pb.NewRouterService(router.DefaultName, cli),
 | 
						|
	}
 | 
						|
 | 
						|
	// set the router address to call
 | 
						|
	if len(options.Address) > 0 {
 | 
						|
		s.callOpts = []client.CallOption{
 | 
						|
			client.WithAddress(options.Address),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// set the table
 | 
						|
	s.table = &table{pb.NewTableService(router.DefaultName, cli), s.callOpts}
 | 
						|
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// Init initializes router with given options
 | 
						|
func (s *svc) Init(opts ...router.Option) error {
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&s.opts)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Options returns router options
 | 
						|
func (s *svc) Options() router.Options {
 | 
						|
	return s.opts
 | 
						|
}
 | 
						|
 | 
						|
func (s *svc) Table() router.Table {
 | 
						|
	return s.table
 | 
						|
}
 | 
						|
 | 
						|
func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_AdvertiseService) error {
 | 
						|
	go func() {
 | 
						|
		<-s.exit
 | 
						|
		stream.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	var advErr error
 | 
						|
 | 
						|
	for {
 | 
						|
		resp, err := stream.Recv()
 | 
						|
		if err != nil {
 | 
						|
			if err != io.EOF {
 | 
						|
				advErr = err
 | 
						|
			}
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		events := make([]*router.Event, len(resp.Events))
 | 
						|
		for i, event := range resp.Events {
 | 
						|
			route := router.Route{
 | 
						|
				Service: event.Route.Service,
 | 
						|
				Address: event.Route.Address,
 | 
						|
				Gateway: event.Route.Gateway,
 | 
						|
				Network: event.Route.Network,
 | 
						|
				Link:    event.Route.Link,
 | 
						|
				Metric:  int(event.Route.Metric),
 | 
						|
			}
 | 
						|
 | 
						|
			events[i] = &router.Event{
 | 
						|
				Type:      router.EventType(event.Type),
 | 
						|
				Timestamp: time.Unix(0, event.Timestamp),
 | 
						|
				Route:     route,
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		advert := &router.Advert{
 | 
						|
			Id:        resp.Id,
 | 
						|
			Type:      router.AdvertType(resp.Type),
 | 
						|
			Timestamp: time.Unix(0, resp.Timestamp),
 | 
						|
			TTL:       time.Duration(resp.Ttl),
 | 
						|
			Events:    events,
 | 
						|
		}
 | 
						|
 | 
						|
		select {
 | 
						|
		case advertChan <- advert:
 | 
						|
		case <-s.exit:
 | 
						|
			close(advertChan)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// close the channel on exit
 | 
						|
	close(advertChan)
 | 
						|
 | 
						|
	return advErr
 | 
						|
}
 | 
						|
 | 
						|
// Advertise advertises routes to the network
 | 
						|
func (s *svc) Advertise() (<-chan *router.Advert, error) {
 | 
						|
	s.Lock()
 | 
						|
	defer s.Unlock()
 | 
						|
 | 
						|
	// get the status
 | 
						|
	status := s.Status()
 | 
						|
 | 
						|
	switch status.Code {
 | 
						|
	case router.Running, router.Advertising:
 | 
						|
		stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...)
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf("failed getting advert stream: %s", err)
 | 
						|
		}
 | 
						|
		// create advertise and event channels
 | 
						|
		advertChan := make(chan *router.Advert)
 | 
						|
		go s.advertiseEvents(advertChan, stream)
 | 
						|
		return advertChan, nil
 | 
						|
	case router.Stopped:
 | 
						|
		// check if our router is stopped
 | 
						|
		select {
 | 
						|
		case <-s.exit:
 | 
						|
			s.exit = make(chan bool)
 | 
						|
			// call advertise again
 | 
						|
			return s.Advertise()
 | 
						|
		default:
 | 
						|
			return nil, fmt.Errorf("not running")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil, fmt.Errorf("error: %s", s.status.Error)
 | 
						|
}
 | 
						|
 | 
						|
// Process processes incoming adverts
 | 
						|
func (s *svc) Process(advert *router.Advert) error {
 | 
						|
	var events []*pb.Event
 | 
						|
	for _, event := range advert.Events {
 | 
						|
		route := &pb.Route{
 | 
						|
			Service: event.Route.Service,
 | 
						|
			Address: event.Route.Address,
 | 
						|
			Gateway: event.Route.Gateway,
 | 
						|
			Network: event.Route.Network,
 | 
						|
			Link:    event.Route.Link,
 | 
						|
			Metric:  int64(event.Route.Metric),
 | 
						|
		}
 | 
						|
		e := &pb.Event{
 | 
						|
			Type:      pb.EventType(event.Type),
 | 
						|
			Timestamp: event.Timestamp.UnixNano(),
 | 
						|
			Route:     route,
 | 
						|
		}
 | 
						|
		events = append(events, e)
 | 
						|
	}
 | 
						|
 | 
						|
	advertReq := &pb.Advert{
 | 
						|
		Id:        s.Options().Id,
 | 
						|
		Type:      pb.AdvertType(advert.Type),
 | 
						|
		Timestamp: advert.Timestamp.UnixNano(),
 | 
						|
		Events:    events,
 | 
						|
	}
 | 
						|
 | 
						|
	if _, err := s.router.Process(context.Background(), advertReq, s.callOpts...); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Status returns router status
 | 
						|
func (s *svc) Status() router.Status {
 | 
						|
	s.Lock()
 | 
						|
	defer s.Unlock()
 | 
						|
 | 
						|
	// check if its stopped
 | 
						|
	select {
 | 
						|
	case <-s.exit:
 | 
						|
		return router.Status{
 | 
						|
			Code:  router.Stopped,
 | 
						|
			Error: nil,
 | 
						|
		}
 | 
						|
	default:
 | 
						|
		// don't block
 | 
						|
	}
 | 
						|
 | 
						|
	// check the remote router
 | 
						|
	rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...)
 | 
						|
	if err != nil {
 | 
						|
		return router.Status{
 | 
						|
			Code:  router.Error,
 | 
						|
			Error: err,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	code := router.Running
 | 
						|
	var serr error
 | 
						|
 | 
						|
	switch rsp.Status.Code {
 | 
						|
	case "running":
 | 
						|
		code = router.Running
 | 
						|
	case "advertising":
 | 
						|
		code = router.Advertising
 | 
						|
	case "stopped":
 | 
						|
		code = router.Stopped
 | 
						|
	case "error":
 | 
						|
		code = router.Error
 | 
						|
	}
 | 
						|
 | 
						|
	if len(rsp.Status.Error) > 0 {
 | 
						|
		serr = errors.New(rsp.Status.Error)
 | 
						|
	}
 | 
						|
 | 
						|
	return router.Status{
 | 
						|
		Code:  code,
 | 
						|
		Error: serr,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Remote router cannot be stopped
 | 
						|
func (s *svc) Stop() error {
 | 
						|
	s.Lock()
 | 
						|
	defer s.Unlock()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-s.exit:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(s.exit)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Lookup looks up routes in the routing table and returns them
 | 
						|
func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
 | 
						|
	// call the router
 | 
						|
	resp, err := s.router.Lookup(context.Background(), &pb.LookupRequest{
 | 
						|
		Query: &pb.Query{
 | 
						|
			Service: q.Options().Service,
 | 
						|
			Gateway: q.Options().Gateway,
 | 
						|
			Network: q.Options().Network,
 | 
						|
		},
 | 
						|
	}, s.callOpts...)
 | 
						|
 | 
						|
	// errored out
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	routes := make([]router.Route, len(resp.Routes))
 | 
						|
	for i, route := range resp.Routes {
 | 
						|
		routes[i] = router.Route{
 | 
						|
			Service: route.Service,
 | 
						|
			Address: route.Address,
 | 
						|
			Gateway: route.Gateway,
 | 
						|
			Network: route.Network,
 | 
						|
			Link:    route.Link,
 | 
						|
			Metric:  int(route.Metric),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return routes, nil
 | 
						|
}
 | 
						|
 | 
						|
// Watch returns a watcher which allows to track updates to the routing table
 | 
						|
func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
 | 
						|
	rsp, err := s.router.Watch(context.Background(), &pb.WatchRequest{}, s.callOpts...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var options router.WatchOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
	return newWatcher(rsp, options)
 | 
						|
}
 | 
						|
 | 
						|
// Returns the router implementation
 | 
						|
func (s *svc) String() string {
 | 
						|
	return "service"
 | 
						|
}
 |