This reverts commit 2a23224d911b0aeccf0614afb937a18ad7d59bca.
This commit is contained in:
parent
2a23224d91
commit
19ef225b2f
@ -8,7 +8,6 @@ import (
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -25,11 +24,9 @@ import (
|
||||
)
|
||||
|
||||
type grpcClient struct {
|
||||
opts client.Options
|
||||
codecs map[string]encoding.Codec
|
||||
pool *pool
|
||||
once atomic.Value
|
||||
sync.RWMutex
|
||||
opts client.Options
|
||||
pool *pool
|
||||
once atomic.Value
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -304,13 +301,18 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
|
||||
}
|
||||
|
||||
func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
|
||||
g.RLock()
|
||||
defer g.RUnlock()
|
||||
|
||||
if c, ok := g.codecs[contentType]; ok {
|
||||
codecs := make(map[string]encoding.Codec)
|
||||
if g.opts.Context != nil {
|
||||
if v := g.opts.Context.Value(codecsKey{}); v != nil {
|
||||
codecs = v.(map[string]encoding.Codec)
|
||||
}
|
||||
}
|
||||
if c, ok := codecs[contentType]; ok {
|
||||
return wrapCodec{c}, nil
|
||||
}
|
||||
if c, ok := defaultGRPCCodecs[contentType]; ok {
|
||||
return wrapCodec{c}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
||||
}
|
||||
|
||||
@ -719,22 +721,6 @@ func newClient(opts ...client.Option) client.Client {
|
||||
c = options.Wrappers[i-1](c)
|
||||
}
|
||||
|
||||
rc.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs))
|
||||
for k, v := range defaultGRPCCodecs {
|
||||
rc.codecs[k] = v
|
||||
}
|
||||
|
||||
var codecs map[string]encoding.Codec
|
||||
if rc.opts.Context != nil {
|
||||
if v := rc.opts.Context.Value(codecsKey{}); v != nil {
|
||||
codecs = v.(map[string]encoding.Codec)
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range codecs {
|
||||
rc.codecs[k] = v
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -64,8 +64,6 @@ type grpcServer struct {
|
||||
|
||||
// registry service instance
|
||||
rsvc *registry.Service
|
||||
|
||||
codecs map[string]encoding.Codec
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -75,19 +73,22 @@ func init() {
|
||||
}
|
||||
|
||||
func newGRPCServer(opts ...server.Option) server.Server {
|
||||
options := newOptions(opts...)
|
||||
|
||||
// create a grpc server
|
||||
srv := &grpcServer{
|
||||
opts: server.Options{},
|
||||
opts: options,
|
||||
rpc: &rServer{
|
||||
serviceMap: make(map[string]*service),
|
||||
},
|
||||
handlers: make(map[string]server.Handler),
|
||||
subscribers: make(map[*subscriber][]broker.Subscriber),
|
||||
exit: make(chan chan error),
|
||||
wg: wait(options.Context),
|
||||
}
|
||||
|
||||
// configure the grpc server
|
||||
srv.configure(opts...)
|
||||
srv.configure()
|
||||
|
||||
return srv
|
||||
}
|
||||
@ -109,8 +110,8 @@ func (g *grpcServer) configure(opts ...server.Option) {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
// Don't reprocess if server created
|
||||
if g.srv != nil {
|
||||
// Don't reprocess where there's no config
|
||||
if len(opts) == 0 && g.srv != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -120,22 +121,6 @@ func (g *grpcServer) configure(opts ...server.Option) {
|
||||
|
||||
g.wg = wait(g.opts.Context)
|
||||
|
||||
g.codecs = make(map[string]encoding.Codec, len(defaultGRPCCodecs))
|
||||
for k, v := range defaultGRPCCodecs {
|
||||
g.codecs[k] = v
|
||||
}
|
||||
|
||||
var codecs map[string]encoding.Codec
|
||||
if g.opts.Context != nil {
|
||||
if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil {
|
||||
codecs = v
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range codecs {
|
||||
g.codecs[k] = v
|
||||
}
|
||||
|
||||
maxMsgSize := g.getMaxMsgSize()
|
||||
|
||||
gopts := []grpc.ServerOption{
|
||||
@ -534,13 +519,18 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m
|
||||
}
|
||||
|
||||
func (g *grpcServer) newGRPCCodec(contentType string) (encoding.Codec, error) {
|
||||
g.RLock()
|
||||
defer g.RUnlock()
|
||||
|
||||
if c, ok := g.codecs[contentType]; ok {
|
||||
codecs := make(map[string]encoding.Codec)
|
||||
if g.opts.Context != nil {
|
||||
if v, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && v != nil {
|
||||
codecs = v
|
||||
}
|
||||
}
|
||||
if c, ok := codecs[contentType]; ok {
|
||||
return c, nil
|
||||
}
|
||||
if c, ok := defaultGRPCCodecs[contentType]; ok {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user