Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-04-26 19:17:29 +03:00
committed by GitHub
parent 08def4d244
commit 510fa4b379
9 changed files with 93 additions and 73 deletions

22
grpc.go
View File

@@ -9,7 +9,6 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/unistack-org/micro/v3/broker"
@@ -24,11 +23,9 @@ import (
)
type grpcClient struct {
opts client.Options
codecs map[string]encoding.Codec
pool *pool
once atomic.Value
init bool
opts client.Options
pool *pool
init bool
sync.RWMutex
}
@@ -42,7 +39,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
}
// default config
tlsConfig := &tls.Config{}
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
defaultCreds := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
// check if the address is prepended with https
@@ -120,7 +117,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
}
defer func() {
// defer execution of release
g.pool.release(addr, cc, grr)
g.pool.release(cc, grr)
}()
ch := make(chan error, 1)
@@ -128,7 +125,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
go func() {
grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(&wrapMicroCodec{cf}),
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name())}
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()),
}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
@@ -220,7 +218,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context
cancel()
// release the connection
g.pool.release(addr, cc, err)
g.pool.release(cc, err)
// now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
}
@@ -242,13 +240,13 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
},
conn: cc,
close: func(err error) {
// cancel the context if an error occured
// cancel the context if an error occurred
if err != nil {
cancel()
}
// defer execution of release
g.pool.release(addr, cc, err)
g.pool.release(cc, err)
},
}