Merge pull request #610 from milosgajdos83/proxy-watch

Adds route watcher to mucp.Proxy
This commit is contained in:
Asim Aslam 2019-07-24 11:19:52 -07:00 committed by GitHub
commit 220a8fafb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,6 +3,7 @@ package mucp
import ( import (
"context" "context"
"fmt"
"io" "io"
"strings" "strings"
"sync" "sync"
@ -36,6 +37,9 @@ type Proxy struct {
// A fib of routes service:address // A fib of routes service:address
sync.RWMutex sync.RWMutex
Routes map[string]map[uint64]table.Route Routes map[string]map[uint64]table.Route
// The channel to monitor watcher errors
errChan chan error
} }
// read client request and write to server // read client request and write to server
@ -119,6 +123,56 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
return toNodes(routes), nil return toNodes(routes), nil
} }
// manageRouteCache applies action on a given route to Proxy route cache
func (p *Proxy) manageRouteCache(route table.Route, action string) error {
switch action {
case "create", "update":
if _, ok := p.Routes[route.Service]; !ok {
p.Routes[route.Service] = make(map[uint64]table.Route)
}
p.Routes[route.Service][route.Hash()] = route
case "delete":
if _, ok := p.Routes[route.Service]; !ok {
return fmt.Errorf("route not found")
}
delete(p.Routes[route.Service], route.Hash())
default:
return fmt.Errorf("unknown action: %s", action)
}
return nil
}
// watchRoutes watches service routes and updates proxy cache
func (p *Proxy) watchRoutes() {
// this is safe to do as the only way watchRoutes returns is
// when some error is written into error channel - we want to bail then
defer close(p.errChan)
// route watcher
w, err := p.Router.Watch()
if err != nil {
p.errChan <- err
return
}
for {
event, err := w.Next()
if err != nil {
p.errChan <- err
return
}
p.Lock()
if err := p.manageRouteCache(event.Route, fmt.Sprintf("%s", event.Type)); err != nil {
// TODO: should we bail here?
p.Unlock()
continue
}
p.Unlock()
}
}
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// service name // service name
@ -181,28 +235,39 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
// get raw response // get raw response
resp := stream.Response() resp := stream.Response()
// route watcher error
var watchErr error
// create server response write loop // create server response write loop
for { for {
// read backend response body select {
body, err := resp.Read() case err := <-p.errChan:
if err == io.EOF { if err != nil {
return nil watchErr = err
} else if err != nil { }
return err return watchErr
} default:
// read backend response body
body, err := resp.Read()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// read backend response header // read backend response header
hdr := resp.Header() hdr := resp.Header()
// write raw response header to client // write raw response header to client
rsp.WriteHeader(hdr) rsp.WriteHeader(hdr)
// write raw response body to client // write raw response body to client
err = rsp.Write(body) err = rsp.Write(body)
if err == io.EOF { if err == io.EOF {
return nil return nil
} else if err != nil { } else if err != nil {
return err return err
}
} }
} }
@ -254,5 +319,9 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
// routes cache // routes cache
p.Routes = make(map[string]map[uint64]table.Route) p.Routes = make(map[string]map[uint64]table.Route)
// watch router service routes
p.errChan = make(chan error, 1)
go p.watchRoutes()
return p return p
} }