diff --git a/network/default.go b/network/default.go index 456be806..88203a48 100644 --- a/network/default.go +++ b/network/default.go @@ -44,15 +44,17 @@ type network struct { // lease generates a new lease with a node id/address // TODO: use a consensus mechanism, pool or some deterministic -// unique prefixing method. +// unique addressing method. func (n *network) lease() *pb.Lease { // create the id id := uuid.New().String() // create a timestamp now := time.Now().UnixNano() - // create the address + + // create the address by hashing the id and timestamp h := sha256.New() h.Write([]byte(fmt.Sprintf("%s-%d\n", id, now))) + // magic new address address := fmt.Sprintf("%x", h.Sum(nil)) // return the node @@ -62,6 +64,7 @@ func (n *network) lease() *pb.Lease { Node: &pb.Node{ Id: id, Address: address, + Network: n.id, }, } } @@ -104,7 +107,10 @@ func (n *network) Connect() (Node, error) { return newNode(n) } -// TODO: establish links for peering networks +// Peer is used to establish a link between two networks. +// e.g micro.mu connects to example.com and share routes +// This is done by creating a new node on both networks +// and creating a link between them. func (n *network) Peer(Network) (Link, error) { // New network was created using NewNetwork after receiving routes from a different node @@ -125,9 +131,13 @@ func (n *network) Peer(Network) (Link, error) { func newNetwork(opts ...options.Option) *network { options := options.NewOptions(opts...) - // new network instance + // new network instance with defaults net := &network{ - id: DefaultId, + Options: options, + id: DefaultId, + router: router.DefaultRouter, + proxy: new(mucp.Proxy), + resolver: new(nreg.Resolver), } // get network id @@ -140,24 +150,18 @@ func newNetwork(opts ...options.Option) *network { r, ok := options.Values().Get("network.router") if ok { net.router = r.(router.Router) - } else { - net.router = router.DefaultRouter } // get proxy p, ok := options.Values().Get("network.proxy") if ok { net.proxy = p.(proxy.Proxy) - } else { - net.proxy = new(mucp.Proxy) } // get resolver res, ok := options.Values().Get("network.resolver") if ok { net.resolver = res.(resolver.Resolver) - } else { - net.resolver = new(nreg.Resolver) } return net diff --git a/network/link.go b/network/link.go index 5fb6b3e5..c6c1d023 100644 --- a/network/link.go +++ b/network/link.go @@ -1,30 +1,38 @@ package network import ( + "errors" + "fmt" + "io" "sync" + gproto "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" pb "github.com/micro/go-micro/network/proto" + "github.com/micro/go-micro/transport" ) type link struct { // the embedded node *node + sync.RWMutex + // the link id id string - // queue buffer for this link + // the send queue to the socket queue chan *Message + // codec we use to marshal things + codec codec.Marshaler + // the socket for this link - socket *socket + socket transport.Socket // the lease for this link lease *pb.Lease - // length and weight of the link - mtx sync.RWMutex - // determines the cost of the link // based on queue length and roundtrip length int @@ -33,25 +41,252 @@ type link struct { // link methods -// bring up the link -func (l *link) up() error { - // TODO: manage the length/weight of the link - return l.socket.accept() +// process processe messages on the send queue +func (l *link) process() { + for { + select { + case m := <-l.queue: + if err := l.send(m, nil); err != nil { + return + } + } + } } -// kill the link -func (l *link) down() error { - return l.socket.close() +// accept waits for the connect message from the remote end +// if it receives anything else it throws an error +func (l *link) accept() error { + for { + m := new(transport.Message) + err := l.socket.Recv(m) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + // TODO: pick a reliable header + event := m.Header["Micro-Method"] + + switch event { + // connect event + case "Connect": + // process connect events from network.Connect() + // these are new connections to join the network + + // decode the connection event + conn := new(pb.Connect) + if err := l.codec.Unmarshal(m.Body, conn); err != nil { + // skip error + continue + } + + // get the existing lease if it exists + lease := conn.Lease + // if there's no lease create a new one + if lease == nil { + // create a new lease/node + lease = l.node.network.lease() + } + + // send back a lease offer for the node + if err := l.send(&Message{ + Header: map[string]string{ + "Micro-Method": "Lease", + }, + }, lease); err != nil { + return err + } + + // the lease is saved + l.Lock() + l.lease = lease + l.Unlock() + + // we've connected + // start processing the messages + go l.process() + + return nil + case "Close": + l.Close() + return errors.New("connection closed") + default: + return errors.New("unknown method: " + event) + } + } +} + +// connect sends a connect request and waits on a lease. +// this is for a new connection. in the event we send +// an existing lease, the same lease should be returned. +// if it differs then we assume our address for this link +// is different... +func (l *link) connect() error { + // get the current lease + l.RLock() + lease := l.lease + l.RUnlock() + + // send a lease request + if err := l.send(&Message{ + Header: map[string]string{ + "Micro-Method": "Connect", + }, + }, &pb.Connect{Lease: lease}); err != nil { + return err + } + + // create the new things + tm := new(Message) + newLease := new(pb.Lease) + + // wait for a response, hopefully a lease + if err := l.recv(tm, newLease); err != nil { + return err + } + + event := tm.Header["Micro-Method"] + + // check the method + switch event { + case "Lease": + // save the lease + l.Lock() + l.lease = newLease + l.Unlock() + + // start processing the messages + go l.process() + case "Close": + l.socket.Close() + return errors.New("connection closed") + default: + return errors.New("unable to attain lease") + } + + return nil +} + +// send a message over the link +func (l *link) send(m *Message, v interface{}) error { + tm := new(transport.Message) + tm.Header = m.Header + tm.Body = m.Body + + // set the body if not nil + // we're assuming this is network message + if v != nil { + // encode the data + b, err := l.codec.Marshal(v) + if err != nil { + return err + } + + // set the content type + tm.Header["Content-Type"] = "application/protobuf" + // set the marshalled body + tm.Body = b + } + + fmt.Printf("link %s sending %+v %+v\n", l.id, m, v) + + // send via the transport socket + return l.socket.Send(&transport.Message{ + Header: m.Header, + Body: m.Body, + }) +} + +// recv a message on the link +func (l *link) recv(m *Message, v interface{}) error { + if m.Header == nil { + m.Header = make(map[string]string) + } + + tm := new(transport.Message) + + // receive the transport message + if err := l.socket.Recv(tm); err != nil { + return err + } + + fmt.Printf("link %s receiving %+v %+v\n", l.id, tm, v) + + // set the message + m.Header = tm.Header + m.Body = tm.Body + + // bail early + if v == nil { + return nil + } + + // try unmarshal the body + // skip if there's no content-type + if tm.Header["Content-Type"] != "application/protobuf" { + return nil + } + + // return unmarshalled + return l.codec.Unmarshal(m.Body, v.(gproto.Message)) +} + +// Close the link +func (l *link) Close() error { + // send a final close message + l.socket.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Method": "Close", + }, + }) + // close the socket + return l.socket.Close() +} + +// returns the node id +func (l *link) Id() string { + l.RLock() + defer l.RUnlock() + if l.lease == nil { + return "" + } + return l.lease.Node.Id +} + +// Address of the node we're connected to +func (l *link) Address() string { + l.RLock() + defer l.RUnlock() + if l.lease == nil { + return l.socket.Remote() + } + // the node in the lease + return l.lease.Node.Address } func (l *link) Length() int { - l.mtx.RLock() - defer l.mtx.RUnlock() + l.RLock() + defer l.RUnlock() return l.length } func (l *link) Weight() int { - l.mtx.RLock() - defer l.mtx.RUnlock() + l.RLock() + defer l.RUnlock() return l.weight } + +func (l *link) Accept() (*Message, error) { + m := new(Message) + err := l.recv(m, nil) + if err != nil { + return nil, err + } + return m, nil +} + +func (l *link) Send(m *Message) error { + return l.send(m, nil) +} diff --git a/network/node.go b/network/node.go index d9ebadd2..914a602b 100644 --- a/network/node.go +++ b/network/node.go @@ -1,7 +1,11 @@ package network import ( + "errors" + "fmt" + "net" "runtime/debug" + "strconv" "sync" "time" @@ -9,6 +13,7 @@ import ( "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/util/addr" "github.com/micro/go-micro/util/log" pb "github.com/micro/go-micro/network/proto" @@ -20,7 +25,7 @@ type node struct { // closed channel closed chan bool - mtx sync.RWMutex + sync.RWMutex // the node id id string @@ -40,32 +45,46 @@ type node struct { // leases for connections to us // link id:link links map[string]*link + + // messages received over links + recv chan *Message + // messages received over links + send chan *Message } // network methods func newNode(n *network) (*node, error) { // create a new node - node := new(node) - // closed channel - node.closed = make(chan bool) - // set the nodes network - node.network = n - - // initially we have no id - // create an id and address - // TODO: create a real unique id and address - // lease := n.lease() - // set the node id - // node.id = lease.Node.Id + node := &node{ + // the links + links: make(map[string]*link), + // closed channel + closed: make(chan bool), + // set the nodes network + network: n, + // set the default transport + transport: transport.DefaultTransport, + // set the default registry + registry: registry.DefaultRegistry, + // receive channel for accepted connections + recv: make(chan *Message, 128), + // send channel for accepted connections + send: make(chan *Message, 128), + } // get the transport we're going to use for our tunnels + // TODO: set to quic or tunnel or something else t, ok := n.Options.Values().Get("network.transport") if ok { node.transport = t.(transport.Transport) - } else { - // TODO: set to quic - node.transport = transport.DefaultTransport + } + + // register the node with the registry for the network + // TODO: use a registrar or something else for local things + r, ok := n.Options.Values().Get("network.registry") + if ok { + node.registry = r.(registry.Registry) } // we listen on a random address, this is not advertised @@ -77,8 +96,6 @@ func newNode(n *network) (*node, error) { // set the listener node.listener = l - // TODO: this should be an overlay address - // ideally received via some dhcp style broadcast node.address = l.Addr() // TODO: start the router and broadcast advertisements @@ -90,36 +107,51 @@ func newNode(n *network) (*node, error) { // process any incoming messages on the listener // this is our inbound network connection - node.accept(l) + go node.accept(l) - // register the node with the registry for the network - // TODO: use a registrar or something else for local things - r, ok := n.Options.Values().Get("network.registry") - if ok { - node.registry = r.(registry.Registry) - } else { - node.registry = registry.DefaultRegistry - } + // process any messages being sent by node.Send + // forwards to every link we have + go node.process() // lookup the network to see if there's any nodes records := n.lookup(node.registry) - // should we actually do this? + // assuming if there are no records, we are the first + // we set ourselves a lease. should we actually do this? if len(records) == 0 { // set your own node id lease := n.lease() node.id = lease.Node.Id } - // register self with the network registry + var port int + // TODO: this should be an overlay address + // ideally received via some dhcp style broadcast + host, pp, err := net.SplitHostPort(l.Addr()) + if err == nil { + pt, _ := strconv.Atoi(pp) + port = pt + } + + // some horrible things are happening + if host == "::" { + host = "" + } + // set the address + addr, _ := addr.Extract(host) + + node.address = fmt.Sprintf("%s:%d", addr, port) + + // register self with the registry using network: prefix // this is a local registry of nodes separate to the resolver // maybe consolidate registry/resolver - // TODO: find a way to do this via gossip or something else + // TODO: find a way to do this via gossip or something like + // a registrar or tld or whatever if err := node.registry.Register(®istry.Service{ // register with the network id Name: "network:" + n.Id(), Nodes: []*registry.Node{ - {Id: node.id, Address: node.address}, + {Id: node.id, Address: addr, Port: port}, }, }); err != nil { node.Close() @@ -134,7 +166,20 @@ func newNode(n *network) (*node, error) { // wait forever to connect // TODO: do something with the links we receive - <-linkChan + link := <-linkChan + + // process this link + go node.manage(link) + + go func() { + // process any further new links + select { + case l := <-linkChan: + go node.manage(l) + case <-node.closed: + return + } + }() return node, nil } @@ -158,28 +203,82 @@ func (n *node) accept(l transport.Listener) error { }() // create a new link - // generate a new link link := &link{ + // link has a unique id + id: uuid.New().String(), + // proto marshaler + codec: proto.Marshaler{}, + // link has a socket + socket: sock, + // for generating leases, node: n, - id: uuid.New().String(), + // the send queue, + queue: make(chan *Message, 128), } - // create a new network socket - sk := new(socket) - sk.node = n - sk.codec = proto.Marshaler{} - sk.socket = sock - // set link socket - link.socket = sk + log.Debugf("Accepting connection from %s", link.socket.Remote()) - // accept messages on the socket - // blocks forever or until error - if err := link.up(); err != nil { - // TODO: delete link + // wait for the link to be connected + // the remote end will send "Connect" + // and we will return a "Lease" + if err := link.accept(); err != nil { + return } + + log.Debugf("Accepted link from %s", link.socket.Remote()) + + // save with the remote address as the key + // where we attempt to connect to nodes + // we do not connect to the same thing + n.Lock() + n.links[link.socket.Remote()] = link + n.Unlock() + + // manage the link for its lifetime + n.manage(link) }) } +// processes the send queue +func (n *node) process() { + for { + select { + case <-n.closed: + return + // process outbound messages on the send queue + // these messages are received from n.Send + case m := <-n.send: + // queue the message on each link + // TODO: more than likely use proxy + n.RLock() + for _, l := range n.links { + l.queue <- m + } + n.RUnlock() + } + } +} + +func (n *node) manage(l *link) { + // now process inbound messages on the link + // assumption is this handles everything else + for { + // get a message on the link + m := new(Message) + if err := l.recv(m, nil); err != nil { + // ??? + return + } + + select { + case <-n.closed: + return + // send to the recv channel e.g node.Accept() + case n.recv <- m: + } + } +} + // connect attempts to periodically connect to new nodes in the network. // It will only do this if it has less than 3 connections. this method // is called by network.Connect and fired in a go routine after establishing @@ -188,30 +287,37 @@ func (n *node) accept(l transport.Listener) error { func (n *node) connect(linkChan chan *link) { // TODO: adjustable ticker t := time.NewTicker(time.Second) + var lease *pb.Lease for { select { + // exit when told to do so + case <-n.closed: + return // on every tick check the number of links and then attempt // to connect to new nodes if we don't have sufficient links case <-t.C: - n.mtx.RLock() + n.RLock() // only start processing if we have less than 3 links if len(n.links) > 2 { - n.mtx.RUnlock() + n.RUnlock() continue } // get a list of link addresses so we don't reconnect // to the ones we're already connected to nodes := map[string]bool{} - for _, l := range n.links { - nodes[l.lease.Node.Address] = true + for addr, _ := range n.links { + // id is the lookup address used to connect + nodes[addr] = true } - n.mtx.RUnlock() + // unlock our read lock + n.RUnlock() + // lookup records for our network records := n.network.lookup(n.registry) // for each record check we haven't already got a connection @@ -224,53 +330,53 @@ func (n *node) connect(linkChan chan *link) { for _, record := range records { // skip existing connections if nodes[record.Address] { + log.Debugf("Skipping connection to %s", record.Address) continue } // attempt to connect and create a link + log.Debugf("Dialing connection to %s", record.Address) + // connect to the node - s, err := n.transport.Dial(record.Address) + sock, err := n.transport.Dial(record.Address) if err != nil { + log.Debugf("Dialing connection error %v", err) continue } - // create a new socket - sk := &socket{ - node: n, - codec: &proto.Marshaler{}, - socket: s, - } - - // broadcast a "connect" request and get back "lease" - // this is your tunnel to the outside world and to the network - // then push updates and messages over this link - // first connect will not have a lease so we get one with node id/address - l, err := sk.connect(lease) - if err != nil { - s.Close() - continue - } - - // set lease for next time - lease = l - // create a new link with the lease and socket link := &link{ + codec: &proto.Marshaler{}, id: uuid.New().String(), lease: lease, - node: n, + socket: sock, queue: make(chan *Message, 128), - socket: sk, } - // bring up the link - go link.up() + log.Debugf("Connecting link to %s", record.Address) + // connect the link: + // this broadcasts a "connect" request and gets back a "lease" + // this is the tunnel to the outside world and to the network + // then push updates and messages over this link + // first connect will not have a lease so we get one with node id/address + if err := link.connect(); err != nil { + // shit + link.Close() + continue + } + + log.Debugf("Connected link to %s", record.Address) + + n.Lock() + // set lease for next time we connect to anything else + // we want to use the same lease for that. in future + // we may have to expire the lease + lease = link.lease // save the new link - n.mtx.Lock() - n.links[link.id] = link - n.mtx.Unlock() + n.links[link.socket.Remote()] = link + n.Unlock() // drop this down the link channel to the network // so it can manage the links @@ -280,8 +386,6 @@ func (n *node) connect(linkChan chan *link) { default: } } - case <-n.closed: - return } } } @@ -296,14 +400,17 @@ func (n *node) Close() error { case <-n.closed: return nil default: + // mark as closed close(n.closed) + // shutdown all the links - n.mtx.Lock() + n.Lock() for id, link := range n.links { - link.down() + link.Close() delete(n.links, id) } - n.mtx.Unlock() + n.Unlock() + // deregister self n.registry.Deregister(®istry.Service{ Name: "network:" + n.network.Id(), @@ -311,14 +418,29 @@ func (n *node) Close() error { {Id: n.id, Address: n.address}, }, }) + + // shutdown the listener return n.listener.Close() } return nil } +// Accept receives the incoming messages from all links func (n *node) Accept() (*Message, error) { // process the inbound cruft - + for { + select { + case m, ok := <-n.recv: + if !ok { + return nil, errors.New("connection closed") + } + // return the message + return m, nil + case <-n.closed: + return nil, errors.New("connection closed") + } + } + // we never get here return nil, nil } @@ -326,22 +448,13 @@ func (n *node) Network() string { return n.network.id } +// Send propagates a message over all links. This should probably use its proxy. func (n *node) Send(m *Message) error { - n.mtx.RLock() - defer n.mtx.RUnlock() - - var gerr error - - // send to all links - // TODO: be smarter - for _, link := range n.links { - // TODO: process the error, do some link flap detection - // blackhole the connection, etc - if err := link.socket.send(m, nil); err != nil { - gerr = err - continue - } + select { + case <-n.closed: + return errors.New("connection closed") + case n.send <- m: + // send the message } - - return gerr + return nil } diff --git a/network/resolver/registry/registry.go b/network/resolver/registry/registry.go index c7ef796a..09228970 100644 --- a/network/resolver/registry/registry.go +++ b/network/resolver/registry/registry.go @@ -2,6 +2,8 @@ package registry import ( + "fmt" + "github.com/micro/go-micro/network/resolver" "github.com/micro/go-micro/registry" ) @@ -27,8 +29,13 @@ func (r *Resolver) Resolve(id string) ([]*resolver.Record, error) { for _, service := range services { for _, node := range service.Nodes { + addr := node.Address + // such a hack + if node.Port > 0 { + addr = fmt.Sprintf("%s:%d", node.Address, node.Port) + } records = append(records, &resolver.Record{ - Address: node.Address, + Address: addr, }) } } diff --git a/network/socket.go b/network/socket.go deleted file mode 100644 index 3b6de23a..00000000 --- a/network/socket.go +++ /dev/null @@ -1,176 +0,0 @@ -package network - -import ( - "io" - - gproto "github.com/golang/protobuf/proto" - "github.com/google/uuid" - "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/transport" - - pb "github.com/micro/go-micro/network/proto" -) - -type socket struct { - node *node - codec codec.Marshaler - socket transport.Socket -} - -func (s *socket) close() error { - return s.socket.Close() -} - -// accept is the state machine that processes messages on the socket -func (s *socket) accept() error { - for { - m := new(transport.Message) - err := s.socket.Recv(m) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - // TODO: pick a reliable header - event := m.Header["Micro-Method"] - - switch event { - // connect event - case "connect": - // process connect events from network.Connect() - // these are new connections to join the network - - // decode the connection event - conn := new(pb.Connect) - if err := s.codec.Unmarshal(m.Body, conn); err != nil { - // skip error - continue - } - - // get the existing lease if it exists - lease := conn.Lease - if lease == nil { - // create a new lease/node - lease = s.node.network.lease() - } - - // send back a lease offer for the node - if err := s.send(&Message{ - Header: map[string]string{ - "Micro-Method": "lease", - }, - }, lease); err != nil { - return err - } - - // record this mapping of socket to node/lease - s.node.mtx.Lock() - id := uuid.New().String() - s.node.links[id] = &link{ - node: s.node, - id: id, - lease: lease, - queue: make(chan *Message, 128), - socket: s, - } - s.node.mtx.Unlock() - // a route update - case "route": - // process router events - - // received a lease - case "lease": - // no op as we don't process lease events on existing connections - // these are in response to a connect message - default: - // process all other messages - } - } -} - -// connect sends a connect request and waits on a lease. -// this is for a new connection. in the event we send -// an existing lease, the same lease should be returned. -// if it differs then we assume our address for this link -// is different... -func (s *socket) connect(l *pb.Lease) (*pb.Lease, error) { - // send a lease request - if err := s.send(&Message{ - Header: map[string]string{ - "Micro-Method": "connect", - }, - }, &pb.Connect{Lease: l}); err != nil { - return nil, err - } - - // create the new things - tm := new(Message) - lease := new(pb.Lease) - - // wait for a lease response - if err := s.recv(tm, lease); err != nil { - return nil, err - } - - return lease, nil -} - -func (s *socket) send(m *Message, v interface{}) error { - tm := new(transport.Message) - tm.Header = m.Header - tm.Body = m.Body - - // set the body if not nil - // we're assuming this is network message - if v != nil { - // encode the data - b, err := s.codec.Marshal(v) - if err != nil { - return err - } - - // set the content type - tm.Header["Content-Type"] = "application/protobuf" - // set the marshalled body - tm.Body = b - } - - // send via the transport socket - return s.socket.Send(&transport.Message{ - Header: m.Header, - Body: m.Body, - }) -} - -func (s *socket) recv(m *Message, v interface{}) error { - if m.Header == nil { - m.Header = make(map[string]string) - } - - tm := new(transport.Message) - - // receive the transport message - if err := s.socket.Recv(tm); err != nil { - return err - } - - // set the message - m.Header = tm.Header - m.Body = tm.Body - - // bail early - if v == nil { - return nil - } - - // try unmarshal the body - // skip if there's no content-type - if tm.Header["Content-Type"] != "application/protobuf" { - return nil - } - - // return unmarshalled - return s.codec.Unmarshal(m.Body, v.(gproto.Message)) -}