diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 19cdc564..2fea1b83 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -15,9 +15,10 @@ import ( ) type rpcCodec struct { - socket transport.Socket - codec codec.Codec - first bool + socket transport.Socket + codec codec.Codec + first bool + protocol string req *transport.Message buf *readWriteCloser @@ -157,12 +158,27 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod rbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil), } + r := &rpcCodec{ - buf: rwc, - codec: c(rwc), - req: req, - socket: socket, + buf: rwc, + codec: c(rwc), + req: req, + socket: socket, + protocol: "mucp", } + + // if grpc pre-load the buffer + // TODO: remove this terrible hack + switch r.codec.String() { + case "grpc": + // set as first + r.first = true + // write the body + rwc.rbuf.Write(req.Body) + // set the protocol + r.protocol = "grpc" + } + return r } @@ -173,27 +189,33 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { Body: c.req.Body, } - var tm transport.Message + // first message could be pre-loaded + if !c.first { + var tm transport.Message - // read off the socket - if err := c.socket.Recv(&tm); err != nil { - return err - } - // reset the read buffer - c.buf.rbuf.Reset() + // read off the socket + if err := c.socket.Recv(&tm); err != nil { + return err + } + // reset the read buffer + c.buf.rbuf.Reset() - // write the body to the buffer - if _, err := c.buf.rbuf.Write(tm.Body); err != nil { - return err + // write the body to the buffer + if _, err := c.buf.rbuf.Write(tm.Body); err != nil { + return err + } + + // set the message header + m.Header = tm.Header + // set the message body + m.Body = tm.Body + + // set req + c.req = &tm } - // set the message header - m.Header = tm.Header - // set the message body - m.Body = tm.Body - - // set req - c.req = &tm + // disable first + c.first = false // set some internal things getHeaders(&m) @@ -293,5 +315,5 @@ func (c *rpcCodec) Close() error { } func (c *rpcCodec) String() string { - return "rpc" + return c.protocol } diff --git a/server/rpc_server.go b/server/rpc_server.go index 6023481e..43c4bae5 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -213,9 +213,11 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } rcodec := newRpcCodec(&msg, psock, cf) + protocol := rcodec.String() // check stream id var stream bool + if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { stream = true } @@ -265,6 +267,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // process the outbound messages from the socket go func(id string, psock *socket.Socket) { defer func() { + // TODO: don't hack this but if its grpc just break out of the stream + // We do this because the underlying connection is h2 and its a stream + switch protocol { + case "grpc": + sock.Close() + } + wg.Done() }() diff --git a/transport/http_transport.go b/transport/http_transport.go index 0e1d5823..e6fd1b05 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -363,6 +363,9 @@ func (h *httpTransportSocket) Close() error { // close the channel close(h.closed) + // close the buffer + h.r.Body.Close() + // close the connection if h.r.ProtoMajor == 1 { return h.conn.Close() @@ -436,9 +439,6 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error { closed: make(chan bool), } - // cleanup - //defer sock.Close() - // execute the socket fn(sock) })