Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
2adba9d0da | |||
9d70c4dd34 | |||
b9704903f2 | |||
4eda58e404 | |||
0f8d0a1123 |
14
go.mod
14
go.mod
@@ -3,15 +3,15 @@ module go.unistack.org/micro-client-grpc/v3
|
|||||||
go 1.20
|
go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
go.unistack.org/micro/v3 v3.10.22
|
go.unistack.org/micro/v3 v3.10.28
|
||||||
google.golang.org/grpc v1.55.0
|
google.golang.org/grpc v1.59.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
golang.org/x/net v0.8.0 // indirect
|
golang.org/x/net v0.17.0 // indirect
|
||||||
golang.org/x/sys v0.6.0 // indirect
|
golang.org/x/sys v0.13.0 // indirect
|
||||||
golang.org/x/text v0.8.0 // indirect
|
golang.org/x/text v0.13.0 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
google.golang.org/protobuf v1.31.0 // indirect
|
||||||
)
|
)
|
||||||
|
28
go.sum
28
go.sum
@@ -3,20 +3,20 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
|
|||||||
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||||
go.unistack.org/micro/v3 v3.10.22 h1:04QQTo+sxK5pttZfqPbhiuZkZMrdJTfiALoqamci6IQ=
|
go.unistack.org/micro/v3 v3.10.28 h1:/87lGekrmi0/66pioy+Nh8lVUBBYnVqKoHiNYX5OmMI=
|
||||||
go.unistack.org/micro/v3 v3.10.22/go.mod h1:XIArw29f0b3uvF4cq96X/nQt2f0J2OGnjh8J+DBbC0s=
|
go.unistack.org/micro/v3 v3.10.28/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA=
|
||||||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||||
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
|
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
|
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||||
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
|
||||||
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
|
||||||
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
|
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
|
||||||
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
|
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
|
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||||
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||||
|
37
grpc.go
37
grpc.go
@@ -30,7 +30,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type grpcClient struct {
|
type grpcClient struct {
|
||||||
pool *pool
|
pool *ConnPool
|
||||||
opts client.Options
|
opts client.Options
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
init bool
|
init bool
|
||||||
@@ -133,13 +133,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
|
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
|
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// defer execution of release
|
// defer execution of release
|
||||||
g.pool.release(cc, grr)
|
g.pool.Put(cc, grr)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ch := make(chan error, 1)
|
ch := make(chan error, 1)
|
||||||
@@ -242,7 +242,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
|
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
|
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
||||||
}
|
}
|
||||||
@@ -275,7 +275,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
// cancel the context
|
// cancel the context
|
||||||
cancel()
|
cancel()
|
||||||
// release the connection
|
// release the connection
|
||||||
g.pool.release(cc, err)
|
g.pool.Put(cc, err)
|
||||||
// now return the error
|
// now return the error
|
||||||
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
||||||
}
|
}
|
||||||
@@ -303,7 +303,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
}
|
}
|
||||||
|
|
||||||
// defer execution of release
|
// defer execution of release
|
||||||
g.pool.release(cc, err)
|
g.pool.Put(cc, err)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -718,6 +718,10 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
||||||
exchange = v
|
exchange = v
|
||||||
}
|
}
|
||||||
|
// get the exchange
|
||||||
|
if len(options.Exchange) > 0 {
|
||||||
|
exchange = options.Exchange
|
||||||
|
}
|
||||||
|
|
||||||
msgs := make([]*broker.Message, 0, len(ps))
|
msgs := make([]*broker.Message, 0, len(ps))
|
||||||
|
|
||||||
@@ -729,6 +733,16 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
for _, p := range ps {
|
for _, p := range ps {
|
||||||
md := metadata.Copy(omd)
|
md := metadata.Copy(omd)
|
||||||
md[metadata.HeaderContentType] = p.ContentType()
|
md[metadata.HeaderContentType] = p.ContentType()
|
||||||
|
topic := p.Topic()
|
||||||
|
if len(exchange) > 0 {
|
||||||
|
topic = exchange
|
||||||
|
}
|
||||||
|
md.Set(metadata.HeaderTopic, topic)
|
||||||
|
iter := p.Metadata().Iterator()
|
||||||
|
var k, v string
|
||||||
|
for iter.Next(&k, &v) {
|
||||||
|
md.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
// passed in raw data
|
// passed in raw data
|
||||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||||
@@ -747,15 +761,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
|
||||||
topic := p.Topic()
|
|
||||||
if len(exchange) > 0 {
|
|
||||||
topic = exchange
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range p.Metadata() {
|
|
||||||
md.Set(k, v)
|
|
||||||
}
|
|
||||||
md.Set(metadata.HeaderTopic, topic)
|
|
||||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -828,7 +833,7 @@ func NewClient(opts ...client.Option) client.Client {
|
|||||||
opts: options,
|
opts: options,
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
|
rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
|
||||||
|
|
||||||
c := client.Client(rc)
|
c := client.Client(rc)
|
||||||
|
|
||||||
|
14
grpc_pool.go
14
grpc_pool.go
@@ -10,7 +10,7 @@ import (
|
|||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
)
|
)
|
||||||
|
|
||||||
type pool struct {
|
type ConnPool struct {
|
||||||
conns map[string]*streamsPool
|
conns map[string]*streamsPool
|
||||||
size int
|
size int
|
||||||
ttl int64
|
ttl int64
|
||||||
@@ -34,7 +34,7 @@ type poolConn struct {
|
|||||||
err error
|
err error
|
||||||
*grpc.ClientConn
|
*grpc.ClientConn
|
||||||
next *poolConn
|
next *poolConn
|
||||||
pool *pool
|
pool *ConnPool
|
||||||
sp *streamsPool
|
sp *streamsPool
|
||||||
pre *poolConn
|
pre *poolConn
|
||||||
addr string
|
addr string
|
||||||
@@ -43,14 +43,14 @@ type poolConn struct {
|
|||||||
in bool
|
in bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
|
func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
|
||||||
if ms <= 0 {
|
if ms <= 0 {
|
||||||
ms = 1
|
ms = 1
|
||||||
}
|
}
|
||||||
if idle < 0 {
|
if idle < 0 {
|
||||||
idle = 0
|
idle = 0
|
||||||
}
|
}
|
||||||
return &pool{
|
return &ConnPool{
|
||||||
size: size,
|
size: size,
|
||||||
ttl: int64(ttl.Seconds()),
|
ttl: int64(ttl.Seconds()),
|
||||||
maxStreams: ms,
|
maxStreams: ms,
|
||||||
@@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
||||||
if strings.HasPrefix(addr, "http") {
|
if strings.HasPrefix(addr, "http") {
|
||||||
addr = addr[strings.Index(addr, ":")+3:]
|
addr = addr[strings.Index(addr, ":")+3:]
|
||||||
}
|
}
|
||||||
@@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) release(conn *poolConn, err error) {
|
func (p *ConnPool) Put(conn *poolConn, err error) {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
p, sp, created := conn.pool, conn.sp, conn.created
|
p, sp, created := conn.pool, conn.sp, conn.created
|
||||||
// try to add conn
|
// try to add conn
|
||||||
@@ -183,7 +183,7 @@ func (p *pool) release(conn *poolConn, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (conn *poolConn) Close() {
|
func (conn *poolConn) Close() {
|
||||||
conn.pool.release(conn, conn.err)
|
conn.pool.Put(conn, conn.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeConn(conn *poolConn) {
|
func removeConn(conn *poolConn) {
|
||||||
|
Reference in New Issue
Block a user