mucp Proxy no longer uses RPC interface of router.Router directly
This commit is contained in:
		| @@ -4,12 +4,10 @@ package mucp | ||||
| import ( | ||||
| 	"context" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/micro/go-micro/client" | ||||
| 	"github.com/micro/go-micro/client/selector" | ||||
| 	"github.com/micro/go-micro/codec" | ||||
| 	"github.com/micro/go-micro/codec/bytes" | ||||
| 	"github.com/micro/go-micro/config/options" | ||||
| @@ -17,7 +15,6 @@ import ( | ||||
| 	"github.com/micro/go-micro/network/router" | ||||
| 	"github.com/micro/go-micro/server" | ||||
|  | ||||
| 	pb "github.com/micro/go-micro/network/router/proto" | ||||
| 	"github.com/micro/go-micro/network/router/table" | ||||
| ) | ||||
|  | ||||
| @@ -27,7 +24,7 @@ type Proxy struct { | ||||
| 	// embed options | ||||
| 	options.Options | ||||
|  | ||||
| 	// Endpoint specified the fixed service endpoint to call. | ||||
| 	// Endpoint specifies the fixed service endpoint to call. | ||||
| 	Endpoint string | ||||
|  | ||||
| 	// The client to use for outbound requests | ||||
| @@ -36,12 +33,9 @@ type Proxy struct { | ||||
| 	// The router for routes | ||||
| 	Router router.Router | ||||
|  | ||||
| 	// The router service client | ||||
| 	RouterService pb.RouterService | ||||
|  | ||||
| 	// A fib of routes service:address | ||||
| 	sync.RWMutex | ||||
| 	Routes map[string][]table.Route | ||||
| 	Routes map[string]map[uint64]table.Route | ||||
| } | ||||
|  | ||||
| // read client request and write to server | ||||
| @@ -79,155 +73,54 @@ func readLoop(r server.Request, s client.Stream) error { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // toNodes returns a list of node addresses from given routes | ||||
| func toNodes(routes map[uint64]table.Route) []string { | ||||
| 	var nodes []string | ||||
| 	for _, node := range routes { | ||||
| 		address := node.Address | ||||
| 		if len(node.Gateway) > 0 { | ||||
| 			address = node.Gateway | ||||
| 		} | ||||
| 		nodes = append(nodes, address) | ||||
| 	} | ||||
| 	return nodes | ||||
| } | ||||
|  | ||||
| func (p *Proxy) getRoute(service string) ([]string, error) { | ||||
| 	// converts routes to just addresses | ||||
| 	toNodes := func(routes []table.Route) []string { | ||||
| 		var nodes []string | ||||
| 		for _, node := range routes { | ||||
| 			address := node.Address | ||||
| 			if len(node.Gateway) > 0 { | ||||
| 				address = node.Gateway | ||||
| 			} | ||||
| 			nodes = append(nodes, address) | ||||
| 		} | ||||
| 		return nodes | ||||
| 	} | ||||
|  | ||||
| 	// lookup the route cache first | ||||
| 	p.RLock() | ||||
| 	p.Lock() | ||||
| 	routes, ok := p.Routes[service] | ||||
| 	// got it! | ||||
| 	if ok { | ||||
| 		p.RUnlock() | ||||
| 		return toNodes(routes), nil | ||||
| 	} | ||||
| 	p.RUnlock() | ||||
|  | ||||
| 	// route cache miss, now lookup the router | ||||
| 	// if it does not exist, don't error out | ||||
| 	// the proxy will just hand off to the client | ||||
| 	// and try the registry | ||||
| 	// in future we might set a default gateway | ||||
| 	if p.Router != nil { | ||||
| 		// lookup the router | ||||
| 		routes, err := p.Router.Lookup( | ||||
| 			table.NewQuery(table.QueryService(service)), | ||||
| 		) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		p.Lock() | ||||
| 		if p.Routes == nil { | ||||
| 			p.Routes = make(map[string][]table.Route) | ||||
| 		} | ||||
| 		p.Routes[service] = routes | ||||
| 		p.Unlock() | ||||
|  | ||||
| 		return toNodes(routes), nil | ||||
| 	} | ||||
| 	p.Routes[service] = make(map[uint64]table.Route) | ||||
| 	p.Unlock() | ||||
|  | ||||
| 	// we've tried getting cached routes | ||||
| 	// we've tried using the router | ||||
| 	addr := os.Getenv("MICRO_ROUTER_ADDRESS") | ||||
| 	name := os.Getenv("MICRO_ROUTER") | ||||
|  | ||||
| 	// no router is specified we're going to set the default | ||||
| 	if len(name) == 0 && len(addr) == 0 { | ||||
| 		p.Router = router.DefaultRouter | ||||
| 		go p.Router.Advertise() | ||||
|  | ||||
| 		// recursively execute getRoute | ||||
| 		return p.getRoute(service) | ||||
| 	// if the router is broken return error | ||||
| 	if status := p.Router.Status(); status.Code == router.Error { | ||||
| 		return nil, status.Error | ||||
| 	} | ||||
|  | ||||
| 	if len(name) == 0 { | ||||
| 		name = "go.micro.router" | ||||
| 	// lookup the routes in the router | ||||
| 	results, err := p.Router.Lookup(table.NewQuery(table.QueryService(service))) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// lookup the remote router | ||||
|  | ||||
| 	var addrs []string | ||||
|  | ||||
| 	// set the remote address if specified | ||||
| 	if len(addr) > 0 { | ||||
| 		addrs = append(addrs, addr) | ||||
| 	} else { | ||||
| 		// we have a name so we need to check the registry | ||||
| 		services, err := p.Client.Options().Registry.GetService(name) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		for _, service := range services { | ||||
| 			for _, node := range service.Nodes { | ||||
| 				addrs = append(addrs, node.Address) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// no router addresses available | ||||
| 	if len(addrs) == 0 { | ||||
| 		return nil, selector.ErrNoneAvailable | ||||
| 	} | ||||
|  | ||||
| 	var pbRoutes *pb.LookupResponse | ||||
| 	var gerr error | ||||
|  | ||||
| 	// set default client | ||||
| 	if p.RouterService == nil { | ||||
| 		p.RouterService = pb.NewRouterService(name, p.Client) | ||||
| 	} | ||||
|  | ||||
| 	// TODO: implement backoff and retries | ||||
| 	for _, addr := range addrs { | ||||
| 		// call the router | ||||
| 		proutes, err := p.RouterService.Lookup(context.Background(), &pb.LookupRequest{ | ||||
| 			Query: &pb.Query{ | ||||
| 				Service: service, | ||||
| 			}, | ||||
| 		}, client.WithAddress(addr)) | ||||
| 		if err != nil { | ||||
| 			gerr = err | ||||
| 			continue | ||||
| 		} | ||||
| 		// set routes | ||||
| 		pbRoutes = proutes | ||||
| 		break | ||||
| 	} | ||||
|  | ||||
| 	// errored out | ||||
| 	if gerr != nil { | ||||
| 		return nil, gerr | ||||
| 	} | ||||
|  | ||||
| 	// no routes | ||||
| 	if pbRoutes == nil { | ||||
| 		return nil, selector.ErrNoneAvailable | ||||
| 	} | ||||
|  | ||||
| 	// convert from pb to []*router.Route | ||||
| 	for _, r := range pbRoutes.Routes { | ||||
| 		routes = append(routes, table.Route{ | ||||
| 			Service: r.Service, | ||||
| 			Address: r.Address, | ||||
| 			Gateway: r.Gateway, | ||||
| 			Network: r.Network, | ||||
| 			Link:    r.Link, | ||||
| 			Metric:  int(r.Metric), | ||||
| 		}) | ||||
| 	// update the proxy cache | ||||
| 	p.Lock() | ||||
| 	for _, route := range results { | ||||
| 		p.Routes[service][route.Hash()] = route | ||||
| 	} | ||||
| 	routes = p.Routes[service] | ||||
| 	p.Unlock() | ||||
|  | ||||
| 	return toNodes(routes), nil | ||||
| } | ||||
|  | ||||
| // ServeRequest honours the server.Router interface | ||||
| func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { | ||||
| 	// set default client | ||||
| 	if p.Client == nil { | ||||
| 		p.Client = client.DefaultClient | ||||
| 	} | ||||
|  | ||||
| 	// service name | ||||
| 	service := req.Service() | ||||
| 	endpoint := req.Endpoint() | ||||
| @@ -342,13 +235,24 @@ func NewProxy(opts ...options.Option) proxy.Proxy { | ||||
| 		p.Client = c.(client.Client) | ||||
| 	} | ||||
|  | ||||
| 	// set the default client | ||||
| 	if p.Client == nil { | ||||
| 		p.Client = client.DefaultClient | ||||
| 	} | ||||
|  | ||||
| 	// get router | ||||
| 	r, ok := p.Options.Values().Get("proxy.router") | ||||
| 	if ok { | ||||
| 		p.Router = r.(router.Router) | ||||
| 		// TODO: should we advertise? | ||||
| 		go p.Router.Advertise() | ||||
| 	} | ||||
|  | ||||
| 	// create default router and start it | ||||
| 	if p.Router == nil { | ||||
| 		p.Router = router.DefaultRouter | ||||
| 	} | ||||
|  | ||||
| 	// routes cache | ||||
| 	p.Routes = make(map[string]map[uint64]table.Route) | ||||
|  | ||||
| 	return p | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user