codec rewrite

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-11-26 01:18:35 +03:00
parent 60ad0903b9
commit 7ef49b03ba
7 changed files with 100 additions and 287 deletions

58
grpc.go
View File

@@ -12,12 +12,11 @@ import (
"sync/atomic"
"time"
raw "github.com/unistack-org/micro-codec-bytes"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/metadata"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
@@ -32,12 +31,6 @@ type grpcClient struct {
sync.RWMutex
}
func init() {
encoding.RegisterCodec(wrapCodec{jsonCodec{}})
encoding.RegisterCodec(wrapCodec{protoCodec{}})
encoding.RegisterCodec(wrapCodec{bytesCodec{}})
}
// secure returns the dial option for whether its a secure or insecure connection
func (g *grpcClient) secure(addr string) grpc.DialOption {
// first we check if theres'a tls config
@@ -136,8 +129,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
go func() {
grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(cf),
grpc.CallContentSubtype(cf.Name())}
grpc.ForceCodec(&wrapMicroCodec{cf}),
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name())}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
@@ -191,7 +184,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
}
defer cancel()
wc := wrapCodec{cf}
wc := &wrapMicroCodec{cf}
grpcDialOptions := []grpc.DialOption{
g.secure(addr),
@@ -214,7 +207,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(wc),
grpc.CallContentSubtype(cf.Name()),
grpc.CallContentSubtype(wc.Name()),
}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
@@ -234,14 +227,9 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
}
codec := &grpcCodec{
s: st,
c: wc,
}
// set request codec
if r, ok := req.(*grpcRequest); ok {
r.codec = codec
r.codec = cf
}
// setup the stream response
@@ -253,7 +241,6 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
conn: cc,
stream: st,
codec: cf,
gcodec: codec,
},
conn: cc,
close: func(err error) {
@@ -317,14 +304,14 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
return v.(int)
}
func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
func (g *grpcClient) newGRPCCodec(ct string) (codec.Codec, error) {
g.RLock()
defer g.RUnlock()
if c, ok := g.codecs[contentType]; ok {
return wrapCodec{c}, nil
if c, ok := g.opts.Codecs[ct]; ok {
return c, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
return nil, codec.ErrUnknownContentType
}
func (g *grpcClient) Init(opts ...client.Option) error {
@@ -632,7 +619,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
md["Micro-Topic"] = p.Topic()
// passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok {
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
@@ -706,18 +693,13 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
}
func newClient(opts ...client.Option) client.Client {
options := client.NewOptions()
options := client.NewOptions(opts...)
// default content type for grpc
options.ContentType = "application/grpc+proto"
for _, o := range opts {
o(&options)
}
rc := &grpcClient{
opts: options,
}
rc.once.Store(false)
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
@@ -728,20 +710,16 @@ func newClient(opts ...client.Option) client.Client {
c = options.Wrappers[i-1](c)
}
rc.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs))
for k, v := range defaultGRPCCodecs {
rc.codecs[k] = v
}
var codecs map[string]encoding.Codec
if rc.opts.Context != nil {
if v := rc.opts.Context.Value(codecsKey{}); v != nil {
codecs = v.(map[string]encoding.Codec)
if codecs, ok := rc.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs {
rc.opts.Codecs[k] = &wrapGrpcCodec{v}
}
}
}
for k, v := range codecs {
rc.codecs[k] = v
for _, k := range options.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
return c