273 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			273 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package router is a network/router selector
 | |
| package router
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"os"
 | |
| 	"sort"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/micro/go-micro/client"
 | |
| 	"github.com/micro/go-micro/client/selector"
 | |
| 	"github.com/micro/go-micro/registry"
 | |
| 	"github.com/micro/go-micro/router"
 | |
| 	pb "github.com/micro/go-micro/router/proto"
 | |
| )
 | |
| 
 | |
| type routerSelector struct {
 | |
| 	opts selector.Options
 | |
| 
 | |
| 	// the router
 | |
| 	r router.Router
 | |
| 
 | |
| 	// the client we have
 | |
| 	c client.Client
 | |
| 
 | |
| 	// the client for the remote router
 | |
| 	rs pb.RouterService
 | |
| 
 | |
| 	// name of the router
 | |
| 	name string
 | |
| 
 | |
| 	// address of the remote router
 | |
| 	addr string
 | |
| 
 | |
| 	// whether to use the remote router
 | |
| 	remote bool
 | |
| }
 | |
| 
 | |
| type clientKey struct{}
 | |
| type routerKey struct{}
 | |
| 
 | |
| // getRoutes returns the routes whether they are remote or local
 | |
| func (r *routerSelector) getRoutes(service string) ([]router.Route, error) {
 | |
| 	if !r.remote {
 | |
| 		// lookup router for routes for the service
 | |
| 		return r.r.Lookup(router.NewQuery(
 | |
| 			router.QueryService(service),
 | |
| 		))
 | |
| 	}
 | |
| 
 | |
| 	// lookup the remote router
 | |
| 
 | |
| 	var addrs []string
 | |
| 
 | |
| 	// set the remote address if specified
 | |
| 	if len(r.addr) > 0 {
 | |
| 		addrs = append(addrs, r.addr)
 | |
| 	} else {
 | |
| 		// we have a name so we need to check the registry
 | |
| 		services, err := r.c.Options().Registry.GetService(r.name)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		for _, service := range services {
 | |
| 			for _, node := range service.Nodes {
 | |
| 				addrs = append(addrs, node.Address)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// no router addresses available
 | |
| 	if len(addrs) == 0 {
 | |
| 		return nil, selector.ErrNoneAvailable
 | |
| 	}
 | |
| 
 | |
| 	var pbRoutes *pb.LookupResponse
 | |
| 	var err error
 | |
| 
 | |
| 	// TODO: implement backoff and retries
 | |
| 	for _, addr := range addrs {
 | |
| 		// call the router
 | |
| 		pbRoutes, err = r.rs.Lookup(context.Background(), &pb.LookupRequest{
 | |
| 			Query: &pb.Query{
 | |
| 				Service: service,
 | |
| 			},
 | |
| 		}, client.WithAddress(addr))
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	// errored out
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// no routes
 | |
| 	if pbRoutes == nil {
 | |
| 		return nil, selector.ErrNoneAvailable
 | |
| 	}
 | |
| 
 | |
| 	var routes []router.Route
 | |
| 
 | |
| 	// convert from pb to []*router.Route
 | |
| 	for _, r := range pbRoutes.Routes {
 | |
| 		routes = append(routes, router.Route{
 | |
| 			Service: r.Service,
 | |
| 			Address: r.Address,
 | |
| 			Gateway: r.Gateway,
 | |
| 			Network: r.Network,
 | |
| 			Link:    r.Link,
 | |
| 			Metric:  int(r.Metric),
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return routes, nil
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) Init(opts ...selector.Option) error {
 | |
| 	// no op
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) Options() selector.Options {
 | |
| 	return r.opts
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
 | |
| 	// TODO: pull routes asynchronously and cache
 | |
| 	routes, err := r.getRoutes(service)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// no routes return not found error
 | |
| 	if len(routes) == 0 {
 | |
| 		return nil, selector.ErrNotFound
 | |
| 	}
 | |
| 
 | |
| 	// TODO: apply filters by pseudo constructing service
 | |
| 
 | |
| 	// sort the routes based on metric
 | |
| 	sort.Slice(routes, func(i, j int) bool {
 | |
| 		return routes[i].Metric < routes[j].Metric
 | |
| 	})
 | |
| 
 | |
| 	// roundrobin assuming routes are in metric preference order
 | |
| 	var i int
 | |
| 	var mtx sync.Mutex
 | |
| 
 | |
| 	return func() (*registry.Node, error) {
 | |
| 		// get index and increment counter with every call to next
 | |
| 		mtx.Lock()
 | |
| 		idx := i
 | |
| 		i++
 | |
| 		mtx.Unlock()
 | |
| 
 | |
| 		// get route based on idx
 | |
| 		route := routes[idx%len(routes)]
 | |
| 
 | |
| 		// defaults to gateway and no port
 | |
| 		address := route.Address
 | |
| 		if len(route.Gateway) > 0 {
 | |
| 			address = route.Gateway
 | |
| 		}
 | |
| 
 | |
| 		// return as a node
 | |
| 		return ®istry.Node{
 | |
| 			// TODO: add id and metadata if we can
 | |
| 			Address: address,
 | |
| 		}, nil
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) Mark(service string, node *registry.Node, err error) {
 | |
| 	// TODO: pass back metrics or information to the router
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) Reset(service string) {
 | |
| 	// TODO: reset the metrics or information at the router
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) Close() error {
 | |
| 	// stop the router advertisements
 | |
| 	return r.r.Stop()
 | |
| }
 | |
| 
 | |
| func (r *routerSelector) String() string {
 | |
| 	return "router"
 | |
| }
 | |
| 
 | |
| // NewSelector returns a new router based selector
 | |
| func NewSelector(opts ...selector.Option) selector.Selector {
 | |
| 	options := selector.Options{
 | |
| 		Context: context.Background(),
 | |
| 	}
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	// set default registry if not set
 | |
| 	if options.Registry == nil {
 | |
| 		options.Registry = registry.DefaultRegistry
 | |
| 	}
 | |
| 
 | |
| 	// try get router from the context
 | |
| 	r, ok := options.Context.Value(routerKey{}).(router.Router)
 | |
| 	if !ok {
 | |
| 		// TODO: Use router.DefaultRouter?
 | |
| 		r = router.NewRouter(
 | |
| 			router.Registry(options.Registry),
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	// try get client from the context
 | |
| 	c, ok := options.Context.Value(clientKey{}).(client.Client)
 | |
| 	if !ok {
 | |
| 		c = client.DefaultClient
 | |
| 	}
 | |
| 
 | |
| 	// get the router from env vars if its a remote service
 | |
| 	remote := true
 | |
| 	routerName := os.Getenv("MICRO_ROUTER")
 | |
| 	routerAddress := os.Getenv("MICRO_ROUTER_ADDRESS")
 | |
| 
 | |
| 	// start the router advertisements if we're running it locally
 | |
| 	if len(routerName) == 0 && len(routerAddress) == 0 {
 | |
| 		go r.Advertise()
 | |
| 		remote = false
 | |
| 	}
 | |
| 
 | |
| 	return &routerSelector{
 | |
| 		opts: options,
 | |
| 		// set the internal router
 | |
| 		r: r,
 | |
| 		// set the client
 | |
| 		c: c,
 | |
| 		// set the router client
 | |
| 		rs: pb.NewRouterService(routerName, c),
 | |
| 		// name of the router
 | |
| 		name: routerName,
 | |
| 		// address of router
 | |
| 		addr: routerAddress,
 | |
| 		// let ourselves know to use the remote router
 | |
| 		remote: remote,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WithClient sets the client for the request
 | |
| func WithClient(c client.Client) selector.Option {
 | |
| 	return func(o *selector.Options) {
 | |
| 		if o.Context == nil {
 | |
| 			o.Context = context.Background()
 | |
| 		}
 | |
| 		o.Context = context.WithValue(o.Context, clientKey{}, c)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WithRouter sets the router as an option
 | |
| func WithRouter(r router.Router) selector.Option {
 | |
| 	return func(o *selector.Options) {
 | |
| 		if o.Context == nil {
 | |
| 			o.Context = context.Background()
 | |
| 		}
 | |
| 		o.Context = context.WithValue(o.Context, routerKey{}, r)
 | |
| 	}
 | |
| }
 |