diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 06ab676f..02f28d89 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -43,9 +43,6 @@ type Proxy struct { // A fib of routes service:address sync.RWMutex Routes map[string]map[uint64]router.Route - - // The channel to monitor watcher errors - errChan chan error } // read client request and write to server @@ -96,6 +93,18 @@ func toNodes(routes []router.Route) []string { return nodes } +func toSlice(r map[uint64]router.Route) []router.Route { + var routes []router.Route + for _, v := range r { + routes = append(routes, v) + } + + // sort the routes in order of metric + sort.Slice(routes, func(i, j int) bool { return routes[i].Metric < routes[j].Metric }) + + return routes +} + func (p *Proxy) getLink(r router.Route) (client.Client, error) { if r.Link == "local" || len(p.Links) == 0 { return p.Client, nil @@ -108,27 +117,25 @@ func (p *Proxy) getLink(r router.Route) (client.Client, error) { } func (p *Proxy) getRoute(service string) ([]router.Route, error) { - toSlice := func(r map[uint64]router.Route) []router.Route { - var routes []router.Route - for _, v := range r { - routes = append(routes, v) - } - - // sort the routes in order of metric - sort.Slice(routes, func(i, j int) bool { return routes[i].Metric < routes[j].Metric }) - - return routes - } - // lookup the route cache first p.Lock() - routes, ok := p.Routes[service] + cached, ok := p.Routes[service] if ok { p.Unlock() - return toSlice(routes), nil + return toSlice(cached), nil } p.Unlock() + // cache routes for the service + routes, err := p.cacheRoutes(service) + if err != nil { + return nil, err + } + + return routes, nil +} + +func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) { // lookup the routes in the router results, err := p.Router.Lookup(router.QueryService(service)) if err != nil { @@ -149,18 +156,40 @@ func (p *Proxy) getRoute(service string) ([]router.Route, error) { } p.Routes[service][route.Hash()] = route } - routes = p.Routes[service] + routes := p.Routes[service] p.Unlock() return toSlice(routes), nil } -// manageRouteCache applies action on a given route to Proxy route cache -func (p *Proxy) manageRouteCache(route router.Route, action string) error { +// refreshMetrics will refresh any metrics for our local cached routes. +// we may not receive new watch events for these as they change. +func (p *Proxy) refreshMetrics() { + var services []string + + // get a list of services to update + p.RLock() + for service, _ := range p.Routes { + services = append(services, service) + } + p.RUnlock() + + // get and cache the routes for the service + for _, service := range services { + p.cacheRoutes(service) + } +} + +// manageRoutes applies action on a given route to Proxy route cache +func (p *Proxy) manageRoutes(route router.Route, action string) error { + // we only cache what we are actually concerned with + p.Lock() + defer p.Unlock() + switch action { case "create", "update": if _, ok := p.Routes[route.Service]; !ok { - p.Routes[route.Service] = make(map[uint64]router.Route) + return fmt.Errorf("not called %s", route.Service) } p.Routes[route.Service][route.Hash()] = route case "delete": @@ -174,31 +203,22 @@ func (p *Proxy) manageRouteCache(route router.Route, action string) error { // watchRoutes watches service routes and updates proxy cache func (p *Proxy) watchRoutes() { - // this is safe to do as the only way watchRoutes returns is - // when some error is written into error channel - we want to bail then - defer close(p.errChan) - // route watcher w, err := p.Router.Watch() if err != nil { - p.errChan <- err return } for { event, err := w.Next() if err != nil { - p.errChan <- err return } - p.Lock() - if err := p.manageRouteCache(event.Route, fmt.Sprintf("%s", event.Type)); err != nil { + if err := p.manageRoutes(event.Route, fmt.Sprintf("%s", event.Type)); err != nil { // TODO: should we bail here? - p.Unlock() continue } - p.Unlock() } } @@ -360,39 +380,28 @@ func (p *Proxy) serveRequest(ctx context.Context, link client.Client, service, e // get raw response resp := stream.Response() - // route watcher error - var watchErr error - // create server response write loop for { - select { - case err := <-p.errChan: - if err != nil { - watchErr = err - } - return watchErr - default: - // read backend response body - body, err := resp.Read() - if err == io.EOF { - return nil - } else if err != nil { - return err - } + // read backend response body + body, err := resp.Read() + if err == io.EOF { + return nil + } else if err != nil { + return err + } - // read backend response header - hdr := resp.Header() + // read backend response header + hdr := resp.Header() - // write raw response header to client - rsp.WriteHeader(hdr) + // write raw response header to client + rsp.WriteHeader(hdr) - // write raw response body to client - err = rsp.Write(body) - if err == io.EOF { - return nil - } else if err != nil { - return err - } + // write raw response body to client + err = rsp.Write(body) + if err == io.EOF { + return nil + } else if err != nil { + return err } } @@ -451,9 +460,6 @@ func NewProxy(opts ...options.Option) proxy.Proxy { // routes cache p.Routes = make(map[string]map[uint64]router.Route) - // watch router service routes - p.errChan = make(chan error, 1) - go func() { // continuously attempt to watch routes for { @@ -464,5 +470,19 @@ func NewProxy(opts ...options.Option) proxy.Proxy { } }() + go func() { + t := time.NewTicker(time.Minute) + defer t.Stop() + + // we must refresh route metrics since they do not trigger new events + for { + select { + case <-t.C: + // refresh route metrics + p.refreshMetrics() + } + } + }() + return p }