Add working grpc proxy config

This commit is contained in:
Asim Aslam 2019-06-18 18:51:52 +01:00 committed by Vasiliy Tolstov
parent 869c6ec764
commit e83572b994
4 changed files with 35 additions and 43 deletions

View File

@ -10,8 +10,8 @@ import (
"github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding"
) )
type jsonCodec struct{} type jsonCodec struct{}
@ -154,27 +154,19 @@ func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
} }
func (g *grpcCodec) ReadBody(v interface{}) error { func (g *grpcCodec) ReadBody(v interface{}) error {
frame := &bytes.Frame{} if f, ok := v.(*bytes.Frame); ok {
if err := g.s.RecvMsg(frame); err != nil { return g.s.RecvMsg(f)
return err
} }
return g.c.Unmarshal(frame.Data, v) return g.s.RecvMsg(v)
} }
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error { func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
// if we don't have a body // if we don't have a body
if len(m.Body) == 0 { if v != nil {
b, err := g.c.Marshal(v) return g.s.SendMsg(v)
if err != nil {
return err
} }
m.Body = b
}
// create an encoded frame
frame := &bytes.Frame{m.Body}
// write the body using the framing codec // write the body using the framing codec
return g.s.SendMsg(frame) return g.s.SendMsg(&bytes.Frame{m.Body})
} }
func (g *grpcCodec) Close() error { func (g *grpcCodec) Close() error {

18
grpc.go
View File

@ -32,8 +32,9 @@ type grpcClient struct {
} }
func init() { func init() {
encoding.RegisterCodec(jsonCodec{}) encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(bytesCodec{}) encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(wrapCodec{bytesCodec{}})
} }
// secure returns the dial option for whether its a secure or insecure connection // secure returns the dial option for whether its a secure or insecure connection
@ -129,7 +130,7 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.ForceCodec(cf)) err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.ForceCodec(cf))
ch <- microError(err) ch <- microError(err)
}() }()
@ -191,23 +192,26 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true, ServerStreams: true,
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body())) st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()))
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
// set request codec codec := &grpcCodec{
if r, ok := req.(*grpcRequest); ok {
r.codec = &grpcCodec{
s: st, s: st,
c: wc, c: wc,
} }
// set request codec
if r, ok := req.(*grpcRequest); ok {
r.codec = codec
} }
rsp := &response{ rsp := &response{
conn: cc, conn: cc,
stream: st, stream: st,
codec: cf, codec: cf,
gcodec: codec,
} }
return &grpcStream{ return &grpcStream{

View File

@ -2,7 +2,6 @@ package grpc
import ( import (
"fmt" "fmt"
"reflect"
"strings" "strings"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
@ -18,30 +17,21 @@ type grpcRequest struct {
codec codec.Codec codec codec.Codec
} }
func methodToGRPC(method string, request interface{}) string { // service Struct.Method /service.Struct/Method
func methodToGRPC(service, method string) string {
// no method or already grpc method // no method or already grpc method
if len(method) == 0 || method[0] == '/' { if len(method) == 0 || method[0] == '/' {
return method return method
} }
// can't operate on nil request
t := reflect.TypeOf(request)
if t == nil {
return method
}
// dereference
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
// get package name
pParts := strings.Split(t.PkgPath(), "/")
pkg := pParts[len(pParts)-1]
// assume method is Foo.Bar // assume method is Foo.Bar
mParts := strings.Split(method, ".") mParts := strings.Split(method, ".")
if len(mParts) != 2 { if len(mParts) != 2 {
return method return method
} }
// return /pkg.Foo/Bar // return /pkg.Foo/Bar
return fmt.Sprintf("/%s.%s/%s", pkg, mParts[0], mParts[1]) return fmt.Sprintf("/%s.%s/%s", service, mParts[0], mParts[1])
} }
func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request {

View File

@ -4,6 +4,7 @@ import (
"strings" "strings"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
@ -12,11 +13,12 @@ type response struct {
conn *grpc.ClientConn conn *grpc.ClientConn
stream grpc.ClientStream stream grpc.ClientStream
codec encoding.Codec codec encoding.Codec
gcodec codec.Codec
} }
// Read the response // Read the response
func (r *response) Codec() codec.Reader { func (r *response) Codec() codec.Reader {
return nil return r.gcodec
} }
// read the header // read the header
@ -34,5 +36,9 @@ func (r *response) Header() map[string]string {
// Read the undecoded response // Read the undecoded response
func (r *response) Read() ([]byte, error) { func (r *response) Read() ([]byte, error) {
return nil, nil f := &bytes.Frame{}
if err := r.gcodec.ReadBody(f); err != nil {
return nil, err
}
return f.Data, nil
} }