diff --git a/.gitea/pkgdashcli.yaml b/.gitea/pkgdashcli.yaml new file mode 100644 index 00000000..28cdda1d --- /dev/null +++ b/.gitea/pkgdashcli.yaml @@ -0,0 +1,3 @@ +branches: + - master + - v3 \ No newline at end of file diff --git a/.gitea/workflows/autoupdate.yml b/.gitea/workflows/autoupdate.yml new file mode 100644 index 00000000..f67ff748 --- /dev/null +++ b/.gitea/workflows/autoupdate.yml @@ -0,0 +1,24 @@ +on: + push: + branches: + - 'main' + - 'master' + - 'v3' + schedule: + - cron: '* * * * *' + #- cron: '@hourly' + +jobs: + autoupdate: + runs-on: ubuntu-latest + steps: + - name: setup-go + uses: https://gitea.com/actions/setup-go@v3 + with: + go-version: 1.21 + - name: checkout + uses: https://gitea.com/actions/checkout@v3 + - name: get pkgdashcli + run: GOPROXY=direct GONOSUMDB="git.unistack.org/*" GONOPROXY="git.unistack.org/*" GOBIN=/bin go install git.unistack.org/unistack-org/pkgdash/cmd/pkgdashcli@latest + - name: pkgdashcli check + run: /bin/pkgdashcli check \ No newline at end of file diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/lint.yml index 8da88945..aece5490 100644 --- a/.gitea/workflows/lint.yml +++ b/.gitea/workflows/lint.yml @@ -12,7 +12,7 @@ jobs: - name: setup-go uses: https://gitea.com/actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.21 - name: checkout uses: https://gitea.com/actions/checkout@v3 - name: deps diff --git a/.gitea/workflows/pr.yml b/.gitea/workflows/pr.yml index ba947f0c..520b5350 100644 --- a/.gitea/workflows/pr.yml +++ b/.gitea/workflows/pr.yml @@ -14,7 +14,7 @@ jobs: - name: setup-go uses: https://gitea.com/actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.21 - name: deps run: go get -v -t -d ./... - name: test diff --git a/config/default.go b/config/default.go index 9032b02b..53175018 100644 --- a/config/default.go +++ b/config/default.go @@ -8,7 +8,7 @@ import ( "time" "github.com/google/uuid" - "github.com/imdario/mergo" + "dario.cat/mergo" "go.unistack.org/micro/v4/options" mid "go.unistack.org/micro/v4/util/id" rutil "go.unistack.org/micro/v4/util/reflect" diff --git a/config/options.go b/config/options.go index 667d3262..eaa8b15e 100644 --- a/config/options.go +++ b/config/options.go @@ -63,9 +63,9 @@ func NewOptions(opts ...options.Option) Options { // LoadOptions struct type LoadOptions struct { Struct interface{} + Context context.Context Override bool Append bool - Context context.Context } // NewLoadOptions create LoadOptions struct with provided opts diff --git a/go.mod b/go.mod index 3216d4c6..56644783 100644 --- a/go.mod +++ b/go.mod @@ -3,20 +3,20 @@ module go.unistack.org/micro/v4 go 1.20 require ( + dario.cat/mergo v1.0.0 github.com/DATA-DOG/go-sqlmock v1.5.0 - github.com/google/uuid v1.3.0 - github.com/imdario/mergo v0.3.15 + github.com/google/uuid v1.3.1 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.3.0 golang.org/x/sys v0.12.0 - google.golang.org/grpc v1.57.0 + google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 - ) require ( github.com/golang/protobuf v1.5.3 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + golang.org/x/net v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect ) diff --git a/go.sum b/go.sum index 812c2f7d..d27f068a 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -5,27 +7,26 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= -github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= +google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= +google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/logger/wrapper/wrapper.go b/logger/wrapper/wrapper.go index af8050e0..eb7cda00 100644 --- a/logger/wrapper/wrapper.go +++ b/logger/wrapper/wrapper.go @@ -7,12 +7,13 @@ import ( "go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/server" ) var ( // DefaultClientCallObserver called by wrapper in client Call - DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string { + DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []options.Option, err error) []string { labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} if err != nil { labels = append(labels, "error", err.Error()) @@ -21,7 +22,7 @@ var ( } // DefaultClientStreamObserver called by wrapper in client Stream - DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string { + DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []options.Option, stream client.Stream, err error) []string { labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} if err != nil { labels = append(labels, "error", err.Error()) @@ -60,9 +61,9 @@ type lWrapper struct { type ( // ClientCallObserver func signature - ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string + ClientCallObserver func(context.Context, client.Request, interface{}, []options.Option, error) []string // ClientStreamObserver func signature - ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string + ClientStreamObserver func(context.Context, client.Request, []options.Option, client.Stream, error) []string // ClientCallFuncObserver func signature ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string // ServerHandlerObserver func signature @@ -167,7 +168,7 @@ func SkipEndpoints(eps ...string) Option { } } -func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { +func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...options.Option) error { err := l.Client.Call(ctx, req, rsp, opts...) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) @@ -190,7 +191,7 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{} return err } -func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { +func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...options.Option) (client.Stream, error) { stream, err := l.Client.Stream(ctx, req, opts...) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go index d3379ce7..26f8cd0f 100644 --- a/meter/wrapper/wrapper.go +++ b/meter/wrapper/wrapper.go @@ -1,4 +1,4 @@ -package wrapper // import "go.unistack.org/micro/v4/meter/wrapper" +package wrapper import ( "context" @@ -7,6 +7,7 @@ import ( "go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/meter" + "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/server" ) @@ -24,7 +25,7 @@ var ( // Options struct type Options struct { Meter meter.Meter - lopts []meter.Option + lopts []options.Option SkipEndpoints []string } @@ -35,7 +36,7 @@ type Option func(*Options) func NewOptions(opts ...Option) Options { options := Options{ Meter: meter.DefaultMeter, - lopts: make([]meter.Option, 0, 5), + lopts: make([]options.Option, 0, 5), SkipEndpoints: DefaultSkipEndpoints, } for _, o := range opts { @@ -137,7 +138,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, return err } -func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { +func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...options.Option) error { endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) for _, ep := range w.opts.SkipEndpoints { if ep == endpoint { @@ -167,7 +168,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, return err } -func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { +func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...options.Option) (client.Stream, error) { endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) for _, ep := range w.opts.SkipEndpoints { if ep == endpoint { diff --git a/store/memory_test.go b/store/memory_test.go index 9e738247..4d6c67ff 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -5,12 +5,13 @@ import ( "testing" "time" + "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/store" ) func TestMemoryReInit(t *testing.T) { - s := store.NewStore(store.Namespace("aaa")) - if err := s.Init(store.Namespace("")); err != nil { + s := store.NewStore(options.Namespace("aaa")) + if err := s.Init(options.Namespace("")); err != nil { t.Fatal(err) } if len(s.Options().Namespace) > 0 { @@ -28,7 +29,7 @@ func TestMemoryBasic(t *testing.T) { func TestMemoryPrefix(t *testing.T) { s := store.NewStore() - if err := s.Init(store.Namespace("some-prefix")); err != nil { + if err := s.Init(options.Namespace("some-prefix")); err != nil { t.Fatal(err) } basictest(s, t) @@ -36,7 +37,7 @@ func TestMemoryPrefix(t *testing.T) { func TestMemoryNamespace(t *testing.T) { s := store.NewStore() - if err := s.Init(store.Namespace("some-namespace")); err != nil { + if err := s.Init(options.Namespace("some-namespace")); err != nil { t.Fatal(err) } basictest(s, t) @@ -44,7 +45,7 @@ func TestMemoryNamespace(t *testing.T) { func TestMemoryNamespacePrefix(t *testing.T) { s := store.NewStore() - if err := s.Init(store.Namespace("some-namespace")); err != nil { + if err := s.Init(options.Namespace("some-namespace")); err != nil { t.Fatal(err) } basictest(s, t) diff --git a/tracer/noop.go b/tracer/noop.go index 77f53191..7fd2cc0a 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -9,19 +9,27 @@ import ( var _ Tracer = (*noopTracer)(nil) type noopTracer struct { - opts Options + opts Options + spans []Span +} + +func (t *noopTracer) Spans() []Span { + return t.spans } func (t *noopTracer) Start(ctx context.Context, name string, opts ...options.Option) (context.Context, Span) { + options := NewSpanOptions(opts...) span := &noopSpan{ name: name, ctx: ctx, tracer: t, - opts: NewSpanOptions(opts...), + labels: options.Labels, + kind: options.Kind, } if span.ctx == nil { span.ctx = context.Background() } + t.spans = append(t.spans, span) return NewSpanContext(ctx, span), span } @@ -40,13 +48,21 @@ func (t *noopTracer) Name() string { return t.opts.Name } +type noopEvent struct { + name string + labels []interface{} +} + type noopSpan struct { ctx context.Context tracer Tracer name string - opts SpanOptions - status SpanStatus statusMsg string + events []*noopEvent + labels []interface{} + logs []interface{} + kind SpanKind + status SpanStatus } func (s *noopSpan) Finish(opts ...options.Option) { @@ -61,22 +77,24 @@ func (s *noopSpan) Tracer() Tracer { } func (s *noopSpan) AddEvent(name string, opts ...options.Option) { + options := NewEventOptions(opts...) + s.events = append(s.events, &noopEvent{name: name, labels: options.Labels}) } func (s *noopSpan) SetName(name string) { s.name = name } -func (s *noopSpan) SetLabels(labels ...interface{}) { - s.opts.Labels = labels +func (s *noopSpan) AddLogs(kv ...interface{}) { + s.logs = append(s.logs, kv...) } -func (s *noopSpan) AddLabels(labels ...interface{}) { - s.opts.Labels = append(s.opts.Labels, labels...) +func (s *noopSpan) AddLabels(kv ...interface{}) { + s.labels = append(s.labels, kv...) } func (s *noopSpan) Kind() SpanKind { - return s.opts.Kind + return s.kind } func (s *noopSpan) Status() (SpanStatus, string) { diff --git a/tracer/options.go b/tracer/options.go index 1405a5d8..dd185ce7 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -159,6 +159,15 @@ func NewSpanOptions(opts ...options.Option) SpanOptions { return options } +// NewEventOptions returns default EventOptions +func NewEventOptions(opts ...options.Option) EventOptions { + options := EventOptions{} + for _, o := range opts { + o(&options) + } + return options +} + // NewOptions returns default options func NewOptions(opts ...options.Option) Options { options := Options{ diff --git a/tracer/tracer.go b/tracer/tracer.go index 442113c9..387cdee5 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -3,6 +3,8 @@ package tracer // import "go.unistack.org/micro/v4/tracer" import ( "context" + "fmt" + "sort" "go.unistack.org/micro/v4/options" ) @@ -27,8 +29,6 @@ type Span interface { Tracer() Tracer // Finish complete and send span Finish(opts ...options.Option) - // AddEvent add event to span - AddEvent(name string, opts ...options.Option) // Context return context with span Context() context.Context // SetName set the span name @@ -37,10 +37,45 @@ type Span interface { SetStatus(status SpanStatus, msg string) // Status returns span status and msg Status() (SpanStatus, string) - // SetLabels set the span labels - SetLabels(labels ...interface{}) - // AddLabels append the span labels - AddLabels(labels ...interface{}) + // AddLabels append labels to span + AddLabels(kv ...interface{}) + // AddEvent append event to span + AddEvent(name string, opts ...options.Option) + // AddLogs append logs to span + AddLogs(kv ...interface{}) // Kind returns span kind Kind() SpanKind } + +// sort labels alphabeticaly by label name +type byKey []interface{} + +func (k byKey) Len() int { return len(k) / 2 } +func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } +func (k byKey) Swap(i, j int) { + k[i*2], k[j*2] = k[j*2], k[i*2] + k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] +} + +func UniqLabels(labels []interface{}) []interface{} { + if len(labels)%2 == 1 { + labels = labels[:len(labels)-1] + } + if len(labels) > 2 { + sort.Sort(byKey(labels)) + + idx := 0 + for { + if labels[idx] == labels[idx+2] { + copy(labels[idx:], labels[idx+2:]) + labels = labels[:len(labels)-2] + } else { + idx += 2 + } + if idx+2 >= len(labels) { + break + } + } + } + return labels +} diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go index 29cea56b..34d20c9a 100644 --- a/tracer/wrapper/wrapper.go +++ b/tracer/wrapper/wrapper.go @@ -4,6 +4,7 @@ package wrapper // import "go.unistack.org/micro/v4/tracer/wrapper" import ( "context" "fmt" + "strings" "go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/metadata" @@ -14,11 +15,11 @@ import ( var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID} -func extractLabels(md metadata.Metadata) []string { - labels := make([]string, 0, 5) +func ExtractDefaultLabels(md metadata.Metadata) []interface{} { + labels := make([]interface{}, 0, len(DefaultHeadersExctract)) for _, k := range DefaultHeadersExctract { if v, ok := md.Get(k); ok { - labels = append(labels, k, v) + labels = append(labels, strings.ToLower(k), v) } } return labels @@ -29,7 +30,7 @@ var ( sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -38,10 +39,9 @@ var ( } DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []options.Option, stream client.Stream, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -50,10 +50,9 @@ var ( } DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -62,10 +61,10 @@ var ( } DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) + sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -177,23 +176,22 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ } } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "unary", - ), - ) - } + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindClient), + options.Labels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "unary", + ), + ) defer sp.Finish() - err := ot.Client.Call(ctx, req, rsp, opts...) + err := ot.Client.Call(nctx, req, rsp, opts...) for _, o := range ot.opts.ClientCallObservers { - o(ctx, req, rsp, opts, sp, err) + o(nctx, req, rsp, opts, sp, err) } return err @@ -207,23 +205,22 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...opti } } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "stream", - ), - ) - } + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindClient), + options.Labels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "stream", + ), + ) defer sp.Finish() - stream, err := ot.Client.Stream(ctx, req, opts...) + stream, err := ot.Client.Stream(nctx, req, opts...) for _, o := range ot.opts.ClientStreamObservers { - o(ctx, req, opts, stream, sp, err) + o(nctx, req, opts, stream, sp, err) } return stream, err @@ -242,24 +239,22 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i callType = "stream" } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server", - tracer.WithSpanKind(tracer.SpanKindServer), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", callType, - ), - ) - } - + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindServer), + options.Labels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", callType, + ), + ) defer sp.Finish() - err := ot.serverHandler(ctx, req, rsp) + err := ot.serverHandler(nctx, req, rsp) for _, o := range ot.opts.ServerHandlerObservers { - o(ctx, req, rsp, sp, err) + o(nctx, req, rsp, sp, err) } return err @@ -297,23 +292,23 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client. } } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "unary", - ), - ) - } + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindClient), + options.Labels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "unary", + ), + ) + defer sp.Finish() - err := ot.clientCallFunc(ctx, addr, req, rsp, opts) + err := ot.clientCallFunc(nctx, addr, req, rsp, opts) for _, o := range ot.opts.ClientCallFuncObservers { - o(ctx, addr, req, rsp, opts, sp, err) + o(nctx, addr, req, rsp, opts, sp, err) } return err diff --git a/util/grpc/tracer.go b/util/grpc/tracer.go new file mode 100644 index 00000000..3a9dd524 --- /dev/null +++ b/util/grpc/tracer.go @@ -0,0 +1,256 @@ +package grpc_util + +import ( + "context" + "net" + "strconv" + "strings" + "sync/atomic" + "time" + + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/tracer" + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +type gRPCContextKey struct{} + +type gRPCContext struct { + messagesReceived int64 + messagesSent int64 +} + +type Options struct { + Tracer tracer.Tracer +} + +// NewServerHandler creates a stats.Handler for gRPC server. +func NewServerHandler(tr tracer.Tracer) stats.Handler { + h := &serverHandler{ + tr: tr, + } + return h +} + +type serverHandler struct { + tr tracer.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + name, attrs := parseFullMethod(info.FullMethodName) + attrs = append(attrs, "rpc.system", "grpc") + ctx, _ = h.tr.Start( + ctx, + name, + tracer.WithSpanKind(tracer.SpanKindServer), + options.Labels(attrs...), + ) + + gctx := gRPCContext{} + return context.WithValue(ctx, gRPCContextKey{}, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + if span, ok := tracer.SpanFromContext(ctx); ok { + attrs := peerAttr(peerFromCtx(ctx)) + span.AddLabels(attrs...) + } + return ctx +} + +// HandleConn processes the Conn stats. +func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { +} + +type clientHandler struct { + tr tracer.Tracer +} + +// NewClientHandler creates a stats.Handler for gRPC client. +func NewClientHandler(tr tracer.Tracer) stats.Handler { + h := &clientHandler{ + tr: tr, + } + return h +} + +// TagRPC can attach some information to the given context. +func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + name, attrs := parseFullMethod(info.FullMethodName) + attrs = append(attrs, "rpc.system", "grpc", "rpc.flavor", "grpc", "rpc.call", info.FullMethodName) + ctx, _ = h.tr.Start( + ctx, + name, + tracer.WithSpanKind(tracer.SpanKindClient), + options.Labels(attrs...), + ) + + gctx := gRPCContext{} + + return context.WithValue(ctx, gRPCContextKey{}, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { + // TODO + if span, ok := tracer.SpanFromContext(ctx); ok { + attrs := peerAttr(cti.RemoteAddr.String()) + span.AddLabels(attrs...) + } + return ctx +} + +// HandleConn processes the Conn stats. +func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { + // no-op +} + +func handleRPC(ctx context.Context, rs stats.RPCStats) { + span, ok := tracer.SpanFromContext(ctx) + gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) + var messageID int64 + if rs.IsClient() { + span.AddLabels("span.kind", "client") + } else { + span.AddLabels("span.kind", "server") + } + + switch rs := rs.(type) { + case *stats.Begin: + if rs.IsClientStream || rs.IsServerStream { + span.AddLabels("rpc.call_type", "stream") + } else { + span.AddLabels("rpc.call_type", "unary") + } + span.AddEvent("message", + options.Labels( + "message.begin_time", rs.BeginTime.Format(time.RFC3339), + ), + ) + case *stats.InPayload: + if gctx != nil { + messageID = atomic.AddInt64(&gctx.messagesReceived, 1) + } + if ok { + span.AddEvent("message", + options.Labels( + "message.recv_time", rs.RecvTime.Format(time.RFC3339), + "message.type", "RECEIVED", + "message.id", messageID, + "message.compressed_size", rs.CompressedLength, + "message.uncompressed_size", rs.Length, + ), + ) + } + case *stats.OutPayload: + if gctx != nil { + messageID = atomic.AddInt64(&gctx.messagesSent, 1) + } + if ok { + span.AddEvent("message", + options.Labels( + "message.sent_time", rs.SentTime.Format(time.RFC3339), + "message.type", "SENT", + "message.id", messageID, + "message.compressed_size", rs.CompressedLength, + "message.uncompressed_size", rs.Length, + ), + ) + } + case *stats.End: + if ok { + span.AddEvent("message", + options.Labels( + "message.begin_time", rs.BeginTime.Format(time.RFC3339), + "message.end_time", rs.EndTime.Format(time.RFC3339), + ), + ) + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(tracer.SpanStatusError, s.Message()) + span.AddLabels("rpc.grpc.status_code", s.Code()) + } else { + span.AddLabels("rpc.grpc.status_code", grpc_codes.OK) + } + span.Finish() + } + default: + return + } +} + +func parseFullMethod(fullMethod string) (string, []interface{}) { + if !strings.HasPrefix(fullMethod, "/") { + // Invalid format, does not follow `/package.service/method`. + return fullMethod, nil + } + name := fullMethod[1:] + pos := strings.LastIndex(name, "/") + if pos < 0 { + // Invalid format, does not follow `/package.service/method`. + return name, nil + } + service, method := name[:pos], name[pos+1:] + + var attrs []interface{} + if service != "" { + attrs = append(attrs, "rpc.service", service) + } + if method != "" { + attrs = append(attrs, "rpc.method", method) + } + return name, attrs +} + +func peerAttr(addr string) []interface{} { + host, p, err := net.SplitHostPort(addr) + if err != nil { + return nil + } + + if host == "" { + host = "127.0.0.1" + } + port, err := strconv.Atoi(p) + if err != nil { + return nil + } + + var attr []interface{} + if ip := net.ParseIP(host); ip != nil { + attr = []interface{}{ + "net.sock.peer.addr", host, + "net.sock.peer.port", port, + } + } else { + attr = []interface{}{ + "net.peer.name", host, + "net.peer.port", port, + } + } + + return attr +} + +func peerFromCtx(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "" + } + return p.Addr.String() +} diff --git a/util/http/clienttracer.go b/util/http/clienttracer.go new file mode 100644 index 00000000..4684453a --- /dev/null +++ b/util/http/clienttracer.go @@ -0,0 +1,254 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "context" + "crypto/tls" + "net/http/httptrace" + "net/textproto" + "strings" + "sync" + + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/tracer" +) + +const ( + httpStatus = "http.status" + httpHeaderMIME = "http.mime" + httpRemoteAddr = "http.remote" + httpLocalAddr = "http.local" + httpHost = "http.host" +) + +var hookMap = map[string]string{ + "http.dns": "http.getconn", + "http.connect": "http.getconn", + "http.tls": "http.getconn", +} + +func parentHook(hook string) string { + if strings.HasPrefix(hook, "http.connect") { + return hookMap["http.connect"] + } + return hookMap[hook] +} + +type clientTracer struct { + context.Context + tr tracer.Tracer + activeHooks map[string]context.Context + root tracer.Span + mtx sync.Mutex +} + +func NewClientTrace(ctx context.Context, tr tracer.Tracer) *httptrace.ClientTrace { + ct := &clientTracer{ + Context: ctx, + activeHooks: make(map[string]context.Context), + tr: tr, + } + + return &httptrace.ClientTrace{ + GetConn: ct.getConn, + GotConn: ct.gotConn, + PutIdleConn: ct.putIdleConn, + GotFirstResponseByte: ct.gotFirstResponseByte, + Got100Continue: ct.got100Continue, + Got1xxResponse: ct.got1xxResponse, + DNSStart: ct.dnsStart, + DNSDone: ct.dnsDone, + ConnectStart: ct.connectStart, + ConnectDone: ct.connectDone, + TLSHandshakeStart: ct.tlsHandshakeStart, + TLSHandshakeDone: ct.tlsHandshakeDone, + WroteHeaderField: ct.wroteHeaderField, + WroteHeaders: ct.wroteHeaders, + Wait100Continue: ct.wait100Continue, + WroteRequest: ct.wroteRequest, + } +} + +func (ct *clientTracer) start(hook, spanName string, attrs ...interface{}) { + ct.mtx.Lock() + defer ct.mtx.Unlock() + + if hookCtx, found := ct.activeHooks[hook]; !found { + var sp tracer.Span + ct.activeHooks[hook], sp = ct.tr.Start(ct.getParentContext(hook), spanName, options.Labels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient)) + if ct.root == nil { + ct.root = sp + } + } else { + // end was called before start finished, add the start attributes and end the span here + if span, ok := tracer.SpanFromContext(hookCtx); ok { + span.AddLabels(attrs...) + span.Finish() + } + + delete(ct.activeHooks, hook) + } +} + +func (ct *clientTracer) end(hook string, err error, attrs ...interface{}) { + ct.mtx.Lock() + defer ct.mtx.Unlock() + if ctx, ok := ct.activeHooks[hook]; ok { // nolint:nestif + if span, ok := tracer.SpanFromContext(ctx); ok { + if err != nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } + span.AddLabels(attrs...) + span.Finish() + } + delete(ct.activeHooks, hook) + } else { + // start is not finished before end is called. + // Start a span here with the ending attributes that will be finished when start finishes. + // Yes, it's backwards. v0v + ctx, span := ct.tr.Start(ct.getParentContext(hook), hook, options.Labels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient)) + if err != nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } + ct.activeHooks[hook] = ctx + } +} + +func (ct *clientTracer) getParentContext(hook string) context.Context { + ctx, ok := ct.activeHooks[parentHook(hook)] + if !ok { + return ct.Context + } + return ctx +} + +func (ct *clientTracer) span(hook string) (tracer.Span, bool) { + ct.mtx.Lock() + defer ct.mtx.Unlock() + if ctx, ok := ct.activeHooks[hook]; ok { + return tracer.SpanFromContext(ctx) + } + return nil, false +} + +func (ct *clientTracer) getConn(host string) { + ct.start("http.getconn", "http.getconn", httpHost, host) +} + +func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) { + ct.end("http.getconn", + nil, + httpRemoteAddr, info.Conn.RemoteAddr().String(), + httpLocalAddr, info.Conn.LocalAddr().String(), + ) +} + +func (ct *clientTracer) putIdleConn(err error) { + ct.end("http.receive", err) +} + +func (ct *clientTracer) gotFirstResponseByte() { + ct.start("http.receive", "http.receive") +} + +func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) { + ct.start("http.dns", "http.dns", httpHost, info.Host) +} + +func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) { + ct.end("http.dns", info.Err) +} + +func (ct *clientTracer) connectStart(network, addr string) { + _ = network + ct.start("http.connect."+addr, "http.connect", httpRemoteAddr, addr) +} + +func (ct *clientTracer) connectDone(network, addr string, err error) { + _ = network + ct.end("http.connect."+addr, err) +} + +func (ct *clientTracer) tlsHandshakeStart() { + ct.start("http.tls", "http.tls") +} + +func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) { + ct.end("http.tls", err) +} + +func (ct *clientTracer) wroteHeaderField(k string, v []string) { + if sp, ok := ct.span("http.headers"); !ok || sp == nil { + ct.start("http.headers", "http.headers") + } + ct.root.AddLabels("http."+strings.ToLower(k), sliceToString(v)) +} + +func (ct *clientTracer) wroteHeaders() { + ct.start("http.send", "http.send") +} + +func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) { + if info.Err != nil { + ct.root.SetStatus(tracer.SpanStatusError, info.Err.Error()) + } + ct.end("http.send", info.Err) +} + +func (ct *clientTracer) got100Continue() { + if sp, ok := ct.span("http.receive"); ok && sp != nil { + sp.AddEvent("GOT 100 - Continue") + } +} + +func (ct *clientTracer) wait100Continue() { + if sp, ok := ct.span("http.receive"); ok && sp != nil { + sp.AddEvent("GOT 100 - Wait") + } +} + +func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error { + if sp, ok := ct.span("http.receive"); ok && sp != nil { + sp.AddEvent("GOT 1xx", + options.Labels( + httpStatus, code, + httpHeaderMIME, sm2s(header), + ), + ) + } + return nil +} + +func sliceToString(value []string) string { + if len(value) == 0 { + return "undefined" + } + return strings.Join(value, ",") +} + +func sm2s(value map[string][]string) string { + var buf strings.Builder + for k, v := range value { + if buf.Len() != 0 { + buf.WriteString(",") + } + buf.WriteString(k) + buf.WriteString("=") + buf.WriteString(sliceToString(v)) + } + return buf.String() +} diff --git a/util/reflect/reflect.go b/util/reflect/reflect.go index 031dcc72..7d34b068 100644 --- a/util/reflect/reflect.go +++ b/util/reflect/reflect.go @@ -508,3 +508,74 @@ func FieldName(name string) string { return string(newstr) } + +func Equal(src interface{}, dst interface{}, excptFields ...string) bool { + srcVal := reflect.ValueOf(src) + dstVal := reflect.ValueOf(dst) + + switch srcVal.Kind() { + case reflect.Array, reflect.Slice: + for i := 0; i < srcVal.Len(); i++ { + e := srcVal.Index(i).Interface() + a := dstVal.Index(i).Interface() + if !Equal(e, a, excptFields...) { + return false + } + } + return true + case reflect.Map: + for i := 0; i < len(srcVal.MapKeys()); i++ { + key := srcVal.MapKeys()[i] + keyStr := fmt.Sprintf("%v", key.Interface()) + if stringContains(keyStr, excptFields) { + continue + } + s := srcVal.MapIndex(key) + d := dstVal.MapIndex(key) + if !Equal(s.Interface(), d.Interface(), excptFields...) { + return false + } + } + return true + case reflect.Struct, reflect.Interface: + for i := 0; i < srcVal.NumField(); i++ { + typeField := srcVal.Type().Field(i) + if stringContains(typeField.Name, excptFields) { + continue + } + s := srcVal.Field(i) + d := dstVal.FieldByName(typeField.Name) + if s.CanInterface() && d.CanInterface() { + if !Equal(s.Interface(), d.Interface(), excptFields...) { + return false + } + } else { + return false + } + } + return true + case reflect.Ptr: + if srcVal.IsNil() { + return dstVal.IsNil() + } + s := srcVal.Elem() + d := reflect.Indirect(dstVal) + if s.CanInterface() && d.CanInterface() { + return Equal(s.Interface(), d.Interface(), excptFields...) + } + return false + case reflect.String, reflect.Int, reflect.Int64, reflect.Float32, reflect.Float64, reflect.Bool: + return src == dst + default: + return srcVal.Interface() == dstVal.Interface() + } +} + +func stringContains(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} diff --git a/util/reflect/reflect_test.go b/util/reflect/reflect_test.go index b475bc08..a2cef0de 100644 --- a/util/reflect/reflect_test.go +++ b/util/reflect/reflect_test.go @@ -133,3 +133,16 @@ func TestMergeNested(t *testing.T) { t.Fatalf("merge error: %#+v", dst.Nested) } } + +func TestEqual(t *testing.T) { + type tstr struct { + Key1 string + Key2 string + } + + src := &tstr{Key1: "val1", Key2: "micro:generate"} + dst := &tstr{Key1: "val1", Key2: "val2"} + if !Equal(src, dst, "Key2") { + t.Fatal("invalid Equal test") + } +} diff --git a/util/test/test.go b/util/test/test.go index 56aa63a5..44120d09 100644 --- a/util/test/test.go +++ b/util/test/test.go @@ -20,6 +20,7 @@ import ( "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/options" "golang.org/x/sync/errgroup" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -226,7 +227,7 @@ func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error) return nil, err } - req := c.NewRequest("test", endpoint, &codec.Frame{Data: reqbuf}, client.RequestContentType(ct)) + req := c.NewRequest("test", endpoint, &codec.Frame{Data: reqbuf}, options.ContentType(ct)) return req, nil } @@ -373,7 +374,7 @@ func Run(ctx context.Context, c client.Client, m sqlmock.Sqlmock, dir string, ex data := &codec.Frame{} md := metadata.New(1) md.Set("X-Request-Id", xrid) - cerr := c.Call(metadata.NewOutgoingContext(gctx, md), treq, data, client.WithContentType(treq.ContentType())) + cerr := c.Call(metadata.NewOutgoingContext(gctx, md), treq, data, options.ContentType(treq.ContentType())) var rspfile string