diff --git a/server/rpc_codec_test.go b/server/rpc_codec_test.go index 76f5094e..9ef8e7fa 100644 --- a/server/rpc_codec_test.go +++ b/server/rpc_codec_test.go @@ -15,6 +15,8 @@ type testCodec struct { } type testSocket struct { + local string + remote string } // TestCodecWriteError simulates what happens when a codec is unable @@ -87,6 +89,14 @@ func (c *testCodec) String() string { return "string" } +func (s testSocket) Local() string { + return s.local +} + +func (s testSocket) Remote() string { + return s.remote +} + func (s testSocket) Recv(message *transport.Message) error { return nil } diff --git a/transport/http_transport.go b/transport/http_transport.go index e2ee8344..514287b7 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -40,6 +40,10 @@ type httpTransportClient struct { r chan *http.Request bl []*http.Request buff *bufio.Reader + + // local/remote ip + local string + remote string } type httpTransportSocket struct { @@ -51,6 +55,10 @@ type httpTransportSocket struct { conn net.Conn // for the first request ch chan *http.Request + + // local/remote ip + local string + remote string } type httpTransportListener struct { @@ -62,6 +70,14 @@ func (b *buffer) Close() error { return nil } +func (h *httpTransportClient) Local() string { + return h.local +} + +func (h *httpTransportClient) Remote() string { + return h.remote +} + func (h *httpTransportClient) Send(m *Message) error { header := make(http.Header) @@ -173,6 +189,14 @@ func (h *httpTransportClient) Close() error { return err } +func (h *httpTransportSocket) Local() string { + return h.local +} + +func (h *httpTransportSocket) Remote() string { + return h.remote +} + func (h *httpTransportSocket) Recv(m *Message) error { if m == nil { return errors.New("message passed in is nil") @@ -368,12 +392,14 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error { ch <- r fn(&httpTransportSocket{ - ht: h.ht, - w: w, - r: r, - rw: buf, - ch: ch, - conn: con, + ht: h.ht, + w: w, + r: r, + rw: buf, + ch: ch, + conn: con, + local: h.Addr(), + remote: r.RemoteAddr, }) }) @@ -430,6 +456,8 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) { buff: bufio.NewReader(conn), dialOpts: dopts, r: make(chan *http.Request, 1), + local: conn.LocalAddr().String(), + remote: conn.RemoteAddr().String(), }, nil } diff --git a/transport/mock/mock.go b/transport/mock/mock.go index 594af71f..c522c1cd 100644 --- a/transport/mock/mock.go +++ b/transport/mock/mock.go @@ -18,6 +18,9 @@ type mockSocket struct { exit chan bool // listener exit lexit chan bool + + local string + remote string } type mockClient struct { @@ -51,6 +54,14 @@ func (ms *mockSocket) Recv(m *transport.Message) error { return nil } +func (ms *mockSocket) Local() string { + return ms.local +} + +func (ms *mockSocket) Remote() string { + return ms.remote +} + func (ms *mockSocket) Send(m *transport.Message) error { select { case <-ms.exit: @@ -93,10 +104,12 @@ func (m *mockListener) Accept(fn func(transport.Socket)) error { return nil case c := <-m.conn: go fn(&mockSocket{ - lexit: c.lexit, - exit: c.exit, - send: c.recv, - recv: c.send, + lexit: c.lexit, + exit: c.exit, + send: c.recv, + recv: c.send, + local: c.Remote(), + remote: c.Local(), }) } } @@ -118,10 +131,12 @@ func (m *mockTransport) Dial(addr string, opts ...transport.DialOption) (transpo client := &mockClient{ &mockSocket{ - send: make(chan *transport.Message), - recv: make(chan *transport.Message), - exit: make(chan bool), - lexit: listener.exit, + send: make(chan *transport.Message), + recv: make(chan *transport.Message), + exit: make(chan bool), + lexit: listener.exit, + local: addr, + remote: addr, }, options, } diff --git a/transport/transport.go b/transport/transport.go index 313a4196..4967c81a 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -14,6 +14,8 @@ type Socket interface { Recv(*Message) error Send(*Message) error Close() error + Local() string + Remote() string } type Client interface {