From 6beae23afdd9536130dc42894094c960236e5be4 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 20 Aug 2019 12:48:51 +0100 Subject: [PATCH 01/13] First commit. Outline of the default network. --- network/default.go | 50 +++++++++++++++++++++++++++ network/link/link.go | 11 +++--- network/network.go | 31 +++++++++++++++++ network/options.go | 48 +++++++++++++++++++++++++ network/resolver/dns/dns.go | 1 + network/resolver/http/http.go | 1 + network/resolver/registry/registry.go | 1 + network/resolver/resolver.go | 2 +- router/router.go | 22 ++++++------ 9 files changed, 150 insertions(+), 17 deletions(-) create mode 100644 network/default.go create mode 100644 network/options.go diff --git a/network/default.go b/network/default.go new file mode 100644 index 00000000..29265a86 --- /dev/null +++ b/network/default.go @@ -0,0 +1,50 @@ +package network + +import ( + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/server" +) + +// network implements Network interface +type network struct { + // options configure the network + options Options +} + +// newNetwork returns a new network node +func newNetwork(opts ...Option) Network { + options := DefaultOptions() + + for _, o := range opts { + o(&options) + } + + return &network{ + options: options, + } +} + +// Name returns network name +func (n *network) Name() string { + return n.options.Name +} + +// Connect connects the network +func (n *network) Connect() error { + return nil +} + +// Close closes network connection +func (n *network) Close() error { + return nil +} + +// Client returns network client +func (n *network) Client() client.Client { + return nil +} + +// Server returns network server +func (n *network) Server() server.Server { + return nil +} diff --git a/network/link/link.go b/network/link/link.go index 3acfd772..1c42831a 100644 --- a/network/link/link.go +++ b/network/link/link.go @@ -8,8 +8,13 @@ import ( "github.com/micro/go-micro/transport" ) +var ( + // ErrLinkClosed is returned when attempting i/o operation on the closed link + ErrLinkClosed = errors.New("link closed") +) + // Link is a layer on top of a transport socket with the -// buffering send and recv queue's with the ability to +// buffering send and recv queues with the ability to // measure the actual transport link and reconnect if // an address is specified. type Link interface { @@ -28,10 +33,6 @@ type Link interface { Length() int } -var ( - ErrLinkClosed = errors.New("link closed") -) - // NewLink creates a new link on top of a socket func NewLink(opts ...options.Option) Link { return newLink(options.NewOptions(opts...)) diff --git a/network/network.go b/network/network.go index 15d8155f..44fcbee4 100644 --- a/network/network.go +++ b/network/network.go @@ -1,2 +1,33 @@ // Package network is for creating internetworks package network + +import ( + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/server" +) + +var ( + // DefaultName is default network name + DefaultName = "go.micro.network" + // DefaultAddress is default network address + DefaultAddress = ":0" +) + +// Network is micro network +type Network interface { + // Name of the network + Name() string + // Connect starts the resolver and tunnel server + Connect() error + // Close stops the tunnel and resolving + Close() error + // Client is micro client + Client() client.Client + // Server is micro server + Server() server.Server +} + +// NewNetwork returns a new network interface +func NewNetwork(opts ...Option) Network { + return newNetwork(opts...) +} diff --git a/network/options.go b/network/options.go new file mode 100644 index 00000000..02bc3b1e --- /dev/null +++ b/network/options.go @@ -0,0 +1,48 @@ +package network + +import ( + "github.com/micro/go-micro/network/resolver" + "github.com/micro/go-micro/network/resolver/dns" +) + +type Option func(*Options) + +// Options configure network +type Options struct { + // Name of the network + Name string + // Address to bind to + Address string + // Resolver is network resolver + Resolver resolver.Resolver +} + +// Name is the network name +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + +// Address is the network address +func Address(a string) Option { + return func(o *Options) { + o.Address = a + } +} + +// Resolver is the network resolver +func Resolver(r resolver.Resolver) Option { + return func(o *Options) { + o.Resolver = r + } +} + +// DefaultOptions returns network default options +func DefaultOptions() Options { + return Options{ + Name: DefaultName, + Address: DefaultAddress, + Resolver: &dns.Resolver{}, + } +} diff --git a/network/resolver/dns/dns.go b/network/resolver/dns/dns.go index ba109656..3cfa2746 100644 --- a/network/resolver/dns/dns.go +++ b/network/resolver/dns/dns.go @@ -8,6 +8,7 @@ import ( "github.com/micro/go-micro/network/resolver" ) +// Resolver is a DNS network resolve type Resolver struct{} // Resolve assumes ID is a domain name e.g micro.mu diff --git a/network/resolver/http/http.go b/network/resolver/http/http.go index b6025a59..055662a5 100644 --- a/network/resolver/http/http.go +++ b/network/resolver/http/http.go @@ -10,6 +10,7 @@ import ( "github.com/micro/go-micro/network/resolver" ) +// Resolver is a HTTP network resolver type Resolver struct { // If not set, defaults to http Proto string diff --git a/network/resolver/registry/registry.go b/network/resolver/registry/registry.go index 9fe80bc5..20fffc18 100644 --- a/network/resolver/registry/registry.go +++ b/network/resolver/registry/registry.go @@ -6,6 +6,7 @@ import ( "github.com/micro/go-micro/registry" ) +// Resolver is a registry network resolver type Resolver struct { // Registry is the registry to use otherwise we use the defaul Registry registry.Registry diff --git a/network/resolver/resolver.go b/network/resolver/resolver.go index 2eb0b2a2..369269df 100644 --- a/network/resolver/resolver.go +++ b/network/resolver/resolver.go @@ -5,7 +5,7 @@ package resolver // via the name to connect to. This is done based on Network.Name(). // Before we can be part of any network, we have to connect to it. type Resolver interface { - // Resolve returns a list of addresses for an name + // Resolve returns a list of addresses for a name Resolve(name string) ([]*Record, error) } diff --git a/router/router.go b/router/router.go index 7d7ab0de..c5320590 100644 --- a/router/router.go +++ b/router/router.go @@ -5,6 +5,17 @@ import ( "time" ) +var ( + // DefaultAddress is default router address + DefaultAddress = ":9093" + // DefaultName is default router service name + DefaultName = "go.micro.router" + // DefaultNetwork is default micro network + DefaultNetwork = "go.micro" + // DefaultRouter is default network router + DefaultRouter = NewRouter() +) + // Router is an interface for a routing control plane type Router interface { // Init initializes the router with options @@ -125,17 +136,6 @@ type Advert struct { Events []*Event } -var ( - // DefaultAddress is default router address - DefaultAddress = ":9093" - // DefaultName is default router service name - DefaultName = "go.micro.router" - // DefaultNetwork is default micro network - DefaultNetwork = "go.micro" - // DefaultRouter is default network router - DefaultRouter = NewRouter() -) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) From 30dd3f54f0e949655c9d8f89c7ef46c26b850c39 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 20 Aug 2019 21:11:27 +0100 Subject: [PATCH 02/13] Make router.Table docs consistent --- router/router.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/router/router.go b/router/router.go index c5320590..8b6618d1 100644 --- a/router/router.go +++ b/router/router.go @@ -42,16 +42,17 @@ type Router interface { String() string } +// Table is an interface for routing table type Table interface { // Create new route in the routing table Create(Route) error - // Delete deletes existing route from the routing table + // Delete existing route from the routing table Delete(Route) error - // Update updates route in the routing table + // Update route in the routing table Update(Route) error - // List returns the list of all routes in the table + // List all routes in the table List() ([]Route, error) - // Query queries routes in the routing table + // Query routes in the routing table Query(Query) ([]Route, error) } From fcec6e8eae22f3de9aef377aa0f83fd414e43940 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 20 Aug 2019 21:12:21 +0100 Subject: [PATCH 03/13] First attempt to implement default network interface --- network/default.go | 296 ++++++++++++++++++++++++++++++++++++++++++++- network/network.go | 4 + network/options.go | 34 ++++++ 3 files changed, 331 insertions(+), 3 deletions(-) diff --git a/network/default.go b/network/default.go index 29265a86..48d52fd5 100644 --- a/network/default.go +++ b/network/default.go @@ -1,14 +1,47 @@ package network import ( + "sync" + "time" + + "github.com/gogo/protobuf/proto" "github.com/micro/go-micro/client" + "github.com/micro/go-micro/proxy" + "github.com/micro/go-micro/router" + pb "github.com/micro/go-micro/router/proto" "github.com/micro/go-micro/server" + "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/tunnel" + tr "github.com/micro/go-micro/tunnel/transport" + "github.com/micro/go-micro/util/log" +) + +var ( + // ControlChannel is the name of the tunnel channel for passing contron message + ControlChannel = "control-msg" ) // network implements Network interface type network struct { // options configure the network + // TODO: we might end up embedding options Options + // rtr is network router + router.Router + // prx is network proxy + proxy.Proxy + // tun is network tunnel + tunnel.Tunnel + // srv is network server + srv server.Server + // client is network client + client client.Client + + sync.RWMutex + // connected marks the network as connected + connected bool + // closed closes the network + closed chan bool } // newNetwork returns a new network node @@ -19,8 +52,33 @@ func newNetwork(opts ...Option) Network { o(&options) } + // init tunnel address to the network bind address + options.Tunnel.Init( + tunnel.Address(options.Address), + ) + + // create tunnel client with tunnel transport + tunTransport := transport.NewTransport( + tr.WithTunnel(options.Tunnel), + ) + + // srv is network server + srv := server.NewServer( + server.Transport(tunTransport), + ) + + // client is network client + client := client.NewClient( + client.Transport(tunTransport), + ) + return &network{ options: options, + Router: options.Router, + Proxy: options.Proxy, + Tunnel: options.Tunnel, + srv: srv, + client: client, } } @@ -29,22 +87,254 @@ func (n *network) Name() string { return n.options.Name } +// Address returns network bind address +func (n *network) Address() string { + return n.options.Address +} + +func (n *network) resolveNodes() ([]string, error) { + // resolve the network address to network nodes + records, err := n.options.Resolver.Resolve(n.options.Name) + if err != nil { + return nil, err + } + + // collect network node addresses + nodes := make([]string, len(records)) + for i, record := range records { + nodes[i] = record.Address + } + + return nodes, nil +} + +func (n *network) resolve() { + resolve := time.NewTicker(ResolveTime) + defer resolve.Stop() + + for { + select { + case <-n.closed: + return + case <-resolve.C: + nodes, err := n.resolveNodes() + if err != nil { + log.Debugf("Network failed to resolve nodes: %v", err) + continue + } + // initialize the tunnel + n.Tunnel.Init( + tunnel.Nodes(nodes...), + ) + } + } +} + +func (n *network) process(client transport.Client) { + for { + m := new(transport.Message) + if err := client.Recv(m); err != nil { + // TODO: should we bail here? + log.Debugf("Network advert receive error: %v", err) + return + } + + // switch on type of message and take action + switch m.Header["Micro-Tunnel"] { + case n.Router.Options().Id: + // NOTE: this should not happen + // skip local adverts + continue + default: + pbAdvert := &pb.Advert{} + if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { + continue + } + + var events []*router.Event + for _, event := range pbAdvert.Events { + route := router.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int(event.Route.Metric), + } + e := &router.Event{ + Type: router.EventType(event.Type), + Timestamp: time.Unix(0, pbAdvert.Timestamp), + Route: route, + } + events = append(events, e) + } + advert := &router.Advert{ + Id: pbAdvert.Id, + Type: router.AdvertType(pbAdvert.Type), + Timestamp: time.Unix(0, pbAdvert.Timestamp), + TTL: time.Duration(pbAdvert.Ttl), + Events: events, + } + + if err := n.Router.Process(advert); err != nil { + log.Debugf("Network failed to process advert %s: %v", advert.Id, err) + continue + } + } + } +} + +// advertise advertises routes to the network +func (n *network) advertise(client transport.Client, advertChan <-chan *router.Advert) { + for { + select { + // process local adverts and randomly fire them at other nodes + case advert := <-advertChan: + // create a proto advert + var events []*pb.Event + for _, event := range advert.Events { + route := &pb.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int64(event.Route.Metric), + } + e := &pb.Event{ + Type: pb.EventType(event.Type), + Timestamp: event.Timestamp.UnixNano(), + Route: route, + } + events = append(events, e) + } + pbAdvert := &pb.Advert{ + Id: advert.Id, + Type: pb.AdvertType(advert.Type), + Timestamp: advert.Timestamp.UnixNano(), + Events: events, + } + body, err := proto.Marshal(pbAdvert) + if err != nil { + // TODO: should we bail here? + log.Debugf("Network failed to marshal message: %v", err) + continue + } + // create transport message and chuck it down the pipe + m := transport.Message{ + Header: map[string]string{ + "Micro-Method": "advert", + }, + Body: body, + } + if err := client.Send(&m); err != nil { + log.Debugf("Network failed to send advert %s: %v", pbAdvert.Id, err) + continue + } + case <-n.closed: + return + } + } +} + // Connect connects the network func (n *network) Connect() error { + n.Lock() + defer n.Unlock() + + // return if already connected + if n.connected { + return nil + } + + // try to resolve network nodes + nodes, err := n.resolveNodes() + if err != nil { + return err + } + + // connect network tunnel + if err := n.Tunnel.Connect(); err != nil { + return err + } + + // initialize the tunnel to resolved nodes + n.Tunnel.Init( + tunnel.Nodes(nodes...), + ) + + // dial into ControlChannel to send route adverts + client, err := n.Tunnel.Dial(ControlChannel) + if err != nil { + // TODO: should we stop the tunnel here? + return err + } + + // create closed channel + n.closed = make(chan bool) + + // keep resolving network nodes + go n.resolve() + + // TODO: do we assume the router has been started? + // start advertising routes + advertChan, err := n.options.Router.Advertise() + if err != nil { + return err + } + + // advertise routes + go n.advertise(client, advertChan) + // process routes + go n.process(client) + + // set connected to true + n.connected = true + + return nil +} + +func (n *network) close() error { + // stop the router + if err := n.Router.Stop(); err != nil { + return err + } + + // close the tunnel + if err := n.Tunnel.Close(); err != nil { + return err + } + return nil } // Close closes network connection func (n *network) Close() error { - return nil + n.Lock() + defer n.Unlock() + + if !n.connected { + return nil + } + + select { + case <-n.closed: + return nil + default: + close(n.closed) + // set connected to false + n.connected = false + } + + return n.close() } // Client returns network client func (n *network) Client() client.Client { - return nil + return n.client } // Server returns network server func (n *network) Server() server.Server { - return nil + return n.srv } diff --git a/network/network.go b/network/network.go index 44fcbee4..0b018d44 100644 --- a/network/network.go +++ b/network/network.go @@ -2,6 +2,8 @@ package network import ( + "time" + "github.com/micro/go-micro/client" "github.com/micro/go-micro/server" ) @@ -11,6 +13,8 @@ var ( DefaultName = "go.micro.network" // DefaultAddress is default network address DefaultAddress = ":0" + // ResolveTime ddefines the time we periodically resolve network nodes + ResolveTime = 1 * time.Minute ) // Network is micro network diff --git a/network/options.go b/network/options.go index 02bc3b1e..4c737fe1 100644 --- a/network/options.go +++ b/network/options.go @@ -3,6 +3,10 @@ package network import ( "github.com/micro/go-micro/network/resolver" "github.com/micro/go-micro/network/resolver/dns" + "github.com/micro/go-micro/proxy" + "github.com/micro/go-micro/proxy/mucp" + "github.com/micro/go-micro/router" + "github.com/micro/go-micro/tunnel" ) type Option func(*Options) @@ -13,6 +17,12 @@ type Options struct { Name string // Address to bind to Address string + // Tunnel is network tunnel + Tunnel tunnel.Tunnel + // Router is network router + Router router.Router + // Proxy is network proxy + Proxy proxy.Proxy // Resolver is network resolver Resolver resolver.Resolver } @@ -31,6 +41,27 @@ func Address(a string) Option { } } +// Tunnel sets the network tunnel +func Tunnel(t tunnel.Tunnel) Option { + return func(o *Options) { + o.Tunnel = t + } +} + +// Router sets the network router +func Router(r router.Router) Option { + return func(o *Options) { + o.Router = r + } +} + +// Proxy sets the network proxy +func Proxy(p proxy.Proxy) Option { + return func(o *Options) { + o.Proxy = p + } +} + // Resolver is the network resolver func Resolver(r resolver.Resolver) Option { return func(o *Options) { @@ -43,6 +74,9 @@ func DefaultOptions() Options { return Options{ Name: DefaultName, Address: DefaultAddress, + Tunnel: tunnel.NewTunnel(), + Router: router.DefaultRouter, + Proxy: mucp.NewProxy(), Resolver: &dns.Resolver{}, } } From a6e1287b27fb91eee371774fb3aae224292ecb7c Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 20 Aug 2019 21:15:02 +0100 Subject: [PATCH 04/13] Replaced incorrect proto import path --- network/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/default.go b/network/default.go index 48d52fd5..ed23a7d7 100644 --- a/network/default.go +++ b/network/default.go @@ -4,7 +4,7 @@ import ( "sync" "time" - "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" "github.com/micro/go-micro/client" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/router" From 6c1f1d66f7b9bd98189a18f5d7984683b963ad28 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 20 Aug 2019 21:30:25 +0100 Subject: [PATCH 05/13] Switch received messages on the right header --- network/default.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/network/default.go b/network/default.go index ed23a7d7..d5f3f8ce 100644 --- a/network/default.go +++ b/network/default.go @@ -140,12 +140,8 @@ func (n *network) process(client transport.Client) { } // switch on type of message and take action - switch m.Header["Micro-Tunnel"] { - case n.Router.Options().Id: - // NOTE: this should not happen - // skip local adverts - continue - default: + switch m.Header["Micro-Method"] { + case "advert": pbAdvert := &pb.Advert{} if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { continue From a09d5d2e9af37a12da3618096f60f8c99e99e11b Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 21 Aug 2019 19:16:18 +0100 Subject: [PATCH 06/13] Add Address method. Start and Stop router/server. --- network/default.go | 31 +++++++++++++++++++++++++------ network/network.go | 4 +++- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/network/default.go b/network/default.go index d5f3f8ce..ba31d7c5 100644 --- a/network/default.go +++ b/network/default.go @@ -6,25 +6,25 @@ import ( "github.com/golang/protobuf/proto" "github.com/micro/go-micro/client" + rtr "github.com/micro/go-micro/client/selector/router" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/router" pb "github.com/micro/go-micro/router/proto" "github.com/micro/go-micro/server" "github.com/micro/go-micro/transport" "github.com/micro/go-micro/tunnel" - tr "github.com/micro/go-micro/tunnel/transport" + trn "github.com/micro/go-micro/tunnel/transport" "github.com/micro/go-micro/util/log" ) var ( // ControlChannel is the name of the tunnel channel for passing contron message - ControlChannel = "control-msg" + ControlChannel = "control" ) // network implements Network interface type network struct { // options configure the network - // TODO: we might end up embedding options Options // rtr is network router router.Router @@ -59,7 +59,7 @@ func newNetwork(opts ...Option) Network { // create tunnel client with tunnel transport tunTransport := transport.NewTransport( - tr.WithTunnel(options.Tunnel), + trn.WithTunnel(options.Tunnel), ) // srv is network server @@ -70,6 +70,11 @@ func newNetwork(opts ...Option) Network { // client is network client client := client.NewClient( client.Transport(tunTransport), + client.Selector( + rtr.NewSelector( + rtr.WithRouter(options.Router), + ), + ), ) return &network{ @@ -89,7 +94,7 @@ func (n *network) Name() string { // Address returns network bind address func (n *network) Address() string { - return n.options.Address + return n.Tunnel.Address() } func (n *network) resolveNodes() ([]string, error) { @@ -272,7 +277,11 @@ func (n *network) Connect() error { // keep resolving network nodes go n.resolve() - // TODO: do we assume the router has been started? + // start the router + if err := n.options.Router.Start(); err != nil { + return err + } + // start advertising routes advertChan, err := n.options.Router.Advertise() if err != nil { @@ -284,6 +293,11 @@ func (n *network) Connect() error { // process routes go n.process(client) + // start the server + if err := n.srv.Start(); err != nil { + return err + } + // set connected to true n.connected = true @@ -291,6 +305,11 @@ func (n *network) Connect() error { } func (n *network) close() error { + // stop the server + if err := n.srv.Stop(); err != nil { + return err + } + // stop the router if err := n.Router.Stop(); err != nil { return err diff --git a/network/network.go b/network/network.go index 0b018d44..3663876f 100644 --- a/network/network.go +++ b/network/network.go @@ -13,7 +13,7 @@ var ( DefaultName = "go.micro.network" // DefaultAddress is default network address DefaultAddress = ":0" - // ResolveTime ddefines the time we periodically resolve network nodes + // ResolveTime ddefines the time to periodically resolve network nodes ResolveTime = 1 * time.Minute ) @@ -21,6 +21,8 @@ var ( type Network interface { // Name of the network Name() string + // Address returns network bind address + Address() string // Connect starts the resolver and tunnel server Connect() error // Close stops the tunnel and resolving From e1599b0f17fc81f77a5500d8a0288410993476ff Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 21 Aug 2019 19:21:23 +0100 Subject: [PATCH 07/13] Set server name. Set default network name. --- network/default.go | 1 + network/network.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/network/default.go b/network/default.go index ba31d7c5..969aed5f 100644 --- a/network/default.go +++ b/network/default.go @@ -64,6 +64,7 @@ func newNetwork(opts ...Option) Network { // srv is network server srv := server.NewServer( + server.Name(DefaultName), server.Transport(tunTransport), ) diff --git a/network/network.go b/network/network.go index 3663876f..46801e0c 100644 --- a/network/network.go +++ b/network/network.go @@ -10,7 +10,7 @@ import ( var ( // DefaultName is default network name - DefaultName = "go.micro.network" + DefaultName = "go.micro" // DefaultAddress is default network address DefaultAddress = ":0" // ResolveTime ddefines the time to periodically resolve network nodes From db89fc4efeee172ebd6726cd0ec7cd6fe9807bd1 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 21 Aug 2019 19:22:50 +0100 Subject: [PATCH 08/13] Set server name to the correct value. --- network/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/default.go b/network/default.go index 969aed5f..56f82a17 100644 --- a/network/default.go +++ b/network/default.go @@ -64,7 +64,7 @@ func newNetwork(opts ...Option) Network { // srv is network server srv := server.NewServer( - server.Name(DefaultName), + server.Name(options.Name), server.Transport(tunTransport), ) From e53484302ce6d1c0b83622c4702e24c5ae8dbd3f Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 22 Aug 2019 13:17:32 +0100 Subject: [PATCH 09/13] Added ControlChannel tunnel.Listener to process incoming messages --- network/default.go | 107 +++++++++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 37 deletions(-) diff --git a/network/default.go b/network/default.go index 56f82a17..bab14f13 100644 --- a/network/default.go +++ b/network/default.go @@ -136,52 +136,80 @@ func (n *network) resolve() { } } -func (n *network) process(client transport.Client) { +func (n *network) handleConn(conn tunnel.Conn, msg chan *transport.Message) { for { m := new(transport.Message) - if err := client.Recv(m); err != nil { + if err := conn.Recv(m); err != nil { // TODO: should we bail here? - log.Debugf("Network advert receive error: %v", err) + log.Debugf("Network tunnel advert receive error: %v", err) return } - // switch on type of message and take action - switch m.Header["Micro-Method"] { - case "advert": - pbAdvert := &pb.Advert{} - if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { - continue - } + select { + case msg <- m: + case <-n.closed: + return + } + } +} - var events []*router.Event - for _, event := range pbAdvert.Events { - route := router.Route{ - Service: event.Route.Service, - Address: event.Route.Address, - Gateway: event.Route.Gateway, - Network: event.Route.Network, - Link: event.Route.Link, - Metric: int(event.Route.Metric), +func (n *network) process(l tunnel.Listener) { + // receive control message queue + recv := make(chan *transport.Message, 128) + + // accept a connection + conn, err := l.Accept() + if err != nil { + // TODO: handle this + log.Debugf("Network tunnel accept error: %v", err) + return + } + + go n.handleConn(conn, recv) + + for { + select { + case m := <-recv: + // switch on type of message and take action + switch m.Header["Micro-Method"] { + case "advert": + pbAdvert := &pb.Advert{} + if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { + continue } - e := &router.Event{ - Type: router.EventType(event.Type), + + var events []*router.Event + for _, event := range pbAdvert.Events { + route := router.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int(event.Route.Metric), + } + e := &router.Event{ + Type: router.EventType(event.Type), + Timestamp: time.Unix(0, pbAdvert.Timestamp), + Route: route, + } + events = append(events, e) + } + advert := &router.Advert{ + Id: pbAdvert.Id, + Type: router.AdvertType(pbAdvert.Type), Timestamp: time.Unix(0, pbAdvert.Timestamp), - Route: route, + TTL: time.Duration(pbAdvert.Ttl), + Events: events, } - events = append(events, e) - } - advert := &router.Advert{ - Id: pbAdvert.Id, - Type: router.AdvertType(pbAdvert.Type), - Timestamp: time.Unix(0, pbAdvert.Timestamp), - TTL: time.Duration(pbAdvert.Ttl), - Events: events, - } - if err := n.Router.Process(advert); err != nil { - log.Debugf("Network failed to process advert %s: %v", advert.Id, err) - continue + if err := n.Router.Process(advert); err != nil { + log.Debugf("Network failed to process advert %s: %v", advert.Id, err) + continue + } } + case <-n.closed: + return } } } @@ -268,7 +296,12 @@ func (n *network) Connect() error { // dial into ControlChannel to send route adverts client, err := n.Tunnel.Dial(ControlChannel) if err != nil { - // TODO: should we stop the tunnel here? + return err + } + + // listen on ControlChannel + listener, err := n.Tunnel.Listen(ControlChannel) + if err != nil { return err } @@ -291,8 +324,8 @@ func (n *network) Connect() error { // advertise routes go n.advertise(client, advertChan) - // process routes - go n.process(client) + // accept and process routes + go n.process(listener) // start the server if err := n.srv.Start(); err != nil { From 8c3eec9f2aced5d057ad63a100fc3131dbd589c7 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 22 Aug 2019 18:57:20 +0100 Subject: [PATCH 10/13] Set the default resolver to registry --- network/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/options.go b/network/options.go index 4c737fe1..8bc36972 100644 --- a/network/options.go +++ b/network/options.go @@ -2,7 +2,7 @@ package network import ( "github.com/micro/go-micro/network/resolver" - "github.com/micro/go-micro/network/resolver/dns" + "github.com/micro/go-micro/network/resolver/registry" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy/mucp" "github.com/micro/go-micro/router" @@ -77,6 +77,6 @@ func DefaultOptions() Options { Tunnel: tunnel.NewTunnel(), Router: router.DefaultRouter, Proxy: mucp.NewProxy(), - Resolver: &dns.Resolver{}, + Resolver: ®istry.Resolver{}, } } From 9448d7c164411fa325a55f1344bb9d141261da65 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 23 Aug 2019 16:01:57 +0100 Subject: [PATCH 11/13] Set Route.Network to "network" and Router.Gateway to network.Address --- network/default.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/network/default.go b/network/default.go index bab14f13..e4512172 100644 --- a/network/default.go +++ b/network/default.go @@ -226,8 +226,8 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A route := &pb.Route{ Service: event.Route.Service, Address: event.Route.Address, - Gateway: event.Route.Gateway, - Network: event.Route.Network, + Gateway: n.options.Address, + Network: "network", Link: event.Route.Link, Metric: int64(event.Route.Metric), } @@ -257,6 +257,7 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A }, Body: body, } + if err := client.Send(&m); err != nil { log.Debugf("Network failed to send advert %s: %v", pbAdvert.Id, err) continue From 88e47b9b066d45d1f6230c9693396ef3273f8b2e Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 23 Aug 2019 17:48:14 +0100 Subject: [PATCH 12/13] Dont bail when unable to resolve network nodes. --- network/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/default.go b/network/default.go index e4512172..c7f6a3ab 100644 --- a/network/default.go +++ b/network/default.go @@ -281,7 +281,7 @@ func (n *network) Connect() error { // try to resolve network nodes nodes, err := n.resolveNodes() if err != nil { - return err + log.Debugf("Network failed to resolve nodes: %v", err) } // connect network tunnel From ed8d28c9ab4eaf4a87689722756ccd328d251b8a Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 23 Aug 2019 21:08:18 +0100 Subject: [PATCH 13/13] Set Route.Link to "network" not Route.Network. Oops! --- network/default.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/network/default.go b/network/default.go index c7f6a3ab..be49f9f8 100644 --- a/network/default.go +++ b/network/default.go @@ -223,12 +223,13 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A // create a proto advert var events []*pb.Event for _, event := range advert.Events { + // NOTE: we override the Gateway and Link fields here route := &pb.Route{ Service: event.Route.Service, Address: event.Route.Address, Gateway: n.options.Address, - Network: "network", - Link: event.Route.Link, + Network: event.Route.Network, + Link: "network", Metric: int64(event.Route.Metric), } e := &pb.Event{