add cruft

This commit is contained in:
Asim Aslam 2019-06-17 20:05:58 +01:00 committed by Vasiliy Tolstov
parent 0ecc1d6197
commit 76265b618b
2 changed files with 100 additions and 1 deletions

View File

@ -2,13 +2,16 @@ package grpc
import ( import (
"fmt" "fmt"
"strings"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/json-iterator/go" "github.com/json-iterator/go"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
"google.golang.org/grpc"
) )
type jsonCodec struct{} type jsonCodec struct{}
@ -52,7 +55,28 @@ func (w wrapCodec) String() string {
return w.Codec.Name() return w.Codec.Name()
} }
func (w wrapCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*bytes.Frame)
if ok {
return b.Data, nil
}
return w.Codec.Marshal(v)
}
func (w wrapCodec) Unmarshal(data []byte, v interface{}) error {
b, ok := v.(*bytes.Frame)
if ok {
b.Data = data
return nil
}
return w.Codec.Unmarshal(data, v)
}
func (protoCodec) Marshal(v interface{}) ([]byte, error) { func (protoCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*bytes.Frame)
if ok {
return b.Data, nil
}
return proto.Marshal(v.(proto.Message)) return proto.Marshal(v.(proto.Message))
} }
@ -96,3 +120,67 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
func (jsonCodec) Name() string { func (jsonCodec) Name() string {
return "json" return "json"
} }
type grpcCodec struct {
// headers
id string
target string
method string
endpoint string
s grpc.ClientStream
c encoding.Codec
}
func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
md, err := g.s.Header()
if err != nil {
return err
}
if m == nil {
m = new(codec.Message)
}
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
return nil
}
func (g *grpcCodec) ReadBody(v interface{}) error {
frame := &bytes.Frame{}
if err := g.s.RecvMsg(frame); err != nil {
return err
}
return g.c.Unmarshal(frame.Data, v)
}
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
// if we don't have a body
if len(m.Body) == 0 {
b, err := g.c.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// create an encoded frame
frame := &bytes.Frame{m.Body}
// write the body using the framing codec
return g.s.SendMsg(frame)
}
func (g *grpcCodec) Close() error {
return g.s.CloseSend()
}
func (g *grpcCodec) String() string {
return g.c.Name()
}

13
grpc.go
View File

@ -177,7 +177,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
dialCtx, cancel = context.WithCancel(ctx) dialCtx, cancel = context.WithCancel(ctx)
} }
defer cancel() defer cancel()
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), g.secure())
wc := wrapCodec{cf}
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure())
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -193,6 +196,14 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
// set request codec
if r, ok := req.(*grpcRequest); ok {
r.codec = &grpcCodec{
s: st,
c: wc,
}
}
rsp := &response{ rsp := &response{
conn: cc, conn: cc,
stream: st, stream: st,