client/grpc: dont use codec for raw bytes payload (#1847)
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
cb2fc6e24d
commit
456d0c25f6
29
grpc.go
29
grpc.go
@ -618,6 +618,16 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
|
|
||||||
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
var options client.PublishOptions
|
var options client.PublishOptions
|
||||||
|
var body []byte
|
||||||
|
|
||||||
|
// fail early on connect error
|
||||||
|
if !g.once.Load().(bool) {
|
||||||
|
if err := g.opts.Broker.Connect(); err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
}
|
||||||
|
g.once.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
@ -629,17 +639,15 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
|||||||
md["Content-Type"] = p.ContentType()
|
md["Content-Type"] = p.ContentType()
|
||||||
md["Micro-Topic"] = p.Topic()
|
md["Micro-Topic"] = p.Topic()
|
||||||
|
|
||||||
cf, err := g.newGRPCCodec(p.ContentType())
|
|
||||||
if err != nil {
|
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
var body []byte
|
|
||||||
|
|
||||||
// passed in raw data
|
// passed in raw data
|
||||||
if d, ok := p.Payload().(*raw.Frame); ok {
|
if d, ok := p.Payload().(*raw.Frame); ok {
|
||||||
body = d.Data
|
body = d.Data
|
||||||
} else {
|
} else {
|
||||||
|
// use codec for payload
|
||||||
|
cf, err := g.newGRPCCodec(p.ContentType())
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
}
|
||||||
// set the body
|
// set the body
|
||||||
b, err := cf.Marshal(p.Payload())
|
b, err := cf.Marshal(p.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -648,13 +656,6 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
|||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
|
||||||
if !g.once.Load().(bool) {
|
|
||||||
if err = g.opts.Broker.Connect(); err != nil {
|
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
|
||||||
}
|
|
||||||
g.once.Store(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
topic := p.Topic()
|
topic := p.Topic()
|
||||||
|
|
||||||
// get the exchange
|
// get the exchange
|
||||||
|
Loading…
Reference in New Issue
Block a user