diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index f230fa46..ef0c9bef 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -8,6 +8,7 @@ import ( "net" "reflect" "strings" + "sync" "sync/atomic" "time" @@ -24,9 +25,11 @@ import ( ) type grpcClient struct { - opts client.Options - pool *pool - once atomic.Value + opts client.Options + codecs map[string]encoding.Codec + pool *pool + once atomic.Value + sync.RWMutex } func init() { @@ -301,18 +304,13 @@ func (g *grpcClient) maxSendMsgSizeValue() int { } func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) { - codecs := make(map[string]encoding.Codec) - if g.opts.Context != nil { - if v := g.opts.Context.Value(codecsKey{}); v != nil { - codecs = v.(map[string]encoding.Codec) - } - } - if c, ok := codecs[contentType]; ok { - return wrapCodec{c}, nil - } - if c, ok := defaultGRPCCodecs[contentType]; ok { + g.RLock() + defer g.RUnlock() + + if c, ok := g.codecs[contentType]; ok { return wrapCodec{c}, nil } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } @@ -721,6 +719,22 @@ func newClient(opts ...client.Option) client.Client { c = options.Wrappers[i-1](c) } + rc.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) + for k, v := range defaultGRPCCodecs { + rc.codecs[k] = v + } + + var codecs map[string]encoding.Codec + if rc.opts.Context != nil { + if v := rc.opts.Context.Value(codecsKey{}); v != nil { + codecs = v.(map[string]encoding.Codec) + } + } + + for k, v := range codecs { + rc.codecs[k] = v + } + return c } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 1de8a83e..98ce29db 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -64,6 +64,8 @@ type grpcServer struct { // registry service instance rsvc *registry.Service + + codecs map[string]encoding.Codec } func init() { @@ -73,22 +75,19 @@ func init() { } func newGRPCServer(opts ...server.Option) server.Server { - options := newOptions(opts...) - // create a grpc server srv := &grpcServer{ - opts: options, + opts: server.Options{}, rpc: &rServer{ serviceMap: make(map[string]*service), }, handlers: make(map[string]server.Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), - wg: wait(options.Context), } // configure the grpc server - srv.configure() + srv.configure(opts...) return srv } @@ -110,8 +109,8 @@ func (g *grpcServer) configure(opts ...server.Option) { g.Lock() defer g.Unlock() - // Don't reprocess where there's no config - if len(opts) == 0 && g.srv != nil { + // Don't reprocess if server created + if g.srv != nil { return } @@ -121,6 +120,22 @@ func (g *grpcServer) configure(opts ...server.Option) { g.wg = wait(g.opts.Context) + g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) + for k, v := range defaultGRPCCodecs { + g.codecs[k] = v + } + + var codecs map[string]encoding.Codec + if g.opts.Context != nil { + if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { + codecs = v + } + } + + for k, v := range codecs { + g.codecs[k] = v + } + maxMsgSize := g.getMaxMsgSize() gopts := []grpc.ServerOption{ @@ -519,18 +534,13 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m } func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) { - codecs := make(map[string]encoding.Codec) - if g.opts.Context != nil { - if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { - codecs = v - } - } - if c, ok := codecs[contentType]; ok { - return c, nil - } - if c, ok := defaultGRPCCodecs[contentType]; ok { + g.RLock() + defer g.RUnlock() + + if c, ok := g.codecs[contentType]; ok { return c, nil } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) }