diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index 2d52058a..dcc19a73 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -3,17 +3,22 @@ package mucp import ( "context" + "fmt" "io" + "os" "strings" + "sync" "github.com/micro/go-micro/client" - rselect "github.com/micro/go-micro/client/selector/router" + "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/network/proxy" "github.com/micro/go-micro/network/router" "github.com/micro/go-micro/server" + + pb "github.com/micro/go-micro/network/router/proto" ) // Proxy will transparently proxy requests to an endpoint. @@ -27,6 +32,16 @@ type Proxy struct { // The client to use for outbound requests Client client.Client + + // The router for routes + Router router.Router + + // The router service client + RouterService pb.RouterService + + // A fib of routes service:address + sync.RWMutex + Routes map[string][]router.Route } // read client request and write to server @@ -64,6 +79,142 @@ func readLoop(r server.Request, s client.Stream) error { } } +func (p *Proxy) getRoute(service string) ([]string, error) { + // converts routes to just addresses + toNodes := func(routes []router.Route) []string { + var nodes []string + for _, node := range routes { + nodes = append(nodes, node.Gateway) + } + return nodes + } + + // lookup the route cache first + p.RLock() + routes, ok := p.Routes[service] + // got it! + if ok { + p.RUnlock() + return toNodes(routes), nil + } + p.RUnlock() + + // route cache miss, now lookup the router + // if it does not exist, don't error out + // the proxy will just hand off to the client + // and try the registry + // in future we might set a default gateway + if p.Router != nil { + // lookup the router + routes, err := p.Router.Table().Lookup( + router.NewQuery(router.QueryDestination(service)), + ) + if err != nil { + return nil, err + } + + p.Lock() + if p.Routes == nil { + p.Routes = make(map[string][]router.Route) + } + p.Routes[service] = routes + p.Unlock() + + return toNodes(routes), nil + } + + // we've tried getting cached routes + // we've tried using the router + addr := os.Getenv("MICRO_ROUTER_ADDRESS") + name := os.Getenv("MICRO_ROUTER") + + // no router is specified we're going to set the default + if len(name) == 0 && len(addr) == 0 { + p.Router = router.DefaultRouter + // recursively execute getRoute + return p.getRoute(service) + } + + if len(name) == 0 { + name = "go.micro.router" + } + + // lookup the remote router + + var addrs []string + + // set the remote address if specified + if len(addr) > 0 { + addrs = append(addrs, addr) + } else { + // we have a name so we need to check the registry + services, err := p.Client.Options().Registry.GetService(name) + if err != nil { + return nil, err + } + + for _, service := range services { + for _, node := range service.Nodes { + addr := node.Address + if node.Port > 0 { + addr = fmt.Sprintf("%s:%d", node.Address, node.Port) + } + addrs = append(addrs, addr) + } + } + } + + // no router addresses available + if len(addrs) == 0 { + return nil, selector.ErrNoneAvailable + } + + var pbRoutes *pb.LookupResponse + var err error + + // set default client + if p.RouterService == nil { + p.RouterService = pb.NewRouterService(name, p.Client) + } + + // TODO: implement backoff and retries + for _, addr := range addrs { + // call the router + pbRoutes, err = p.RouterService.Lookup(context.Background(), &pb.LookupRequest{ + Query: &pb.Query{ + Destination: service, + }, + }, client.WithAddress(addr)) + if err != nil { + continue + } + break + } + + // errored out + if err != nil { + return nil, err + } + + // no routes + if pbRoutes == nil { + return nil, selector.ErrNoneAvailable + } + + // convert from pb to []*router.Route + for _, r := range pbRoutes.Routes { + routes = append(routes, router.Route{ + Destination: r.Destination, + Gateway: r.Gateway, + Router: r.Router, + Network: r.Network, + Metric: int(r.Metric), + }) + } + + return toNodes(routes), nil +} + // ServeRequest honours the server.Router interface func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { // set default client @@ -71,21 +222,42 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server p.Client = client.DefaultClient } - opts := []client.CallOption{} - // service name service := req.Service() endpoint := req.Endpoint() + var addresses []string - // call a specific backend + // call a specific backend endpoint either by name or address if len(p.Endpoint) > 0 { // address:port if parts := strings.Split(p.Endpoint, ":"); len(parts) > 1 { - opts = append(opts, client.WithAddress(p.Endpoint)) - // use as service name + addresses = []string{p.Endpoint} } else { + // get route for endpoint from router + addr, err := p.getRoute(p.Endpoint) + if err != nil { + return err + } + // set the address + addresses = addr + // set the name service = p.Endpoint } + } else { + // no endpoint was specified just lookup the route + // get route for endpoint from router + addr, err := p.getRoute(service) + if err != nil { + return err + } + addresses = addr + } + + var opts []client.CallOption + + // set address if available + if len(addresses) > 0 { + opts = append(opts, client.WithAddress(addresses...)) } // read initial request @@ -167,14 +339,7 @@ func NewProxy(opts ...options.Option) proxy.Proxy { // get router r, ok := p.Options.Values().Get("proxy.router") if ok { - // set the router in the client - p.Client.Init( - // pass new selector as an option to the client - client.Selector(rselect.NewSelector( - // set the router in the selector - rselect.WithRouter(r.(router.Router)), - )), - ) + p.Router = r.(router.Router) } return p