From e83572b9941e0497db4a2e4be7b384df57687f90 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Jun 2019 18:51:52 +0100 Subject: [PATCH] Add working grpc proxy config --- codec.go | 28 ++++++++++------------------ grpc.go | 20 ++++++++++++-------- request.go | 20 +++++--------------- response.go | 10 ++++++++-- 4 files changed, 35 insertions(+), 43 deletions(-) diff --git a/codec.go b/codec.go index 97f0551..62f3038 100644 --- a/codec.go +++ b/codec.go @@ -10,8 +10,8 @@ import ( "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/protorpc" - "google.golang.org/grpc/encoding" "google.golang.org/grpc" + "google.golang.org/grpc/encoding" ) type jsonCodec struct{} @@ -123,9 +123,9 @@ func (jsonCodec) Name() string { type grpcCodec struct { // headers - id string - target string - method string + id string + target string + method string endpoint string s grpc.ClientStream @@ -154,27 +154,19 @@ func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { } func (g *grpcCodec) ReadBody(v interface{}) error { - frame := &bytes.Frame{} - if err := g.s.RecvMsg(frame); err != nil { - return err + if f, ok := v.(*bytes.Frame); ok { + return g.s.RecvMsg(f) } - return g.c.Unmarshal(frame.Data, v) + return g.s.RecvMsg(v) } func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { // if we don't have a body - if len(m.Body) == 0 { - b, err := g.c.Marshal(v) - if err != nil { - return err - } - m.Body = b + if v != nil { + return g.s.SendMsg(v) } - - // create an encoded frame - frame := &bytes.Frame{m.Body} // write the body using the framing codec - return g.s.SendMsg(frame) + return g.s.SendMsg(&bytes.Frame{m.Body}) } func (g *grpcCodec) Close() error { diff --git a/grpc.go b/grpc.go index 7a8aacf..ac1dc82 100644 --- a/grpc.go +++ b/grpc.go @@ -32,8 +32,9 @@ type grpcClient struct { } func init() { - encoding.RegisterCodec(jsonCodec{}) - encoding.RegisterCodec(bytesCodec{}) + encoding.RegisterCodec(wrapCodec{jsonCodec{}}) + encoding.RegisterCodec(wrapCodec{jsonCodec{}}) + encoding.RegisterCodec(wrapCodec{bytesCodec{}}) } // secure returns the dial option for whether its a secure or insecure connection @@ -129,7 +130,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R ch := make(chan error, 1) go func() { - err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf)) + err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.ForceCodec(cf)) ch <- microError(err) }() @@ -191,23 +192,26 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client ServerStreams: true, } - st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body())) + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint())) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } + codec := &grpcCodec{ + s: st, + c: wc, + } + // set request codec if r, ok := req.(*grpcRequest); ok { - r.codec = &grpcCodec{ - s: st, - c: wc, - } + r.codec = codec } rsp := &response{ conn: cc, stream: st, codec: cf, + gcodec: codec, } return &grpcStream{ diff --git a/request.go b/request.go index cb14a33..94b44da 100644 --- a/request.go +++ b/request.go @@ -2,7 +2,6 @@ package grpc import ( "fmt" - "reflect" "strings" "github.com/micro/go-micro/client" @@ -18,30 +17,21 @@ type grpcRequest struct { codec codec.Codec } -func methodToGRPC(method string, request interface{}) string { +// service Struct.Method /service.Struct/Method +func methodToGRPC(service, method string) string { // no method or already grpc method if len(method) == 0 || method[0] == '/' { return method } - // can't operate on nil request - t := reflect.TypeOf(request) - if t == nil { - return method - } - // dereference - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - // get package name - pParts := strings.Split(t.PkgPath(), "/") - pkg := pParts[len(pParts)-1] + // assume method is Foo.Bar mParts := strings.Split(method, ".") if len(mParts) != 2 { return method } + // return /pkg.Foo/Bar - return fmt.Sprintf("/%s.%s/%s", pkg, mParts[0], mParts[1]) + return fmt.Sprintf("/%s.%s/%s", service, mParts[0], mParts[1]) } func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { diff --git a/response.go b/response.go index 7ef7224..c870fad 100644 --- a/response.go +++ b/response.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/bytes" "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) @@ -12,11 +13,12 @@ type response struct { conn *grpc.ClientConn stream grpc.ClientStream codec encoding.Codec + gcodec codec.Codec } // Read the response func (r *response) Codec() codec.Reader { - return nil + return r.gcodec } // read the header @@ -34,5 +36,9 @@ func (r *response) Header() map[string]string { // Read the undecoded response func (r *response) Read() ([]byte, error) { - return nil, nil + f := &bytes.Frame{} + if err := r.gcodec.ReadBody(f); err != nil { + return nil, err + } + return f.Data, nil }