Adds route watcher to mucp.Proxy

This commit is contained in:
Milos Gajdos 2019-07-24 19:03:13 +01:00
parent 74cbce72df
commit 23f0231a09
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -3,6 +3,7 @@ package mucp
import ( import (
"context" "context"
"fmt"
"io" "io"
"strings" "strings"
"sync" "sync"
@ -36,6 +37,9 @@ type Proxy struct {
// A fib of routes service:address // A fib of routes service:address
sync.RWMutex sync.RWMutex
Routes map[string]map[uint64]table.Route Routes map[string]map[uint64]table.Route
// The channel to monitor watcher errors
errChan chan error
} }
// read client request and write to server // read client request and write to server
@ -119,6 +123,55 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
return toNodes(routes), nil return toNodes(routes), nil
} }
// manageRouteCache applies action on a given route to Proxy route cache
func (p *Proxy) manageRouteCache(route table.Route, action string) error {
switch action {
case "create", "update":
if _, ok := p.Routes[route.Service]; !ok {
p.Routes[route.Service] = make(map[uint64]table.Route)
}
p.Routes[route.Service][route.Hash()] = route
case "delete":
if _, ok := p.Routes[route.Service]; !ok {
return fmt.Errorf("route not found")
}
delete(p.Routes[route.Service], route.Hash())
default:
return fmt.Errorf("unknown action: %s", action)
}
return nil
}
// watchRoutes watches service routes and updates proxy cache
func (p *Proxy) watchRoutes() {
// this is safe to do as the only way watchRoutes returns is
// when some error is written into error channel - we want to bail then
defer close(p.errChan)
// route watcher
w, err := p.Router.Watch()
if err != nil {
p.errChan <- err
return
}
for {
event, err := w.Next()
if err != nil {
p.errChan <- err
return
}
p.Lock()
if err := p.manageRouteCache(event.Route, fmt.Sprintf("%s", event.Type)); err != nil {
// TODO: should we bail here?
continue
}
p.Unlock()
}
}
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// service name // service name
@ -181,8 +234,18 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
// get raw response // get raw response
resp := stream.Response() resp := stream.Response()
// route watcher error
var watchErr error
// create server response write loop // create server response write loop
for { for {
select {
case err := <-p.errChan:
if err != nil {
watchErr = err
}
return watchErr
default:
// read backend response body // read backend response body
body, err := resp.Read() body, err := resp.Read()
if err == io.EOF { if err == io.EOF {
@ -205,6 +268,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
return err return err
} }
} }
}
return nil return nil
} }
@ -254,5 +318,9 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
// routes cache // routes cache
p.Routes = make(map[string]map[uint64]table.Route) p.Routes = make(map[string]map[uint64]table.Route)
// watch router service routes
p.errChan = make(chan error, 1)
go p.watchRoutes()
return p return p
} }