From b55adc0c30c6a21641be4eba1ca6af1ebc7b2e91 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 24 Jul 2019 18:13:51 +0100 Subject: [PATCH] mucp Proxy no longer uses RPC interface of router.Router directly --- network/proxy/mucp/mucp.go | 184 +++++++++---------------------------- 1 file changed, 44 insertions(+), 140 deletions(-) diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index 101f5f2a..96e3527f 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -4,12 +4,10 @@ package mucp import ( "context" "io" - "os" "strings" "sync" "github.com/micro/go-micro/client" - "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/config/options" @@ -17,7 +15,6 @@ import ( "github.com/micro/go-micro/network/router" "github.com/micro/go-micro/server" - pb "github.com/micro/go-micro/network/router/proto" "github.com/micro/go-micro/network/router/table" ) @@ -27,7 +24,7 @@ type Proxy struct { // embed options options.Options - // Endpoint specified the fixed service endpoint to call. + // Endpoint specifies the fixed service endpoint to call. Endpoint string // The client to use for outbound requests @@ -36,12 +33,9 @@ type Proxy struct { // The router for routes Router router.Router - // The router service client - RouterService pb.RouterService - // A fib of routes service:address sync.RWMutex - Routes map[string][]table.Route + Routes map[string]map[uint64]table.Route } // read client request and write to server @@ -79,155 +73,54 @@ func readLoop(r server.Request, s client.Stream) error { } } +// toNodes returns a list of node addresses from given routes +func toNodes(routes map[uint64]table.Route) []string { + var nodes []string + for _, node := range routes { + address := node.Address + if len(node.Gateway) > 0 { + address = node.Gateway + } + nodes = append(nodes, address) + } + return nodes +} + func (p *Proxy) getRoute(service string) ([]string, error) { - // converts routes to just addresses - toNodes := func(routes []table.Route) []string { - var nodes []string - for _, node := range routes { - address := node.Address - if len(node.Gateway) > 0 { - address = node.Gateway - } - nodes = append(nodes, address) - } - return nodes - } - // lookup the route cache first - p.RLock() + p.Lock() routes, ok := p.Routes[service] - // got it! if ok { - p.RUnlock() - return toNodes(routes), nil - } - p.RUnlock() - - // route cache miss, now lookup the router - // if it does not exist, don't error out - // the proxy will just hand off to the client - // and try the registry - // in future we might set a default gateway - if p.Router != nil { - // lookup the router - routes, err := p.Router.Lookup( - table.NewQuery(table.QueryService(service)), - ) - if err != nil { - return nil, err - } - - p.Lock() - if p.Routes == nil { - p.Routes = make(map[string][]table.Route) - } - p.Routes[service] = routes p.Unlock() - return toNodes(routes), nil } + p.Routes[service] = make(map[uint64]table.Route) + p.Unlock() - // we've tried getting cached routes - // we've tried using the router - addr := os.Getenv("MICRO_ROUTER_ADDRESS") - name := os.Getenv("MICRO_ROUTER") - - // no router is specified we're going to set the default - if len(name) == 0 && len(addr) == 0 { - p.Router = router.DefaultRouter - go p.Router.Advertise() - - // recursively execute getRoute - return p.getRoute(service) + // if the router is broken return error + if status := p.Router.Status(); status.Code == router.Error { + return nil, status.Error } - if len(name) == 0 { - name = "go.micro.router" + // lookup the routes in the router + results, err := p.Router.Lookup(table.NewQuery(table.QueryService(service))) + if err != nil { + return nil, err } - // lookup the remote router - - var addrs []string - - // set the remote address if specified - if len(addr) > 0 { - addrs = append(addrs, addr) - } else { - // we have a name so we need to check the registry - services, err := p.Client.Options().Registry.GetService(name) - if err != nil { - return nil, err - } - - for _, service := range services { - for _, node := range service.Nodes { - addrs = append(addrs, node.Address) - } - } - } - - // no router addresses available - if len(addrs) == 0 { - return nil, selector.ErrNoneAvailable - } - - var pbRoutes *pb.LookupResponse - var gerr error - - // set default client - if p.RouterService == nil { - p.RouterService = pb.NewRouterService(name, p.Client) - } - - // TODO: implement backoff and retries - for _, addr := range addrs { - // call the router - proutes, err := p.RouterService.Lookup(context.Background(), &pb.LookupRequest{ - Query: &pb.Query{ - Service: service, - }, - }, client.WithAddress(addr)) - if err != nil { - gerr = err - continue - } - // set routes - pbRoutes = proutes - break - } - - // errored out - if gerr != nil { - return nil, gerr - } - - // no routes - if pbRoutes == nil { - return nil, selector.ErrNoneAvailable - } - - // convert from pb to []*router.Route - for _, r := range pbRoutes.Routes { - routes = append(routes, table.Route{ - Service: r.Service, - Address: r.Address, - Gateway: r.Gateway, - Network: r.Network, - Link: r.Link, - Metric: int(r.Metric), - }) + // update the proxy cache + p.Lock() + for _, route := range results { + p.Routes[service][route.Hash()] = route } + routes = p.Routes[service] + p.Unlock() return toNodes(routes), nil } // ServeRequest honours the server.Router interface func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { - // set default client - if p.Client == nil { - p.Client = client.DefaultClient - } - // service name service := req.Service() endpoint := req.Endpoint() @@ -342,13 +235,24 @@ func NewProxy(opts ...options.Option) proxy.Proxy { p.Client = c.(client.Client) } + // set the default client + if p.Client == nil { + p.Client = client.DefaultClient + } + // get router r, ok := p.Options.Values().Get("proxy.router") if ok { p.Router = r.(router.Router) - // TODO: should we advertise? - go p.Router.Advertise() } + // create default router and start it + if p.Router == nil { + p.Router = router.DefaultRouter + } + + // routes cache + p.Routes = make(map[string]map[uint64]table.Route) + return p }