From 98ba3b2788a9d7ce068bdbba2b46d69cf32788a8 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 20 Aug 2020 14:45:47 +0300 Subject: [PATCH] grpc: avoid allocations for each message (#11) * grpc: avoid allocations for each message * fix tests for api/router Signed-off-by: Vasiliy Tolstov --- api/router/router_test.go | 5 +++++ client/grpc/grpc.go | 40 +++++++++++++++++++++++------------ server/grpc/grpc.go | 44 ++++++++++++++++++++++++--------------- 3 files changed, 59 insertions(+), 30 deletions(-) diff --git a/api/router/router_test.go b/api/router/router_test.go index e74a1032..36af155c 100644 --- a/api/router/router_test.go +++ b/api/router/router_test.go @@ -15,6 +15,8 @@ import ( "github.com/unistack-org/micro/v3/api/router" rregistry "github.com/unistack-org/micro/v3/api/router/registry" rstatic "github.com/unistack-org/micro/v3/api/router/static" + "github.com/unistack-org/micro/v3/broker" + bmemory "github.com/unistack-org/micro/v3/broker/memory" "github.com/unistack-org/micro/v3/client" gcli "github.com/unistack-org/micro/v3/client/grpc" rmemory "github.com/unistack-org/micro/v3/registry/memory" @@ -50,10 +52,12 @@ func (s *testServer) CallPcreInvalid(ctx context.Context, req *pb.Request, rsp * func initial(t *testing.T) (server.Server, client.Client) { r := rmemory.NewRegistry() + b := bmemory.NewBroker(broker.Registry(r)) // create a new client s := gsrv.NewServer( server.Name("foo"), + server.Broker(b), server.Registry(r), ) @@ -64,6 +68,7 @@ func initial(t *testing.T) (server.Server, client.Client) { // create a new server c := gcli.NewClient( client.Router(rtr), + client.Broker(b), ) h := &testServer{} diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index fd5dbcd8..37b9205d 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -8,6 +8,7 @@ import ( "net" "reflect" "strings" + "sync" "sync/atomic" "time" @@ -24,9 +25,11 @@ import ( ) type grpcClient struct { - opts client.Options - pool *pool - once atomic.Value + opts client.Options + codecs map[string]encoding.Codec + pool *pool + once atomic.Value + sync.RWMutex } func init() { @@ -301,18 +304,13 @@ func (g *grpcClient) maxSendMsgSizeValue() int { } func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) { - codecs := make(map[string]encoding.Codec) - if g.opts.Context != nil { - if v := g.opts.Context.Value(codecsKey{}); v != nil { - codecs = v.(map[string]encoding.Codec) - } - } - if c, ok := codecs[contentType]; ok { - return wrapCodec{c}, nil - } - if c, ok := defaultGRPCCodecs[contentType]; ok { + g.RLock() + defer g.RUnlock() + + if c, ok := g.codecs[contentType]; ok { return wrapCodec{c}, nil } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } @@ -721,6 +719,22 @@ func newClient(opts ...client.Option) client.Client { c = options.Wrappers[i-1](c) } + rc.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) + for k, v := range defaultGRPCCodecs { + rc.codecs[k] = v + } + + var codecs map[string]encoding.Codec + if rc.opts.Context != nil { + if v := rc.opts.Context.Value(codecsKey{}); v != nil { + codecs = v.(map[string]encoding.Codec) + } + } + + for k, v := range codecs { + rc.codecs[k] = v + } + return c } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 3dd26411..6e015bce 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -64,6 +64,8 @@ type grpcServer struct { // registry service instance rsvc *registry.Service + + codecs map[string]encoding.Codec } func init() { @@ -73,22 +75,19 @@ func init() { } func newGRPCServer(opts ...server.Option) server.Server { - options := newOptions(opts...) - // create a grpc server srv := &grpcServer{ - opts: options, + opts: server.Options{}, rpc: &rServer{ serviceMap: make(map[string]*service), }, handlers: make(map[string]server.Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), - wg: wait(options.Context), } // configure the grpc server - srv.configure() + srv.configure(opts...) return srv } @@ -110,8 +109,8 @@ func (g *grpcServer) configure(opts ...server.Option) { g.Lock() defer g.Unlock() - // Don't reprocess where there's no config - if len(opts) == 0 && g.srv != nil { + // Don't reprocess if server created + if g.srv != nil { return } @@ -121,6 +120,22 @@ func (g *grpcServer) configure(opts ...server.Option) { g.wg = wait(g.opts.Context) + g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) + for k, v := range defaultGRPCCodecs { + g.codecs[k] = v + } + + var codecs map[string]encoding.Codec + if g.opts.Context != nil { + if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { + codecs = v + } + } + + for k, v := range codecs { + g.codecs[k] = v + } + maxMsgSize := g.getMaxMsgSize() gopts := []grpc.ServerOption{ @@ -519,18 +534,13 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m } func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) { - codecs := make(map[string]encoding.Codec) - if g.opts.Context != nil { - if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { - codecs = v - } - } - if c, ok := codecs[contentType]; ok { - return c, nil - } - if c, ok := defaultGRPCCodecs[contentType]; ok { + g.RLock() + defer g.RUnlock() + + if c, ok := g.codecs[contentType]; ok { return c, nil } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) }