embed grpc server stream and client so they can be accessed (#1916)
This commit is contained in:
parent
9365b1fe9b
commit
c258ff3ca4
10
codec.go
10
codec.go
@ -128,18 +128,18 @@ func (bytesCodec) Name() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type grpcCodec struct {
|
type grpcCodec struct {
|
||||||
|
grpc.ServerStream
|
||||||
// headers
|
// headers
|
||||||
id string
|
id string
|
||||||
target string
|
target string
|
||||||
method string
|
method string
|
||||||
endpoint string
|
endpoint string
|
||||||
|
|
||||||
s grpc.ServerStream
|
|
||||||
c encoding.Codec
|
c encoding.Codec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
||||||
md, _ := metadata.FromIncomingContext(g.s.Context())
|
md, _ := metadata.FromIncomingContext(g.ServerStream.Context())
|
||||||
if m == nil {
|
if m == nil {
|
||||||
m = new(codec.Message)
|
m = new(codec.Message)
|
||||||
}
|
}
|
||||||
@ -159,9 +159,9 @@ func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
func (g *grpcCodec) ReadBody(v interface{}) error {
|
func (g *grpcCodec) ReadBody(v interface{}) error {
|
||||||
// caller has requested a frame
|
// caller has requested a frame
|
||||||
if f, ok := v.(*bytes.Frame); ok {
|
if f, ok := v.(*bytes.Frame); ok {
|
||||||
return g.s.RecvMsg(f)
|
return g.ServerStream.RecvMsg(f)
|
||||||
}
|
}
|
||||||
return g.s.RecvMsg(v)
|
return g.ServerStream.RecvMsg(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
|
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
|
||||||
@ -174,7 +174,7 @@ func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
|
|||||||
m.Body = b
|
m.Body = b
|
||||||
}
|
}
|
||||||
// write the body using the framing codec
|
// write the body using the framing codec
|
||||||
return g.s.SendMsg(&bytes.Frame{Data: m.Body})
|
return g.ServerStream.SendMsg(&bytes.Frame{Data: m.Body})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcCodec) Close() error {
|
func (g *grpcCodec) Close() error {
|
||||||
|
20
grpc.go
20
grpc.go
@ -265,11 +265,11 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
|||||||
return errors.InternalServerError(g.opts.Name, err.Error())
|
return errors.InternalServerError(g.opts.Name, err.Error())
|
||||||
}
|
}
|
||||||
codec := &grpcCodec{
|
codec := &grpcCodec{
|
||||||
method: fmt.Sprintf("%s.%s", serviceName, methodName),
|
ServerStream: stream,
|
||||||
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
|
method: fmt.Sprintf("%s.%s", serviceName, methodName),
|
||||||
target: g.opts.Name,
|
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
|
||||||
s: stream,
|
target: g.opts.Name,
|
||||||
c: cc,
|
c: cc,
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a client.Request
|
// create a client.Request
|
||||||
@ -394,8 +394,10 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
|
|||||||
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
|
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
|
||||||
fn = g.opts.HdlrWrappers[i-1](fn)
|
fn = g.opts.HdlrWrappers[i-1](fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
statusCode := codes.OK
|
statusCode := codes.OK
|
||||||
statusDesc := ""
|
statusDesc := ""
|
||||||
|
|
||||||
// execute the handler
|
// execute the handler
|
||||||
if appErr := fn(ctx, r, replyv.Interface()); appErr != nil {
|
if appErr := fn(ctx, r, replyv.Interface()); appErr != nil {
|
||||||
var errStatus *status.Status
|
var errStatus *status.Status
|
||||||
@ -411,6 +413,7 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
|
|||||||
// micro.Error now proto based and we can attach it to grpc status
|
// micro.Error now proto based and we can attach it to grpc status
|
||||||
statusCode = microError(verr)
|
statusCode = microError(verr)
|
||||||
statusDesc = verr.Error()
|
statusDesc = verr.Error()
|
||||||
|
|
||||||
errStatus, err = status.New(statusCode, statusDesc).WithDetails(perr)
|
errStatus, err = status.New(statusCode, statusDesc).WithDetails(perr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -428,6 +431,7 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
|
|||||||
statusCode = convertCode(verr)
|
statusCode = convertCode(verr)
|
||||||
statusDesc = verr.Error()
|
statusDesc = verr.Error()
|
||||||
errStatus = status.New(statusCode, statusDesc)
|
errStatus = status.New(statusCode, statusDesc)
|
||||||
|
fmt.Printf("Responding with :%v\n", errStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
return errStatus.Err()
|
return errStatus.Err()
|
||||||
@ -436,6 +440,7 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
|
|||||||
if err := stream.SendMsg(replyv.Interface()); err != nil {
|
if err := stream.SendMsg(replyv.Interface()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return status.New(statusCode, statusDesc).Err()
|
return status.New(statusCode, statusDesc).Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -451,8 +456,8 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
|
|||||||
}
|
}
|
||||||
|
|
||||||
ss := &rpcStream{
|
ss := &rpcStream{
|
||||||
request: r,
|
ServerStream: stream,
|
||||||
s: stream,
|
request: r,
|
||||||
}
|
}
|
||||||
|
|
||||||
function := mtype.method.Func
|
function := mtype.method.Func
|
||||||
@ -507,6 +512,7 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
|
|||||||
statusDesc = verr.Error()
|
statusDesc = verr.Error()
|
||||||
errStatus = status.New(statusCode, statusDesc)
|
errStatus = status.New(statusCode, statusDesc)
|
||||||
}
|
}
|
||||||
|
|
||||||
return errStatus.Err()
|
return errStatus.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
10
stream.go
10
stream.go
@ -9,7 +9,9 @@ import (
|
|||||||
|
|
||||||
// rpcStream implements a server side Stream.
|
// rpcStream implements a server side Stream.
|
||||||
type rpcStream struct {
|
type rpcStream struct {
|
||||||
s grpc.ServerStream
|
// embed the grpc stream so we can access it
|
||||||
|
grpc.ServerStream
|
||||||
|
|
||||||
request server.Request
|
request server.Request
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -26,13 +28,13 @@ func (r *rpcStream) Request() server.Request {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcStream) Context() context.Context {
|
func (r *rpcStream) Context() context.Context {
|
||||||
return r.s.Context()
|
return r.ServerStream.Context()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcStream) Send(m interface{}) error {
|
func (r *rpcStream) Send(m interface{}) error {
|
||||||
return r.s.SendMsg(m)
|
return r.ServerStream.SendMsg(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcStream) Recv(m interface{}) error {
|
func (r *rpcStream) Recv(m interface{}) error {
|
||||||
return r.s.RecvMsg(m)
|
return r.ServerStream.RecvMsg(m)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user