From 19ef225b2f124d9a42860835c2afdc57150d83eb Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Aug 2020 14:44:29 +0100 Subject: [PATCH] Revert "grpc: avoid allocations for each message (#1939)" (#1941) This reverts commit 2a23224d911b0aeccf0614afb937a18ad7d59bca. --- client/grpc/grpc.go | 40 +++++++++++++--------------------------- server/grpc/grpc.go | 44 +++++++++++++++++--------------------------- 2 files changed, 30 insertions(+), 54 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index ef0c9bef..f230fa46 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -8,7 +8,6 @@ import ( "net" "reflect" "strings" - "sync" "sync/atomic" "time" @@ -25,11 +24,9 @@ import ( ) type grpcClient struct { - opts client.Options - codecs map[string]encoding.Codec - pool *pool - once atomic.Value - sync.RWMutex + opts client.Options + pool *pool + once atomic.Value } func init() { @@ -304,13 +301,18 @@ func (g *grpcClient) maxSendMsgSizeValue() int { } func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) { - g.RLock() - defer g.RUnlock() - - if c, ok := g.codecs[contentType]; ok { + codecs := make(map[string]encoding.Codec) + if g.opts.Context != nil { + if v := g.opts.Context.Value(codecsKey{}); v != nil { + codecs = v.(map[string]encoding.Codec) + } + } + if c, ok := codecs[contentType]; ok { + return wrapCodec{c}, nil + } + if c, ok := defaultGRPCCodecs[contentType]; ok { return wrapCodec{c}, nil } - return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } @@ -719,22 +721,6 @@ func newClient(opts ...client.Option) client.Client { c = options.Wrappers[i-1](c) } - rc.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) - for k, v := range defaultGRPCCodecs { - rc.codecs[k] = v - } - - var codecs map[string]encoding.Codec - if rc.opts.Context != nil { - if v := rc.opts.Context.Value(codecsKey{}); v != nil { - codecs = v.(map[string]encoding.Codec) - } - } - - for k, v := range codecs { - rc.codecs[k] = v - } - return c } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 98ce29db..1de8a83e 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -64,8 +64,6 @@ type grpcServer struct { // registry service instance rsvc *registry.Service - - codecs map[string]encoding.Codec } func init() { @@ -75,19 +73,22 @@ func init() { } func newGRPCServer(opts ...server.Option) server.Server { + options := newOptions(opts...) + // create a grpc server srv := &grpcServer{ - opts: server.Options{}, + opts: options, rpc: &rServer{ serviceMap: make(map[string]*service), }, handlers: make(map[string]server.Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), + wg: wait(options.Context), } // configure the grpc server - srv.configure(opts...) + srv.configure() return srv } @@ -109,8 +110,8 @@ func (g *grpcServer) configure(opts ...server.Option) { g.Lock() defer g.Unlock() - // Don't reprocess if server created - if g.srv != nil { + // Don't reprocess where there's no config + if len(opts) == 0 && g.srv != nil { return } @@ -120,22 +121,6 @@ func (g *grpcServer) configure(opts ...server.Option) { g.wg = wait(g.opts.Context) - g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) - for k, v := range defaultGRPCCodecs { - g.codecs[k] = v - } - - var codecs map[string]encoding.Codec - if g.opts.Context != nil { - if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { - codecs = v - } - } - - for k, v := range codecs { - g.codecs[k] = v - } - maxMsgSize := g.getMaxMsgSize() gopts := []grpc.ServerOption{ @@ -534,13 +519,18 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m } func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) { - g.RLock() - defer g.RUnlock() - - if c, ok := g.codecs[contentType]; ok { + codecs := make(map[string]encoding.Codec) + if g.opts.Context != nil { + if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { + codecs = v + } + } + if c, ok := codecs[contentType]; ok { + return c, nil + } + if c, ok := defaultGRPCCodecs[contentType]; ok { return c, nil } - return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) }