Route has changed to accomodate Link, Service and Address
This commit is contained in:
parent
449aa0a339
commit
70665e5a7d
@ -105,11 +105,12 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
|
||||
// take route action on each service node
|
||||
for _, node := range service.Nodes {
|
||||
route := table.Route{
|
||||
Destination: service.Name,
|
||||
Gateway: node.Address,
|
||||
Router: r.opts.Address,
|
||||
Network: r.opts.Network,
|
||||
Metric: table.DefaultLocalMetric,
|
||||
Service: service.Name,
|
||||
Address: node.Address,
|
||||
Gateway: "",
|
||||
Network: r.opts.Network,
|
||||
Link: table.DefaultLink,
|
||||
Metric: table.DefaultLocalMetric,
|
||||
}
|
||||
switch action {
|
||||
case "insert", "create":
|
||||
@ -439,11 +440,11 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
||||
if r.opts.Gateway != "" {
|
||||
// note, the only non-default value is the gateway
|
||||
route := table.Route{
|
||||
Destination: "*",
|
||||
Gateway: r.opts.Gateway,
|
||||
Router: "*",
|
||||
Network: "*",
|
||||
Metric: table.DefaultLocalMetric,
|
||||
Service: "*",
|
||||
Address: "*",
|
||||
Gateway: r.opts.Gateway,
|
||||
Network: "*",
|
||||
Metric: table.DefaultLocalMetric,
|
||||
}
|
||||
if err := r.opts.Table.Add(route); err != nil {
|
||||
return nil, fmt.Errorf("failed adding default gateway route: %s", err)
|
||||
@ -530,14 +531,8 @@ func (r *router) Update(a *Advert) error {
|
||||
})
|
||||
|
||||
for _, event := range events {
|
||||
// we extract the route from advertisement and update the routing table
|
||||
route := table.Route{
|
||||
Destination: event.Route.Destination,
|
||||
Gateway: event.Route.Gateway,
|
||||
Router: event.Route.Router,
|
||||
Network: event.Route.Network,
|
||||
Metric: event.Route.Metric,
|
||||
}
|
||||
// create a copy of the route
|
||||
route := event.Route
|
||||
if err := r.opts.Table.Update(route); err != nil {
|
||||
return fmt.Errorf("failed updating routing table: %v", err)
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ var (
|
||||
// DefaultAddress is default router address
|
||||
DefaultAddress = ":9093"
|
||||
// DefaultNetwork is default micro network
|
||||
DefaultNetwork = "micro.mu"
|
||||
DefaultNetwork = "go.micro"
|
||||
)
|
||||
|
||||
// Options are router options
|
||||
@ -19,10 +19,10 @@ type Options struct {
|
||||
ID string
|
||||
// Address is router address
|
||||
Address string
|
||||
// Network is micro network
|
||||
Network string
|
||||
// Gateway is micro network gateway
|
||||
Gateway string
|
||||
// Network is micro network
|
||||
Network string
|
||||
// Registry is the local registry
|
||||
Registry registry.Registry
|
||||
// Table is routing table
|
||||
@ -43,13 +43,6 @@ func Address(a string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Network sets router network
|
||||
func Network(n string) Option {
|
||||
return func(o *Options) {
|
||||
o.Network = n
|
||||
}
|
||||
}
|
||||
|
||||
// Gateway sets network gateway
|
||||
func Gateway(g string) Option {
|
||||
return func(o *Options) {
|
||||
@ -57,6 +50,13 @@ func Gateway(g string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Network sets router network
|
||||
func Network(n string) Option {
|
||||
return func(o *Options) {
|
||||
o.Network = n
|
||||
}
|
||||
}
|
||||
|
||||
// RoutingTable sets the routing table
|
||||
func RoutingTable(t table.Table) Option {
|
||||
return func(o *Options) {
|
||||
|
@ -57,23 +57,23 @@ func (t *table) Options() TableOptions {
|
||||
|
||||
// Add adds a route to the routing table
|
||||
func (t *table) Add(r Route) error {
|
||||
destAddr := r.Destination
|
||||
service := r.Service
|
||||
sum := r.Hash()
|
||||
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// check if there are any routes in the table for the route destination
|
||||
if _, ok := t.m[destAddr]; !ok {
|
||||
t.m[destAddr] = make(map[uint64]Route)
|
||||
t.m[destAddr][sum] = r
|
||||
if _, ok := t.m[service]; !ok {
|
||||
t.m[service] = make(map[uint64]Route)
|
||||
t.m[service][sum] = r
|
||||
go t.sendEvent(&Event{Type: Insert, Route: r})
|
||||
return nil
|
||||
}
|
||||
|
||||
// add new route to the table for the route destination
|
||||
if _, ok := t.m[destAddr][sum]; !ok {
|
||||
t.m[destAddr][sum] = r
|
||||
if _, ok := t.m[service][sum]; !ok {
|
||||
t.m[service][sum] = r
|
||||
go t.sendEvent(&Event{Type: Insert, Route: r})
|
||||
return nil
|
||||
}
|
||||
@ -83,17 +83,17 @@ func (t *table) Add(r Route) error {
|
||||
|
||||
// Delete deletes the route from the routing table
|
||||
func (t *table) Delete(r Route) error {
|
||||
destAddr := r.Destination
|
||||
service := r.Service
|
||||
sum := r.Hash()
|
||||
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
if _, ok := t.m[destAddr]; !ok {
|
||||
if _, ok := t.m[service]; !ok {
|
||||
return ErrRouteNotFound
|
||||
}
|
||||
|
||||
delete(t.m[destAddr], sum)
|
||||
delete(t.m[service], sum)
|
||||
go t.sendEvent(&Event{Type: Delete, Route: r})
|
||||
|
||||
return nil
|
||||
@ -101,20 +101,20 @@ func (t *table) Delete(r Route) error {
|
||||
|
||||
// Update updates routing table with the new route
|
||||
func (t *table) Update(r Route) error {
|
||||
destAddr := r.Destination
|
||||
service := r.Service
|
||||
sum := r.Hash()
|
||||
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// check if the route destination has any routes in the table
|
||||
if _, ok := t.m[destAddr]; !ok {
|
||||
if _, ok := t.m[service]; !ok {
|
||||
return ErrRouteNotFound
|
||||
}
|
||||
|
||||
// if the route has been found update it
|
||||
if _, ok := t.m[destAddr][sum]; ok {
|
||||
t.m[destAddr][sum] = r
|
||||
if _, ok := t.m[service][sum]; ok {
|
||||
t.m[service][sum] = r
|
||||
go t.sendEvent(&Event{Type: Update, Route: r})
|
||||
return nil
|
||||
}
|
||||
@ -140,7 +140,7 @@ func (t *table) List() ([]Route, error) {
|
||||
// isMatch checks if the route matches given network and router
|
||||
func isMatch(route Route, network, router string) bool {
|
||||
if network == "*" || network == route.Network {
|
||||
if router == "*" || router == route.Router {
|
||||
if router == "*" || router == route.Gateway {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -163,18 +163,18 @@ func (t *table) Lookup(q Query) ([]Route, error) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
if q.Options().Destination != "*" {
|
||||
if q.Options().Service != "*" {
|
||||
// no routes found for the destination and query policy is not a DiscardIfNone
|
||||
if _, ok := t.m[q.Options().Destination]; !ok && q.Options().Policy != DiscardIfNone {
|
||||
if _, ok := t.m[q.Options().Service]; !ok && q.Options().Policy != DiscardIfNone {
|
||||
return nil, ErrRouteNotFound
|
||||
}
|
||||
return findRoutes(t.m[q.Options().Destination], q.Options().Network, q.Options().Router), nil
|
||||
return findRoutes(t.m[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
|
||||
}
|
||||
|
||||
var results []Route
|
||||
// search through all destinations
|
||||
for _, routes := range t.m {
|
||||
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Router)...)
|
||||
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
|
||||
}
|
||||
|
||||
return results, nil
|
||||
@ -184,7 +184,7 @@ func (t *table) Lookup(q Query) ([]Route, error) {
|
||||
func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
|
||||
// by default watch everything
|
||||
wopts := WatchOptions{
|
||||
Destination: "*",
|
||||
Service: "*",
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@ -244,15 +244,16 @@ func (t *table) String() string {
|
||||
|
||||
// create nice table printing structure
|
||||
table := tablewriter.NewWriter(sb)
|
||||
table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"})
|
||||
table.SetHeader([]string{"Service", "Address", "Gateway", "Network", "Link", "Metric"})
|
||||
|
||||
for _, destRoute := range t.m {
|
||||
for _, route := range destRoute {
|
||||
strRoute := []string{
|
||||
route.Destination,
|
||||
route.Service,
|
||||
route.Address,
|
||||
route.Gateway,
|
||||
route.Router,
|
||||
route.Network,
|
||||
route.Link,
|
||||
fmt.Sprintf("%d", route.Metric),
|
||||
}
|
||||
table.Append(strRoute)
|
||||
|
@ -6,11 +6,11 @@ func testSetup() (Table, Route) {
|
||||
table := NewTable()
|
||||
|
||||
route := Route{
|
||||
Destination: "dest.svc",
|
||||
Gateway: "dest.gw",
|
||||
Router: "dest.router",
|
||||
Network: "dest.network",
|
||||
Metric: 10,
|
||||
Service: "dest.svc",
|
||||
Gateway: "dest.gw",
|
||||
Network: "dest.network",
|
||||
Link: "det.link",
|
||||
Metric: 10,
|
||||
}
|
||||
|
||||
return table, route
|
||||
@ -34,7 +34,7 @@ func TestAdd(t *testing.T) {
|
||||
testTableSize += 1
|
||||
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size())
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
}
|
||||
|
||||
// adding the same route under Insert policy must error
|
||||
@ -53,15 +53,15 @@ func TestDelete(t *testing.T) {
|
||||
testTableSize += 1
|
||||
|
||||
// should fail to delete non-existant route
|
||||
prevDest := route.Destination
|
||||
route.Destination = "randDest"
|
||||
prevSvc := route.Service
|
||||
route.Service = "randDest"
|
||||
|
||||
if err := table.Delete(route); err != ErrRouteNotFound {
|
||||
t.Errorf("error deleting route. Expected error: %s, found: %s", ErrRouteNotFound, err)
|
||||
t.Errorf("error deleting route. Expected: %s, found: %s", ErrRouteNotFound, err)
|
||||
}
|
||||
|
||||
// we should be able to delete the existing route
|
||||
route.Destination = prevDest
|
||||
route.Service = prevSvc
|
||||
|
||||
if err := table.Delete(route); err != nil {
|
||||
t.Errorf("error deleting route: %s", err)
|
||||
@ -69,7 +69,7 @@ func TestDelete(t *testing.T) {
|
||||
testTableSize -= 1
|
||||
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size())
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,28 +91,28 @@ func TestUpdate(t *testing.T) {
|
||||
|
||||
// the size of the table should not change as we're only updating the metric of an existing route
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size())
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
}
|
||||
|
||||
// this should error as the destination does not exist
|
||||
route.Destination = "rand.dest"
|
||||
route.Service = "rand.dest"
|
||||
|
||||
if err := table.Update(route); err != ErrRouteNotFound {
|
||||
t.Errorf("error updating route. Expected error: %s, found: %s", ErrRouteNotFound, err)
|
||||
}
|
||||
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size())
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
table, route := testSetup()
|
||||
|
||||
dest := []string{"one.svc", "two.svc", "three.svc"}
|
||||
svc := []string{"one.svc", "two.svc", "three.svc"}
|
||||
|
||||
for i := 0; i < len(dest); i++ {
|
||||
route.Destination = dest[i]
|
||||
for i := 0; i < len(svc); i++ {
|
||||
route.Service = svc[i]
|
||||
if err := table.Add(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
@ -123,26 +123,26 @@ func TestList(t *testing.T) {
|
||||
t.Errorf("error listing routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != len(dest) {
|
||||
t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(dest), len(routes))
|
||||
if len(routes) != len(svc) {
|
||||
t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes))
|
||||
}
|
||||
|
||||
if len(routes) != table.Size() {
|
||||
t.Errorf("mismatch number of routes and table size. Routes: %d, Size: %d", len(routes), table.Size())
|
||||
t.Errorf("mismatch number of routes and table size. Expected: %d, found: %d", len(routes), table.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLookup(t *testing.T) {
|
||||
table, route := testSetup()
|
||||
|
||||
dest := []string{"svc1", "svc2", "svc3"}
|
||||
svc := []string{"svc1", "svc2", "svc3"}
|
||||
net := []string{"net1", "net2", "net1"}
|
||||
rtr := []string{"router1", "router2", "router3"}
|
||||
gw := []string{"gw1", "gw2", "gw3"}
|
||||
|
||||
for i := 0; i < len(dest); i++ {
|
||||
route.Destination = dest[i]
|
||||
for i := 0; i < len(svc); i++ {
|
||||
route.Service = svc[i]
|
||||
route.Network = net[i]
|
||||
route.Router = rtr[i]
|
||||
route.Gateway = gw[i]
|
||||
if err := table.Add(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
@ -157,7 +157,7 @@ func TestLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
if len(routes) != table.Size() {
|
||||
t.Errorf("incorrect number of routes returned. expected: %d, found: %d", table.Size(), len(routes))
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", table.Size(), len(routes))
|
||||
}
|
||||
|
||||
// query particular net
|
||||
@ -169,12 +169,12 @@ func TestLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
if len(routes) != 2 {
|
||||
t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 2, len(routes))
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
|
||||
}
|
||||
|
||||
// query particular router
|
||||
router := "router1"
|
||||
query = NewQuery(QueryRouter(router))
|
||||
// query particular gateway
|
||||
gateway := "gw1"
|
||||
query = NewQuery(QueryGateway(gateway))
|
||||
|
||||
routes, err = table.Lookup(query)
|
||||
if err != nil {
|
||||
@ -182,17 +182,17 @@ func TestLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
if len(routes) != 1 {
|
||||
t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes))
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
}
|
||||
|
||||
if routes[0].Router != router {
|
||||
t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router)
|
||||
if routes[0].Gateway != gateway {
|
||||
t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
|
||||
}
|
||||
|
||||
// query particular route
|
||||
network := "net1"
|
||||
query = NewQuery(
|
||||
QueryRouter(router),
|
||||
QueryGateway(gateway),
|
||||
QueryNetwork(network),
|
||||
)
|
||||
|
||||
@ -202,11 +202,11 @@ func TestLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
if len(routes) != 1 {
|
||||
t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes))
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
}
|
||||
|
||||
if routes[0].Router != router {
|
||||
t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router)
|
||||
if routes[0].Gateway != gateway {
|
||||
t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
|
||||
}
|
||||
|
||||
if routes[0].Network != network {
|
||||
@ -214,7 +214,7 @@ func TestLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
// bullshit route query
|
||||
query = NewQuery(QueryDestination("foobar"))
|
||||
query = NewQuery(QueryService("foobar"))
|
||||
|
||||
routes, err = table.Lookup(query)
|
||||
if err != nil {
|
||||
@ -222,6 +222,6 @@ func TestLookup(t *testing.T) {
|
||||
}
|
||||
|
||||
if len(routes) != 0 {
|
||||
t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 0, len(routes))
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
|
||||
}
|
||||
}
|
||||
|
@ -34,34 +34,34 @@ type QueryOption func(*QueryOptions)
|
||||
|
||||
// QueryOptions are routing table query options
|
||||
type QueryOptions struct {
|
||||
// Destination is destination address
|
||||
Destination string
|
||||
// Service is destination service name
|
||||
Service string
|
||||
// Gateway is route gateway
|
||||
Gateway string
|
||||
// Network is network address
|
||||
Network string
|
||||
// Router is router address
|
||||
Router string
|
||||
// Policy is query lookup policy
|
||||
Policy LookupPolicy
|
||||
}
|
||||
|
||||
// QueryDestination sets destination address
|
||||
func QueryDestination(d string) QueryOption {
|
||||
// QueryService sets destination address
|
||||
func QueryService(s string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Destination = d
|
||||
o.Service = s
|
||||
}
|
||||
}
|
||||
|
||||
// QueryGateway sets route gateway
|
||||
func QueryGateway(g string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Gateway = g
|
||||
}
|
||||
}
|
||||
|
||||
// QueryNetwork sets route network address
|
||||
func QueryNetwork(a string) QueryOption {
|
||||
func QueryNetwork(n string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Network = a
|
||||
}
|
||||
}
|
||||
|
||||
// QueryRouter sets route router address
|
||||
func QueryRouter(r string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Router = r
|
||||
o.Network = n
|
||||
}
|
||||
}
|
||||
|
||||
@ -89,10 +89,10 @@ func NewQuery(opts ...QueryOption) Query {
|
||||
// default options
|
||||
// NOTE: by default we use DefaultNetworkMetric
|
||||
qopts := QueryOptions{
|
||||
Destination: "*",
|
||||
Router: "*",
|
||||
Network: "*",
|
||||
Policy: DiscardIfNone,
|
||||
Service: "*",
|
||||
Gateway: "*",
|
||||
Network: "*",
|
||||
Policy: DiscardIfNone,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@ -116,12 +116,12 @@ func (q query) String() string {
|
||||
|
||||
// create nice table printing structure
|
||||
table := tablewriter.NewWriter(sb)
|
||||
table.SetHeader([]string{"Destination", "Network", "Router", "Policy"})
|
||||
table.SetHeader([]string{"Service", "Gateway", "Network", "Policy"})
|
||||
|
||||
strQuery := []string{
|
||||
q.opts.Destination,
|
||||
q.opts.Service,
|
||||
q.opts.Gateway,
|
||||
q.opts.Network,
|
||||
q.opts.Router,
|
||||
fmt.Sprintf("%s", q.opts.Policy),
|
||||
}
|
||||
table.Append(strQuery)
|
||||
|
@ -9,6 +9,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultLink is default network link
|
||||
DefaultLink = "local"
|
||||
// DefaultLocalMetric is default route cost metric for the local network
|
||||
DefaultLocalMetric = 1
|
||||
// DefaultNetworkMetric is default route cost metric for the micro network
|
||||
@ -17,14 +19,16 @@ var (
|
||||
|
||||
// Route is network route
|
||||
type Route struct {
|
||||
// Destination is destination address
|
||||
Destination string
|
||||
// Service is destination service name
|
||||
Service string
|
||||
// Address is service node address
|
||||
Address string
|
||||
// Gateway is route gateway
|
||||
Gateway string
|
||||
// Network is network address
|
||||
Network string
|
||||
// Router is the router address
|
||||
Router string
|
||||
// Link is network link
|
||||
Link string
|
||||
// Metric is the route cost metric
|
||||
Metric int
|
||||
}
|
||||
@ -33,7 +37,7 @@ type Route struct {
|
||||
func (r *Route) Hash() uint64 {
|
||||
h := fnv.New64()
|
||||
h.Reset()
|
||||
h.Write([]byte(r.Destination + r.Gateway + r.Network))
|
||||
h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Link))
|
||||
|
||||
return h.Sum64()
|
||||
}
|
||||
@ -45,13 +49,14 @@ func (r Route) String() string {
|
||||
|
||||
// create nice table printing structure
|
||||
table := tablewriter.NewWriter(sb)
|
||||
table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"})
|
||||
table.SetHeader([]string{"Service", "Address", "Gateway", "Network", "Link", "Metric"})
|
||||
|
||||
strRoute := []string{
|
||||
r.Destination,
|
||||
r.Service,
|
||||
r.Address,
|
||||
r.Gateway,
|
||||
r.Router,
|
||||
r.Network,
|
||||
r.Link,
|
||||
fmt.Sprintf("%d", r.Metric),
|
||||
}
|
||||
table.Append(strRoute)
|
||||
|
@ -53,7 +53,7 @@ type Event struct {
|
||||
|
||||
// String prints human readable Event
|
||||
func (e Event) String() string {
|
||||
return fmt.Sprintf("[EVENT] %s:\nRoute:\n%s", e.Type, e.Route)
|
||||
return fmt.Sprintf("[EVENT] time: %s type: %s", e.Timestamp, e.Type)
|
||||
}
|
||||
|
||||
// WatchOption is used to define what routes to watch in the table
|
||||
@ -72,15 +72,15 @@ type Watcher interface {
|
||||
|
||||
// WatchOptions are table watcher options
|
||||
type WatchOptions struct {
|
||||
// Specify destination address to watch
|
||||
Destination string
|
||||
// Service allows to watch specific service routes
|
||||
Service string
|
||||
}
|
||||
|
||||
// WatchDestination sets what destination to watch
|
||||
// Destination is usually microservice name
|
||||
func WatchDestination(d string) WatchOption {
|
||||
// WatchService sets what service routes to watch
|
||||
// Service is the microservice name
|
||||
func WatchService(s string) WatchOption {
|
||||
return func(o *WatchOptions) {
|
||||
o.Destination = d
|
||||
o.Service = s
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,8 +97,8 @@ func (w *tableWatcher) Next() (*Event, error) {
|
||||
for {
|
||||
select {
|
||||
case res := <-w.resChan:
|
||||
switch w.opts.Destination {
|
||||
case res.Route.Destination, "*":
|
||||
switch w.opts.Service {
|
||||
case res.Route.Service, "*":
|
||||
return res, nil
|
||||
default:
|
||||
log.Logf("no table watcher available to receive the event")
|
||||
@ -130,10 +130,10 @@ func (w tableWatcher) String() string {
|
||||
sb := &strings.Builder{}
|
||||
|
||||
table := tablewriter.NewWriter(sb)
|
||||
table.SetHeader([]string{"Destination"})
|
||||
table.SetHeader([]string{"Service"})
|
||||
|
||||
data := []string{
|
||||
w.opts.Destination,
|
||||
w.opts.Service,
|
||||
}
|
||||
table.Append(data)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user