Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-10-28 10:07:30 +03:00
parent 131916baca
commit 15a5d7d2cd
4 changed files with 13 additions and 22 deletions

23
grpc.go
View File

@@ -75,14 +75,13 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string
header = make(map[string]string)
if md, ok := metadata.FromContext(ctx); ok {
header = make(map[string]string, len(md))
for k, v := range md {
header[strings.ToLower(k)] = v
}
} else {
header = make(map[string]string)
header = make(map[string]string, 2)
}
// set timeout in nanoseconds
@@ -574,7 +573,10 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, verr
}
g.opts.Selector.Record(node, err)
if rerr := g.opts.Selector.Record(node, err); rerr != nil {
return nil, rerr
}
return stream, err
}
@@ -618,20 +620,9 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
}
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
var options client.PublishOptions
var body []byte
// fail early on connect error
if !g.once.Load().(bool) {
if err := g.opts.Broker.Connect(); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
g.once.Store(true)
}
for _, o := range opts {
o(&options)
}
options := client.NewPublishOptions(opts...)
md, ok := metadata.FromContext(ctx)
if !ok {
@@ -664,7 +655,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
topic = options.Exchange
}
return g.opts.Broker.Publish(topic, &broker.Message{
return g.opts.Broker.Publish(ctx, topic, &broker.Message{
Header: md,
Body: body,
}, broker.PublishContext(options.Context))