Compare commits

...

17 Commits
v3.11.1 ... v3

Author SHA1 Message Date
25618a3859 update deps
Some checks failed
build / lint (push) Failing after 8s
build / test (push) Failing after 8s
codeql / analyze (go) (push) Failing after 10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 18:03:39 +03:00
b22f150601 update to latest micro
Some checks failed
build / test (push) Failing after 7s
build / lint (push) Failing after 8s
codeql / analyze (go) (push) Failing after 12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-17 13:11:39 +03:00
71bcb63b60 fixup trace stream
Some checks failed
codeql / analyze (go) (push) Failing after 1m44s
build / test (push) Failing after 1m49s
build / lint (push) Successful in 9m17s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 00:24:46 +03:00
698bfbc6f1 meter and tracing
Some checks failed
build / test (push) Failing after 1m45s
codeql / analyze (go) (push) Failing after 1m44s
build / lint (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 00:20:29 +03:00
209042de3a Merge pull request 'issue_132' (#134) from devstigneev/micro-client-grpc:issue_132 into v3
Some checks failed
build / test (push) Failing after 1m40s
codeql / analyze (go) (push) Failing after 2m4s
build / lint (push) Successful in 9m23s
Reviewed-on: #134
2024-02-29 23:07:33 +03:00
990685628d Merge remote-tracking branch 'origin/v3' into issue_132
Some checks failed
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 10s
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
# Conflicts:
#	grpc.go
2024-02-29 17:29:50 +03:00
0f8ead6acc add defaultConfigService
Some checks failed
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 10s
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
2024-02-29 16:58:44 +03:00
2fbcee7b59 export grpc pool conn
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 19:23:01 +03:00
19a469c4e2 export grpc pool conn
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 19:14:56 +03:00
2a6a93a792 export grpc conn pool
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 18:09:34 +03:00
2adba9d0da export grpc conn pool
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 18:09:07 +03:00
9d70c4dd34 Merge pull request 'fixup md' (#131) from pubmdfix into v3
Some checks failed
build / test (push) Failing after 1m28s
build / lint (push) Failing after 2m36s
codeql / analyze (go) (push) Failing after 2m50s
Reviewed-on: #131
2023-12-21 00:17:46 +03:00
b9704903f2 fixup md
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m39s
prbuild / test (pull_request) Failing after 1m28s
prbuild / lint (pull_request) Failing after 2m40s
autoapprove / autoapprove (pull_request) Failing after 1m25s
automerge / automerge (pull_request) Failing after 3s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-12-21 00:17:25 +03:00
4eda58e404 Merge pull request 'fix MessageMetadata option' (#130) from client-metadata into v3
Some checks failed
build / test (push) Failing after 1m0s
build / lint (push) Successful in 13s
codeql / analyze (go) (push) Failing after 1m1s
Reviewed-on: #130
2023-10-26 03:15:02 +03:00
0f8d0a1123 fix MessageMetadata option
Some checks failed
codeql / analyze (go) (pull_request) Failing after 1m18s
prbuild / test (pull_request) Failing after 1m4s
prbuild / lint (pull_request) Successful in 22s
autoapprove / autoapprove (pull_request) Failing after 8s
automerge / automerge (pull_request) Failing after 3s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-10-26 03:12:32 +03:00
603b855e2a Merge pull request 'dialopts fix' (#129) from dialoptsfix into v3
Some checks failed
build / test (push) Failing after 6s
build / lint (push) Failing after 5s
codeql / analyze (go) (push) Failing after 6s
Reviewed-on: #129
2023-06-23 12:24:12 +03:00
f2cfd562c3 dialopts fix
Some checks failed
autoapprove / autoapprove (pull_request) Failing after 6s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
codeql / analyze (go) (pull_request) Failing after 6s
prbuild / test (pull_request) Failing after 6s
prbuild / lint (pull_request) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-23 12:23:44 +03:00
9 changed files with 231 additions and 178 deletions

24
.gitignore vendored Normal file
View File

@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

View File

@ -1,8 +1,6 @@
package grpc package grpc
import ( import (
"io"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/codec"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
@ -65,63 +63,3 @@ func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option)
} }
return w.Codec.Unmarshal(d, v) return w.Codec.Unmarshal(d, v)
} }
/*
type grpcCodec struct {
grpc.ServerStream
// headers
id string
target string
method string
endpoint string
c encoding.Codec
}
*/
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
/*
if m == nil {
m = codec.NewMessage(codec.Request)
}
if md, ok := metadata.FromIncomingContext(g.ServerStream.Context()); ok {
if m.Header == nil {
m.Header = meta.New(len(md))
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
*/
return nil
}
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
// caller has requested a frame
if m, ok := v.(*codec.Frame); ok {
_, err := conn.Read(m.Data)
return err
}
return codec.ErrInvalidMessage
}
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body
if v != nil {
b, err := w.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// write the body using the framing codec
_, err := conn.Write(m.Body)
return err
}

20
go.mod
View File

@ -1,17 +1,19 @@
module go.unistack.org/micro-client-grpc/v3 module go.unistack.org/micro-client-grpc/v3
go 1.20 go 1.21
toolchain go1.23.1
require ( require (
go.unistack.org/micro/v3 v3.10.22 go.unistack.org/micro/v3 v3.10.91
google.golang.org/grpc v1.55.0 google.golang.org/grpc v1.67.0
) )
require ( require (
github.com/golang/protobuf v1.5.3 // indirect go.unistack.org/micro-proto/v3 v3.4.1 // indirect
golang.org/x/net v0.8.0 // indirect golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.6.0 // indirect golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.8.0 // indirect golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/protobuf v1.30.0 // indirect google.golang.org/protobuf v1.34.2 // indirect
) )

40
go.sum
View File

@ -1,22 +1,18 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= go.unistack.org/micro/v3 v3.10.91 h1:vuJY4tXwpqimwIkEJ3TozMYNVQQs+C5QMlQWPgSY/YM=
go.unistack.org/micro/v3 v3.10.22 h1:04QQTo+sxK5pttZfqPbhiuZkZMrdJTfiALoqamci6IQ= go.unistack.org/micro/v3 v3.10.91/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
go.unistack.org/micro/v3 v3.10.22/go.mod h1:XIArw29f0b3uvF4cq96X/nQt2f0J2OGnjh8J+DBbC0s= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

194
grpc.go
View File

@ -1,5 +1,5 @@
// Package grpc provides a gRPC client // Package grpc provides a gRPC client for micro framework
package grpc // import "go.unistack.org/micro-client-grpc/v3" package grpc
import ( import (
"context" "context"
@ -8,6 +8,7 @@ import (
"net" "net"
"os" "os"
"reflect" "reflect"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -17,12 +18,17 @@ import (
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata" gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
) )
const ( const (
@ -30,7 +36,11 @@ const (
) )
type grpcClient struct { type grpcClient struct {
pool *pool funcPublish client.FuncPublish
funcBatchPublish client.FuncBatchPublish
funcCall client.FuncCall
funcStream client.FuncStream
pool *ConnPool
opts client.Options opts client.Options
sync.RWMutex sync.RWMutex
init bool init bool
@ -98,6 +108,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
maxRecvMsgSize := g.maxRecvMsgSizeValue() maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue()
cfgService := g.serviceConfig()
var grr error var grr error
@ -116,8 +127,12 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
), ),
grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
@ -130,13 +145,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
} }
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.release(cc, grr) g.pool.Put(cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@ -218,6 +233,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
maxRecvMsgSize := g.maxRecvMsgSizeValue() maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue()
cfgService := g.serviceConfig()
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
g.secure(addr), g.secure(addr),
@ -225,6 +241,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
), ),
grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
@ -239,7 +256,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
} }
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -272,7 +289,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context // cancel the context
cancel() cancel()
// release the connection // release the connection
g.pool.release(cc, err) g.pool.Put(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@ -300,7 +317,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
} }
// defer execution of release // defer execution of release
g.pool.release(cc, err) g.pool.Put(cc, err)
}, },
} }
@ -369,6 +386,17 @@ func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (g *grpcClient) serviceConfig() string {
if g.opts.Context == nil {
return DefaultServiceConfig
}
v := g.opts.Context.Value(serviceConfigKey{})
if v == nil {
return DefaultServiceConfig
}
return v.(string)
}
func (g *grpcClient) Init(opts ...client.Option) error { func (g *grpcClient) Init(opts ...client.Option) error {
if len(opts) == 0 && g.init { if len(opts) == 0 && g.init {
return nil return nil
@ -388,26 +416,23 @@ func (g *grpcClient) Init(opts ...client.Option) error {
g.pool.Unlock() g.pool.Unlock()
} }
if err := g.opts.Broker.Init(); err != nil { g.funcCall = g.fnCall
return err g.funcStream = g.fnStream
} g.funcPublish = g.fnPublish
if err := g.opts.Tracer.Init(); err != nil { g.funcBatchPublish = g.fnBatchPublish
return err
}
if err := g.opts.Router.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil {
return err
}
if err := g.opts.Meter.Init(); err != nil {
return err
}
if err := g.opts.Transport.Init(); err != nil {
return err
}
g.init = true g.opts.Hooks.EachNext(func(hook options.Hook) {
switch h := hook.(type) {
case client.HookCall:
g.funcCall = h(g.funcCall)
case client.HookStream:
g.funcStream = h(g.funcStream)
case client.HookPublish:
g.funcPublish = h(g.funcPublish)
case client.HookBatchPublish:
g.funcBatchPublish = h(g.funcBatchPublish)
}
})
return nil return nil
} }
@ -430,6 +455,32 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
} else if rsp == nil { } else if rsp == nil {
return errors.InternalServerError("go.micro.client", "rsp is nil") return errors.InternalServerError("go.micro.client", "rsp is nil")
} }
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
err := g.funcCall(ctx, req, rsp, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return err
}
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
@ -461,11 +512,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
// make copy of call method // make copy of call method
gcall := g.call gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gcall = callOpts.CallWrappers[i-1](gcall)
}
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
callOpts.Router = g.opts.Router callOpts.Router = g.opts.Router
@ -565,6 +611,31 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
} }
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
stream, err := g.funcStream(ctx, req, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := status.Convert(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
}
return stream, err
}
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
@ -583,11 +654,6 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
// make a copy of stream // make a copy of stream
gstream := g.stream gstream := g.stream
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gstream = callOpts.CallWrappers[i-1](gstream)
}
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
callOpts.Router = g.opts.Router callOpts.Router = g.opts.Router
@ -698,10 +764,18 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
} }
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
return g.funcBatchPublish(ctx, ps, opts...)
}
func (g *grpcClient) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, ps, opts...) return g.publish(ctx, ps, opts...)
} }
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return g.funcPublish(ctx, p, opts...)
}
func (g *grpcClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, []client.Message{p}, opts...) return g.publish(ctx, []client.Message{p}, opts...)
} }
@ -715,6 +789,10 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
if v, ok := os.LookupEnv("MICRO_PROXY"); ok { if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v exchange = v
} }
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
msgs := make([]*broker.Message, 0, len(ps)) msgs := make([]*broker.Message, 0, len(ps))
@ -726,6 +804,16 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
for _, p := range ps { for _, p := range ps {
md := metadata.Copy(omd) md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType() md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator()
var k, v string
for iter.Next(&k, &v) {
md.Set(k, v)
}
// passed in raw data // passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok { if d, ok := p.Payload().(*codec.Frame); ok {
@ -743,16 +831,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
} }
body = b body = b
} }
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body}) msgs = append(msgs, &broker.Message{Header: md, Body: body})
} }
@ -821,23 +899,16 @@ func NewClient(opts ...client.Option) client.Client {
options.ContentType = DefaultContentType options.ContentType = DefaultContentType
} }
rc := &grpcClient{ c := &grpcClient{
opts: options, opts: options,
} }
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c.pool = NewConnPool(options.PoolSize, options.PoolTTL, c.poolMaxIdle(), c.poolMaxStreams())
c := client.Client(rc) if c.opts.Context != nil {
if codecs, ok := c.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
// wrap in reverse
for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
if rc.opts.Context != nil {
if codecs, ok := rc.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs { for k, v := range codecs {
rc.opts.Codecs[k] = &wrapGrpcCodec{v} c.opts.Codecs[k] = &wrapGrpcCodec{v}
} }
} }
} }
@ -846,5 +917,10 @@ func NewClient(opts ...client.Option) client.Client {
encoding.RegisterCodec(&wrapMicroCodec{k}) encoding.RegisterCodec(&wrapMicroCodec{k})
} }
c.funcCall = c.fnCall
c.funcStream = c.fnStream
c.funcPublish = c.fnPublish
c.funcBatchPublish = c.fnBatchPublish
return c return c
} }

View File

@ -10,7 +10,7 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
) )
type pool struct { type ConnPool struct {
conns map[string]*streamsPool conns map[string]*streamsPool
size int size int
ttl int64 ttl int64
@ -21,36 +21,36 @@ type pool struct {
type streamsPool struct { type streamsPool struct {
// head of list // head of list
head *poolConn head *PoolConn
// busy conns list // busy conns list
busy *poolConn busy *PoolConn
// the siza of list // the siza of list
count int count int
// idle conn // idle conn
idle int idle int
} }
type poolConn struct { type PoolConn struct {
err error err error
*grpc.ClientConn *grpc.ClientConn
next *poolConn next *PoolConn
pool *pool pool *ConnPool
sp *streamsPool sp *streamsPool
pre *poolConn pre *PoolConn
addr string addr string
streams int streams int
created int64 created int64
in bool in bool
} }
func newPool(size int, ttl time.Duration, idle int, ms int) *pool { func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
if ms <= 0 { if ms <= 0 {
ms = 1 ms = 1
} }
if idle < 0 { if idle < 0 {
idle = 0 idle = 0
} }
return &pool{ return &ConnPool{
size: size, size: size,
ttl: int64(ttl.Seconds()), ttl: int64(ttl.Seconds()),
maxStreams: ms, maxStreams: ms,
@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
} }
} }
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) {
if strings.HasPrefix(addr, "http") { if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:] addr = addr[strings.Index(addr, ":")+3:]
} }
@ -67,7 +67,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
p.Lock() p.Lock()
sp, ok := p.conns[addr] sp, ok := p.conns[addr]
if !ok { if !ok {
sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
p.conns[addr] = sp p.conns[addr] = sp
} }
// while we have conns check streams and then return one // while we have conns check streams and then return one
@ -135,7 +135,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.Lock() p.Lock()
@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil return conn, nil
} }
func (p *pool) release(conn *poolConn, err error) { func (p *ConnPool) Put(conn *PoolConn, err error) {
p.Lock() p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
@ -182,11 +182,11 @@ func (p *pool) release(conn *poolConn, err error) {
p.Unlock() p.Unlock()
} }
func (conn *poolConn) Close() { func (conn *PoolConn) Close() {
conn.pool.release(conn, conn.err) conn.pool.Put(conn, conn.err)
} }
func removeConn(conn *poolConn) { func removeConn(conn *PoolConn) {
if conn.pre != nil { if conn.pre != nil {
conn.pre.next = conn.next conn.pre.next = conn.next
} }
@ -199,7 +199,7 @@ func removeConn(conn *poolConn) {
conn.sp.count-- conn.sp.count--
} }
func addConnAfter(conn *poolConn, after *poolConn) { func addConnAfter(conn *PoolConn, after *PoolConn) {
conn.next = after.next conn.next = after.next
conn.pre = after conn.pre = after
if after.next != nil { if after.next != nil {

View File

@ -10,7 +10,7 @@ import (
) )
var ( var (
// DefaultPoolMaxStreams maximum streams on a connectioin // DefaultPoolMaxStreams maximum streams on a connection
// (20) // (20)
DefaultPoolMaxStreams = 20 DefaultPoolMaxStreams = 20
@ -25,6 +25,9 @@ var (
// DefaultMaxSendMsgSize maximum message that client can send // DefaultMaxSendMsgSize maximum message that client can send
// (4 MB). // (4 MB).
DefaultMaxSendMsgSize = 1024 * 1024 * 4 DefaultMaxSendMsgSize = 1024 * 1024 * 4
// DefaultServiceConfig enable load balancing
DefaultServiceConfig = `{"loadBalancingPolicy":"round_robin"}`
) )
type poolMaxStreams struct{} type poolMaxStreams struct{}
@ -115,3 +118,14 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption {
o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts)
} }
} }
type serviceConfigKey struct{}
func ServiceConfig(str string) client.CallOption {
return func(options *client.CallOptions) {
if options.Context == nil {
options.Context = context.Background()
}
options.Context = context.WithValue(options.Context, serviceConfigKey{}, str)
}
}

View File

@ -9,7 +9,7 @@ import (
) )
type response struct { type response struct {
conn *poolConn conn *PoolConn
stream grpc.ClientStream stream grpc.ClientStream
codec codec.Codec codec codec.Codec
} }
@ -34,9 +34,5 @@ func (r *response) Header() metadata.Metadata {
// Read the undecoded response // Read the undecoded response
func (r *response) Read() ([]byte, error) { func (r *response) Read() ([]byte, error) {
f := &codec.Frame{} return nil, nil
if err := r.codec.ReadBody(&wrapStream{r.stream}, f); err != nil {
return nil, err
}
return f.Data, nil
} }

View File

@ -6,6 +6,7 @@ import (
"sync" "sync"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/tracer"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -17,7 +18,7 @@ type grpcStream struct {
request client.Request request client.Request
response client.Response response client.Response
close func(err error) close func(err error)
conn *poolConn conn *PoolConn
sync.RWMutex sync.RWMutex
closed bool closed bool
} }
@ -111,6 +112,12 @@ func (g *grpcStream) Close() error {
return nil return nil
} }
if sp, ok := tracer.SpanFromContext(g.context); ok && sp != nil {
if g.err != nil {
sp.SetStatus(tracer.SpanStatusError, g.err.Error())
}
sp.Finish()
}
// close the connection // close the connection
g.closed = true g.closed = true
g.close(g.err) g.close(g.err)