grpc: avoid allocations for each message (#1939)
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										44
									
								
								grpc.go
									
									
									
									
									
								
							
							
						
						
									
										44
									
								
								grpc.go
									
									
									
									
									
								
							| @@ -64,6 +64,8 @@ type grpcServer struct { | ||||
|  | ||||
| 	// registry service instance | ||||
| 	rsvc *registry.Service | ||||
|  | ||||
| 	codecs map[string]encoding.Codec | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| @@ -73,22 +75,19 @@ func init() { | ||||
| } | ||||
|  | ||||
| func newGRPCServer(opts ...server.Option) server.Server { | ||||
| 	options := newOptions(opts...) | ||||
|  | ||||
| 	// create a grpc server | ||||
| 	srv := &grpcServer{ | ||||
| 		opts: options, | ||||
| 		opts: server.Options{}, | ||||
| 		rpc: &rServer{ | ||||
| 			serviceMap: make(map[string]*service), | ||||
| 		}, | ||||
| 		handlers:    make(map[string]server.Handler), | ||||
| 		subscribers: make(map[*subscriber][]broker.Subscriber), | ||||
| 		exit:        make(chan chan error), | ||||
| 		wg:          wait(options.Context), | ||||
| 	} | ||||
|  | ||||
| 	// configure the grpc server | ||||
| 	srv.configure() | ||||
| 	srv.configure(opts...) | ||||
|  | ||||
| 	return srv | ||||
| } | ||||
| @@ -110,8 +109,8 @@ func (g *grpcServer) configure(opts ...server.Option) { | ||||
| 	g.Lock() | ||||
| 	defer g.Unlock() | ||||
|  | ||||
| 	// Don't reprocess where there's no config | ||||
| 	if len(opts) == 0 && g.srv != nil { | ||||
| 	// Don't reprocess if server created | ||||
| 	if g.srv != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -121,6 +120,22 @@ func (g *grpcServer) configure(opts ...server.Option) { | ||||
|  | ||||
| 	g.wg = wait(g.opts.Context) | ||||
|  | ||||
| 	g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs)) | ||||
| 	for k, v := range defaultGRPCCodecs { | ||||
| 		g.codecs[k] = v | ||||
| 	} | ||||
|  | ||||
| 	var codecs map[string]encoding.Codec | ||||
| 	if g.opts.Context != nil { | ||||
| 		if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { | ||||
| 			codecs = v | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for k, v := range codecs { | ||||
| 		g.codecs[k] = v | ||||
| 	} | ||||
|  | ||||
| 	maxMsgSize := g.getMaxMsgSize() | ||||
|  | ||||
| 	gopts := []grpc.ServerOption{ | ||||
| @@ -519,18 +534,13 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m | ||||
| } | ||||
|  | ||||
| func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) { | ||||
| 	codecs := make(map[string]encoding.Codec) | ||||
| 	if g.opts.Context != nil { | ||||
| 		if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil { | ||||
| 			codecs = v | ||||
| 		} | ||||
| 	} | ||||
| 	if c, ok := codecs[contentType]; ok { | ||||
| 		return c, nil | ||||
| 	} | ||||
| 	if c, ok := defaultGRPCCodecs[contentType]; ok { | ||||
| 	g.RLock() | ||||
| 	defer g.RUnlock() | ||||
|  | ||||
| 	if c, ok := g.codecs[contentType]; ok { | ||||
| 		return c, nil | ||||
| 	} | ||||
|  | ||||
| 	return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user