diff --git a/network/default.go b/network/default.go new file mode 100644 index 00000000..be49f9f8 --- /dev/null +++ b/network/default.go @@ -0,0 +1,391 @@ +package network + +import ( + "sync" + "time" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/client" + rtr "github.com/micro/go-micro/client/selector/router" + "github.com/micro/go-micro/proxy" + "github.com/micro/go-micro/router" + pb "github.com/micro/go-micro/router/proto" + "github.com/micro/go-micro/server" + "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/tunnel" + trn "github.com/micro/go-micro/tunnel/transport" + "github.com/micro/go-micro/util/log" +) + +var ( + // ControlChannel is the name of the tunnel channel for passing contron message + ControlChannel = "control" +) + +// network implements Network interface +type network struct { + // options configure the network + options Options + // rtr is network router + router.Router + // prx is network proxy + proxy.Proxy + // tun is network tunnel + tunnel.Tunnel + // srv is network server + srv server.Server + // client is network client + client client.Client + + sync.RWMutex + // connected marks the network as connected + connected bool + // closed closes the network + closed chan bool +} + +// newNetwork returns a new network node +func newNetwork(opts ...Option) Network { + options := DefaultOptions() + + for _, o := range opts { + o(&options) + } + + // init tunnel address to the network bind address + options.Tunnel.Init( + tunnel.Address(options.Address), + ) + + // create tunnel client with tunnel transport + tunTransport := transport.NewTransport( + trn.WithTunnel(options.Tunnel), + ) + + // srv is network server + srv := server.NewServer( + server.Name(options.Name), + server.Transport(tunTransport), + ) + + // client is network client + client := client.NewClient( + client.Transport(tunTransport), + client.Selector( + rtr.NewSelector( + rtr.WithRouter(options.Router), + ), + ), + ) + + return &network{ + options: options, + Router: options.Router, + Proxy: options.Proxy, + Tunnel: options.Tunnel, + srv: srv, + client: client, + } +} + +// Name returns network name +func (n *network) Name() string { + return n.options.Name +} + +// Address returns network bind address +func (n *network) Address() string { + return n.Tunnel.Address() +} + +func (n *network) resolveNodes() ([]string, error) { + // resolve the network address to network nodes + records, err := n.options.Resolver.Resolve(n.options.Name) + if err != nil { + return nil, err + } + + // collect network node addresses + nodes := make([]string, len(records)) + for i, record := range records { + nodes[i] = record.Address + } + + return nodes, nil +} + +func (n *network) resolve() { + resolve := time.NewTicker(ResolveTime) + defer resolve.Stop() + + for { + select { + case <-n.closed: + return + case <-resolve.C: + nodes, err := n.resolveNodes() + if err != nil { + log.Debugf("Network failed to resolve nodes: %v", err) + continue + } + // initialize the tunnel + n.Tunnel.Init( + tunnel.Nodes(nodes...), + ) + } + } +} + +func (n *network) handleConn(conn tunnel.Conn, msg chan *transport.Message) { + for { + m := new(transport.Message) + if err := conn.Recv(m); err != nil { + // TODO: should we bail here? + log.Debugf("Network tunnel advert receive error: %v", err) + return + } + + select { + case msg <- m: + case <-n.closed: + return + } + } +} + +func (n *network) process(l tunnel.Listener) { + // receive control message queue + recv := make(chan *transport.Message, 128) + + // accept a connection + conn, err := l.Accept() + if err != nil { + // TODO: handle this + log.Debugf("Network tunnel accept error: %v", err) + return + } + + go n.handleConn(conn, recv) + + for { + select { + case m := <-recv: + // switch on type of message and take action + switch m.Header["Micro-Method"] { + case "advert": + pbAdvert := &pb.Advert{} + if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { + continue + } + + var events []*router.Event + for _, event := range pbAdvert.Events { + route := router.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int(event.Route.Metric), + } + e := &router.Event{ + Type: router.EventType(event.Type), + Timestamp: time.Unix(0, pbAdvert.Timestamp), + Route: route, + } + events = append(events, e) + } + advert := &router.Advert{ + Id: pbAdvert.Id, + Type: router.AdvertType(pbAdvert.Type), + Timestamp: time.Unix(0, pbAdvert.Timestamp), + TTL: time.Duration(pbAdvert.Ttl), + Events: events, + } + + if err := n.Router.Process(advert); err != nil { + log.Debugf("Network failed to process advert %s: %v", advert.Id, err) + continue + } + } + case <-n.closed: + return + } + } +} + +// advertise advertises routes to the network +func (n *network) advertise(client transport.Client, advertChan <-chan *router.Advert) { + for { + select { + // process local adverts and randomly fire them at other nodes + case advert := <-advertChan: + // create a proto advert + var events []*pb.Event + for _, event := range advert.Events { + // NOTE: we override the Gateway and Link fields here + route := &pb.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: n.options.Address, + Network: event.Route.Network, + Link: "network", + Metric: int64(event.Route.Metric), + } + e := &pb.Event{ + Type: pb.EventType(event.Type), + Timestamp: event.Timestamp.UnixNano(), + Route: route, + } + events = append(events, e) + } + pbAdvert := &pb.Advert{ + Id: advert.Id, + Type: pb.AdvertType(advert.Type), + Timestamp: advert.Timestamp.UnixNano(), + Events: events, + } + body, err := proto.Marshal(pbAdvert) + if err != nil { + // TODO: should we bail here? + log.Debugf("Network failed to marshal message: %v", err) + continue + } + // create transport message and chuck it down the pipe + m := transport.Message{ + Header: map[string]string{ + "Micro-Method": "advert", + }, + Body: body, + } + + if err := client.Send(&m); err != nil { + log.Debugf("Network failed to send advert %s: %v", pbAdvert.Id, err) + continue + } + case <-n.closed: + return + } + } +} + +// Connect connects the network +func (n *network) Connect() error { + n.Lock() + defer n.Unlock() + + // return if already connected + if n.connected { + return nil + } + + // try to resolve network nodes + nodes, err := n.resolveNodes() + if err != nil { + log.Debugf("Network failed to resolve nodes: %v", err) + } + + // connect network tunnel + if err := n.Tunnel.Connect(); err != nil { + return err + } + + // initialize the tunnel to resolved nodes + n.Tunnel.Init( + tunnel.Nodes(nodes...), + ) + + // dial into ControlChannel to send route adverts + client, err := n.Tunnel.Dial(ControlChannel) + if err != nil { + return err + } + + // listen on ControlChannel + listener, err := n.Tunnel.Listen(ControlChannel) + if err != nil { + return err + } + + // create closed channel + n.closed = make(chan bool) + + // keep resolving network nodes + go n.resolve() + + // start the router + if err := n.options.Router.Start(); err != nil { + return err + } + + // start advertising routes + advertChan, err := n.options.Router.Advertise() + if err != nil { + return err + } + + // advertise routes + go n.advertise(client, advertChan) + // accept and process routes + go n.process(listener) + + // start the server + if err := n.srv.Start(); err != nil { + return err + } + + // set connected to true + n.connected = true + + return nil +} + +func (n *network) close() error { + // stop the server + if err := n.srv.Stop(); err != nil { + return err + } + + // stop the router + if err := n.Router.Stop(); err != nil { + return err + } + + // close the tunnel + if err := n.Tunnel.Close(); err != nil { + return err + } + + return nil +} + +// Close closes network connection +func (n *network) Close() error { + n.Lock() + defer n.Unlock() + + if !n.connected { + return nil + } + + select { + case <-n.closed: + return nil + default: + close(n.closed) + // set connected to false + n.connected = false + } + + return n.close() +} + +// Client returns network client +func (n *network) Client() client.Client { + return n.client +} + +// Server returns network server +func (n *network) Server() server.Server { + return n.srv +} diff --git a/network/link/link.go b/network/link/link.go index 3acfd772..1c42831a 100644 --- a/network/link/link.go +++ b/network/link/link.go @@ -8,8 +8,13 @@ import ( "github.com/micro/go-micro/transport" ) +var ( + // ErrLinkClosed is returned when attempting i/o operation on the closed link + ErrLinkClosed = errors.New("link closed") +) + // Link is a layer on top of a transport socket with the -// buffering send and recv queue's with the ability to +// buffering send and recv queues with the ability to // measure the actual transport link and reconnect if // an address is specified. type Link interface { @@ -28,10 +33,6 @@ type Link interface { Length() int } -var ( - ErrLinkClosed = errors.New("link closed") -) - // NewLink creates a new link on top of a socket func NewLink(opts ...options.Option) Link { return newLink(options.NewOptions(opts...)) diff --git a/network/network.go b/network/network.go index 15d8155f..46801e0c 100644 --- a/network/network.go +++ b/network/network.go @@ -1,2 +1,39 @@ // Package network is for creating internetworks package network + +import ( + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/server" +) + +var ( + // DefaultName is default network name + DefaultName = "go.micro" + // DefaultAddress is default network address + DefaultAddress = ":0" + // ResolveTime ddefines the time to periodically resolve network nodes + ResolveTime = 1 * time.Minute +) + +// Network is micro network +type Network interface { + // Name of the network + Name() string + // Address returns network bind address + Address() string + // Connect starts the resolver and tunnel server + Connect() error + // Close stops the tunnel and resolving + Close() error + // Client is micro client + Client() client.Client + // Server is micro server + Server() server.Server +} + +// NewNetwork returns a new network interface +func NewNetwork(opts ...Option) Network { + return newNetwork(opts...) +} diff --git a/network/options.go b/network/options.go new file mode 100644 index 00000000..8bc36972 --- /dev/null +++ b/network/options.go @@ -0,0 +1,82 @@ +package network + +import ( + "github.com/micro/go-micro/network/resolver" + "github.com/micro/go-micro/network/resolver/registry" + "github.com/micro/go-micro/proxy" + "github.com/micro/go-micro/proxy/mucp" + "github.com/micro/go-micro/router" + "github.com/micro/go-micro/tunnel" +) + +type Option func(*Options) + +// Options configure network +type Options struct { + // Name of the network + Name string + // Address to bind to + Address string + // Tunnel is network tunnel + Tunnel tunnel.Tunnel + // Router is network router + Router router.Router + // Proxy is network proxy + Proxy proxy.Proxy + // Resolver is network resolver + Resolver resolver.Resolver +} + +// Name is the network name +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + +// Address is the network address +func Address(a string) Option { + return func(o *Options) { + o.Address = a + } +} + +// Tunnel sets the network tunnel +func Tunnel(t tunnel.Tunnel) Option { + return func(o *Options) { + o.Tunnel = t + } +} + +// Router sets the network router +func Router(r router.Router) Option { + return func(o *Options) { + o.Router = r + } +} + +// Proxy sets the network proxy +func Proxy(p proxy.Proxy) Option { + return func(o *Options) { + o.Proxy = p + } +} + +// Resolver is the network resolver +func Resolver(r resolver.Resolver) Option { + return func(o *Options) { + o.Resolver = r + } +} + +// DefaultOptions returns network default options +func DefaultOptions() Options { + return Options{ + Name: DefaultName, + Address: DefaultAddress, + Tunnel: tunnel.NewTunnel(), + Router: router.DefaultRouter, + Proxy: mucp.NewProxy(), + Resolver: ®istry.Resolver{}, + } +} diff --git a/network/resolver/dns/dns.go b/network/resolver/dns/dns.go index ba109656..3cfa2746 100644 --- a/network/resolver/dns/dns.go +++ b/network/resolver/dns/dns.go @@ -8,6 +8,7 @@ import ( "github.com/micro/go-micro/network/resolver" ) +// Resolver is a DNS network resolve type Resolver struct{} // Resolve assumes ID is a domain name e.g micro.mu diff --git a/network/resolver/http/http.go b/network/resolver/http/http.go index b6025a59..055662a5 100644 --- a/network/resolver/http/http.go +++ b/network/resolver/http/http.go @@ -10,6 +10,7 @@ import ( "github.com/micro/go-micro/network/resolver" ) +// Resolver is a HTTP network resolver type Resolver struct { // If not set, defaults to http Proto string diff --git a/network/resolver/registry/registry.go b/network/resolver/registry/registry.go index 9fe80bc5..20fffc18 100644 --- a/network/resolver/registry/registry.go +++ b/network/resolver/registry/registry.go @@ -6,6 +6,7 @@ import ( "github.com/micro/go-micro/registry" ) +// Resolver is a registry network resolver type Resolver struct { // Registry is the registry to use otherwise we use the defaul Registry registry.Registry diff --git a/network/resolver/resolver.go b/network/resolver/resolver.go index 2eb0b2a2..369269df 100644 --- a/network/resolver/resolver.go +++ b/network/resolver/resolver.go @@ -5,7 +5,7 @@ package resolver // via the name to connect to. This is done based on Network.Name(). // Before we can be part of any network, we have to connect to it. type Resolver interface { - // Resolve returns a list of addresses for an name + // Resolve returns a list of addresses for a name Resolve(name string) ([]*Record, error) } diff --git a/router/router.go b/router/router.go index 7d7ab0de..8b6618d1 100644 --- a/router/router.go +++ b/router/router.go @@ -5,6 +5,17 @@ import ( "time" ) +var ( + // DefaultAddress is default router address + DefaultAddress = ":9093" + // DefaultName is default router service name + DefaultName = "go.micro.router" + // DefaultNetwork is default micro network + DefaultNetwork = "go.micro" + // DefaultRouter is default network router + DefaultRouter = NewRouter() +) + // Router is an interface for a routing control plane type Router interface { // Init initializes the router with options @@ -31,16 +42,17 @@ type Router interface { String() string } +// Table is an interface for routing table type Table interface { // Create new route in the routing table Create(Route) error - // Delete deletes existing route from the routing table + // Delete existing route from the routing table Delete(Route) error - // Update updates route in the routing table + // Update route in the routing table Update(Route) error - // List returns the list of all routes in the table + // List all routes in the table List() ([]Route, error) - // Query queries routes in the routing table + // Query routes in the routing table Query(Query) ([]Route, error) } @@ -125,17 +137,6 @@ type Advert struct { Events []*Event } -var ( - // DefaultAddress is default router address - DefaultAddress = ":9093" - // DefaultName is default router service name - DefaultName = "go.micro.router" - // DefaultNetwork is default micro network - DefaultNetwork = "go.micro" - // DefaultRouter is default network router - DefaultRouter = NewRouter() -) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...)