From 76265b618bfb041e147006a446ce1eb1d5a0c805 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 17 Jun 2019 20:05:58 +0100 Subject: [PATCH] add cruft --- codec.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ grpc.go | 13 ++++++++- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/codec.go b/codec.go index 4994db7..97f0551 100644 --- a/codec.go +++ b/codec.go @@ -2,13 +2,16 @@ package grpc import ( "fmt" + "strings" "github.com/golang/protobuf/proto" "github.com/json-iterator/go" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/protorpc" "google.golang.org/grpc/encoding" + "google.golang.org/grpc" ) type jsonCodec struct{} @@ -52,7 +55,28 @@ func (w wrapCodec) String() string { return w.Codec.Name() } +func (w wrapCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } + return w.Codec.Marshal(v) +} + +func (w wrapCodec) Unmarshal(data []byte, v interface{}) error { + b, ok := v.(*bytes.Frame) + if ok { + b.Data = data + return nil + } + return w.Codec.Unmarshal(data, v) +} + func (protoCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*bytes.Frame) + if ok { + return b.Data, nil + } return proto.Marshal(v.(proto.Message)) } @@ -96,3 +120,67 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error { func (jsonCodec) Name() string { return "json" } + +type grpcCodec struct { + // headers + id string + target string + method string + endpoint string + + s grpc.ClientStream + c encoding.Codec +} + +func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + md, err := g.s.Header() + if err != nil { + return err + } + if m == nil { + m = new(codec.Message) + } + if m.Header == nil { + m.Header = make(map[string]string) + } + for k, v := range md { + m.Header[k] = strings.Join(v, ",") + } + m.Id = g.id + m.Target = g.target + m.Method = g.method + m.Endpoint = g.endpoint + return nil +} + +func (g *grpcCodec) ReadBody(v interface{}) error { + frame := &bytes.Frame{} + if err := g.s.RecvMsg(frame); err != nil { + return err + } + return g.c.Unmarshal(frame.Data, v) +} + +func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { + // if we don't have a body + if len(m.Body) == 0 { + b, err := g.c.Marshal(v) + if err != nil { + return err + } + m.Body = b + } + + // create an encoded frame + frame := &bytes.Frame{m.Body} + // write the body using the framing codec + return g.s.SendMsg(frame) +} + +func (g *grpcCodec) Close() error { + return g.s.CloseSend() +} + +func (g *grpcCodec) String() string { + return g.c.Name() +} diff --git a/grpc.go b/grpc.go index 9ed206c..ebdbed5 100644 --- a/grpc.go +++ b/grpc.go @@ -177,7 +177,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client dialCtx, cancel = context.WithCancel(ctx) } defer cancel() - cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure()) + + wc := wrapCodec{cf} + + cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure()) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -193,6 +196,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } + // set request codec + if r, ok := req.(*grpcRequest); ok { + r.codec = &grpcCodec{ + s: st, + c: wc, + } + } + rsp := &response{ conn: cc, stream: st,