Compare commits

..

1 Commits
v3 ... master

Author SHA1 Message Date
e3fe27105b Merge pull request 'add defaultConfigService' (#133) from devstigneev/micro-client-grpc:issue_132 into master
Some checks failed
build / test (push) Successful in 1m38s
build / lint (push) Successful in 9m26s
codeql / analyze (go) (push) Failing after 13m42s
Reviewed-on: #133
2024-02-29 17:07:00 +03:00
7 changed files with 1154 additions and 158 deletions

View File

@ -1,6 +1,8 @@
package grpc package grpc
import ( import (
"io"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/codec"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
@ -63,3 +65,63 @@ func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option)
} }
return w.Codec.Unmarshal(d, v) return w.Codec.Unmarshal(d, v)
} }
/*
type grpcCodec struct {
grpc.ServerStream
// headers
id string
target string
method string
endpoint string
c encoding.Codec
}
*/
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
/*
if m == nil {
m = codec.NewMessage(codec.Request)
}
if md, ok := metadata.FromIncomingContext(g.ServerStream.Context()); ok {
if m.Header == nil {
m.Header = meta.New(len(md))
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
*/
return nil
}
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
// caller has requested a frame
if m, ok := v.(*codec.Frame); ok {
_, err := conn.Read(m.Data)
return err
}
return codec.ErrInvalidMessage
}
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body
if v != nil {
b, err := w.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// write the body using the framing codec
_, err := conn.Write(m.Body)
return err
}

17
go.mod
View File

@ -1,19 +1,8 @@
module go.unistack.org/micro-client-grpc/v3 module go.unistack.org/micro-client-grpc/v3
go 1.21 go 1.16
toolchain go1.23.1
require ( require (
go.unistack.org/micro/v3 v3.10.91 go.unistack.org/micro/v3 v3.10.22
google.golang.org/grpc v1.67.0 google.golang.org/grpc v1.52.3
)
require (
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
) )

1046
go.sum

File diff suppressed because it is too large Load Diff

170
grpc.go
View File

@ -1,5 +1,5 @@
// Package grpc provides a gRPC client for micro framework // Package grpc provides a gRPC client
package grpc package grpc // import "go.unistack.org/micro-client-grpc/v3"
import ( import (
"context" "context"
@ -8,7 +8,6 @@ import (
"net" "net"
"os" "os"
"reflect" "reflect"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -18,17 +17,12 @@ import (
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata" gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
) )
const ( const (
@ -36,12 +30,8 @@ const (
) )
type grpcClient struct { type grpcClient struct {
funcPublish client.FuncPublish pool *ConnPool
funcBatchPublish client.FuncBatchPublish opts client.Options
funcCall client.FuncCall
funcStream client.FuncStream
pool *ConnPool
opts client.Options
sync.RWMutex sync.RWMutex
init bool init bool
} }
@ -130,9 +120,6 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpc.WithDefaultServiceConfig(cfgService), grpc.WithDefaultServiceConfig(cfgService),
} }
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil { if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
@ -416,23 +403,26 @@ func (g *grpcClient) Init(opts ...client.Option) error {
g.pool.Unlock() g.pool.Unlock()
} }
g.funcCall = g.fnCall if err := g.opts.Broker.Init(); err != nil {
g.funcStream = g.fnStream return err
g.funcPublish = g.fnPublish }
g.funcBatchPublish = g.fnBatchPublish if err := g.opts.Tracer.Init(); err != nil {
return err
}
if err := g.opts.Router.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil {
return err
}
if err := g.opts.Meter.Init(); err != nil {
return err
}
if err := g.opts.Transport.Init(); err != nil {
return err
}
g.opts.Hooks.EachNext(func(hook options.Hook) { g.init = true
switch h := hook.(type) {
case client.HookCall:
g.funcCall = h(g.funcCall)
case client.HookStream:
g.funcStream = h(g.funcStream)
case client.HookPublish:
g.funcPublish = h(g.funcPublish)
case client.HookBatchPublish:
g.funcBatchPublish = h(g.funcBatchPublish)
}
})
return nil return nil
} }
@ -455,32 +445,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
} else if rsp == nil { } else if rsp == nil {
return errors.InternalServerError("go.micro.client", "rsp is nil") return errors.InternalServerError("go.micro.client", "rsp is nil")
} }
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
err := g.funcCall(ctx, req, rsp, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return err
}
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
@ -512,6 +476,11 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
// make copy of call method // make copy of call method
gcall := g.call gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gcall = callOpts.CallWrappers[i-1](gcall)
}
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
callOpts.Router = g.opts.Router callOpts.Router = g.opts.Router
@ -611,31 +580,6 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
} }
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
stream, err := g.funcStream(ctx, req, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := status.Convert(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
}
return stream, err
}
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
@ -654,6 +598,11 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
// make a copy of stream // make a copy of stream
gstream := g.stream gstream := g.stream
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gstream = callOpts.CallWrappers[i-1](gstream)
}
// use the router passed as a call option, or fallback to the rpc clients router // use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil { if callOpts.Router == nil {
callOpts.Router = g.opts.Router callOpts.Router = g.opts.Router
@ -764,18 +713,10 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
} }
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
return g.funcBatchPublish(ctx, ps, opts...)
}
func (g *grpcClient) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, ps, opts...) return g.publish(ctx, ps, opts...)
} }
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return g.funcPublish(ctx, p, opts...)
}
func (g *grpcClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, []client.Message{p}, opts...) return g.publish(ctx, []client.Message{p}, opts...)
} }
@ -789,10 +730,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
if v, ok := os.LookupEnv("MICRO_PROXY"); ok { if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v exchange = v
} }
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
msgs := make([]*broker.Message, 0, len(ps)) msgs := make([]*broker.Message, 0, len(ps))
@ -804,16 +741,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
for _, p := range ps { for _, p := range ps {
md := metadata.Copy(omd) md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType() md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator()
var k, v string
for iter.Next(&k, &v) {
md.Set(k, v)
}
// passed in raw data // passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok { if d, ok := p.Payload().(*codec.Frame); ok {
@ -831,6 +758,16 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
} }
body = b body = b
} }
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body}) msgs = append(msgs, &broker.Message{Header: md, Body: body})
} }
@ -899,16 +836,22 @@ func NewClient(opts ...client.Option) client.Client {
options.ContentType = DefaultContentType options.ContentType = DefaultContentType
} }
c := &grpcClient{ rc := &grpcClient{
opts: options, opts: options,
} }
c.pool = NewConnPool(options.PoolSize, options.PoolTTL, c.poolMaxIdle(), c.poolMaxStreams()) rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc)
if c.opts.Context != nil { // wrap in reverse
if codecs, ok := c.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil { for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
if rc.opts.Context != nil {
if codecs, ok := rc.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs { for k, v := range codecs {
c.opts.Codecs[k] = &wrapGrpcCodec{v} rc.opts.Codecs[k] = &wrapGrpcCodec{v}
} }
} }
} }
@ -917,10 +860,5 @@ func NewClient(opts ...client.Option) client.Client {
encoding.RegisterCodec(&wrapMicroCodec{k}) encoding.RegisterCodec(&wrapMicroCodec{k})
} }
c.funcCall = c.fnCall
c.funcStream = c.fnStream
c.funcPublish = c.fnPublish
c.funcBatchPublish = c.fnBatchPublish
return c return c
} }

View File

@ -98,8 +98,8 @@ func MaxSendMsgSize(s int) client.Option {
type grpcDialOptions struct{} type grpcDialOptions struct{}
// DialOptions to be used to configure gRPC dial options // DialOptions to be used to configure gRPC dial options
func DialOptions(opts ...grpc.DialOption) client.Option { func DialOptions(opts ...grpc.DialOption) client.CallOption {
return func(o *client.Options) { return func(o *client.CallOptions) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }

View File

@ -34,5 +34,9 @@ func (r *response) Header() metadata.Metadata {
// Read the undecoded response // Read the undecoded response
func (r *response) Read() ([]byte, error) { func (r *response) Read() ([]byte, error) {
return nil, nil f := &codec.Frame{}
if err := r.codec.ReadBody(&wrapStream{r.stream}, f); err != nil {
return nil, err
}
return f.Data, nil
} }

View File

@ -6,7 +6,6 @@ import (
"sync" "sync"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/tracer"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -112,12 +111,6 @@ func (g *grpcStream) Close() error {
return nil return nil
} }
if sp, ok := tracer.SpanFromContext(g.context); ok && sp != nil {
if g.err != nil {
sp.SetStatus(tracer.SpanStatusError, g.err.Error())
}
sp.Finish()
}
// close the connection // close the connection
g.closed = true g.closed = true
g.close(g.err) g.close(g.err)