Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
e3fe27105b |
62
codec.go
62
codec.go
@ -1,6 +1,8 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
@ -63,3 +65,63 @@ func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option)
|
|||||||
}
|
}
|
||||||
return w.Codec.Unmarshal(d, v)
|
return w.Codec.Unmarshal(d, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
type grpcCodec struct {
|
||||||
|
grpc.ServerStream
|
||||||
|
// headers
|
||||||
|
id string
|
||||||
|
target string
|
||||||
|
method string
|
||||||
|
endpoint string
|
||||||
|
|
||||||
|
c encoding.Codec
|
||||||
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
|
||||||
|
/*
|
||||||
|
if m == nil {
|
||||||
|
m = codec.NewMessage(codec.Request)
|
||||||
|
}
|
||||||
|
|
||||||
|
if md, ok := metadata.FromIncomingContext(g.ServerStream.Context()); ok {
|
||||||
|
if m.Header == nil {
|
||||||
|
m.Header = meta.New(len(md))
|
||||||
|
}
|
||||||
|
for k, v := range md {
|
||||||
|
m.Header[k] = strings.Join(v, ",")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Id = g.id
|
||||||
|
m.Target = g.target
|
||||||
|
m.Method = g.method
|
||||||
|
m.Endpoint = g.endpoint
|
||||||
|
*/
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
|
||||||
|
// caller has requested a frame
|
||||||
|
if m, ok := v.(*codec.Frame); ok {
|
||||||
|
_, err := conn.Read(m.Data)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return codec.ErrInvalidMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
|
||||||
|
// if we don't have a body
|
||||||
|
if v != nil {
|
||||||
|
b, err := w.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.Body = b
|
||||||
|
}
|
||||||
|
// write the body using the framing codec
|
||||||
|
_, err := conn.Write(m.Body)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
17
go.mod
17
go.mod
@ -1,19 +1,8 @@
|
|||||||
module go.unistack.org/micro-client-grpc/v3
|
module go.unistack.org/micro-client-grpc/v3
|
||||||
|
|
||||||
go 1.21
|
go 1.16
|
||||||
|
|
||||||
toolchain go1.23.1
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
go.unistack.org/micro/v3 v3.10.91
|
go.unistack.org/micro/v3 v3.10.22
|
||||||
google.golang.org/grpc v1.67.0
|
google.golang.org/grpc v1.52.3
|
||||||
)
|
|
||||||
|
|
||||||
require (
|
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
|
||||||
golang.org/x/net v0.29.0 // indirect
|
|
||||||
golang.org/x/sys v0.25.0 // indirect
|
|
||||||
golang.org/x/text v0.18.0 // indirect
|
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
|
||||||
google.golang.org/protobuf v1.34.2 // indirect
|
|
||||||
)
|
)
|
||||||
|
170
grpc.go
170
grpc.go
@ -1,5 +1,5 @@
|
|||||||
// Package grpc provides a gRPC client for micro framework
|
// Package grpc provides a gRPC client
|
||||||
package grpc
|
package grpc // import "go.unistack.org/micro-client-grpc/v3"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,7 +8,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -18,17 +17,12 @@ import (
|
|||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v3/errors"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/options"
|
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v3/selector"
|
||||||
"go.unistack.org/micro/v3/semconv"
|
|
||||||
"go.unistack.org/micro/v3/tracer"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
gmetadata "google.golang.org/grpc/metadata"
|
gmetadata "google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -36,12 +30,8 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type grpcClient struct {
|
type grpcClient struct {
|
||||||
funcPublish client.FuncPublish
|
pool *ConnPool
|
||||||
funcBatchPublish client.FuncBatchPublish
|
opts client.Options
|
||||||
funcCall client.FuncCall
|
|
||||||
funcStream client.FuncStream
|
|
||||||
pool *ConnPool
|
|
||||||
opts client.Options
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
init bool
|
init bool
|
||||||
}
|
}
|
||||||
@ -130,9 +120,6 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
grpc.WithDefaultServiceConfig(cfgService),
|
grpc.WithDefaultServiceConfig(cfgService),
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
|
|
||||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
|
||||||
}
|
|
||||||
if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
|
if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
|
||||||
grpcDialOptions = append(grpcDialOptions, opts...)
|
grpcDialOptions = append(grpcDialOptions, opts...)
|
||||||
}
|
}
|
||||||
@ -416,23 +403,26 @@ func (g *grpcClient) Init(opts ...client.Option) error {
|
|||||||
g.pool.Unlock()
|
g.pool.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
g.funcCall = g.fnCall
|
if err := g.opts.Broker.Init(); err != nil {
|
||||||
g.funcStream = g.fnStream
|
return err
|
||||||
g.funcPublish = g.fnPublish
|
}
|
||||||
g.funcBatchPublish = g.fnBatchPublish
|
if err := g.opts.Tracer.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Router.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Logger.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Meter.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Transport.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
g.opts.Hooks.EachNext(func(hook options.Hook) {
|
g.init = true
|
||||||
switch h := hook.(type) {
|
|
||||||
case client.HookCall:
|
|
||||||
g.funcCall = h(g.funcCall)
|
|
||||||
case client.HookStream:
|
|
||||||
g.funcStream = h(g.funcStream)
|
|
||||||
case client.HookPublish:
|
|
||||||
g.funcPublish = h(g.funcPublish)
|
|
||||||
case client.HookBatchPublish:
|
|
||||||
g.funcBatchPublish = h(g.funcBatchPublish)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -455,32 +445,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
} else if rsp == nil {
|
} else if rsp == nil {
|
||||||
return errors.InternalServerError("go.micro.client", "rsp is nil")
|
return errors.InternalServerError("go.micro.client", "rsp is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := time.Now()
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
|
||||||
var sp tracer.Span
|
|
||||||
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
|
||||||
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
|
||||||
)
|
|
||||||
err := g.funcCall(ctx, req, rsp, opts...)
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
|
||||||
te := time.Since(ts)
|
|
||||||
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
||||||
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
||||||
|
|
||||||
if me := errors.FromError(err); me == nil {
|
|
||||||
sp.Finish()
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
|
||||||
} else {
|
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := g.opts.CallOptions
|
callOpts := g.opts.CallOptions
|
||||||
|
|
||||||
@ -512,6 +476,11 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|||||||
// make copy of call method
|
// make copy of call method
|
||||||
gcall := g.call
|
gcall := g.call
|
||||||
|
|
||||||
|
// wrap the call in reverse
|
||||||
|
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
||||||
|
gcall = callOpts.CallWrappers[i-1](gcall)
|
||||||
|
}
|
||||||
|
|
||||||
// use the router passed as a call option, or fallback to the rpc clients router
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
if callOpts.Router == nil {
|
if callOpts.Router == nil {
|
||||||
callOpts.Router = g.opts.Router
|
callOpts.Router = g.opts.Router
|
||||||
@ -611,31 +580,6 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
ts := time.Now()
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
|
||||||
var sp tracer.Span
|
|
||||||
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
|
||||||
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
|
||||||
)
|
|
||||||
stream, err := g.funcStream(ctx, req, opts...)
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
|
||||||
te := time.Since(ts)
|
|
||||||
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
||||||
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
||||||
|
|
||||||
if me := status.Convert(err); me == nil {
|
|
||||||
sp.Finish()
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
|
|
||||||
} else {
|
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
|
||||||
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
|
|
||||||
}
|
|
||||||
|
|
||||||
return stream, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := g.opts.CallOptions
|
callOpts := g.opts.CallOptions
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@ -654,6 +598,11 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|||||||
// make a copy of stream
|
// make a copy of stream
|
||||||
gstream := g.stream
|
gstream := g.stream
|
||||||
|
|
||||||
|
// wrap the call in reverse
|
||||||
|
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
||||||
|
gstream = callOpts.CallWrappers[i-1](gstream)
|
||||||
|
}
|
||||||
|
|
||||||
// use the router passed as a call option, or fallback to the rpc clients router
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
if callOpts.Router == nil {
|
if callOpts.Router == nil {
|
||||||
callOpts.Router = g.opts.Router
|
callOpts.Router = g.opts.Router
|
||||||
@ -764,18 +713,10 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
||||||
return g.funcBatchPublish(ctx, ps, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *grpcClient) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
|
||||||
return g.publish(ctx, ps, opts...)
|
return g.publish(ctx, ps, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
return g.funcPublish(ctx, p, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *grpcClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
|
||||||
return g.publish(ctx, []client.Message{p}, opts...)
|
return g.publish(ctx, []client.Message{p}, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -789,10 +730,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
||||||
exchange = v
|
exchange = v
|
||||||
}
|
}
|
||||||
// get the exchange
|
|
||||||
if len(options.Exchange) > 0 {
|
|
||||||
exchange = options.Exchange
|
|
||||||
}
|
|
||||||
|
|
||||||
msgs := make([]*broker.Message, 0, len(ps))
|
msgs := make([]*broker.Message, 0, len(ps))
|
||||||
|
|
||||||
@ -804,16 +741,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
for _, p := range ps {
|
for _, p := range ps {
|
||||||
md := metadata.Copy(omd)
|
md := metadata.Copy(omd)
|
||||||
md[metadata.HeaderContentType] = p.ContentType()
|
md[metadata.HeaderContentType] = p.ContentType()
|
||||||
topic := p.Topic()
|
|
||||||
if len(exchange) > 0 {
|
|
||||||
topic = exchange
|
|
||||||
}
|
|
||||||
md.Set(metadata.HeaderTopic, topic)
|
|
||||||
iter := p.Metadata().Iterator()
|
|
||||||
var k, v string
|
|
||||||
for iter.Next(&k, &v) {
|
|
||||||
md.Set(k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// passed in raw data
|
// passed in raw data
|
||||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||||
@ -831,6 +758,16 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
}
|
}
|
||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
topic := p.Topic()
|
||||||
|
if len(exchange) > 0 {
|
||||||
|
topic = exchange
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range p.Metadata() {
|
||||||
|
md.Set(k, v)
|
||||||
|
}
|
||||||
|
md.Set(metadata.HeaderTopic, topic)
|
||||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -899,16 +836,22 @@ func NewClient(opts ...client.Option) client.Client {
|
|||||||
options.ContentType = DefaultContentType
|
options.ContentType = DefaultContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &grpcClient{
|
rc := &grpcClient{
|
||||||
opts: options,
|
opts: options,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.pool = NewConnPool(options.PoolSize, options.PoolTTL, c.poolMaxIdle(), c.poolMaxStreams())
|
rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
|
||||||
|
c := client.Client(rc)
|
||||||
|
|
||||||
if c.opts.Context != nil {
|
// wrap in reverse
|
||||||
if codecs, ok := c.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
|
for i := len(options.Wrappers); i > 0; i-- {
|
||||||
|
c = options.Wrappers[i-1](c)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rc.opts.Context != nil {
|
||||||
|
if codecs, ok := rc.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
|
||||||
for k, v := range codecs {
|
for k, v := range codecs {
|
||||||
c.opts.Codecs[k] = &wrapGrpcCodec{v}
|
rc.opts.Codecs[k] = &wrapGrpcCodec{v}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -917,10 +860,5 @@ func NewClient(opts ...client.Option) client.Client {
|
|||||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||||
}
|
}
|
||||||
|
|
||||||
c.funcCall = c.fnCall
|
|
||||||
c.funcStream = c.fnStream
|
|
||||||
c.funcPublish = c.fnPublish
|
|
||||||
c.funcBatchPublish = c.fnBatchPublish
|
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -98,8 +98,8 @@ func MaxSendMsgSize(s int) client.Option {
|
|||||||
type grpcDialOptions struct{}
|
type grpcDialOptions struct{}
|
||||||
|
|
||||||
// DialOptions to be used to configure gRPC dial options
|
// DialOptions to be used to configure gRPC dial options
|
||||||
func DialOptions(opts ...grpc.DialOption) client.Option {
|
func DialOptions(opts ...grpc.DialOption) client.CallOption {
|
||||||
return func(o *client.Options) {
|
return func(o *client.CallOptions) {
|
||||||
if o.Context == nil {
|
if o.Context == nil {
|
||||||
o.Context = context.Background()
|
o.Context = context.Background()
|
||||||
}
|
}
|
||||||
|
@ -34,5 +34,9 @@ func (r *response) Header() metadata.Metadata {
|
|||||||
|
|
||||||
// Read the undecoded response
|
// Read the undecoded response
|
||||||
func (r *response) Read() ([]byte, error) {
|
func (r *response) Read() ([]byte, error) {
|
||||||
return nil, nil
|
f := &codec.Frame{}
|
||||||
|
if err := r.codec.ReadBody(&wrapStream{r.stream}, f); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return f.Data, nil
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -112,12 +111,6 @@ func (g *grpcStream) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if sp, ok := tracer.SpanFromContext(g.context); ok && sp != nil {
|
|
||||||
if g.err != nil {
|
|
||||||
sp.SetStatus(tracer.SpanStatusError, g.err.Error())
|
|
||||||
}
|
|
||||||
sp.Finish()
|
|
||||||
}
|
|
||||||
// close the connection
|
// close the connection
|
||||||
g.closed = true
|
g.closed = true
|
||||||
g.close(g.err)
|
g.close(g.err)
|
||||||
|
Loading…
Reference in New Issue
Block a user