Compare commits

..

7 Commits

Author SHA1 Message Date
2adba9d0da export grpc conn pool
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 18:09:07 +03:00
9d70c4dd34 Merge pull request 'fixup md' (#131) from pubmdfix into v3
Some checks failed
build / test (push) Failing after 1m28s
build / lint (push) Failing after 2m36s
codeql / analyze (go) (push) Failing after 2m50s
Reviewed-on: #131
2023-12-21 00:17:46 +03:00
b9704903f2 fixup md
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m39s
prbuild / test (pull_request) Failing after 1m28s
prbuild / lint (pull_request) Failing after 2m40s
autoapprove / autoapprove (pull_request) Failing after 1m25s
automerge / automerge (pull_request) Failing after 3s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-12-21 00:17:25 +03:00
4eda58e404 Merge pull request 'fix MessageMetadata option' (#130) from client-metadata into v3
Some checks failed
build / test (push) Failing after 1m0s
build / lint (push) Successful in 13s
codeql / analyze (go) (push) Failing after 1m1s
Reviewed-on: #130
2023-10-26 03:15:02 +03:00
0f8d0a1123 fix MessageMetadata option
Some checks failed
codeql / analyze (go) (pull_request) Failing after 1m18s
prbuild / test (pull_request) Failing after 1m4s
prbuild / lint (pull_request) Successful in 22s
autoapprove / autoapprove (pull_request) Failing after 8s
automerge / automerge (pull_request) Failing after 3s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-10-26 03:12:32 +03:00
603b855e2a Merge pull request 'dialopts fix' (#129) from dialoptsfix into v3
Some checks failed
build / test (push) Failing after 6s
build / lint (push) Failing after 5s
codeql / analyze (go) (push) Failing after 6s
Reviewed-on: #129
2023-06-23 12:24:12 +03:00
f2cfd562c3 dialopts fix
Some checks failed
autoapprove / autoapprove (pull_request) Failing after 6s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
codeql / analyze (go) (pull_request) Failing after 6s
prbuild / test (pull_request) Failing after 6s
prbuild / lint (pull_request) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-23 12:23:44 +03:00
4 changed files with 52 additions and 44 deletions

14
go.mod
View File

@@ -3,15 +3,15 @@ module go.unistack.org/micro-client-grpc/v3
go 1.20
require (
go.unistack.org/micro/v3 v3.10.22
google.golang.org/grpc v1.55.0
go.unistack.org/micro/v3 v3.10.28
google.golang.org/grpc v1.59.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/protobuf v1.30.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

28
go.sum
View File

@@ -3,20 +3,20 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
go.unistack.org/micro/v3 v3.10.22 h1:04QQTo+sxK5pttZfqPbhiuZkZMrdJTfiALoqamci6IQ=
go.unistack.org/micro/v3 v3.10.22/go.mod h1:XIArw29f0b3uvF4cq96X/nQt2f0J2OGnjh8J+DBbC0s=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
go.unistack.org/micro/v3 v3.10.28 h1:/87lGekrmi0/66pioy+Nh8lVUBBYnVqKoHiNYX5OmMI=
go.unistack.org/micro/v3 v3.10.28/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

40
grpc.go
View File

@@ -30,7 +30,7 @@ const (
)
type grpcClient struct {
pool *pool
pool *ConnPool
opts client.Options
sync.RWMutex
init bool
@@ -118,6 +118,9 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
),
}
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
@@ -130,13 +133,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
}
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
defer func() {
// defer execution of release
g.pool.release(cc, grr)
g.pool.Put(cc, grr)
}()
ch := make(chan error, 1)
@@ -239,7 +242,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
}
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
@@ -272,7 +275,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context
cancel()
// release the connection
g.pool.release(cc, err)
g.pool.Put(cc, err)
// now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
}
@@ -300,7 +303,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
}
// defer execution of release
g.pool.release(cc, err)
g.pool.Put(cc, err)
},
}
@@ -715,6 +718,10 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v
}
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
msgs := make([]*broker.Message, 0, len(ps))
@@ -726,6 +733,16 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
for _, p := range ps {
md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator()
var k, v string
for iter.Next(&k, &v) {
md.Set(k, v)
}
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
@@ -744,15 +761,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
body = b
}
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
@@ -825,7 +833,7 @@ func NewClient(opts ...client.Option) client.Client {
opts: options,
}
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc)

View File

@@ -10,7 +10,7 @@ import (
"google.golang.org/grpc/connectivity"
)
type pool struct {
type ConnPool struct {
conns map[string]*streamsPool
size int
ttl int64
@@ -34,7 +34,7 @@ type poolConn struct {
err error
*grpc.ClientConn
next *poolConn
pool *pool
pool *ConnPool
sp *streamsPool
pre *poolConn
addr string
@@ -43,14 +43,14 @@ type poolConn struct {
in bool
}
func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
if ms <= 0 {
ms = 1
}
if idle < 0 {
idle = 0
}
return &pool{
return &ConnPool{
size: size,
ttl: int64(ttl.Seconds()),
maxStreams: ms,
@@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
}
}
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:]
}
@@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil
}
func (p *pool) release(conn *poolConn, err error) {
func (p *ConnPool) Put(conn *poolConn, err error) {
p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn
@@ -183,7 +183,7 @@ func (p *pool) release(conn *poolConn, err error) {
}
func (conn *poolConn) Close() {
conn.pool.release(conn, conn.err)
conn.pool.Put(conn, conn.err)
}
func removeConn(conn *poolConn) {