diff --git a/codec/grpc/grpc.go b/codec/grpc/grpc.go index 9f2951bb..86630772 100644 --- a/codec/grpc/grpc.go +++ b/codec/grpc/grpc.go @@ -94,6 +94,17 @@ func (c *Codec) Write(m *codec.Message, b interface{}) error { m.Header[":status"] = "200" m.Header["grpc-status"] = "0" // m.Header["grpc-message"] = "" + case codec.Error: + m.Header["Trailer"] = "grpc-status, grpc-message" + // micro end of stream + if m.Error == "EOS" { + m.Header["grpc-status"] = "0" + } else { + m.Header["grpc-message"] = m.Error + m.Header["grpc-status"] = "13" + } + + return nil } // marshal content diff --git a/server/rpc_server.go b/server/rpc_server.go index 9e83f288..6023481e 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -154,34 +154,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { sockets[id] = psock mtx.Unlock() - // wait for processing to exit - wg.Add(1) - - // process the outbound messages from the socket - go func(id string, psock *socket.Socket) { - defer func() { - wg.Done() - }() - - for { - // get the message from our internal handler/stream - m := new(transport.Message) - if err := psock.Process(m); err != nil { - // delete the socket - mtx.Lock() - delete(sockets, id) - mtx.Unlock() - return - } - - // send the message back over the socket - if err := sock.Send(m); err != nil { - return - } - - } - }(id, psock) - // now walk the usual path // we use this Timeout header to set a server deadline @@ -287,6 +259,33 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { r = rpcRouter{handler} } + // wait for processing to exit + wg.Add(1) + + // process the outbound messages from the socket + go func(id string, psock *socket.Socket) { + defer func() { + wg.Done() + }() + + for { + // get the message from our internal handler/stream + m := new(transport.Message) + if err := psock.Process(m); err != nil { + // delete the socket + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + return + } + + // send the message back over the socket + if err := sock.Send(m); err != nil { + return + } + } + }(id, psock) + // serve the request in a go routine as this may be a stream go func(id string, psock *socket.Socket) { defer psock.Close() diff --git a/transport/http_transport.go b/transport/http_transport.go index 0852eaad..0e1d5823 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -324,6 +324,9 @@ func (h *httpTransportSocket) Send(m *Message) error { // write request _, err := h.w.Write(m.Body) + // flush the trailers + h.w.(http.Flusher).Flush() + return err }