Add remote lookup via router selector
This commit is contained in:
		| @@ -4,12 +4,15 @@ package router | ||||
| import ( | ||||
| 	"context" | ||||
| 	"net" | ||||
| 	"os" | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/micro/go-micro/client" | ||||
| 	"github.com/micro/go-micro/client/selector" | ||||
| 	"github.com/micro/go-micro/network/router" | ||||
| 	pb "github.com/micro/go-micro/network/router/proto" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| ) | ||||
|  | ||||
| @@ -18,10 +21,64 @@ type routerSelector struct { | ||||
|  | ||||
| 	// the router | ||||
| 	r router.Router | ||||
|  | ||||
| 	// the client for the remote router | ||||
| 	c pb.RouterService | ||||
|  | ||||
| 	// address of the remote router | ||||
| 	addr string | ||||
|  | ||||
| 	// whether to use the remote router | ||||
| 	remote bool | ||||
| } | ||||
|  | ||||
| type clientKey struct{} | ||||
| type routerKey struct{} | ||||
|  | ||||
| // getRoutes returns the routes whether they are remote or local | ||||
| func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { | ||||
| 	if !r.remote { | ||||
| 		// lookup router for routes for the service | ||||
| 		return r.r.Table().Lookup(router.NewQuery( | ||||
| 			router.QueryDestination(service), | ||||
| 		)) | ||||
| 	} | ||||
|  | ||||
| 	// lookup the remote router | ||||
|  | ||||
| 	var clientOpts []client.CallOption | ||||
|  | ||||
| 	// set the remote address if specified | ||||
| 	if len(r.addr) > 0 { | ||||
| 		clientOpts = append(clientOpts, client.WithAddress(r.addr)) | ||||
| 	} | ||||
|  | ||||
| 	// call the router | ||||
| 	pbRoutes, err := r.c.Lookup(context.Background(), &pb.LookupRequest{ | ||||
| 		Query: &pb.Query{ | ||||
| 			Destination: service, | ||||
| 		}, | ||||
| 	}, clientOpts...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var routes []router.Route | ||||
|  | ||||
| 	// convert from pb to []*router.Route | ||||
| 	for _, r := range pbRoutes.Routes { | ||||
| 		routes = append(routes, router.Route{ | ||||
| 			Destination: r.Destination, | ||||
| 			Gateway:     r.Gateway, | ||||
| 			Router:      r.Router, | ||||
| 			Network:     r.Network, | ||||
| 			Metric:      int(r.Metric), | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	return routes, nil | ||||
| } | ||||
|  | ||||
| func (r *routerSelector) Init(opts ...selector.Option) error { | ||||
| 	// no op | ||||
| 	return nil | ||||
| @@ -32,11 +89,8 @@ func (r *routerSelector) Options() selector.Options { | ||||
| } | ||||
|  | ||||
| func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { | ||||
| 	// lookup router for routes for the service | ||||
| 	routes, err := r.r.Table().Lookup(router.NewQuery( | ||||
| 		router.QueryDestination(service), | ||||
| 	)) | ||||
|  | ||||
| 	// TODO: pull routes asynchronously and cache | ||||
| 	routes, err := r.getRoutes(service) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -124,7 +178,7 @@ func NewSelector(opts ...selector.Option) selector.Selector { | ||||
| 		options.Registry = registry.DefaultRegistry | ||||
| 	} | ||||
|  | ||||
| 	// try get from the context | ||||
| 	// try get router from the context | ||||
| 	r, ok := options.Context.Value(routerKey{}).(router.Router) | ||||
| 	if !ok { | ||||
| 		// TODO: Use router.DefaultRouter? | ||||
| @@ -133,12 +187,43 @@ func NewSelector(opts ...selector.Option) selector.Selector { | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	// start the router advertisements | ||||
| 	r.Advertise() | ||||
| 	// try get client from the context | ||||
| 	c, ok := options.Context.Value(clientKey{}).(client.Client) | ||||
| 	if !ok { | ||||
| 		c = client.DefaultClient | ||||
| 	} | ||||
|  | ||||
| 	// get the router from env vars if its a remote service | ||||
| 	remote := true | ||||
| 	routerName := os.Getenv("MICRO_ROUTER") | ||||
| 	routerAddress := os.Getenv("MICRO_ROUTER_ADDRESS") | ||||
|  | ||||
| 	// start the router advertisements if we're running it locally | ||||
| 	if len(routerName) == 0 && len(routerAddress) == 0 { | ||||
| 		go r.Advertise() | ||||
| 		remote = false | ||||
| 	} | ||||
|  | ||||
| 	return &routerSelector{ | ||||
| 		opts: options, | ||||
| 		r:    r, | ||||
| 		// set the internal router | ||||
| 		r: r, | ||||
| 		// set the client | ||||
| 		c: pb.NewRouterService(routerName, c), | ||||
| 		// address of router | ||||
| 		addr: routerAddress, | ||||
| 		// let ourselves know to use the remote router | ||||
| 		remote: remote, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithClient sets the client for the request | ||||
| func WithClient(c client.Client) selector.Option { | ||||
| 	return func(o *selector.Options) { | ||||
| 		if o.Context == nil { | ||||
| 			o.Context = context.Background() | ||||
| 		} | ||||
| 		o.Context = context.WithValue(o.Context, clientKey{}, c) | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user