From 23f0231a098a71de58de5e5064b494feaec3947d Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 24 Jul 2019 19:03:13 +0100 Subject: [PATCH] Adds route watcher to mucp.Proxy --- network/proxy/mucp/mucp.go | 102 ++++++++++++++++++++++++++++++------- 1 file changed, 85 insertions(+), 17 deletions(-) diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index 96e3527f..d15118e3 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -3,6 +3,7 @@ package mucp import ( "context" + "fmt" "io" "strings" "sync" @@ -36,6 +37,9 @@ type Proxy struct { // A fib of routes service:address sync.RWMutex Routes map[string]map[uint64]table.Route + + // The channel to monitor watcher errors + errChan chan error } // read client request and write to server @@ -119,6 +123,55 @@ func (p *Proxy) getRoute(service string) ([]string, error) { return toNodes(routes), nil } +// manageRouteCache applies action on a given route to Proxy route cache +func (p *Proxy) manageRouteCache(route table.Route, action string) error { + switch action { + case "create", "update": + if _, ok := p.Routes[route.Service]; !ok { + p.Routes[route.Service] = make(map[uint64]table.Route) + } + p.Routes[route.Service][route.Hash()] = route + case "delete": + if _, ok := p.Routes[route.Service]; !ok { + return fmt.Errorf("route not found") + } + delete(p.Routes[route.Service], route.Hash()) + default: + return fmt.Errorf("unknown action: %s", action) + } + + return nil +} + +// watchRoutes watches service routes and updates proxy cache +func (p *Proxy) watchRoutes() { + // this is safe to do as the only way watchRoutes returns is + // when some error is written into error channel - we want to bail then + defer close(p.errChan) + + // route watcher + w, err := p.Router.Watch() + if err != nil { + p.errChan <- err + return + } + + for { + event, err := w.Next() + if err != nil { + p.errChan <- err + return + } + + p.Lock() + if err := p.manageRouteCache(event.Route, fmt.Sprintf("%s", event.Type)); err != nil { + // TODO: should we bail here? + continue + } + p.Unlock() + } +} + // ServeRequest honours the server.Router interface func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { // service name @@ -181,28 +234,39 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server // get raw response resp := stream.Response() + // route watcher error + var watchErr error + // create server response write loop for { - // read backend response body - body, err := resp.Read() - if err == io.EOF { - return nil - } else if err != nil { - return err - } + select { + case err := <-p.errChan: + if err != nil { + watchErr = err + } + return watchErr + default: + // read backend response body + body, err := resp.Read() + if err == io.EOF { + return nil + } else if err != nil { + return err + } - // read backend response header - hdr := resp.Header() + // read backend response header + hdr := resp.Header() - // write raw response header to client - rsp.WriteHeader(hdr) + // write raw response header to client + rsp.WriteHeader(hdr) - // write raw response body to client - err = rsp.Write(body) - if err == io.EOF { - return nil - } else if err != nil { - return err + // write raw response body to client + err = rsp.Write(body) + if err == io.EOF { + return nil + } else if err != nil { + return err + } } } @@ -254,5 +318,9 @@ func NewProxy(opts ...options.Option) proxy.Proxy { // routes cache p.Routes = make(map[string]map[uint64]table.Route) + // watch router service routes + p.errChan = make(chan error, 1) + go p.watchRoutes() + return p }