diff --git a/transport/http_transport.go b/transport/http_transport.go index 8e2e3d8a..796502a5 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -42,6 +42,7 @@ type httpTransportClient struct { } type httpTransportSocket struct { + ht *httpTransport r chan *http.Request conn net.Conn once sync.Once @@ -51,6 +52,7 @@ type httpTransportSocket struct { } type httpTransportListener struct { + ht *httpTransport listener net.Listener } @@ -144,6 +146,11 @@ func (h *httpTransportClient) Send(m *Message) error { } h.Unlock() + // set deadline if its greater than 0 + if h.ht.opts.Deadline > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Deadline)) + } + return req.Write(h.conn) } @@ -163,6 +170,11 @@ func (h *httpTransportClient) Recv(m *Message) error { return io.EOF } + // set deadline if its greater than 0 + if h.ht.opts.Deadline > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Deadline)) + } + rsp, err := http.ReadResponse(h.buff, r) if err != nil { return err @@ -212,6 +224,11 @@ func (h *httpTransportSocket) Recv(m *Message) error { return errors.New("message passed in is nil") } + // set deadline if its greater than 0 + if h.ht.opts.Deadline > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Deadline)) + } + r, err := http.ReadRequest(h.buff) if err != nil { return err @@ -271,6 +288,11 @@ func (h *httpTransportSocket) Send(m *Message) error { default: } + // set deadline if its greater than 0 + if h.ht.opts.Deadline > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Deadline)) + } + return rsp.Write(h.conn) } @@ -337,6 +359,7 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error { } sock := &httpTransportSocket{ + ht: h.ht, conn: c, buff: bufio.NewReader(c), r: make(chan *http.Request, 1), @@ -444,6 +467,7 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err } return &httpTransportListener{ + ht: h, listener: l, }, nil } diff --git a/transport/http_transport_test.go b/transport/http_transport_test.go index 87c1202d..de5f9b73 100644 --- a/transport/http_transport_test.go +++ b/transport/http_transport_test.go @@ -4,6 +4,7 @@ import ( "io" "strings" "testing" + "time" ) func expectedPort(t *testing.T, expected string, lsn Listener) { @@ -176,3 +177,68 @@ func TestHTTPTransportError(t *testing.T) { close(done) } + +func TestHTTPTransportDeadline(t *testing.T) { + tr := NewTransport(Deadline(time.Millisecond * 100)) + + l, err := tr.Listen(":0") + if err != nil { + t.Errorf("Unexpected listen err: %v", err) + } + defer l.Close() + + done := make(chan bool) + + fn := func(sock Socket) { + defer func() { + sock.Close() + close(done) + }() + + go func() { + select { + case <-done: + return + case <-time.After(time.Second): + t.Fatal("deadline not executed") + } + }() + + for { + var m Message + + if err := sock.Recv(&m); err != nil { + return + } + } + } + + go func() { + if err := l.Accept(fn); err != nil { + select { + case <-done: + default: + t.Errorf("Unexpected accept err: %v", err) + } + } + }() + + c, err := tr.Dial(l.Addr()) + if err != nil { + t.Errorf("Unexpected dial err: %v", err) + } + defer c.Close() + + m := Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + if err := c.Send(&m); err != nil { + t.Errorf("Unexpected send err: %v", err) + } + + <-done +} diff --git a/transport/options.go b/transport/options.go index 8d0ecc54..f0ef62e3 100644 --- a/transport/options.go +++ b/transport/options.go @@ -11,7 +11,8 @@ type Options struct { Addrs []string Secure bool TLSConfig *tls.Config - + // Deadline sets the time to wait to Send/Recv + Deadline time.Duration // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -45,6 +46,13 @@ func Addrs(addrs ...string) Option { } } +// Deadline sets the time to wait for Send/Recv execution +func Deadline(t time.Duration) Option { + return func(o *Options) { + o.Deadline = t + } +} + // Use secure communication. If TLSConfig is not specified we // use InsecureSkipVerify and generate a self signed cert func Secure(b bool) Option {