diff --git a/network/default.go b/network/default.go index 9aaff91a..951221ba 100644 --- a/network/default.go +++ b/network/default.go @@ -34,8 +34,8 @@ type network struct { proxy.Proxy // tun is network tunnel tunnel.Tunnel - // srv is network server - srv server.Server + // server is network server + server server.Server // client is network client client client.Client @@ -59,13 +59,19 @@ func newNetwork(opts ...Option) Network { tunnel.Address(options.Address), ) + // init router Id to the network id + options.Router.Init( + router.Id(options.Id), + ) + // create tunnel client with tunnel transport tunTransport := trn.NewTransport( trn.WithTunnel(options.Tunnel), ) - // srv is network server - srv := server.NewServer( + // server is network server + server := server.NewServer( + server.Id(options.Id), server.Address(options.Address), server.Name(options.Name), server.Transport(tunTransport), @@ -86,7 +92,7 @@ func newNetwork(opts ...Option) Network { Router: options.Router, Proxy: options.Proxy, Tunnel: options.Tunnel, - srv: srv, + server: server, client: client, } } @@ -333,7 +339,7 @@ func (n *network) Connect() error { go n.process(listener) // start the server - if err := n.srv.Start(); err != nil { + if err := n.server.Start(); err != nil { return err } @@ -345,7 +351,7 @@ func (n *network) Connect() error { func (n *network) close() error { // stop the server - if err := n.srv.Stop(); err != nil { + if err := n.server.Stop(); err != nil { return err } @@ -390,5 +396,5 @@ func (n *network) Client() client.Client { // Server returns network server func (n *network) Server() server.Server { - return n.srv + return n.server } diff --git a/network/options.go b/network/options.go index 8bc36972..5894b857 100644 --- a/network/options.go +++ b/network/options.go @@ -1,6 +1,7 @@ package network import ( + "github.com/google/uuid" "github.com/micro/go-micro/network/resolver" "github.com/micro/go-micro/network/resolver/registry" "github.com/micro/go-micro/proxy" @@ -13,6 +14,8 @@ type Option func(*Options) // Options configure network type Options struct { + // Id of the node + Id string // Name of the network Name string // Address to bind to @@ -27,14 +30,21 @@ type Options struct { Resolver resolver.Resolver } -// Name is the network name +// Id sets the id of the network node +func Id(id string) Option { + return func(o *Options) { + o.Id = id + } +} + +// Name sets the network name func Name(n string) Option { return func(o *Options) { o.Name = n } } -// Address is the network address +// Address sets the network address func Address(a string) Option { return func(o *Options) { o.Address = a @@ -72,6 +82,7 @@ func Resolver(r resolver.Resolver) Option { // DefaultOptions returns network default options func DefaultOptions() Options { return Options{ + Id: uuid.New().String(), Name: DefaultName, Address: DefaultAddress, Tunnel: tunnel.NewTunnel(), diff --git a/router/default.go b/router/default.go index bf3f0e60..05e033ce 100644 --- a/router/default.go +++ b/router/default.go @@ -43,7 +43,7 @@ var ( // router implements default router type router struct { sync.RWMutex - opts Options + options Options status Status table *table exit chan struct{} @@ -70,7 +70,7 @@ func newRouter(opts ...Option) Router { status := Status{Code: Stopped, Error: nil} return &router{ - opts: options, + options: options, status: status, table: newTable(), advertWg: &sync.WaitGroup{}, @@ -85,7 +85,7 @@ func (r *router) Init(opts ...Option) error { defer r.Unlock() for _, o := range opts { - o(&r.opts) + o(&r.options) } return nil @@ -94,10 +94,10 @@ func (r *router) Init(opts ...Option) error { // Options returns router options func (r *router) Options() Options { r.Lock() - opts := r.opts + options := r.options r.Unlock() - return opts + return options } // Table returns routing table @@ -139,7 +139,8 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e Service: service.Name, Address: node.Address, Gateway: "", - Network: r.opts.Network, + Network: r.options.Network, + Router: r.options.Id, Link: DefaultLink, Metric: DefaultLocalMetric, } @@ -278,7 +279,7 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) { defer r.advertWg.Done() a := &Advert{ - Id: r.opts.Id, + Id: r.options.Id, Type: advType, TTL: DefaultAdvertTTL, Timestamp: time.Now(), @@ -529,20 +530,22 @@ func (r *router) Start() error { } // add all local service routes into the routing table - if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil { + if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil { e := fmt.Errorf("failed adding registry routes: %s", err) r.status = Status{Code: Error, Error: e} return e } // add default gateway into routing table - if r.opts.Gateway != "" { + if r.options.Gateway != "" { // note, the only non-default value is the gateway route := Route{ Service: "*", Address: "*", - Gateway: r.opts.Gateway, + Gateway: r.options.Gateway, Network: "*", + Router: r.options.Id, + Link: DefaultLink, Metric: DefaultLocalMetric, } if err := r.table.Create(route); err != nil { @@ -557,7 +560,7 @@ func (r *router) Start() error { r.exit = make(chan struct{}) // registry watcher - regWatcher, err := r.opts.Registry.Watch() + regWatcher, err := r.options.Registry.Watch() if err != nil { e := fmt.Errorf("failed creating registry watcher: %v", err) r.status = Status{Code: Error, Error: e} @@ -669,6 +672,10 @@ func (r *router) Process(a *Advert) error { }) for _, event := range events { + // skip if the router is the origin of this route + if event.Route.Router == r.options.Id { + continue + } // create a copy of the route route := event.Route action := event.Type diff --git a/router/query.go b/router/query.go index 68fb3588..3e2c3d38 100644 --- a/router/query.go +++ b/router/query.go @@ -11,29 +11,38 @@ type QueryOptions struct { Gateway string // Network is network address Network string + // Router is router id + Router string } -// QueryService sets destination address +// QueryService sets service to query func QueryService(s string) QueryOption { return func(o *QueryOptions) { o.Service = s } } -// QueryGateway sets route gateway +// QueryGateway sets gateway address to query func QueryGateway(g string) QueryOption { return func(o *QueryOptions) { o.Gateway = g } } -// QueryNetwork sets route network address +// QueryNetwork sets network name to query func QueryNetwork(n string) QueryOption { return func(o *QueryOptions) { o.Network = n } } +// QueryRouter sets router id to query +func QueryRouter(r string) QueryOption { + return func(o *QueryOptions) { + o.Router = r + } +} + // Query is routing table query type Query interface { // Options returns query options @@ -52,6 +61,7 @@ func NewQuery(opts ...QueryOption) Query { Service: "*", Gateway: "*", Network: "*", + Router: "*", } for _, o := range opts { @@ -67,8 +77,3 @@ func NewQuery(opts ...QueryOption) Query { func (q *query) Options() QueryOptions { return q.opts } - -// String prints routing table query in human readable form -func (q query) String() string { - return "query" -} diff --git a/router/route.go b/router/route.go index fa9dbc72..f2768403 100644 --- a/router/route.go +++ b/router/route.go @@ -7,9 +7,9 @@ import ( var ( // DefaultLink is default network link DefaultLink = "local" - // DefaultLocalMetric is default route cost metric for the local network + // DefaultLocalMetric is default route cost for a local route DefaultLocalMetric = 1 - // DefaultNetworkMetric is default route cost metric for the micro network + // DefaultNetworkMetric is default route cost for a network route DefaultNetworkMetric = 10 ) @@ -23,6 +23,8 @@ type Route struct { Gateway string // Network is network address Network string + // Router is router id + Router string // Link is network link Link string // Metric is the route cost metric @@ -33,6 +35,6 @@ type Route struct { func (r *Route) Hash() uint64 { h := fnv.New64() h.Reset() - h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Link)) + h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Router + r.Link)) return h.Sum64() } diff --git a/router/table.go b/router/table.go index 6b34bb00..42a1be47 100644 --- a/router/table.go +++ b/router/table.go @@ -8,7 +8,14 @@ import ( "github.com/google/uuid" ) -// table is an in memory routing table +var ( + // ErrRouteNotFound is returned when no route was found in the routing table + ErrRouteNotFound = errors.New("route not found") + // ErrDuplicateRoute is returned when the route already exists + ErrDuplicateRoute = errors.New("duplicate route") +) + +// table is an in-memory routing table type table struct { sync.RWMutex // routes stores service routes @@ -25,6 +32,19 @@ func newTable(opts ...Option) *table { } } +// sendEvent sends events to all subscribed watchers +func (t *table) sendEvent(e *Event) { + t.RLock() + defer t.RUnlock() + + for _, w := range t.watchers { + select { + case w.resChan <- e: + case <-w.done: + } + } +} + // Create creates new route in the routing table func (t *table) Create(r Route) error { service := r.Service @@ -106,21 +126,23 @@ func (t *table) List() ([]Route, error) { return routes, nil } -// isMatch checks if the route matches given network and router -func isMatch(route Route, network, router string) bool { - if network == "*" || network == route.Network { - if router == "*" || router == route.Gateway { - return true +// isMatch checks if the route matches given query options +func isMatch(route Route, gateway, network, router string) bool { + if gateway == "*" || gateway == route.Gateway { + if network == "*" || network == route.Network { + if router == "*" || router == route.Router { + return true + } } } return false } // findRoutes finds all the routes for given network and router and returns them -func findRoutes(routes map[uint64]Route, network, router string) []Route { +func findRoutes(routes map[uint64]Route, gateway, network, router string) []Route { var results []Route for _, route := range routes { - if isMatch(route, network, router) { + if isMatch(route, gateway, network, router) { results = append(results, route) } } @@ -136,13 +158,13 @@ func (t *table) Query(q Query) ([]Route, error) { if _, ok := t.routes[q.Options().Service]; !ok { return nil, ErrRouteNotFound } - return findRoutes(t.routes[q.Options().Service], q.Options().Network, q.Options().Gateway), nil + return findRoutes(t.routes[q.Options().Service], q.Options().Gateway, q.Options().Network, q.Options().Router), nil } var results []Route // search through all destinations for _, routes := range t.routes { - results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...) + results = append(results, findRoutes(routes, q.Options().Gateway, q.Options().Network, q.Options().Router)...) } return results, nil @@ -181,23 +203,3 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) { return w, nil } - -// sendEvent sends events to all subscribed watchers -func (t *table) sendEvent(e *Event) { - t.RLock() - defer t.RUnlock() - - for _, w := range t.watchers { - select { - case w.resChan <- e: - case <-w.done: - } - } -} - -var ( - // ErrRouteNotFound is returned when no route was found in the routing table - ErrRouteNotFound = errors.New("route not found") - // ErrDuplicateRoute is returned when the route already exists - ErrDuplicateRoute = errors.New("duplicate route") -) diff --git a/router/table_test.go b/router/table_test.go index 16c73f2f..d989ee3b 100644 --- a/router/table_test.go +++ b/router/table_test.go @@ -9,6 +9,7 @@ func testSetup() (*table, Route) { Service: "dest.svc", Gateway: "dest.gw", Network: "dest.network", + Router: "src.router", Link: "det.link", Metric: 10, } @@ -109,11 +110,13 @@ func TestQuery(t *testing.T) { svc := []string{"svc1", "svc2", "svc3"} net := []string{"net1", "net2", "net1"} gw := []string{"gw1", "gw2", "gw3"} + rtr := []string{"rtr1", "rt2", "rt3"} for i := 0; i < len(svc); i++ { route.Service = svc[i] route.Network = net[i] route.Gateway = gw[i] + route.Router = rtr[i] if err := table.Create(route); err != nil { t.Errorf("error adding route: %s", err) } @@ -127,8 +130,9 @@ func TestQuery(t *testing.T) { t.Errorf("error looking up routes: %s", err) } - // query particular net - query = NewQuery(QueryNetwork("net1")) + // query routes particular network + network := "net1" + query = NewQuery(QueryNetwork(network)) routes, err = table.Query(query) if err != nil { @@ -139,7 +143,13 @@ func TestQuery(t *testing.T) { t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) } - // query particular gateway + for _, route := range routes { + if route.Network != network { + t.Errorf("incorrect route returned. Expected network: %s, found: %s", network, route.Network) + } + } + + // query routes for particular gateway gateway := "gw1" query = NewQuery(QueryGateway(gateway)) @@ -156,11 +166,28 @@ func TestQuery(t *testing.T) { t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) } - // query particular route - network := "net1" + // query routes for particular router + router := "rtr1" + query = NewQuery(QueryRouter(router)) + + routes, err = table.Query(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 1 { + t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) + } + + if routes[0].Router != router { + t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + } + + // query particular gateway and network query = NewQuery( QueryGateway(gateway), QueryNetwork(network), + QueryRouter(router), ) routes, err = table.Query(query) @@ -180,7 +207,11 @@ func TestQuery(t *testing.T) { t.Errorf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network) } - // bullshit route query + if routes[0].Router != router { + t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + } + + // non-existen route query query = NewQuery(QueryService("foobar")) routes, err = table.Query(query) diff --git a/router/watcher.go b/router/watcher.go index f2c8beac..148b88d4 100644 --- a/router/watcher.go +++ b/router/watcher.go @@ -6,6 +6,11 @@ import ( "time" ) +var ( + // ErrWatcherStopped is returned when routing table watcher has been stopped + ErrWatcherStopped = errors.New("watcher stopped") +) + // EventType defines routing table event type EventType int @@ -42,9 +47,6 @@ type Event struct { Route Route } -// WatchOption is used to define what routes to watch in the table -type WatchOption func(*WatchOptions) - // Watcher defines routing table watcher interface // Watcher returns updates to the routing table type Watcher interface { @@ -56,7 +58,11 @@ type Watcher interface { Stop() } +// WatchOption is used to define what routes to watch in the table +type WatchOption func(*WatchOptions) + // WatchOptions are table watcher options +// TODO: expand the options to watch based on other criteria type WatchOptions struct { // Service allows to watch specific service routes Service string @@ -70,6 +76,7 @@ func WatchService(s string) WatchOption { } } +// tableWatcher implements routing table Watcher type tableWatcher struct { sync.RWMutex id string @@ -113,8 +120,3 @@ func (w *tableWatcher) Stop() { close(w.done) } } - -var ( - // ErrWatcherStopped is returned when routing table watcher has been stopped - ErrWatcherStopped = errors.New("watcher stopped") -)