diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9e16696 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +bin + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# General +.DS_Store +.idea +.vscode \ No newline at end of file diff --git a/grpc.go b/grpc.go index 2a5f3dd..8fe5de6 100644 --- a/grpc.go +++ b/grpc.go @@ -98,6 +98,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, maxRecvMsgSize := g.maxRecvMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue() + cfgService := g.serviceConfig() var grr error @@ -116,6 +117,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), ), + grpc.WithDefaultServiceConfig(cfgService), } if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil { @@ -221,6 +223,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request maxRecvMsgSize := g.maxRecvMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue() + cfgService := g.serviceConfig() grpcDialOptions := []grpc.DialOption{ g.secure(addr), @@ -228,6 +231,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), ), + grpc.WithDefaultServiceConfig(cfgService), } if opts := g.getGrpcDialOptions(opts.Context); opts != nil { @@ -372,6 +376,17 @@ func (g *grpcClient) newCodec(ct string) (codec.Codec, error) { return nil, codec.ErrUnknownContentType } +func (g *grpcClient) serviceConfig() string { + if g.opts.Context == nil { + return DefaultServiceConfig + } + v := g.opts.Context.Value(serviceConfigKey{}) + if v == nil { + return DefaultServiceConfig + } + return v.(string) +} + func (g *grpcClient) Init(opts ...client.Option) error { if len(opts) == 0 && g.init { return nil @@ -760,7 +775,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c } body = b } - msgs = append(msgs, &broker.Message{Header: md, Body: body}) } @@ -834,7 +848,6 @@ func NewClient(opts ...client.Option) client.Client { } rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) - c := client.Client(rc) // wrap in reverse diff --git a/options.go b/options.go index 0404438..02e56a8 100644 --- a/options.go +++ b/options.go @@ -25,6 +25,9 @@ var ( // DefaultMaxSendMsgSize maximum message that client can send // (4 MB). DefaultMaxSendMsgSize = 1024 * 1024 * 4 + + // DefaultServiceConfig enable load balancing + DefaultServiceConfig = `{"loadBalancingPolicy":"round_robin"}` ) type poolMaxStreams struct{} @@ -115,3 +118,14 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption { o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) } } + +type serviceConfigKey struct{} + +func ServiceConfig(str string) client.CallOption { + return func(options *client.CallOptions) { + if options.Context == nil { + options.Context = context.Background() + } + options.Context = context.WithValue(options.Context, serviceConfigKey{}, str) + } +}