Refresh route metrics in the proxy

This commit is contained in:
Asim Aslam 2019-10-25 22:46:43 +01:00
parent e85863d6cc
commit 51922c1763

View File

@ -43,9 +43,6 @@ type Proxy struct {
// A fib of routes service:address // A fib of routes service:address
sync.RWMutex sync.RWMutex
Routes map[string]map[uint64]router.Route Routes map[string]map[uint64]router.Route
// The channel to monitor watcher errors
errChan chan error
} }
// read client request and write to server // read client request and write to server
@ -96,6 +93,18 @@ func toNodes(routes []router.Route) []string {
return nodes return nodes
} }
func toSlice(r map[uint64]router.Route) []router.Route {
var routes []router.Route
for _, v := range r {
routes = append(routes, v)
}
// sort the routes in order of metric
sort.Slice(routes, func(i, j int) bool { return routes[i].Metric < routes[j].Metric })
return routes
}
func (p *Proxy) getLink(r router.Route) (client.Client, error) { func (p *Proxy) getLink(r router.Route) (client.Client, error) {
if r.Link == "local" || len(p.Links) == 0 { if r.Link == "local" || len(p.Links) == 0 {
return p.Client, nil return p.Client, nil
@ -108,27 +117,25 @@ func (p *Proxy) getLink(r router.Route) (client.Client, error) {
} }
func (p *Proxy) getRoute(service string) ([]router.Route, error) { func (p *Proxy) getRoute(service string) ([]router.Route, error) {
toSlice := func(r map[uint64]router.Route) []router.Route {
var routes []router.Route
for _, v := range r {
routes = append(routes, v)
}
// sort the routes in order of metric
sort.Slice(routes, func(i, j int) bool { return routes[i].Metric < routes[j].Metric })
return routes
}
// lookup the route cache first // lookup the route cache first
p.Lock() p.Lock()
routes, ok := p.Routes[service] cached, ok := p.Routes[service]
if ok { if ok {
p.Unlock() p.Unlock()
return toSlice(routes), nil return toSlice(cached), nil
} }
p.Unlock() p.Unlock()
// cache routes for the service
routes, err := p.cacheRoutes(service)
if err != nil {
return nil, err
}
return routes, nil
}
func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) {
// lookup the routes in the router // lookup the routes in the router
results, err := p.Router.Lookup(router.QueryService(service)) results, err := p.Router.Lookup(router.QueryService(service))
if err != nil { if err != nil {
@ -149,18 +156,40 @@ func (p *Proxy) getRoute(service string) ([]router.Route, error) {
} }
p.Routes[service][route.Hash()] = route p.Routes[service][route.Hash()] = route
} }
routes = p.Routes[service] routes := p.Routes[service]
p.Unlock() p.Unlock()
return toSlice(routes), nil return toSlice(routes), nil
} }
// manageRouteCache applies action on a given route to Proxy route cache // refreshMetrics will refresh any metrics for our local cached routes.
func (p *Proxy) manageRouteCache(route router.Route, action string) error { // we may not receive new watch events for these as they change.
func (p *Proxy) refreshMetrics() {
var services []string
// get a list of services to update
p.RLock()
for service, _ := range p.Routes {
services = append(services, service)
}
p.RUnlock()
// get and cache the routes for the service
for _, service := range services {
p.cacheRoutes(service)
}
}
// manageRoutes applies action on a given route to Proxy route cache
func (p *Proxy) manageRoutes(route router.Route, action string) error {
// we only cache what we are actually concerned with
p.Lock()
defer p.Unlock()
switch action { switch action {
case "create", "update": case "create", "update":
if _, ok := p.Routes[route.Service]; !ok { if _, ok := p.Routes[route.Service]; !ok {
p.Routes[route.Service] = make(map[uint64]router.Route) return fmt.Errorf("not called %s", route.Service)
} }
p.Routes[route.Service][route.Hash()] = route p.Routes[route.Service][route.Hash()] = route
case "delete": case "delete":
@ -174,31 +203,22 @@ func (p *Proxy) manageRouteCache(route router.Route, action string) error {
// watchRoutes watches service routes and updates proxy cache // watchRoutes watches service routes and updates proxy cache
func (p *Proxy) watchRoutes() { func (p *Proxy) watchRoutes() {
// this is safe to do as the only way watchRoutes returns is
// when some error is written into error channel - we want to bail then
defer close(p.errChan)
// route watcher // route watcher
w, err := p.Router.Watch() w, err := p.Router.Watch()
if err != nil { if err != nil {
p.errChan <- err
return return
} }
for { for {
event, err := w.Next() event, err := w.Next()
if err != nil { if err != nil {
p.errChan <- err
return return
} }
p.Lock() if err := p.manageRoutes(event.Route, fmt.Sprintf("%s", event.Type)); err != nil {
if err := p.manageRouteCache(event.Route, fmt.Sprintf("%s", event.Type)); err != nil {
// TODO: should we bail here? // TODO: should we bail here?
p.Unlock()
continue continue
} }
p.Unlock()
} }
} }
@ -360,39 +380,28 @@ func (p *Proxy) serveRequest(ctx context.Context, link client.Client, service, e
// get raw response // get raw response
resp := stream.Response() resp := stream.Response()
// route watcher error
var watchErr error
// create server response write loop // create server response write loop
for { for {
select { // read backend response body
case err := <-p.errChan: body, err := resp.Read()
if err != nil { if err == io.EOF {
watchErr = err return nil
} } else if err != nil {
return watchErr return err
default: }
// read backend response body
body, err := resp.Read()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// read backend response header // read backend response header
hdr := resp.Header() hdr := resp.Header()
// write raw response header to client // write raw response header to client
rsp.WriteHeader(hdr) rsp.WriteHeader(hdr)
// write raw response body to client // write raw response body to client
err = rsp.Write(body) err = rsp.Write(body)
if err == io.EOF { if err == io.EOF {
return nil return nil
} else if err != nil { } else if err != nil {
return err return err
}
} }
} }
@ -451,9 +460,6 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
// routes cache // routes cache
p.Routes = make(map[string]map[uint64]router.Route) p.Routes = make(map[string]map[uint64]router.Route)
// watch router service routes
p.errChan = make(chan error, 1)
go func() { go func() {
// continuously attempt to watch routes // continuously attempt to watch routes
for { for {
@ -464,5 +470,19 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
} }
}() }()
go func() {
t := time.NewTicker(time.Minute)
defer t.Stop()
// we must refresh route metrics since they do not trigger new events
for {
select {
case <-t.C:
// refresh route metrics
p.refreshMetrics()
}
}
}()
return p return p
} }