From 1fbc056dd4d9619a00a84b56fe3c917c0614e876 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 8 Apr 2020 12:50:19 +0300 Subject: [PATCH] minimize allocations (#1472) * server: minimize allocations on re-register Signed-off-by: Vasiliy Tolstov * server: stop old instance before Init() Signed-off-by: Vasiliy Tolstov * client/grpc: fix allocations in protobuf marshal Signed-off-by: Vasiliy Tolstov * codec/json: fix allocations in protobuf marshal Signed-off-by: Vasiliy Tolstov * remove stop from init Signed-off-by: Vasiliy Tolstov * codec/grpc: expose MaxMessageSize Signed-off-by: Vasiliy Tolstov * codec: use buffer pool Signed-off-by: Vasiliy Tolstov * metadata: minimize reallocations Signed-off-by: Vasiliy Tolstov * util/wrapper: use metadata helper Signed-off-by: Vasiliy Tolstov * registry/cache: move logs to debug level Signed-off-by: Vasiliy Tolstov * server: move logs to debug level Signed-off-by: Vasiliy Tolstov * server: cache service only when Advertise is ip addr Signed-off-by: Vasiliy Tolstov * server: use metadata.Copy Signed-off-by: Vasiliy Tolstov --- client/grpc/codec.go | 19 +++++-- codec/bytes/marshaler.go | 6 +- codec/grpc/util.go | 6 +- codec/json/marshaler.go | 17 +++++- codec/proto/marshaler.go | 32 ++++++++++- metadata/metadata.go | 6 +- registry/cache/cache.go | 8 +-- server/grpc/grpc.go | 79 ++++++++++++++++++-------- server/rpc_server.go | 116 +++++++++++++++++++++++++++++---------- util/wrapper/wrapper.go | 14 +---- 10 files changed, 220 insertions(+), 83 deletions(-) diff --git a/client/grpc/codec.go b/client/grpc/codec.go index a7ee99fc..e5a6730e 100644 --- a/client/grpc/codec.go +++ b/client/grpc/codec.go @@ -11,6 +11,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/micro/go-micro/v2/codec" "github.com/micro/go-micro/v2/codec/bytes" + "github.com/oxtoacart/bpool" "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) @@ -23,6 +24,9 @@ type wrapCodec struct{ encoding.Codec } var jsonpbMarshaler = &jsonpb.Marshaler{} var useNumber bool +// create buffer pool with 16 instances each preallocated with 256 bytes +var bufferPool = bpool.NewSizedBufferPool(16, 256) + var ( defaultGRPCCodecs = map[string]encoding.Codec{ "application/json": jsonCodec{}, @@ -106,14 +110,19 @@ func (bytesCodec) Name() string { } func (jsonCodec) Marshal(v interface{}) ([]byte, error) { - if pb, ok := v.(proto.Message); ok { - s, err := jsonpbMarshaler.MarshalToString(pb) - - return []byte(s), err - } if b, ok := v.(*bytes.Frame); ok { return b.Data, nil } + + if pb, ok := v.(proto.Message); ok { + buf := bufferPool.Get() + defer bufferPool.Put(buf) + if err := jsonpbMarshaler.Marshal(buf, pb); err != nil { + return nil, err + } + return buf.Bytes(), nil + } + return json.Marshal(v) } diff --git a/codec/bytes/marshaler.go b/codec/bytes/marshaler.go index 86af8984..d15e8d75 100644 --- a/codec/bytes/marshaler.go +++ b/codec/bytes/marshaler.go @@ -1,7 +1,7 @@ package bytes import ( - "errors" + "github.com/micro/go-micro/v2/codec" ) type Marshaler struct{} @@ -20,7 +20,7 @@ func (n Marshaler) Marshal(v interface{}) ([]byte, error) { case *Message: return ve.Body, nil } - return nil, errors.New("invalid message") + return nil, codec.ErrInvalidMessage } func (n Marshaler) Unmarshal(d []byte, v interface{}) error { @@ -30,7 +30,7 @@ func (n Marshaler) Unmarshal(d []byte, v interface{}) error { case *Message: ve.Body = d } - return errors.New("invalid message") + return codec.ErrInvalidMessage } func (n Marshaler) String() string { diff --git a/codec/grpc/util.go b/codec/grpc/util.go index 04c5ee38..6ba77e22 100644 --- a/codec/grpc/util.go +++ b/codec/grpc/util.go @@ -7,7 +7,7 @@ import ( ) var ( - maxMessageSize = 1024 * 1024 * 4 + MaxMessageSize = 1024 * 1024 * 4 // 4Mb maxInt = int(^uint(0) >> 1) ) @@ -34,8 +34,8 @@ func decode(r io.Reader) (uint8, []byte, error) { if int64(length) > int64(maxInt) { return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) } - if int(length) > maxMessageSize { - return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, maxMessageSize) + if int(length) > MaxMessageSize { + return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, MaxMessageSize) } msg := make([]byte, int(length)) diff --git a/codec/json/marshaler.go b/codec/json/marshaler.go index a13e5f39..2f6bcd54 100644 --- a/codec/json/marshaler.go +++ b/codec/json/marshaler.go @@ -1,21 +1,36 @@ package json import ( + "bytes" "encoding/json" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" + "github.com/oxtoacart/bpool" ) +var jsonpbMarshaler = &jsonpb.Marshaler{} + +// create buffer pool with 16 instances each preallocated with 256 bytes +var bufferPool = bpool.NewSizedBufferPool(16, 256) + type Marshaler struct{} func (j Marshaler) Marshal(v interface{}) ([]byte, error) { + if pb, ok := v.(proto.Message); ok { + buf := bufferPool.Get() + defer bufferPool.Put(buf) + if err := jsonpbMarshaler.Marshal(buf, pb); err != nil { + return nil, err + } + return buf.Bytes(), nil + } return json.Marshal(v) } func (j Marshaler) Unmarshal(d []byte, v interface{}) error { if pb, ok := v.(proto.Message); ok { - return jsonpb.UnmarshalString(string(d), pb) + return jsonpb.Unmarshal(bytes.NewReader(d), pb) } return json.Unmarshal(d, v) } diff --git a/codec/proto/marshaler.go b/codec/proto/marshaler.go index 8f9eee2a..82acc4b2 100644 --- a/codec/proto/marshaler.go +++ b/codec/proto/marshaler.go @@ -1,17 +1,45 @@ package proto import ( + "bytes" + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/v2/codec" + "github.com/oxtoacart/bpool" ) +// create buffer pool with 16 instances each preallocated with 256 bytes +var bufferPool = bpool.NewSizedBufferPool(16, 256) + type Marshaler struct{} func (Marshaler) Marshal(v interface{}) ([]byte, error) { - return proto.Marshal(v.(proto.Message)) + pb, ok := v.(proto.Message) + if !ok { + return nil, codec.ErrInvalidMessage + } + + // looks not good, but allows to reuse underlining bytes + buf := bufferPool.Get() + pbuf := proto.NewBuffer(buf.Bytes()) + defer func() { + bufferPool.Put(bytes.NewBuffer(pbuf.Bytes())) + }() + + if err := pbuf.Marshal(pb); err != nil { + return nil, err + } + + return pbuf.Bytes(), nil } func (Marshaler) Unmarshal(data []byte, v interface{}) error { - return proto.Unmarshal(data, v.(proto.Message)) + pb, ok := v.(proto.Message) + if !ok { + return codec.ErrInvalidMessage + } + + return proto.Unmarshal(data, pb) } func (Marshaler) String() string { diff --git a/metadata/metadata.go b/metadata/metadata.go index 96693cf6..bd539314 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -34,7 +34,7 @@ func (md Metadata) Delete(key string) { // Copy makes a copy of the metadata func Copy(md Metadata) Metadata { - cmd := make(Metadata) + cmd := make(Metadata, len(md)) for k, v := range md { cmd[k] = v } @@ -86,7 +86,7 @@ func FromContext(ctx context.Context) (Metadata, bool) { } // capitalise all values - newMD := make(map[string]string, len(md)) + newMD := make(Metadata, len(md)) for k, v := range md { newMD[strings.Title(k)] = v } @@ -105,7 +105,7 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context ctx = context.Background() } md, _ := ctx.Value(MetadataKey{}).(Metadata) - cmd := make(Metadata) + cmd := make(Metadata, len(md)) for k, v := range md { cmd[k] = v } diff --git a/registry/cache/cache.go b/registry/cache/cache.go index a68e32ef..eb12d16d 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -339,8 +339,8 @@ func (c *cache) run() { c.setStatus(err) if a > 3 { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Info("rcache: ", err, " backing off ", d) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debug("rcache: ", err, " backing off ", d) } a = 0 } @@ -364,8 +364,8 @@ func (c *cache) run() { c.setStatus(err) if b > 3 { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Info("rcache: ", err, " backing off ", d) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debug("rcache: ", err, " backing off ", d) } b = 0 } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 2b0dc913..dd711d3f 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -59,6 +59,9 @@ type grpcServer struct { started bool // used for first registration registered bool + + // registry service instance + rsvc *registry.Service } func init() { @@ -102,6 +105,9 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se } func (g *grpcServer) configure(opts ...server.Option) { + g.Lock() + defer g.Unlock() + // Don't reprocess where there's no config if len(opts) == 0 && g.srv != nil { return @@ -127,6 +133,7 @@ func (g *grpcServer) configure(opts ...server.Option) { gopts = append(gopts, opts...) } + g.rsvc = nil g.srv = grpc.NewServer(gopts...) } @@ -559,11 +566,24 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error { } func (g *grpcServer) Register() error { + + g.RLock() + rsvc := g.rsvc + config := g.opts + g.RUnlock() + + // if service already filled, reuse it and return early + if rsvc != nil { + rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} + if err := config.Registry.Register(rsvc, rOpts...); err != nil { + return err + } + return nil + } + var err error var advt, host, port string - - // parse address for host, port - config := g.opts + var cacheService bool // check the advertise address first // if it exists then use it, otherwise @@ -584,16 +604,17 @@ func (g *grpcServer) Register() error { host = advt } + if ip := net.ParseIP(host); ip != nil { + cacheService = true + } + addr, err := addr.Extract(host) if err != nil { return err } // make copy of metadata - md := make(meta.Metadata) - for k, v := range config.Metadata { - md[k] = v - } + md := meta.Copy(config.Metadata) // register service node := ®istry.Node{ @@ -646,13 +667,13 @@ func (g *grpcServer) Register() error { Endpoints: endpoints, } - g.Lock() + g.RLock() registered := g.registered - g.Unlock() + g.RUnlock() if !registered { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) } } @@ -671,6 +692,9 @@ func (g *grpcServer) Register() error { g.Lock() defer g.Unlock() + if cacheService { + g.rsvc = service + } g.registered = true for sb := range g.subscribers { @@ -688,8 +712,8 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.DisableAutoAck()) } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Subscribing to topic: %s", sb.Topic()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debug("Subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { @@ -705,7 +729,9 @@ func (g *grpcServer) Deregister() error { var err error var advt, host, port string + g.RLock() config := g.opts + g.RUnlock() // check the advertise address first // if it exists then use it, otherwise @@ -742,14 +768,15 @@ func (g *grpcServer) Deregister() error { Nodes: []*registry.Node{node}, } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Deregistering node: %s", node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Deregistering node: %s", node.Id) } if err := config.Registry.Deregister(service); err != nil { return err } g.Lock() + g.rsvc = nil if !g.registered { g.Unlock() @@ -760,8 +787,8 @@ func (g *grpcServer) Deregister() error { for sb, subs := range g.subscribers { for _, sub := range subs { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Unsubscribing from topic: %s", sub.Topic()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Unsubscribing from topic: %s", sub.Topic()) } sub.Unsubscribe() } @@ -819,11 +846,14 @@ func (g *grpcServer) Start() error { if len(g.subscribers) > 0 { // connect to the broker if err := config.Broker.Connect(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) + } return err } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } @@ -900,11 +930,15 @@ func (g *grpcServer) Start() error { // close transport ch <- nil - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker - config.Broker.Disconnect() + if err := config.Broker.Disconnect(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) + } + } }() // mark the server as started @@ -930,6 +964,7 @@ func (g *grpcServer) Stop() error { select { case err = <-ch: g.Lock() + g.rsvc = nil g.started = false g.Unlock() } diff --git a/server/rpc_server.go b/server/rpc_server.go index 4619fa64..5bb5740d 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -40,6 +40,8 @@ type rpcServer struct { subscriber broker.Subscriber // graceful exit wg *sync.WaitGroup + + rsvc *registry.Service } func newRpcServer(opts ...Option) Server { @@ -459,10 +461,11 @@ func (s *rpcServer) Options() Options { func (s *rpcServer) Init(opts ...Option) error { s.Lock() + defer s.Unlock() + for _, opt := range opts { opt(&s.opts) } - // update router if its the default if s.opts.Router == nil { r := newRpcRouter() @@ -472,7 +475,8 @@ func (s *rpcServer) Init(opts ...Option) error { s.router = r } - s.Unlock() + s.rsvc = nil + return nil } @@ -510,11 +514,24 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { } func (s *rpcServer) Register() error { + + s.RLock() + rsvc := s.rsvc + config := s.Options() + s.RUnlock() + + if rsvc != nil { + rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} + if err := config.Registry.Register(rsvc, rOpts...); err != nil { + return err + } + + return nil + } + var err error var advt, host, port string - - // parse address for host, port - config := s.Options() + var cacheService bool // check the advertise address first // if it exists then use it, otherwise @@ -535,16 +552,17 @@ func (s *rpcServer) Register() error { host = advt } + if ip := net.ParseIP(host); ip != nil { + cacheService = true + } + addr, err := addr.Extract(host) if err != nil { return err } // make copy of metadata - md := make(metadata.Metadata) - for k, v := range config.Metadata { - md[k] = v - } + md := metadata.Copy(config.Metadata) // mq-rpc(eg. nats) doesn't need the port. its addr is queue name. if port != "" { @@ -612,7 +630,9 @@ func (s *rpcServer) Register() error { s.RUnlock() if !registered { - log.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) + } } // create registry options @@ -630,6 +650,9 @@ func (s *rpcServer) Register() error { s.Lock() defer s.Unlock() + if cacheService { + s.rsvc = service + } s.registered = true // set what we're advertising s.opts.Advertise = addr @@ -665,8 +688,9 @@ func (s *rpcServer) Register() error { if err != nil { return err } - log.Infof("Subscribing to topic: %s", sub.Topic()) - + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Subscribing to topic: %s", sub.Topic()) + } s.subscribers[sb] = []broker.Subscriber{sub} } @@ -677,7 +701,9 @@ func (s *rpcServer) Deregister() error { var err error var advt, host, port string + s.RLock() config := s.Options() + s.RUnlock() // check the advertise address first // if it exists then use it, otherwise @@ -719,12 +745,15 @@ func (s *rpcServer) Deregister() error { Nodes: []*registry.Node{node}, } - log.Infof("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id) + } if err := config.Registry.Deregister(service); err != nil { return err } s.Lock() + s.rsvc = nil if !s.registered { s.Unlock() @@ -741,7 +770,9 @@ func (s *rpcServer) Deregister() error { for sb, subs := range s.subscribers { for _, sub := range subs { - log.Infof("Unsubscribing %s from topic: %s", node.Id, sub.Topic()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Unsubscribing %s from topic: %s", node.Id, sub.Topic()) + } sub.Unsubscribe() } s.subscribers[sb] = nil @@ -767,7 +798,9 @@ func (s *rpcServer) Start() error { return err } - log.Infof("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) + } // swap address s.Lock() @@ -775,22 +808,31 @@ func (s *rpcServer) Start() error { s.opts.Address = ts.Addr() s.Unlock() + bname := config.Broker.String() + // connect to the broker if err := config.Broker.Connect(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Broker [%s] connect error: %v", bname, err) + } return err } - bname := config.Broker.String() - - log.Infof("Broker [%s] Connected to %s", bname, config.Broker.Address()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Broker [%s] Connected to %s", bname, config.Broker.Address()) + } // use RegisterCheck func before register if err = s.opts.RegisterCheck(s.opts.Context); err != nil { - log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + } } else { // announce self to the world if err = s.Register(); err != nil { - log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + } } } @@ -811,7 +853,9 @@ func (s *rpcServer) Start() error { // check the error and backoff default: if err != nil { - log.Errorf("Accept error: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Accept error: %v", err) + } time.Sleep(time.Second) continue } @@ -844,17 +888,25 @@ func (s *rpcServer) Start() error { s.RUnlock() rerr := s.opts.RegisterCheck(s.opts.Context) if rerr != nil && registered { - log.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err) + } // deregister self in case of error if err := s.Deregister(); err != nil { - log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + } } } else if rerr != nil && !registered { - log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + } continue } if err := s.Register(); err != nil { - log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + } } // wait for exit case ch = <-s.exit: @@ -870,7 +922,9 @@ func (s *rpcServer) Start() error { if registered { // deregister self if err := s.Deregister(); err != nil { - log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + } } } @@ -886,9 +940,15 @@ func (s *rpcServer) Start() error { // close transport listener ch <- ts.Close() - log.Infof("Broker [%s] Disconnected from %s", bname, config.Broker.Address()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + log.Debugf("Broker [%s] Disconnected from %s", bname, config.Broker.Address()) + } // disconnect the broker - config.Broker.Disconnect() + if err := config.Broker.Disconnect(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + log.Errorf("Broker [%s] Disconnect error: %v", bname, err) + } + } // swap back address s.Lock() diff --git a/util/wrapper/wrapper.go b/util/wrapper/wrapper.go index 100f2f2b..505772b6 100644 --- a/util/wrapper/wrapper.go +++ b/util/wrapper/wrapper.go @@ -35,18 +35,8 @@ var ( ) func (c *clientWrapper) setHeaders(ctx context.Context) context.Context { - // copy metadata - mda, _ := metadata.FromContext(ctx) - md := metadata.Copy(mda) - - // set headers - for k, v := range c.headers { - if _, ok := md[k]; !ok { - md[k] = v - } - } - - return metadata.NewContext(ctx, md) + // don't overwrite keys + return metadata.MergeContext(ctx, c.headers, false) } func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {