issue_132 #134

Merged
vtolstov merged 4 commits from devstigneev/micro-client-grpc:issue_132 into v3 2024-02-29 23:07:34 +03:00
4 changed files with 47 additions and 1037 deletions
Showing only changes of commit 990685628d - Show all commits

15
go.mod
View File

@ -1,8 +1,17 @@
module go.unistack.org/micro-client-grpc/v3 module go.unistack.org/micro-client-grpc/v3
go 1.16 go 1.20
require ( require (
go.unistack.org/micro/v3 v3.10.22 go.unistack.org/micro/v3 v3.10.28
google.golang.org/grpc v1.52.3 google.golang.org/grpc v1.59.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/protobuf v1.31.0 // indirect
) )

1038
go.sum

File diff suppressed because it is too large Load Diff

27
grpc.go
View File

@ -120,6 +120,9 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpc.WithDefaultServiceConfig(cfgService), grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
@ -730,6 +733,10 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
if v, ok := os.LookupEnv("MICRO_PROXY"); ok { if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v exchange = v
} }
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
msgs := make([]*broker.Message, 0, len(ps)) msgs := make([]*broker.Message, 0, len(ps))
@ -741,6 +748,16 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
for _, p := range ps { for _, p := range ps {
md := metadata.Copy(omd) md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType() md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator()
var k, v string
for iter.Next(&k, &v) {
md.Set(k, v)
}
// passed in raw data // passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok { if d, ok := p.Payload().(*codec.Frame); ok {
@ -758,16 +775,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
} }
body = b body = b
} }
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body}) msgs = append(msgs, &broker.Message{Header: md, Body: body})
} }

View File

@ -98,8 +98,8 @@ func MaxSendMsgSize(s int) client.Option {
type grpcDialOptions struct{} type grpcDialOptions struct{}
// DialOptions to be used to configure gRPC dial options // DialOptions to be used to configure gRPC dial options
func DialOptions(opts ...grpc.DialOption) client.CallOption { func DialOptions(opts ...grpc.DialOption) client.Option {
return func(o *client.CallOptions) { return func(o *client.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }