diff --git a/server/grpc/codec.go b/server/grpc/codec.go index a1d4a659..8de1b8bf 100644 --- a/server/grpc/codec.go +++ b/server/grpc/codec.go @@ -180,5 +180,5 @@ func (g *grpcCodec) Close() error { } func (g *grpcCodec) String() string { - return g.c.Name() + return "grpc" } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 11f48630..1113f842 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -248,6 +248,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { contentType: ct, method: fmt.Sprintf("%s.%s", serviceName, methodName), codec: codec, + stream: true, } response := &rpcResponse{ @@ -385,9 +386,11 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, } return status.New(statusCode, statusDesc).Err() } + if err := stream.SendMsg(replyv.Interface()); err != nil { return err } + return status.New(statusCode, statusDesc).Err() } } diff --git a/server/rpc_router.go b/server/rpc_router.go index d607f5f8..b67417bf 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -346,7 +346,9 @@ func (router *router) readRequest(r Request) (service *service, mtype *methodTyp } // is it a streaming request? then we don't read the body if mtype.stream { - cc.ReadBody(nil) + if cc.(codec.Codec).String() != "grpc" { + cc.ReadBody(nil) + } return }