Compare commits

...

5 Commits

Author SHA1 Message Date
510fa4b379 lint (#32)
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-26 19:17:29 +03:00
Renovate Bot
08def4d244 Update module github.com/unistack-org/micro/v3 to v3.3.16 2021-04-20 11:52:53 +00:00
a4683c0b78 drop AuthTLS option and use client TLSConfig option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-20 12:21:40 +03:00
Renovate Bot
637e3df24a Update module github.com/unistack-org/micro/v3 to v3.3.15 2021-04-19 14:55:53 +00:00
Renovate Bot
36c7fa6a23 Update module github.com/unistack-org/micro/v3 to v3.3.14 2021-04-19 04:53:53 +00:00
11 changed files with 107 additions and 91 deletions

13
.github/stale.sh vendored
View File

@@ -1,13 +0,0 @@
#!/bin/bash -ex
export PATH=$PATH:$(pwd)/bin
export GO111MODULE=on
export GOBIN=$(pwd)/bin
#go get github.com/rvflash/goup@v0.4.1
#goup -v ./...
#go get github.com/psampaz/go-mod-outdated@v0.6.0
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
#go list -u -m -json all | go-mod-outdated -update

44
.golangci.yml Normal file
View File

@@ -0,0 +1,44 @@
run:
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@@ -36,19 +36,17 @@ func (w *wrapGrpcCodec) String() string {
} }
func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) { func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) {
switch m := v.(type) { if m, ok := v.(*codec.Frame); ok {
case *codec.Frame:
return m.Data, nil return m.Data, nil
} }
return w.Codec.Marshal(v) return w.Codec.Marshal(v)
} }
func (w wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error { func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error {
if d == nil || v == nil { if d == nil || v == nil {
return nil return nil
} }
switch m := v.(type) { if m, ok := v.(*codec.Frame); ok {
case *codec.Frame:
m.Data = d m.Data = d
return nil return nil
} }
@@ -69,7 +67,7 @@ type grpcCodec struct {
*/ */
func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
/* /*
if m == nil { if m == nil {
m = codec.NewMessage(codec.Request) m = codec.NewMessage(codec.Request)
@@ -92,20 +90,19 @@ func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.Me
return nil return nil
} }
func (g *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
// caller has requested a frame // caller has requested a frame
switch m := v.(type) { if m, ok := v.(*codec.Frame); ok {
case *codec.Frame:
_, err := conn.Read(m.Data) _, err := conn.Read(m.Data)
return err return err
} }
return codec.ErrInvalidMessage return codec.ErrInvalidMessage
} }
func (g *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body // if we don't have a body
if v != nil { if v != nil {
b, err := g.Marshal(v) b, err := w.Marshal(v)
if err != nil { if err != nil {
return err return err
} }

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.16
require ( require (
github.com/google/go-cmp v0.5.1 // indirect github.com/google/go-cmp v0.5.1 // indirect
github.com/unistack-org/micro/v3 v3.3.13 github.com/unistack-org/micro/v3 v3.3.16
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d // indirect google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d // indirect
google.golang.org/grpc v1.37.0 google.golang.org/grpc v1.37.0

10
go.sum
View File

@@ -57,6 +57,12 @@ github.com/unistack-org/micro/v3 v3.3.11 h1:Jr0gAw5lLqgddiHKQeWUOUeP6ZqgRhz52EA9
github.com/unistack-org/micro/v3 v3.3.11/go.mod h1:PPCt675o3HPcODFbJ4iRWPmQFAk1WQ+asQSOb/syq6U= github.com/unistack-org/micro/v3 v3.3.11/go.mod h1:PPCt675o3HPcODFbJ4iRWPmQFAk1WQ+asQSOb/syq6U=
github.com/unistack-org/micro/v3 v3.3.13 h1:y4bDDkbwnjgOckrhFkC6D/o42tr75X33UbrB+Ko0M68= github.com/unistack-org/micro/v3 v3.3.13 h1:y4bDDkbwnjgOckrhFkC6D/o42tr75X33UbrB+Ko0M68=
github.com/unistack-org/micro/v3 v3.3.13/go.mod h1:98hNcMXp/WyWJwLwCuwrhN1Jm7aCWaRNsMfRjK8Fq+Y= github.com/unistack-org/micro/v3 v3.3.13/go.mod h1:98hNcMXp/WyWJwLwCuwrhN1Jm7aCWaRNsMfRjK8Fq+Y=
github.com/unistack-org/micro/v3 v3.3.14 h1:CAkDMjHZT8/D6GGF5h3gK84m6tlWZC17IGPb2GkAn/4=
github.com/unistack-org/micro/v3 v3.3.14/go.mod h1:ETGcQQUcjxGaD44LUMX+0fgo8Loh7ExldfIPLvfUmDo=
github.com/unistack-org/micro/v3 v3.3.15 h1:rj+spzhezCg4gmj1nuF0FRGixC51/7xFACOchB/23/E=
github.com/unistack-org/micro/v3 v3.3.15/go.mod h1:ETGcQQUcjxGaD44LUMX+0fgo8Loh7ExldfIPLvfUmDo=
github.com/unistack-org/micro/v3 v3.3.16 h1:v0h/oC0TO2n1djQJeOjD2jNEqKkiykwI6cpflEVTlQE=
github.com/unistack-org/micro/v3 v3.3.16/go.mod h1:ETGcQQUcjxGaD44LUMX+0fgo8Loh7ExldfIPLvfUmDo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -76,6 +82,8 @@ golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c h1:KHUzaHIpjWVlVVNh65G3hhuj3
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d h1:BgJvlyh+UqCUaPlscHJ+PN8GcpfrFdr7NHjd1JL0+Gs=
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -92,6 +100,8 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

33
grpc.go
View File

@@ -9,7 +9,6 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
@@ -24,28 +23,23 @@ import (
) )
type grpcClient struct { type grpcClient struct {
opts client.Options opts client.Options
codecs map[string]encoding.Codec pool *pool
pool *pool init bool
once atomic.Value
init bool
sync.RWMutex sync.RWMutex
} }
// secure returns the dial option for whether its a secure or insecure connection // secure returns the dial option for whether its a secure or insecure connection
func (g *grpcClient) secure(addr string) grpc.DialOption { func (g *grpcClient) secure(addr string) grpc.DialOption {
// first we check if theres'a tls config // first we check if theres'a tls config
if g.opts.Context != nil { if g.opts.TLSConfig != nil {
if v := g.opts.Context.Value(tlsAuth{}); v != nil { creds := credentials.NewTLS(g.opts.TLSConfig)
tls := v.(*tls.Config) // return tls config if it exists
creds := credentials.NewTLS(tls) return grpc.WithTransportCredentials(creds)
// return tls config if it exists
return grpc.WithTransportCredentials(creds)
}
} }
// default config // default config
tlsConfig := &tls.Config{} tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
defaultCreds := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) defaultCreds := grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
// check if the address is prepended with https // check if the address is prepended with https
@@ -123,7 +117,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.release(addr, cc, grr) g.pool.release(cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@@ -131,7 +125,8 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
go func() { go func() {
grpcCallOptions := []grpc.CallOption{ grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(&wrapMicroCodec{cf}), grpc.ForceCodec(&wrapMicroCodec{cf}),
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name())} grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()),
}
if opts := g.getGrpcCallOptions(); opts != nil { if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...) grpcCallOptions = append(grpcCallOptions, opts...)
} }
@@ -223,7 +218,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context // cancel the context
cancel() cancel()
// release the connection // release the connection
g.pool.release(addr, cc, err) g.pool.release(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@@ -245,13 +240,13 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
}, },
conn: cc, conn: cc,
close: func(err error) { close: func(err error) {
// cancel the context if an error occured // cancel the context if an error occurred
if err != nil { if err != nil {
cancel() cancel()
} }
// defer execution of release // defer execution of release
g.pool.release(addr, cc, err) g.pool.release(cc, err)
}, },
} }

View File

@@ -10,16 +10,12 @@ import (
) )
type pool struct { type pool struct {
size int conns map[string]*streamsPool
ttl int64 size int
ttl int64
// max streams on a *poolConn
maxStreams int maxStreams int
// max idle conns maxIdle int
maxIdle int
sync.Mutex sync.Mutex
conns map[string]*streamsPool
} }
type streamsPool struct { type streamsPool struct {
@@ -34,21 +30,16 @@ type streamsPool struct {
} }
type poolConn struct { type poolConn struct {
// grpc conn err error
*grpc.ClientConn *grpc.ClientConn
err error next *poolConn
addr string
// pool and streams pool
pool *pool pool *pool
sp *streamsPool sp *streamsPool
pre *poolConn
addr string
streams int streams int
created int64 created int64
in bool
// list
pre *poolConn
next *poolConn
in bool
} }
func newPool(size int, ttl time.Duration, idle int, ms int) *pool { func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
@@ -140,7 +131,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false} conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.Lock() p.Lock()
@@ -152,7 +143,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil return conn, nil
} }
func (p *pool) release(addr string, conn *poolConn, err error) { func (p *pool) release(conn *poolConn, err error) {
p.Lock() p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
@@ -188,7 +179,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) {
} }
func (conn *poolConn) Close() { func (conn *poolConn) Close() {
conn.pool.release(conn.addr, conn, conn.err) conn.pool.release(conn, conn.err)
} }
func removeConn(conn *poolConn) { func removeConn(conn *poolConn) {

View File

@@ -5,9 +5,9 @@ import (
) )
type grpcEvent struct { type grpcEvent struct {
payload interface{}
topic string topic string
contentType string contentType string
payload interface{}
} }
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {

View File

@@ -3,7 +3,6 @@ package grpc
import ( import (
"context" "context"
"crypto/tls"
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
"google.golang.org/grpc" "google.golang.org/grpc"
@@ -29,13 +28,6 @@ var (
) )
type poolMaxStreams struct{} type poolMaxStreams struct{}
type poolMaxIdle struct{}
type codecsKey struct{}
type tlsAuth struct{}
type maxRecvMsgSizeKey struct{}
type maxSendMsgSizeKey struct{}
type grpcDialOptions struct{}
type grpcCallOptions struct{}
// maximum streams on a connectioin // maximum streams on a connectioin
func PoolMaxStreams(n int) client.Option { func PoolMaxStreams(n int) client.Option {
@@ -47,6 +39,8 @@ func PoolMaxStreams(n int) client.Option {
} }
} }
type poolMaxIdle struct{}
// maximum idle conns of a pool // maximum idle conns of a pool
func PoolMaxIdle(d int) client.Option { func PoolMaxIdle(d int) client.Option {
return func(o *client.Options) { return func(o *client.Options) {
@@ -57,6 +51,8 @@ func PoolMaxIdle(d int) client.Option {
} }
} }
type codecsKey struct{}
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) client.Option { func Codec(contentType string, c encoding.Codec) client.Option {
return func(o *client.Options) { return func(o *client.Options) {
@@ -72,15 +68,7 @@ func Codec(contentType string, c encoding.Codec) client.Option {
} }
} }
// AuthTLS should be used to setup a secure authentication using TLS type maxRecvMsgSizeKey struct{}
func AuthTLS(t *tls.Config) client.Option {
return func(o *client.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tlsAuth{}, t)
}
}
// //
// MaxRecvMsgSize set the maximum size of message that client can receive. // MaxRecvMsgSize set the maximum size of message that client can receive.
@@ -94,6 +82,8 @@ func MaxRecvMsgSize(s int) client.Option {
} }
} }
type maxSendMsgSizeKey struct{}
// //
// MaxSendMsgSize set the maximum size of message that client can send. // MaxSendMsgSize set the maximum size of message that client can send.
// //
@@ -106,6 +96,8 @@ func MaxSendMsgSize(s int) client.Option {
} }
} }
type grpcDialOptions struct{}
// //
// DialOptions to be used to configure gRPC dial options // DialOptions to be used to configure gRPC dial options
// //
@@ -118,6 +110,8 @@ func DialOptions(opts ...grpc.DialOption) client.CallOption {
} }
} }
type grpcCallOptions struct{}
// //
// CallOptions to be used to configure gRPC call options // CallOptions to be used to configure gRPC call options
// //

View File

@@ -9,12 +9,12 @@ import (
) )
type grpcRequest struct { type grpcRequest struct {
request interface{}
codec codec.Codec
service string service string
method string method string
contentType string contentType string
request interface{}
opts client.RequestOptions opts client.RequestOptions
codec codec.Codec
} }
// service Struct.Method /service.Struct/Method // service Struct.Method /service.Struct/Method

View File

@@ -11,17 +11,15 @@ import (
// Implements the streamer interface // Implements the streamer interface
type grpcStream struct { type grpcStream struct {
// embed so we can access if need be
grpc.ClientStream grpc.ClientStream
context context.Context
sync.RWMutex
closed bool
err error err error
conn *poolConn
request client.Request request client.Request
response client.Response response client.Response
context context.Context
close func(err error) close func(err error)
conn *poolConn
sync.RWMutex
closed bool
} }
func (g *grpcStream) Context() context.Context { func (g *grpcStream) Context() context.Context {