Revert "grpc: avoid allocations for each message (#1939)" (#1941)

This reverts commit 2a23224d911b0aeccf0614afb937a18ad7d59bca.
This commit is contained in:
Asim Aslam 2020-08-18 14:44:29 +01:00 committed by Vasiliy Tolstov
parent a8144c0d7c
commit 67f3adcafd

44
grpc.go
View File

@ -64,8 +64,6 @@ type grpcServer struct {
// registry service instance // registry service instance
rsvc *registry.Service rsvc *registry.Service
codecs map[string]encoding.Codec
} }
func init() { func init() {
@ -75,19 +73,22 @@ func init() {
} }
func newGRPCServer(opts ...server.Option) server.Server { func newGRPCServer(opts ...server.Option) server.Server {
options := newOptions(opts...)
// create a grpc server // create a grpc server
srv := &grpcServer{ srv := &grpcServer{
opts: server.Options{}, opts: options,
rpc: &rServer{ rpc: &rServer{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
}, },
handlers: make(map[string]server.Handler), handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber), subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error), exit: make(chan chan error),
wg: wait(options.Context),
} }
// configure the grpc server // configure the grpc server
srv.configure(opts...) srv.configure()
return srv return srv
} }
@ -109,8 +110,8 @@ func (g *grpcServer) configure(opts ...server.Option) {
g.Lock() g.Lock()
defer g.Unlock() defer g.Unlock()
// Don't reprocess if server created // Don't reprocess where there's no config
if g.srv != nil { if len(opts) == 0 && g.srv != nil {
return return
} }
@ -120,22 +121,6 @@ func (g *grpcServer) configure(opts ...server.Option) {
g.wg = wait(g.opts.Context) g.wg = wait(g.opts.Context)
g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs))
for k, v := range defaultGRPCCodecs {
g.codecs[k] = v
}
var codecs map[string]encoding.Codec
if g.opts.Context != nil {
if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil {
codecs = v
}
}
for k, v := range codecs {
g.codecs[k] = v
}
maxMsgSize := g.getMaxMsgSize() maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{ gopts := []grpc.ServerOption{
@ -534,13 +519,18 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
} }
func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) { func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) {
g.RLock() codecs := make(map[string]encoding.Codec)
defer g.RUnlock() if g.opts.Context != nil {
if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil {
if c, ok := g.codecs[contentType]; ok { codecs = v
}
}
if c, ok := codecs[contentType]; ok {
return c, nil
}
if c, ok := defaultGRPCCodecs[contentType]; ok {
return c, nil return c, nil
} }
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
} }