Move router and proxy into network package

This commit is contained in:
Asim Aslam
2019-06-21 17:20:31 +01:00
parent 7936d74602
commit 3f97743e34
14 changed files with 3 additions and 3 deletions

25
network/proxy/README.md Normal file
View File

@@ -0,0 +1,25 @@
# Go Proxy [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-proxy?status.svg)](https://godoc.org/github.com/micro/go-proxy)
Go Proxy is a proxy library for Go Micro.
## Overview
Go Micro is a distributed systems framework for client/server communication. It handles the details
around discovery, fault tolerance, rpc communication, etc. We may want to leverage this in broader ecosystems
which make use of standard http or we may also want to offload a number of requirements to a single proxy.
## Features
- **Transparent Proxy** - Proxy requests to any micro services through a single location. Go Proxy enables
you to write Go Micro proxies which handle and forward requests. This is good for incorporating wrappers.
- **Single Backend Router** - Enable the single backend router to proxy directly to your local app. The proxy
allows you to set a router which serves your backend service whether its http, grpc, etc.
- **Protocol Aware Handler** - Set a request handler which speaks your app protocol to make outbound requests.
Your app may not speak the MUCP protocol so it may be easier to translate internally.
- **Control Planes** - Additionally we support use of control planes to offload many distributed systems concerns.
* [x] [Consul](https://www.consul.io/docs/connect/native.html) - Using Connect-Native to provide secure mTLS.
* [x] [NATS](https://nats.io/) - Fully leveraging NATS as the control plane and data plane.

158
network/proxy/grpc/grpc.go Normal file
View File

@@ -0,0 +1,158 @@
// Package grpc transparently forwards the grpc protocol using a go-micro client.
package grpc
import (
"context"
"io"
"strings"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/network/proxy"
"github.com/micro/go-micro/server"
)
// Proxy will transparently proxy requests to the backend.
// If no backend is specified it will call a service using the client.
// If the service matches the Name it will use the server.DefaultRouter.
type Proxy struct {
// The proxy options
options.Options
// Endpoint specified the fixed endpoint to call.
Endpoint string
// The client to use for outbound requests
Client client.Client
}
// read client request and write to server
func readLoop(r server.Request, s client.Stream) error {
// request to backend server
req := s.Request()
for {
// get data from client
// no need to decode it
body, err := r.Read()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// get the header from client
hdr := r.Header()
msg := &codec.Message{
Type: codec.Request,
Header: hdr,
Body: body,
}
// write the raw request
err = req.Codec().Write(msg, nil)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}
// ServeRequest honours the server.Proxy interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// set default client
if p.Client == nil {
p.Client = grpc.NewClient()
}
opts := []client.CallOption{}
// service name
service := req.Service()
endpoint := req.Endpoint()
// call a specific backend
if len(p.Endpoint) > 0 {
// address:port
if parts := strings.Split(p.Endpoint, ":"); len(parts) > 1 {
opts = append(opts, client.WithAddress(p.Endpoint))
// use as service name
} else {
service = p.Endpoint
}
}
// create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, nil, client.WithContentType(req.ContentType()))
// create new stream
stream, err := p.Client.Stream(ctx, creq, opts...)
if err != nil {
return err
}
defer stream.Close()
// create client request read loop
go readLoop(req, stream)
// get raw response
resp := stream.Response()
// create server response write loop
for {
// read backend response body
body, err := resp.Read()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// read backend response header
hdr := resp.Header()
// write raw response header to client
rsp.WriteHeader(hdr)
// write raw response body to client
err = rsp.Write(body)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
return nil
}
// NewProxy returns a new grpc proxy server
func NewProxy(opts ...options.Option) proxy.Proxy {
p := new(Proxy)
p.Options = options.NewOptions(opts...)
p.Options.Init(options.WithString("grpc"))
// get endpoint
ep, ok := p.Options.Values().Get("proxy.endpoint")
if ok {
p.Endpoint = ep.(string)
}
// get client
c, ok := p.Options.Values().Get("proxy.client")
if ok {
p.Client = c.(client.Client)
}
return p
}
// NewSingleHostProxy returns a router which sends requests to a single backend
func NewSingleHostProxy(url string) *Proxy {
return &Proxy{
Endpoint: url,
}
}

149
network/proxy/http/http.go Normal file
View File

@@ -0,0 +1,149 @@
// Package http provides a micro rpc to http proxy
package http
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/network/proxy"
"github.com/micro/go-micro/server"
)
// Proxy will proxy rpc requests as http POST requests. It is a server.Proxy
type Proxy struct {
options.Options
// The http backend to call
Endpoint string
// first request
first bool
}
func getMethod(hdr map[string]string) string {
switch hdr["Micro-Method"] {
case "GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", "PATCH":
return hdr["Micro-Method"]
default:
return "POST"
}
}
func getEndpoint(hdr map[string]string) string {
ep := hdr["Micro-Endpoint"]
if len(ep) > 0 && ep[0] == '/' {
return ep
}
return ""
}
// ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
if p.Endpoint == "" {
p.Endpoint = proxy.DefaultEndpoint
}
for {
// get data
body, err := req.Read()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// get the header
hdr := req.Header()
// get method
method := getMethod(hdr)
// get endpoint
endpoint := getEndpoint(hdr)
// set the endpoint
if len(endpoint) == 0 {
endpoint = p.Endpoint
} else {
// add endpoint to backend
u, err := url.Parse(p.Endpoint)
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
u.Path = path.Join(u.Path, endpoint)
endpoint = u.String()
}
// send to backend
hreq, err := http.NewRequest(method, endpoint, bytes.NewReader(body))
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
// set the headers
for k, v := range hdr {
hreq.Header.Set(k, v)
}
// make the call
hrsp, err := http.DefaultClient.Do(hreq)
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
// read body
b, err := ioutil.ReadAll(hrsp.Body)
hrsp.Body.Close()
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
// set response headers
hdr = map[string]string{}
for k, _ := range hrsp.Header {
hdr[k] = hrsp.Header.Get(k)
}
// write the header
rsp.WriteHeader(hdr)
// write the body
err = rsp.Write(b)
if err == io.EOF {
return nil
}
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
}
return nil
}
// NewSingleHostProxy returns a router which sends requests to a single http backend
func NewSingleHostProxy(url string) proxy.Proxy {
return &Proxy{
Endpoint: url,
}
}
// NewProxy returns a new proxy which will route using a http client
func NewProxy(opts ...options.Option) proxy.Proxy {
p := new(Proxy)
p.Options = options.NewOptions(opts...)
p.Options.Init(options.WithString("http"))
// get endpoint
ep, ok := p.Options.Values().Get("proxy.endpoint")
if ok {
p.Endpoint = ep.(string)
}
return p
}

View File

@@ -0,0 +1,93 @@
package http
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"testing"
"github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry/memory"
"github.com/micro/go-micro/server"
)
type testHandler struct{}
func (t *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"hello": "world"}`))
}
func TestHTTPProxy(t *testing.T) {
c, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
defer c.Close()
addr := c.Addr().String()
url := fmt.Sprintf("http://%s", addr)
testCases := []struct {
// http endpoint to call e.g /foo/bar
httpEp string
// rpc endpoint called e.g Foo.Bar
rpcEp string
// should be an error
err bool
}{
{"/", "Foo.Bar", false},
{"/", "Foo.Baz", false},
{"/helloworld", "Hello.World", true},
}
// handler
http.Handle("/", new(testHandler))
// new proxy
p := NewSingleHostProxy(url)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
// new micro service
service := micro.NewService(
micro.Context(ctx),
micro.Name("foobar"),
micro.Registry(memory.NewRegistry()),
micro.AfterStart(func() error {
wg.Done()
return nil
}),
)
// set router
service.Server().Init(
server.WithRouter(p),
)
// run service
// server
go http.Serve(c, nil)
go service.Run()
// wait till service is started
wg.Wait()
for _, test := range testCases {
req := service.Client().NewRequest("foobar", test.rpcEp, map[string]string{"foo": "bar"}, client.WithContentType("application/json"))
var rsp map[string]string
err := service.Client().Call(ctx, req, &rsp)
if err != nil && test.err == false {
t.Fatal(err)
}
if v := rsp["hello"]; v != "world" {
t.Fatalf("Expected hello world got %s from %s", v, test.rpcEp)
}
}
}

166
network/proxy/mucp/mucp.go Normal file
View File

@@ -0,0 +1,166 @@
// Package mucp transparently forwards the incoming request using a go-micro client.
package mucp
import (
"context"
"io"
"strings"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/network/proxy"
"github.com/micro/go-micro/server"
)
// Proxy will transparently proxy requests to an endpoint.
// If no endpoint is specified it will call a service using the client.
type Proxy struct {
// embed options
options.Options
// Endpoint specified the fixed service endpoint to call.
Endpoint string
// The client to use for outbound requests
Client client.Client
}
// read client request and write to server
func readLoop(r server.Request, s client.Stream) error {
// request to backend server
req := s.Request()
for {
// get data from client
// no need to decode it
body, err := r.Read()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// get the header from client
hdr := r.Header()
msg := &codec.Message{
Type: codec.Request,
Header: hdr,
Body: body,
}
// write the raw request
err = req.Codec().Write(msg, nil)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}
// ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// set default client
if p.Client == nil {
p.Client = client.DefaultClient
}
opts := []client.CallOption{}
// service name
service := req.Service()
endpoint := req.Endpoint()
// call a specific backend
if len(p.Endpoint) > 0 {
// address:port
if parts := strings.Split(p.Endpoint, ":"); len(parts) > 1 {
opts = append(opts, client.WithAddress(p.Endpoint))
// use as service name
} else {
service = p.Endpoint
}
}
// read initial request
body, err := req.Read()
if err != nil {
return err
}
// create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType()))
// create new stream
stream, err := p.Client.Stream(ctx, creq, opts...)
if err != nil {
return err
}
defer stream.Close()
// create client request read loop
go readLoop(req, stream)
// get raw response
resp := stream.Response()
// create server response write loop
for {
// read backend response body
body, err := resp.Read()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// read backend response header
hdr := resp.Header()
// write raw response header to client
rsp.WriteHeader(hdr)
// write raw response body to client
err = rsp.Write(body)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
return nil
}
// NewSingleHostProxy returns a proxy which sends requests to a single backend
func NewSingleHostProxy(endpoint string) *Proxy {
return &Proxy{
Options: options.NewOptions(),
Endpoint: endpoint,
}
}
// NewProxy returns a new proxy which will route based on mucp headers
func NewProxy(opts ...options.Option) proxy.Proxy {
p := new(Proxy)
p.Options = options.NewOptions(opts...)
p.Options.Init(options.WithString("mucp"))
// get endpoint
ep, ok := p.Options.Values().Get("proxy.endpoint")
if ok {
p.Endpoint = ep.(string)
}
// get client
c, ok := p.Options.Values().Get("proxy.client")
if ok {
p.Client = c.(client.Client)
}
return p
}

31
network/proxy/proxy.go Normal file
View File

@@ -0,0 +1,31 @@
// Package proxy is a transparent proxy built on the go-micro/server
package proxy
import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/server"
)
// Proxy can be used as a proxy server for go-micro services
type Proxy interface {
options.Options
// ServeRequest honours the server.Router interface
ServeRequest(context.Context, server.Request, server.Response) error
}
var (
DefaultEndpoint = "localhost:9090"
)
// WithEndpoint sets a proxy endpoint
func WithEndpoint(e string) options.Option {
return options.WithValue("proxy.endpoint", e)
}
// WithClient sets the client
func WithClient(c client.Client) options.Option {
return options.WithValue("proxy.client", c)
}

View File

@@ -0,0 +1,323 @@
package router
import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/micro/go-micro/registry"
"github.com/olekukonko/tablewriter"
)
var (
// AdvertiseTick defines how often in seconds do we scal the local registry
// to advertise the local services to the network registry
AdvertiseTick = 5 * time.Second
// AdvertiseTTL defines network registry TTL in seconds
// NOTE: this is a rather arbitrary picked value subject to change
AdvertiseTTL = 120 * time.Second
)
type router struct {
opts Options
exit chan struct{}
wg *sync.WaitGroup
}
// newRouter creates new router and returns it
func newRouter(opts ...Option) Router {
// get default options
options := DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
return &router{
opts: options,
exit: make(chan struct{}),
wg: &sync.WaitGroup{},
}
}
// Init initializes router with given options
func (r *router) Init(opts ...Option) error {
for _, o := range opts {
o(&r.opts)
}
return nil
}
// Options returns router options
func (r *router) Options() Options {
return r.opts
}
// ID returns router ID
func (r *router) ID() string {
return r.opts.ID
}
// Table returns routing table
func (r *router) Table() Table {
return r.opts.Table
}
// Address returns router's bind address
func (r *router) Address() string {
return r.opts.Address
}
// Network returns the address router advertises to the network
func (r *router) Network() string {
return r.opts.Advertise
}
// Advertise advertises the router routes to the network.
// Advertise is a blocking function. It launches multiple goroutines that watch
// service registries and advertise the router routes to other routers in the network.
// It returns error if any of the launched goroutines fail with error.
func (r *router) Advertise() error {
// add local service routes into the routing table
if err := r.addServiceRoutes(r.opts.Registry, DefaultLocalMetric); err != nil {
return fmt.Errorf("failed adding routes for local services: %v", err)
}
// add network service routes into the routing table
if err := r.addServiceRoutes(r.opts.Network, DefaultNetworkMetric); err != nil {
return fmt.Errorf("failed adding routes for network services: %v", err)
}
node, err := r.parseToNode()
if err != nil {
return fmt.Errorf("failed to parse router into service node: %v", err)
}
localWatcher, err := r.opts.Registry.Watch()
if err != nil {
return fmt.Errorf("failed to create local registry watcher: %v", err)
}
networkWatcher, err := r.opts.Network.Watch()
if err != nil {
return fmt.Errorf("failed to create network registry watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 3)
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch network registry and register routes in routine table
errChan <- r.manageServiceRoutes(networkWatcher, DefaultNetworkMetric)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and advertise local service to the network
errChan <- r.advertiseToNetwork(node)
}()
return <-errChan
}
// addServiceRoutes adds all services in given registry to the routing table.
// NOTE: this is a one-off operation done when bootstrapping the routing table of the new router.
// It returns error if either the services could not be listed or if the routes could not be added to the routing table.
func (r *router) addServiceRoutes(reg registry.Registry, metric int) error {
services, err := reg.ListServices()
if err != nil {
return fmt.Errorf("failed to list services: %v", err)
}
for _, service := range services {
route := Route{
Destination: service.Name,
Router: r,
Network: r.opts.Advertise,
Metric: metric,
}
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("error adding route for service: %s", service.Name)
}
}
return nil
}
// parseToNode parses router into registry.Node and returns the result.
// It returns error if the router network address could not be parsed into host and port.
func (r *router) parseToNode() (*registry.Node, error) {
// split router address to host and port part
addr, portStr, err := net.SplitHostPort(r.opts.Advertise)
if err != nil {
return nil, fmt.Errorf("could not parse router address: %v", err)
}
// try to parse network port into integer
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, fmt.Errorf("could not parse router network address: %v", err)
}
node := &registry.Node{
Id: r.opts.ID,
Address: addr,
Port: port,
}
return node, nil
}
// advertiseToNetwork periodically scans local registry and registers (i.e. advertises) all the local services in the network registry.
// It returns error if either the local services failed to be listed or if it fails to register local service in network registry.
func (r *router) advertiseToNetwork(node *registry.Node) error {
// ticker to periodically scan the local registry
ticker := time.NewTicker(AdvertiseTick)
for {
select {
case <-r.exit:
return nil
case <-ticker.C:
// list all local services
services, err := r.opts.Registry.ListServices()
if err != nil {
return fmt.Errorf("failed to list local services: %v", err)
}
// loop through all registered local services and register them in the network registry
for _, service := range services {
svc := &registry.Service{
Name: service.Name,
Nodes: []*registry.Node{node},
}
// register the local service in the network registry
if err := r.opts.Network.Register(svc, registry.RegisterTTL(AdvertiseTTL)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", svc.Name, err)
}
}
}
}
}
// manageServiceRoutes watches services in given registry and updates the routing table accordingly.
// It returns error if the service registry watcher has stopped or if the routing table failed to be updated.
func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()
var watchErr error
for {
res, err := w.Next()
if err == registry.ErrWatcherStopped {
break
}
if err != nil {
watchErr = err
break
}
route := Route{
Destination: res.Service.Name,
Router: r,
Network: r.opts.Advertise,
Metric: metric,
}
switch res.Action {
case "create":
if len(res.Service.Nodes) > 0 {
// only return error if the route is not duplicate, but something else has failed
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed to add route for service: %v", res.Service.Name)
}
}
case "delete":
if len(res.Service.Nodes) < 1 {
// only return error if the route is present in the table, but something else has failed
if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to delete route for service: %v", res.Service.Name)
}
}
}
}
return watchErr
}
// Stop stops the router
func (r *router) Stop() error {
// notify all goroutines to finish
close(r.exit)
// wait for all goroutines to finish
r.wg.Wait()
// NOTE: we need a more efficient way of doing this e.g. network routes
// should ideally be autodeleted when the router stops gossiping
query := NewQuery(QueryRouter(r), QueryNetwork(r.opts.Advertise))
routes, err := r.opts.Table.Lookup(query)
if err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to lookup routes for router %s: %v", r.opts.ID, err)
}
// parse router to registry.Node
node, err := r.parseToNode()
if err != nil {
return fmt.Errorf("failed to parse router into service node: %v", err)
}
for _, route := range routes {
service := &registry.Service{
Name: route.Destination,
Nodes: []*registry.Node{node},
}
if err := r.opts.Network.Deregister(service); err != nil {
return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err)
}
}
return nil
}
// String prints debugging information about router
func (r *router) String() string {
sb := &strings.Builder{}
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"ID", "Address", "Network", "Table"})
data := []string{
r.opts.ID,
r.opts.Address,
r.opts.Advertise,
fmt.Sprintf("%d", r.opts.Table.Size()),
}
table.Append(data)
// render table into sb
table.Render()
return sb.String()
}

View File

@@ -0,0 +1,289 @@
package router
import (
"fmt"
"hash"
"hash/fnv"
"strings"
"sync"
"github.com/google/uuid"
"github.com/olekukonko/tablewriter"
)
// TableOptions are routing table options
// TODO: table options TBD in the future
type TableOptions struct{}
// table is in memory routing table
type table struct {
// opts are table options
opts TableOptions
// m stores routing table map
m map[string]map[uint64]Route
// h hashes route entries
h hash.Hash64
// w is a list of table watchers
w map[string]*tableWatcher
sync.RWMutex
}
// newTable creates in memory routing table and returns it
func newTable(opts ...TableOption) Table {
// default options
var options TableOptions
// apply requested options
for _, o := range opts {
o(&options)
}
h := fnv.New64()
h.Reset()
return &table{
opts: options,
m: make(map[string]map[uint64]Route),
w: make(map[string]*tableWatcher),
h: h,
}
}
// Init initializes routing table with options
func (t *table) Init(opts ...TableOption) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
// Options returns routing table options
func (t *table) Options() TableOptions {
return t.opts
}
// Add adds a route to the routing table
func (t *table) Add(r Route) error {
destAddr := r.Destination
sum := t.hash(r)
t.Lock()
defer t.Unlock()
// check if the destination has any routes in the table
if _, ok := t.m[destAddr]; !ok {
t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})
return nil
}
// add new route to the table for the given destination
if _, ok := t.m[destAddr][sum]; !ok {
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})
return nil
}
// only add the route if the route override is explicitly requested
if _, ok := t.m[destAddr][sum]; ok && r.Policy == OverrideIfExists {
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r})
return nil
}
// if we reached this point without already returning the route already exists
// we return nil only if explicitly requested by the client
if r.Policy == IgnoreIfExists {
return nil
}
return ErrDuplicateRoute
}
// Delete deletes the route from the routing table
func (t *table) Delete(r Route) error {
t.Lock()
defer t.Unlock()
destAddr := r.Destination
sum := t.hash(r)
if _, ok := t.m[destAddr]; !ok {
return ErrRouteNotFound
}
delete(t.m[destAddr], sum)
go t.sendEvent(&Event{Type: DeleteEvent, Route: r})
return nil
}
// Update updates routing table with new route
func (t *table) Update(r Route) error {
destAddr := r.Destination
sum := t.hash(r)
t.Lock()
defer t.Unlock()
// check if the destAddr has ANY routes in the table
if _, ok := t.m[destAddr]; !ok {
return ErrRouteNotFound
}
// if the route has been found update it
if _, ok := t.m[destAddr][sum]; ok {
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r})
return nil
}
return ErrRouteNotFound
}
// List returns a list of all routes in the table
func (t *table) List() ([]Route, error) {
t.RLock()
defer t.RUnlock()
var routes []Route
for _, rmap := range t.m {
for _, route := range rmap {
routes = append(routes, route)
}
}
return routes, nil
}
// Lookup queries routing table and returns all routes that match it
func (t *table) Lookup(q Query) ([]Route, error) {
t.RLock()
defer t.RUnlock()
var results []Route
for destAddr, routes := range t.m {
if q.Options().Destination != "*" {
if q.Options().Destination != destAddr {
continue
}
for _, route := range routes {
if q.Options().Network == "*" || q.Options().Network == route.Network {
if q.Options().Router.ID() == "*" || q.Options().Router.ID() == route.Router.ID() {
if route.Metric <= q.Options().Metric {
results = append(results, route)
}
}
}
}
}
if q.Options().Destination == "*" {
for _, route := range routes {
if q.Options().Network == "*" || q.Options().Network == route.Router.Network() {
if q.Options().Router.ID() == "*" || q.Options().Router.ID() == route.Router.ID() {
if route.Metric <= q.Options().Metric {
results = append(results, route)
}
}
}
}
}
}
if len(results) == 0 && q.Options().Policy != DiscardNoRoute {
return nil, ErrRouteNotFound
}
return results, nil
}
// Watch returns routing table entry watcher
func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
// by default watch everything
wopts := WatchOptions{
Destination: "*",
Network: "*",
}
for _, o := range opts {
o(&wopts)
}
watcher := &tableWatcher{
opts: wopts,
resChan: make(chan *Event, 10),
done: make(chan struct{}),
}
t.Lock()
t.w[uuid.New().String()] = watcher
t.Unlock()
return watcher, nil
}
// sendEvent sends rules to all subscribe watchers
func (t *table) sendEvent(r *Event) {
t.RLock()
defer t.RUnlock()
for _, w := range t.w {
select {
case w.resChan <- r:
case <-w.done:
}
}
}
// Size returns the size of the routing table
func (t *table) Size() int {
t.RLock()
defer t.RUnlock()
return len(t.m)
}
// String returns debug information
func (t *table) String() string {
t.RLock()
defer t.RUnlock()
// this will help us build routing table string
sb := &strings.Builder{}
// create nice table printing structure
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination", "Router", "Network", "Metric"})
for _, destRoute := range t.m {
for _, route := range destRoute {
strRoute := []string{
route.Destination,
route.Router.Address(),
route.Network,
fmt.Sprintf("%d", route.Metric),
}
table.Append(strRoute)
}
}
// render table into sb
table.Render()
return sb.String()
}
// hash hashes the route using router gateway and network address
func (t *table) hash(r Route) uint64 {
destAddr := r.Destination
routerAddr := r.Router.Address()
netAddr := r.Network
t.h.Reset()
t.h.Write([]byte(destAddr + routerAddr + netAddr))
return t.h.Sum64()
}

84
network/router/options.go Normal file
View File

@@ -0,0 +1,84 @@
package router
import (
"github.com/google/uuid"
"github.com/micro/go-micro/registry"
)
var (
// DefaultAddress is default router address
DefaultAddress = ":9093"
// DefaultAdvertise is default address advertised to the network
DefaultAdvertise = ":9094"
)
// Options are router options
type Options struct {
// ID is router id
ID string
// Address is router address
Address string
// Advertise is the address advertised to the network
Advertise string
// Registry is the local registry
Registry registry.Registry
// Networkis the network registry
Network registry.Registry
// Table is routing table
Table Table
}
// ID sets Router ID
func ID(id string) Option {
return func(o *Options) {
o.ID = id
}
}
// Address sets router service address
func Address(a string) Option {
return func(o *Options) {
o.Address = a
}
}
// Advertise sets the address that is advertise to the network
func Advertise(n string) Option {
return func(o *Options) {
o.Advertise = n
}
}
// RoutingTable sets the routing table
func RoutingTable(t Table) Option {
return func(o *Options) {
o.Table = t
}
}
// Registry sets the local registry
func Registry(r registry.Registry) Option {
return func(o *Options) {
o.Registry = r
}
}
// Network sets the network registry
func Network(r registry.Registry) Option {
return func(o *Options) {
o.Network = r
}
}
// DefaultOptions returns router default options
func DefaultOptions() Options {
// NOTE: by default both local and network registies use default registry i.e. mdns
return Options{
ID: uuid.New().String(),
Address: DefaultAddress,
Advertise: DefaultAdvertise,
Registry: registry.DefaultRegistry,
Network: registry.DefaultRegistry,
Table: NewTable(),
}
}

116
network/router/query.go Normal file
View File

@@ -0,0 +1,116 @@
package router
// LookupPolicy defines query policy
type LookupPolicy int
const (
// DiscardNoRoute discards query when no route is found
DiscardNoRoute LookupPolicy = iota
// ClosestMatch returns closest match to supplied query
ClosestMatch
)
// String returns human representation of LookupPolicy
func (lp LookupPolicy) String() string {
switch lp {
case DiscardNoRoute:
return "DISCARD"
case ClosestMatch:
return "CLOSEST"
default:
return "UNKNOWN"
}
}
// QueryOption sets routing table query options
type QueryOption func(*QueryOptions)
// QueryOptions are routing table query options
type QueryOptions struct {
// Destination is destination address
Destination string
// Network is network address
Network string
// Router is gateway address
Router Router
// Metric is route metric
Metric int
// Policy is query lookup policy
Policy LookupPolicy
}
// QueryDestination sets query destination address
func QueryDestination(a string) QueryOption {
return func(o *QueryOptions) {
o.Destination = a
}
}
// QueryNetwork sets query network address
func QueryNetwork(a string) QueryOption {
return func(o *QueryOptions) {
o.Network = a
}
}
// QueryRouter sets query gateway address
func QueryRouter(r Router) QueryOption {
return func(o *QueryOptions) {
o.Router = r
}
}
// QueryMetric sets query metric
func QueryMetric(m int) QueryOption {
return func(o *QueryOptions) {
o.Metric = m
}
}
// QueryPolicy sets query policy
// NOTE: this might be renamed to filter or some such
func QueryPolicy(p LookupPolicy) QueryOption {
return func(o *QueryOptions) {
o.Policy = p
}
}
// Query is routing table query
type Query interface {
// Options returns query options
Options() QueryOptions
}
// query is a basic implementation of Query
type query struct {
opts QueryOptions
}
// NewQuery creates new query and returns it
func NewQuery(opts ...QueryOption) Query {
// default gateway for wildcard router
r := newRouter(ID("*"))
// default options
// NOTE: by default we use DefaultNetworkMetric
qopts := QueryOptions{
Destination: "*",
Network: "*",
Router: r,
Metric: DefaultNetworkMetric,
Policy: DiscardNoRoute,
}
for _, o := range opts {
o(&qopts)
}
return &query{
opts: qopts,
}
}
// Options returns query options
func (q *query) Options() QueryOptions {
return q.opts
}

74
network/router/route.go Normal file
View File

@@ -0,0 +1,74 @@
package router
import (
"fmt"
"strings"
"github.com/olekukonko/tablewriter"
)
var (
// DefaultLocalMetric is default route cost for local network
DefaultLocalMetric = 1
// DefaultNetworkMetric is default route cost for micro network
DefaultNetworkMetric = 10
)
// RoutePolicy defines routing table addition policy
type RoutePolicy int
const (
// OverrideIfExists overrides route if it already exists
OverrideIfExists RoutePolicy = iota
// IgnoreIfExists does not modify existing route
IgnoreIfExists
)
// String returns human reprensentation of policy
func (p RoutePolicy) String() string {
switch p {
case OverrideIfExists:
return "OVERRIDE"
case IgnoreIfExists:
return "IGNORE"
default:
return "UNKNOWN"
}
}
// Route is network route
type Route struct {
// Destination is destination address
Destination string
// Router is the network router
Router Router
// Network is micro network address
Network string
// Metric is the route cost metric
Metric int
// Policy defines route policy
Policy RoutePolicy
}
// String allows to print the route
func (r *Route) String() string {
// this will help us build routing table string
sb := &strings.Builder{}
// create nice table printing structure
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination", "Router", "Network", "Metric"})
strRoute := []string{
r.Destination,
r.Router.Address(),
r.Network,
fmt.Sprintf("%d", r.Metric),
}
table.Append(strRoute)
// render table into sb
table.Render()
return sb.String()
}

32
network/router/router.go Normal file
View File

@@ -0,0 +1,32 @@
// Package router provides a network routing control plane
package router
// Router is an interface for a routing control plane
type Router interface {
// Init initializes the router with options
Init(...Option) error
// Options returns the router options
Options() Options
// ID returns the id of the router
ID() string
// Table returns the routing table
Table() Table
// Address returns the router adddress
Address() string
// Network returns the network address of the router
Network() string
// Advertise starts advertising the routes to the network
Advertise() error
// Stop stops the router
Stop() error
// String returns debug info
String() string
}
// Option used by the router
type Option func(*Options)
// NewRouter creates new Router and returns it
func NewRouter(opts ...Option) Router {
return newRouter(opts...)
}

44
network/router/table.go Normal file
View File

@@ -0,0 +1,44 @@
package router
import (
"errors"
)
var (
// ErrRouteNotFound is returned when no route was found in the routing table
ErrRouteNotFound = errors.New("route not found")
// ErrDuplicateRoute is returned when the route already exists
ErrDuplicateRoute = errors.New("duplicate route")
)
// Table defines routing table interface
type Table interface {
// Init initializes the router with options
Init(...TableOption) error
// Options returns the router options
Options() TableOptions
// Add adds new route to the routing table
Add(Route) error
// Delete deletes existing route from the routing table
Delete(Route) error
// Update updates route in the routing table
Update(Route) error
// List returns the list of all routes in the table
List() ([]Route, error)
// Lookup looks up routes in the routing table and returns them
Lookup(Query) ([]Route, error)
// Watch returns a watcher which allows to track updates to the routing table
Watch(opts ...WatchOption) (Watcher, error)
// Size returns the size of the routing table
Size() int
// String prints the routing table
String() string
}
// TableOption used by the routing table
type TableOption func(*TableOptions)
// NewTable creates new routing table and returns it
func NewTable(opts ...TableOption) Table {
return newTable(opts...)
}

View File

@@ -0,0 +1,147 @@
package router
import (
"errors"
"strings"
"github.com/olekukonko/tablewriter"
)
var (
// ErrWatcherStopped is returned when routing table watcher has been stopped
ErrWatcherStopped = errors.New("routing table watcher stopped")
)
// EventType defines routing table event
type EventType int
const (
// CreateEvent is emitted when new route has been created
CreateEvent EventType = iota
// DeleteEvent is emitted when an existing route has been deleted
DeleteEvent
// UpdateEvent is emitted when a routing table has been updated
UpdateEvent
)
// String returns string representation of the event
func (et EventType) String() string {
switch et {
case CreateEvent:
return "CREATE"
case DeleteEvent:
return "DELETE"
case UpdateEvent:
return "UPDATE"
default:
return "UNKNOWN"
}
}
// Event is returned by a call to Next on the watcher.
type Event struct {
// Type defines type of event
Type EventType
// Route is table rout
Route Route
}
// WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions)
// Watcher defines routing table watcher interface
// Watcher returns updates to the routing table
type Watcher interface {
// Next is a blocking call that returns watch result
Next() (*Event, error)
// Chan returns event channel
Chan() (<-chan *Event, error)
// Stop stops watcher
Stop()
}
// WatchOptions are table watcher options
type WatchOptions struct {
// Specify destination address to watch
Destination string
// Specify network to watch
Network string
}
// WatchDestination sets what destination to watch
// Destination is usually microservice name
func WatchDestination(a string) WatchOption {
return func(o *WatchOptions) {
o.Destination = a
}
}
// WatchNetwork sets what network to watch
func WatchNetwork(n string) WatchOption {
return func(o *WatchOptions) {
o.Network = n
}
}
type tableWatcher struct {
opts WatchOptions
resChan chan *Event
done chan struct{}
}
// Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly
// we are aiming to provide the same options Query provides
func (w *tableWatcher) Next() (*Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Destination {
case "*", "":
if w.opts.Network == "*" || w.opts.Network == res.Route.Network {
return res, nil
}
case res.Route.Destination:
if w.opts.Network == "*" || w.opts.Network == res.Route.Network {
return res, nil
}
}
case <-w.done:
return nil, ErrWatcherStopped
}
}
}
// Chan returns watcher events channel
func (w *tableWatcher) Chan() (<-chan *Event, error) {
return w.resChan, nil
}
// Stop stops routing table watcher
func (w *tableWatcher) Stop() {
select {
case <-w.done:
return
default:
close(w.done)
}
}
// String prints debug information
func (w *tableWatcher) String() string {
sb := &strings.Builder{}
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination", "Network"})
data := []string{
w.opts.Destination,
w.opts.Network,
}
table.Append(data)
// render table into sb
table.Render()
return sb.String()
}