Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-04-26 19:14:58 +03:00
parent a4683c0b78
commit c9660346e5
9 changed files with 93 additions and 73 deletions

13
.github/stale.sh vendored
View File

@ -1,13 +0,0 @@
#!/bin/bash -ex
export PATH=$PATH:$(pwd)/bin
export GO111MODULE=on
export GOBIN=$(pwd)/bin
#go get github.com/rvflash/goup@v0.4.1
#goup -v ./...
#go get github.com/psampaz/go-mod-outdated@v0.6.0
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
#go list -u -m -json all | go-mod-outdated -update

44
.golangci.yml Normal file
View File

@ -0,0 +1,44 @@
run:
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -36,19 +36,17 @@ func (w *wrapGrpcCodec) String() string {
} }
func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) { func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) {
switch m := v.(type) { if m, ok := v.(*codec.Frame); ok {
case *codec.Frame:
return m.Data, nil return m.Data, nil
} }
return w.Codec.Marshal(v) return w.Codec.Marshal(v)
} }
func (w wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error { func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error {
if d == nil || v == nil { if d == nil || v == nil {
return nil return nil
} }
switch m := v.(type) { if m, ok := v.(*codec.Frame); ok {
case *codec.Frame:
m.Data = d m.Data = d
return nil return nil
} }
@ -69,7 +67,7 @@ type grpcCodec struct {
*/ */
func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
/* /*
if m == nil { if m == nil {
m = codec.NewMessage(codec.Request) m = codec.NewMessage(codec.Request)
@ -92,20 +90,19 @@ func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.Me
return nil return nil
} }
func (g *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
// caller has requested a frame // caller has requested a frame
switch m := v.(type) { if m, ok := v.(*codec.Frame); ok {
case *codec.Frame:
_, err := conn.Read(m.Data) _, err := conn.Read(m.Data)
return err return err
} }
return codec.ErrInvalidMessage return codec.ErrInvalidMessage
} }
func (g *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body // if we don't have a body
if v != nil { if v != nil {
b, err := g.Marshal(v) b, err := w.Marshal(v)
if err != nil { if err != nil {
return err return err
} }

22
grpc.go
View File

@ -9,7 +9,6 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
@ -24,11 +23,9 @@ import (
) )
type grpcClient struct { type grpcClient struct {
opts client.Options opts client.Options
codecs map[string]encoding.Codec pool *pool
pool *pool init bool
once atomic.Value
init bool
sync.RWMutex sync.RWMutex
} }
@ -42,7 +39,7 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
} }
// default config // default config
tlsConfig := &tls.Config{} tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
defaultCreds := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) defaultCreds := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
// check if the address is prepended with https // check if the address is prepended with https
@ -120,7 +117,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.release(addr, cc, grr) g.pool.release(cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@ -128,7 +125,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
go func() { go func() {
grpcCallOptions := []grpc.CallOption{ grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(&wrapMicroCodec{cf}), grpc.ForceCodec(&wrapMicroCodec{cf}),
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name())} grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()),
}
if opts := g.getGrpcCallOptions(); opts != nil { if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...) grpcCallOptions = append(grpcCallOptions, opts...)
} }
@ -220,7 +218,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context // cancel the context
cancel() cancel()
// release the connection // release the connection
g.pool.release(addr, cc, err) g.pool.release(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@ -242,13 +240,13 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
}, },
conn: cc, conn: cc,
close: func(err error) { close: func(err error) {
// cancel the context if an error occured // cancel the context if an error occurred
if err != nil { if err != nil {
cancel() cancel()
} }
// defer execution of release // defer execution of release
g.pool.release(addr, cc, err) g.pool.release(cc, err)
}, },
} }

View File

@ -10,16 +10,12 @@ import (
) )
type pool struct { type pool struct {
size int conns map[string]*streamsPool
ttl int64 size int
ttl int64
// max streams on a *poolConn
maxStreams int maxStreams int
// max idle conns maxIdle int
maxIdle int
sync.Mutex sync.Mutex
conns map[string]*streamsPool
} }
type streamsPool struct { type streamsPool struct {
@ -34,21 +30,16 @@ type streamsPool struct {
} }
type poolConn struct { type poolConn struct {
// grpc conn err error
*grpc.ClientConn *grpc.ClientConn
err error next *poolConn
addr string
// pool and streams pool
pool *pool pool *pool
sp *streamsPool sp *streamsPool
pre *poolConn
addr string
streams int streams int
created int64 created int64
in bool
// list
pre *poolConn
next *poolConn
in bool
} }
func newPool(size int, ttl time.Duration, idle int, ms int) *pool { func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
@ -140,7 +131,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false} conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.Lock() p.Lock()
@ -152,7 +143,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil return conn, nil
} }
func (p *pool) release(addr string, conn *poolConn, err error) { func (p *pool) release(conn *poolConn, err error) {
p.Lock() p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
@ -188,7 +179,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) {
} }
func (conn *poolConn) Close() { func (conn *poolConn) Close() {
conn.pool.release(conn.addr, conn, conn.err) conn.pool.release(conn, conn.err)
} }
func removeConn(conn *poolConn) { func removeConn(conn *poolConn) {

View File

@ -5,9 +5,9 @@ import (
) )
type grpcEvent struct { type grpcEvent struct {
payload interface{}
topic string topic string
contentType string contentType string
payload interface{}
} }
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {

View File

@ -28,13 +28,6 @@ var (
) )
type poolMaxStreams struct{} type poolMaxStreams struct{}
type poolMaxIdle struct{}
type codecsKey struct{}
type tlsAuth struct{}
type maxRecvMsgSizeKey struct{}
type maxSendMsgSizeKey struct{}
type grpcDialOptions struct{}
type grpcCallOptions struct{}
// maximum streams on a connectioin // maximum streams on a connectioin
func PoolMaxStreams(n int) client.Option { func PoolMaxStreams(n int) client.Option {
@ -46,6 +39,8 @@ func PoolMaxStreams(n int) client.Option {
} }
} }
type poolMaxIdle struct{}
// maximum idle conns of a pool // maximum idle conns of a pool
func PoolMaxIdle(d int) client.Option { func PoolMaxIdle(d int) client.Option {
return func(o *client.Options) { return func(o *client.Options) {
@ -56,6 +51,8 @@ func PoolMaxIdle(d int) client.Option {
} }
} }
type codecsKey struct{}
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) client.Option { func Codec(contentType string, c encoding.Codec) client.Option {
return func(o *client.Options) { return func(o *client.Options) {
@ -71,6 +68,8 @@ func Codec(contentType string, c encoding.Codec) client.Option {
} }
} }
type maxRecvMsgSizeKey struct{}
// //
// MaxRecvMsgSize set the maximum size of message that client can receive. // MaxRecvMsgSize set the maximum size of message that client can receive.
// //
@ -83,6 +82,8 @@ func MaxRecvMsgSize(s int) client.Option {
} }
} }
type maxSendMsgSizeKey struct{}
// //
// MaxSendMsgSize set the maximum size of message that client can send. // MaxSendMsgSize set the maximum size of message that client can send.
// //
@ -95,6 +96,8 @@ func MaxSendMsgSize(s int) client.Option {
} }
} }
type grpcDialOptions struct{}
// //
// DialOptions to be used to configure gRPC dial options // DialOptions to be used to configure gRPC dial options
// //
@ -107,6 +110,8 @@ func DialOptions(opts ...grpc.DialOption) client.CallOption {
} }
} }
type grpcCallOptions struct{}
// //
// CallOptions to be used to configure gRPC call options // CallOptions to be used to configure gRPC call options
// //

View File

@ -9,12 +9,12 @@ import (
) )
type grpcRequest struct { type grpcRequest struct {
request interface{}
codec codec.Codec
service string service string
method string method string
contentType string contentType string
request interface{}
opts client.RequestOptions opts client.RequestOptions
codec codec.Codec
} }
// service Struct.Method /service.Struct/Method // service Struct.Method /service.Struct/Method

View File

@ -11,17 +11,15 @@ import (
// Implements the streamer interface // Implements the streamer interface
type grpcStream struct { type grpcStream struct {
// embed so we can access if need be
grpc.ClientStream grpc.ClientStream
context context.Context
sync.RWMutex
closed bool
err error err error
conn *poolConn
request client.Request request client.Request
response client.Response response client.Response
context context.Context
close func(err error) close func(err error)
conn *poolConn
sync.RWMutex
closed bool
} }
func (g *grpcStream) Context() context.Context { func (g *grpcStream) Context() context.Context {