diff --git a/network/default.go b/network/default.go index 29e0bbb2..bc4024f6 100644 --- a/network/default.go +++ b/network/default.go @@ -1,144 +1,136 @@ package network import ( - "crypto/sha256" "fmt" + "io" "sync" - "time" "github.com/google/uuid" "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/network/proxy" - "github.com/micro/go-micro/network/proxy/mucp" - "github.com/micro/go-micro/network/resolver" - "github.com/micro/go-micro/network/router" - "github.com/micro/go-micro/registry" - - pb "github.com/micro/go-micro/network/proto" - nreg "github.com/micro/go-micro/network/resolver/registry" + "github.com/micro/go-micro/network/transport" + "github.com/micro/go-micro/util/addr" ) +// default network implementation type network struct { options.Options - // resolver use to connect to the network - resolver resolver.Resolver - - // router used to find routes in the network - router router.Router - - // proxy used to route through the network - proxy proxy.Proxy - - // name of this network + // name of the network name string - // links maintained for this network - // based on peers not nodes. maybe maintain - // node separately or note that links have nodes - mtx sync.RWMutex - links []Link + // transport + transport transport.Transport } -// network methods +type listener struct { + // start accepting once + once sync.Once + // close channel to close the connection + closed chan bool + // the listener + listener transport.Listener + // the connection queue + conns chan Conn +} -// lease generates a new lease with a node id/address -// TODO: use a consensus mechanism, pool or some deterministic -// unique addressing method. -func (n *network) lease(muid string) *pb.Lease { - // create the id - id := uuid.New().String() - // create a timestamp - now := time.Now().UnixNano() - - // create the address by hashing the id and timestamp - h := sha256.New() - h.Write([]byte(fmt.Sprintf("%s-%d\n", id, now))) - // magic new address - address := fmt.Sprintf("%x", h.Sum(nil)) - - // return the node - return &pb.Lease{ - Id: id, - Timestamp: now, - Node: &pb.Node{ - Muid: muid, - Id: id, - Address: address, - Network: n.name, +func (n *network) Create() (*Node, error) { + ip, err := addr.Extract("") + if err != nil { + return nil, err + } + return &Node{ + Id: fmt.Sprintf("%s-%s", n.name, uuid.New().String()), + Address: ip, + Metadata: map[string]string{ + "network": n.Name(), }, - } -} - -// lookup returns a list of network records in priority order of local -func (n *network) lookup(r registry.Registry) []*resolver.Record { - // create a registry resolver to find local nodes - rr := nreg.Resolver{Registry: r} - - // get all the nodes for the network that are local - localRecords, err := rr.Resolve(n.Name()) - if err != nil { - // we're not in a good place here - } - - // if its a local network we never try lookup anything else - if n.Name() == "local" { - return localRecords - } - - // now resolve incrementally based on resolvers specified - networkRecords, err := n.resolver.Resolve(n.Name()) - if err != nil { - // still not in a good place - } - - // return aggregate records - return append(localRecords, networkRecords...) + }, nil } func (n *network) Name() string { return n.name } -// Connect connects to the network and returns a new node. -// The node is the callers connection to the network. They -// should advertise this address to people. Anyone else -// on the network should be able to route to it. -func (n *network) Connect() (Node, error) { - return newNode(n) +func (n *network) Connect(node *Node) (Conn, error) { + c, err := n.transport.Dial(node.Address) + if err != nil { + return nil, err + } + return newLink(c.(transport.Socket)), nil } -// Peer is used to establish a link between two networks. -// e.g micro.mu connects to example.com and share routes -// This is done by creating a new node on both networks -// and creating a link between them. -func (n *network) Peer(Network) (Link, error) { - // New network was created using NewNetwork after receiving routes from a different node - - // Connect to the new network and be assigned a node - - // Transfer data between the networks - - // take other resolver - // order: registry (local), ...resolver - // resolve the network - - // periodically connect to nodes resolved in the network - // and add to the network links - return nil, nil +func (n *network) Listen(node *Node) (Listener, error) { + l, err := n.transport.Listen(node.Address) + if err != nil { + return nil, err + } + return newListener(l), nil +} + +func (l *listener) process() { + if err := l.listener.Accept(l.accept); err != nil { + // close the listener + l.Close() + } +} + +func (l *listener) accept(sock transport.Socket) { + // create a new link and pass it through + link := newLink(sock) + + // send it + l.conns <- link + + // wait for it to be closed + select { + case <-l.closed: + return + case <-link.closed: + return + } +} + +func (l *listener) Address() string { + return l.listener.Addr() +} + +func (l *listener) Close() error { + select { + case <-l.closed: + return nil + default: + close(l.closed) + } + return nil +} + +func (l *listener) Accept() (Conn, error) { + l.once.Do(func() { + // TODO: catch the error + go l.process() + }) + select { + case c := <-l.conns: + return c, nil + case <-l.closed: + return nil, io.EOF + } +} + +func newListener(l transport.Listener) *listener { + return &listener{ + closed: make(chan bool), + conns: make(chan Conn), + listener: l, + } } -// newNetwork returns a new network interface func newNetwork(opts ...options.Option) *network { options := options.NewOptions(opts...) - // new network instance with defaults net := &network{ - Options: options, - name: DefaultName, - router: router.DefaultRouter, - proxy: new(mucp.Proxy), - resolver: new(nreg.Resolver), + name: DefaultName, + transport: transport.DefaultTransport, } // get network name @@ -147,22 +139,10 @@ func newNetwork(opts ...options.Option) *network { net.name = name.(string) } - // get router - r, ok := options.Values().Get("network.router") + // get network transport + t, ok := options.Values().Get("network.transport") if ok { - net.router = r.(router.Router) - } - - // get proxy - p, ok := options.Values().Get("network.proxy") - if ok { - net.proxy = p.(proxy.Proxy) - } - - // get resolver - res, ok := options.Values().Get("network.resolver") - if ok { - net.resolver = res.(resolver.Resolver) + net.transport = t.(transport.Transport) } return net diff --git a/network/default_test.go b/network/default_test.go new file mode 100644 index 00000000..da10fdc8 --- /dev/null +++ b/network/default_test.go @@ -0,0 +1,86 @@ +package network + +import ( + "io" + "testing" +) + +func TestNetwork(t *testing.T) { + // create a new network + n := newNetwork() + + // create a new node + node, err := n.Create() + if err != nil { + t.Fatal(err) + } + + // set ourselves a random port + node.Address = node.Address + ":0" + + l, err := n.Listen(node) + if err != nil { + t.Fatal(err) + } + + wait := make(chan error) + + go func() { + var gerr error + + for { + c, err := l.Accept() + if err != nil { + gerr = err + break + } + m := new(Message) + if err := c.Recv(m); err != nil { + gerr = err + break + } + if err := c.Send(m); err != nil { + gerr = err + break + } + } + + wait <- gerr + }() + + node.Address = l.Address() + + // connect to the node + conn, err := n.Connect(node) + if err != nil { + t.Fatal(err) + } + + // send a message + if err := conn.Send(&Message{ + Header: map[string]string{"Foo": "bar"}, + Body: []byte(`hello world`), + }); err != nil { + t.Fatal(err) + } + + m := new(Message) + // send a message + if err := conn.Recv(m); err != nil { + t.Fatal(err) + } + + if m.Header["Foo"] != "bar" { + t.Fatalf("Received unexpected message %+v", m) + } + + // close the listener + l.Close() + + // get listener error + err = <-wait + + if err != io.EOF { + t.Fatal(err) + } +} diff --git a/network/link.go b/network/link.go index 656abe24..e805e55c 100644 --- a/network/link.go +++ b/network/link.go @@ -5,18 +5,11 @@ import ( "io" "sync" - gproto "github.com/golang/protobuf/proto" "github.com/google/uuid" - "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/codec/proto" - pb "github.com/micro/go-micro/network/proto" "github.com/micro/go-micro/network/transport" ) type link struct { - // the embedded node - *node - closed chan bool sync.RWMutex @@ -29,15 +22,9 @@ type link struct { // the recv queue to the socket recvQueue chan *Message - // codec we use to marshal things - codec codec.Marshaler - // the socket for this link socket transport.Socket - // the lease for this link - lease *pb.Lease - // determines the cost of the link // based on queue length and roundtrip length int @@ -48,17 +35,16 @@ var ( ErrLinkClosed = errors.New("link closed") ) -func newLink(n *node, sock transport.Socket, lease *pb.Lease) *link { - return &link{ +func newLink(sock transport.Socket) *link { + l := &link{ id: uuid.New().String(), - closed: make(chan bool), - codec: &proto.Marshaler{}, - node: n, - lease: lease, socket: sock, + closed: make(chan bool), sendQueue: make(chan *Message, 128), recvQueue: make(chan *Message, 128), } + go l.process() + return l } // link methods @@ -69,13 +55,7 @@ func (l *link) process() { go func() { for { m := new(Message) - if err := l.recv(m, nil); err != nil { - return - } - - // check if it's an internal close method - if m.Header["Micro-Method"] == "close" { - l.Close() + if err := l.recv(m); err != nil { return } @@ -90,7 +70,7 @@ func (l *link) process() { for { select { case m := <-l.sendQueue: - if err := l.send(m, nil); err != nil { + if err := l.send(m); err != nil { return } case <-l.closed: @@ -99,169 +79,17 @@ func (l *link) process() { } } -// accept waits for the connect message from the remote end -// if it receives anything else it throws an error -func (l *link) accept() error { - for { - m := new(transport.Message) - err := l.socket.Recv(m) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - // TODO: pick a reliable header - event := m.Header["Micro-Method"] - - switch event { - // connect event - case "connect": - // process connect events from network.Connect() - // these are new connections to join the network - - // decode the connection event - conn := new(pb.Connect) - // expecting a connect message - if err := l.codec.Unmarshal(m.Body, conn); err != nil { - // skip error - continue - } - - // no micro id close the link - if len(conn.Muid) == 0 { - l.Close() - return errors.New("invalid muid " + conn.Muid) - } - - // get the existing lease if it exists - lease := conn.Lease - // if there's no lease create a new one - if lease == nil { - // create a new lease/node - lease = l.node.network.lease(conn.Muid) - } - - // check if we connected to ourself - if conn.Muid == l.node.muid { - // check our own leasae - l.node.Lock() - if l.node.lease == nil { - l.node.lease = lease - } - l.node.Unlock() - } - - // set the author to our own muid - lease.Author = l.node.muid - - // send back a lease offer for the node - if err := l.send(&Message{ - Header: map[string]string{ - "Micro-Method": "lease", - }, - }, lease); err != nil { - return err - } - - // the lease is saved - l.Lock() - l.lease = lease - l.Unlock() - - // we've connected - // start processing the messages - go l.process() - return nil - case "close": - l.Close() - return io.EOF - default: - return errors.New("unknown method: " + event) - } - } -} - -// connect sends a connect request and waits on a lease. -// this is for a new connection. in the event we send -// an existing lease, the same lease should be returned. -// if it differs then we assume our address for this link -// is different... -func (l *link) connect() error { - // get the current lease - l.RLock() - lease := l.lease - l.RUnlock() - - // send a lease request - if err := l.send(&Message{ - Header: map[string]string{ - "Micro-Method": "connect", - }, - }, &pb.Connect{Muid: l.node.muid, Lease: lease}); err != nil { - return err - } - - // create the new things - tm := new(Message) - newLease := new(pb.Lease) - - // wait for a response, hopefully a lease - if err := l.recv(tm, newLease); err != nil { - return err - } - - event := tm.Header["Micro-Method"] - - // check the method - switch event { - case "lease": - // save the lease - l.Lock() - l.lease = newLease - l.Unlock() - - // start processing the messages - go l.process() - case "close": - l.Close() - return io.EOF - default: - l.Close() - return errors.New("unable to attain lease") - } - - return nil -} - // send a message over the link -func (l *link) send(m *Message, v interface{}) error { +func (l *link) send(m *Message) error { tm := new(transport.Message) tm.Header = m.Header tm.Body = m.Body - - // set the body if not nil - // we're assuming this is network message - if v != nil { - // encode the data - b, err := l.codec.Marshal(v) - if err != nil { - return err - } - - // set the content type - tm.Header["Content-Type"] = "application/protobuf" - // set the marshalled body - tm.Body = b - } - // send via the transport socket return l.socket.Send(tm) } // recv a message on the link -func (l *link) recv(m *Message, v interface{}) error { +func (l *link) recv(m *Message) error { if m.Header == nil { m.Header = make(map[string]string) } @@ -277,19 +105,7 @@ func (l *link) recv(m *Message, v interface{}) error { m.Header = tm.Header m.Body = tm.Body - // bail early - if v == nil { - return nil - } - - // try unmarshal the body - // skip if there's no content-type - if tm.Header["Content-Type"] != "application/protobuf" { - return nil - } - - // return unmarshalled - return l.codec.Unmarshal(m.Body, v.(gproto.Message)) + return nil } // Close the link @@ -299,35 +115,27 @@ func (l *link) Close() error { return nil default: close(l.closed) + return l.socket.Close() } - - // send a final close message - return l.socket.Send(&transport.Message{ - Header: map[string]string{ - "Micro-Method": "close", - }, - }) } // returns the node id func (l *link) Id() string { l.RLock() defer l.RUnlock() - if l.lease == nil { - return "" - } - return l.lease.Node.Id + return l.id } -// Address of the node we're connected to -func (l *link) Address() string { +func (l *link) Remote() string { l.RLock() defer l.RUnlock() - if l.lease == nil { - return l.socket.Remote() - } - // the node in the lease - return l.lease.Node.Address + return l.socket.Remote() +} + +func (l *link) Local() string { + l.RLock() + defer l.RUnlock() + return l.socket.Local() } func (l *link) Length() int { @@ -341,15 +149,16 @@ func (l *link) Weight() int { } // Accept accepts a message on the socket -func (l *link) Accept() (*Message, error) { +func (l *link) Recv(m *Message) error { select { case <-l.closed: - return nil, io.EOF - case m := <-l.recvQueue: - return m, nil + return io.EOF + case rm := <-l.recvQueue: + *m = *rm + return nil } // never reach - return nil, nil + return nil } // Send sends a message on the socket immediately diff --git a/network/network.go b/network/network.go index f9b32090..12dc6b8c 100644 --- a/network/network.go +++ b/network/network.go @@ -10,46 +10,46 @@ import ( // is responsible for routing messages to the correct services. type Network interface { options.Options + // Create starts the network + Create() (*Node, error) // Name of the network Name() string - // Connect to the network - Connect() (Node, error) - // Peer with a neighboring network - Peer(Network) (Link, error) + // Connect to a node + Connect(*Node) (Conn, error) + // Listen for connections + Listen(*Node) (Listener, error) } -// Node represents a single node on a network -type Node interface { - // Id of the node - Id() string - // Address of the node +type Node struct { + Id string + Address string + Metadata map[string]string +} + +type Listener interface { Address() string - // The network of the node - Network() string - // Close the network connection Close() error - // Accept messages on the network - Accept() (*Message, error) - // Send a message to the network + Accept() (Conn, error) +} + +type Conn interface { + // Unique id of the connection + Id() string + // Close the connection + Close() error + // Send a message Send(*Message) error + // Receive a message + Recv(*Message) error + // The remote node + Remote() string + // The local node + Local() string } -// Link is a connection between one network and another -type Link interface { - // remote node the link is peered with - Node - // length defines the speed or distance of the link - Length() int - // weight defines the saturation or usage of the link - Weight() int -} - -// Message is the base type for opaque data type Message struct { - // Headers which provide local/remote info Header map[string]string - // The opaque data being sent - Body []byte + Body []byte } var ( diff --git a/network/transport/mucp/listener.go b/network/transport/mucp/listener.go deleted file mode 100644 index ceb7aae0..00000000 --- a/network/transport/mucp/listener.go +++ /dev/null @@ -1,44 +0,0 @@ -package mucp - -import ( - "github.com/micro/go-micro/network/transport" -) - -type listener struct { - // stream id - id string - // address of the listener - addr string - // close channel - closed chan bool - // accept socket - accept chan *socket -} - -func (n *listener) Addr() string { - return n.addr -} - -func (n *listener) Close() error { - select { - case <-n.closed: - default: - close(n.closed) - } - return nil -} - -func (n *listener) Accept(fn func(s transport.Socket)) error { - for { - select { - case <-n.closed: - return nil - case s, ok := <-n.accept: - if !ok { - return nil - } - go fn(s) - } - } - return nil -} diff --git a/network/transport/mucp/network.go b/network/transport/mucp/network.go deleted file mode 100644 index ae0046b9..00000000 --- a/network/transport/mucp/network.go +++ /dev/null @@ -1,342 +0,0 @@ -// Package mucp provides a mucp network transport -package mucp - -import ( - "context" - "crypto/sha256" - "errors" - "fmt" - "sync" - - "github.com/micro/go-micro/network" - "github.com/micro/go-micro/network/transport" -) - -type networkKey struct{} - -// Transport is a mucp transport. It should only -// be created with NewTransport and cast to -// *Transport if there's a need to close it. -type Transport struct { - options transport.Options - - // the network interface - network network.Network - - // protect all the things - sync.RWMutex - - // connect - connected bool - // connected node - node network.Node - // the send channel - send chan *message - // close channel - closed chan bool - - // sockets - sockets map[string]*socket - // listeners - listeners map[string]*listener -} - -func (n *Transport) newListener(addr string) *listener { - // hash the id - h := sha256.New() - h.Write([]byte(addr)) - id := fmt.Sprintf("%x", h.Sum(nil)) - - // create the listener - l := &listener{ - id: id, - addr: addr, - closed: make(chan bool), - accept: make(chan *socket, 128), - } - - // save it - n.Lock() - n.listeners[id] = l - n.Unlock() - - return l -} - -func (n *Transport) getListener(id string) (*listener, bool) { - // get the listener - n.RLock() - s, ok := n.listeners[id] - n.RUnlock() - return s, ok -} - -func (n *Transport) getSocket(id string) (*socket, bool) { - // get the socket - n.RLock() - s, ok := n.sockets[id] - n.RUnlock() - return s, ok -} - -func (n *Transport) newSocket(id string) *socket { - // hash the id - h := sha256.New() - h.Write([]byte(id)) - id = fmt.Sprintf("%x", h.Sum(nil)) - - // new socket - s := &socket{ - id: id, - closed: make(chan bool), - recv: make(chan *message, 128), - send: n.send, - } - - // save socket - n.Lock() - n.sockets[id] = s - n.Unlock() - - // return socket - return s -} - -// process outgoing messages -func (n *Transport) process() { - // manage the send buffer - // all pseudo sockets throw everything down this - for { - select { - case msg := <-n.send: - netmsg := &network.Message{ - Header: msg.data.Header, - Body: msg.data.Body, - } - - // set the stream id on the outgoing message - netmsg.Header["Micro-Stream"] = msg.id - - // send the message via the interface - if err := n.node.Send(netmsg); err != nil { - // no op - // TODO: do something - } - case <-n.closed: - return - } - } -} - -// process incoming messages -func (n *Transport) listen() { - for { - // process anything via the net interface - msg, err := n.node.Accept() - if err != nil { - return - } - - // a stream id - id := msg.Header["Micro-Stream"] - - // get the socket - s, exists := n.getSocket(id) - if !exists { - // get the listener - l, ok := n.getListener(id) - // there's no socket and there's no listener - if !ok { - continue - } - - // listener is closed - select { - case <-l.closed: - // delete it - n.Lock() - delete(n.listeners, l.id) - n.Unlock() - continue - default: - } - - // no socket, create one - s = n.newSocket(id) - // set remote address - s.remote = msg.Header["Remote"] - - // drop that to the listener - // TODO: non blocking - l.accept <- s - } - - // is the socket closed? - select { - case <-s.closed: - // closed - delete(n.sockets, id) - continue - default: - // process - } - - tmsg := &transport.Message{ - Header: msg.Header, - Body: msg.Body, - } - - // TODO: don't block on queuing - // append to recv backlog - s.recv <- &message{id: id, data: tmsg} - } -} - -func (n *Transport) Init(opts ...transport.Option) error { - for _, o := range opts { - o(&n.options) - } - return nil -} - -func (n *Transport) Options() transport.Options { - return n.options -} - -// Close the tunnel -func (n *Transport) Close() error { - n.Lock() - defer n.Unlock() - - if !n.connected { - return nil - } - - select { - case <-n.closed: - return nil - default: - // close all the sockets - for _, s := range n.sockets { - s.Close() - } - for _, l := range n.listeners { - l.Close() - } - // close the connection - close(n.closed) - // close node connection - n.node.Close() - // reset connected - n.connected = false - } - - return nil -} - -// Connect the tunnel -func (n *Transport) Connect() error { - n.Lock() - defer n.Unlock() - - // already connected - if n.connected { - return nil - } - - // get a new node - node, err := n.network.Connect() - if err != nil { - return err - } - - // set as connected - n.connected = true - // create new close channel - n.closed = make(chan bool) - // save node - n.node = node - - // process messages to be sent - go n.process() - // process incoming messages - go n.listen() - - return nil -} - -// Dial an address -func (n *Transport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - if err := n.Connect(); err != nil { - return nil, err - } - - // create new socket - s := n.newSocket(addr) - // set remote - s.remote = addr - // set local - n.RLock() - s.local = n.node.Address() - n.RUnlock() - - return s, nil -} - -func (n *Transport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { - // check existing listeners - n.RLock() - for _, l := range n.listeners { - if l.addr == addr { - n.RUnlock() - return nil, errors.New("already listening on " + addr) - } - } - n.RUnlock() - - // try to connect to the network - if err := n.Connect(); err != nil { - return nil, err - } - - return n.newListener(addr), nil -} - -func (n *Transport) String() string { - return "network" -} - -// NewTransport creates a new network transport -func NewTransport(opts ...transport.Option) transport.Transport { - options := transport.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - // get the network interface - n, ok := options.Context.Value(networkKey{}).(network.Network) - if !ok { - n = network.DefaultNetwork - } - - return &Transport{ - options: options, - network: n, - send: make(chan *message, 128), - closed: make(chan bool), - sockets: make(map[string]*socket), - } -} - -// WithNetwork sets the network interface -func WithNetwork(n network.Network) transport.Option { - return func(o *transport.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, networkKey{}, n) - } -} diff --git a/network/transport/mucp/socket.go b/network/transport/mucp/socket.go deleted file mode 100644 index 862da50a..00000000 --- a/network/transport/mucp/socket.go +++ /dev/null @@ -1,80 +0,0 @@ -package mucp - -import ( - "errors" - - "github.com/micro/go-micro/network/transport" -) - -// socket is our pseudo socket for transport.Socket -type socket struct { - // socket id based on Micro-Stream - id string - // closed - closed chan bool - // remote addr - remote string - // local addr - local string - // send chan - send chan *message - // recv chan - recv chan *message -} - -// message is sent over the send channel -type message struct { - // socket id - id string - // transport data - data *transport.Message -} - -func (s *socket) Remote() string { - return s.remote -} - -func (s *socket) Local() string { - return s.local -} - -func (s *socket) Id() string { - return s.id -} - -func (s *socket) Send(m *transport.Message) error { - select { - case <-s.closed: - return errors.New("socket is closed") - default: - // no op - } - // append to backlog - s.send <- &message{id: s.id, data: m} - return nil -} - -func (s *socket) Recv(m *transport.Message) error { - select { - case <-s.closed: - return errors.New("socket is closed") - default: - // no op - } - // recv from backlog - msg := <-s.recv - // set message - *m = *msg.data - // return nil - return nil -} - -func (s *socket) Close() error { - select { - case <-s.closed: - // no op - default: - close(s.closed) - } - return nil -} diff --git a/network/transport/mucp/socket_test.go b/network/transport/mucp/socket_test.go deleted file mode 100644 index a9dfbd5a..00000000 --- a/network/transport/mucp/socket_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package mucp - -import ( - "testing" - - "github.com/micro/go-micro/network/transport" -) - -func TestTunnelSocket(t *testing.T) { - s := &socket{ - id: "1", - closed: make(chan bool), - remote: "remote", - local: "local", - send: make(chan *message, 1), - recv: make(chan *message, 1), - } - - // check addresses local and remote - if s.Local() != s.local { - t.Fatalf("Expected s.Local %s got %s", s.local, s.Local()) - } - if s.Remote() != s.remote { - t.Fatalf("Expected s.Remote %s got %s", s.remote, s.Remote()) - } - - // send a message - s.Send(&transport.Message{Header: map[string]string{}}) - - // get sent message - msg := <-s.send - - if msg.id != s.id { - t.Fatalf("Expected sent message id %s got %s", s.id, msg.id) - } - - // recv a message - msg.data.Header["Foo"] = "bar" - s.recv <- msg - - m := new(transport.Message) - s.Recv(m) - - // check header - if m.Header["Foo"] != "bar" { - t.Fatalf("Did not receive correct message %+v", m) - } - - // close the connection - s.Close() - - // check connection - err := s.Send(m) - if err == nil { - t.Fatal("Expected closed connection") - } - err = s.Recv(m) - if err == nil { - t.Fatal("Expected closed connection") - } -}