Compare commits

..

56 Commits

Author SHA1 Message Date
6794ea9871 Merge pull request 'client/noop: fix MessageMetadata option' (#274) from client-noop-metadata into v3
Reviewed-on: #274
2023-10-26 03:07:12 +03:00
089e7b6812 client/noop: fix MessageMetadata option
All checks were successful
lint / lint (pull_request) Successful in 1m18s
pr / test (pull_request) Successful in 1m1s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-10-26 03:06:39 +03:00
1c703f0f0c Merge pull request 'errors: add IsRetrayable func' (#273) from errors into v3
Reviewed-on: #273
2023-10-25 10:24:58 +03:00
d167c8c67c cleanup
All checks were successful
lint / lint (pull_request) Successful in 1m7s
pr / test (pull_request) Successful in 1m2s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-10-25 02:36:52 +03:00
df4f96a2d8 errors: add IsRetrayable func
All checks were successful
lint / lint (pull_request) Successful in 1m18s
pr / test (pull_request) Successful in 1m3s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-10-23 02:35:10 +03:00
fac3b20bd4 Merge pull request 'util/reflect: add Equal func with ability to skip some fields' (#244) from util-reflect into v3
Reviewed-on: #244
2023-09-12 11:45:26 +03:00
7c6bd98498 util/reflect: add Equal func with ability to skip some fields
All checks were successful
pr / test (pull_request) Successful in 1m4s
lint / lint (pull_request) Successful in 1m10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-09-12 10:31:45 +03:00
23e1174f25 Merge pull request 'tracer: improve' (#241) from tracing into v3
Reviewed-on: #241
2023-09-08 13:40:51 +03:00
52bed214cf tracer: improve
Some checks failed
lint / lint (pull_request) Failing after 1m31s
pr / test (pull_request) Failing after 2m44s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-09-08 13:40:01 +03:00
64c4f5f47e Merge pull request 'tracer: tweaks for span tags and naming' (#239) from tracing into v3
Reviewed-on: #239
2023-09-01 14:58:15 +03:00
036c612137 tracer: tweaks for span tags and naming
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-09-01 14:58:15 +03:00
ca80e3ecf2 Merge pull request 'tracer: improve tracing info' (#238) from tracing into v3
Reviewed-on: #238
2023-09-01 08:41:46 +03:00
18e7bb41ca tracer: improve tracing info
Some checks failed
lint / lint (pull_request) Failing after 1m29s
pr / test (pull_request) Failing after 2m37s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-09-01 08:41:23 +03:00
8e72fb1c35 Merge pull request 'add util/test' (#235) from util-test into v3
Reviewed-on: #235
2023-08-07 18:35:31 +03:00
17f21a03f4 add util/test
Some checks failed
lint / lint (pull_request) Failing after 1m28s
pr / test (pull_request) Failing after 2m35s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-07 18:33:23 +03:00
a076d43a26 add util/test
Some checks failed
lint / lint (pull_request) Failing after 1m31s
pr / test (pull_request) Failing after 2m33s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-07 18:32:29 +03:00
de6efaee0b Merge pull request 'config/default: add micro:generate uuid/id' (#232) from config-default-gen into v3
Reviewed-on: #232
2023-07-13 20:27:13 +03:00
9e0e657003 config/default: add micro:generate uuid/id
Some checks failed
lint / lint (pull_request) Failing after 1m28s
pr / test (pull_request) Failing after 2m35s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-07-13 20:26:47 +03:00
be5f9ab77f Merge pull request 'tracer: add Flush method' (#225) from traceimp into v3
Reviewed-on: #225
2023-07-04 00:26:33 +03:00
144dca0cae tracer: add Flush method
Some checks failed
pr / test (pull_request) Failing after 2m42s
lint / lint (pull_request) Failing after 1m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-07-04 00:25:41 +03:00
75173560e3 Merge pull request 'util/time: ParseDuration fix' (#222) from timefix into v3
Reviewed-on: #222
2023-05-29 14:04:41 +03:00
9b3bccd1f1 util/time: ParseDuration fix
All checks were successful
lint / lint (pull_request) Successful in 1m0s
pr / test (pull_request) Successful in 58s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-29 14:02:06 +03:00
ce125b77c1 Merge pull request 'util/time: fix duration parsing' (#219) from timefeature into v3
Reviewed-on: #219
2023-05-27 23:55:51 +03:00
2ee8d4ed46 util/time: fix duration parsing
Some checks failed
lint / lint (pull_request) Successful in 59s
pr / test (pull_request) Failing after 1m0s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-27 23:55:08 +03:00
f58781d076 Merge pull request 'server/noop: fix graceful unsubscribe' (#218) from unsubfix into v3
Reviewed-on: #218
2023-05-25 23:19:26 +03:00
e1af4aa3a4 server/noop: fix graceful unsubscribe
All checks were successful
pr / test (pull_request) Successful in 1m2s
lint / lint (pull_request) Successful in 59s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-25 23:18:47 +03:00
1d5e795443 Merge pull request 'move RawMessage to codec package' (#208) from rawmsg into v3
Reviewed-on: #208
2023-04-02 14:15:02 +03:00
a3a434d923 move RawMessage to codec package
All checks were successful
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-02 14:10:57 +03:00
bcc06054f1 Merge pull request 'updates' (#207) from updates into v3
Reviewed-on: #207
2023-03-24 00:29:33 +03:00
270d26f1ae test
All checks were successful
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-24 00:25:47 +03:00
646212cc08 retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 17:09:40 +03:00
00c2c749db retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:46:41 +03:00
2dbada0e94 retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:43:33 +03:00
7b8f4410fb retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:36:38 +03:00
45ebef5544 retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:31:42 +03:00
cf4cac0733 retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:29:58 +03:00
50d60b5825 retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:24:47 +03:00
46ef491764 retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:17:44 +03:00
a51b8b8102 retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:11:52 +03:00
15aac48f1e retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:08:51 +03:00
078069b2d7 retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 16:01:27 +03:00
258812304a retest
Some checks failed
autoapprove
automerge
test
lint
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 15:12:32 +03:00
da5d50db5b retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 14:48:27 +03:00
384e4d113d retest
All checks were successful
test
lint
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 14:43:23 +03:00
dfd1da7f0d retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 14:26:03 +03:00
8e5015e580 retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 13:41:59 +03:00
bd0c309b71 retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 13:38:39 +03:00
b4f0c3e29a retest
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 13:29:21 +03:00
8fddaa0455 retest
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 13:26:42 +03:00
2710c269a8 actions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 13:21:59 +03:00
70ea93e466 actions
Some checks failed
automerge
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 13:12:10 +03:00
a87d0ab1c1 update deps
Some checks failed
autoapprove
automerge
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-19 00:43:15 +03:00
2e5e102719 Merge pull request 'config: another fix for Default funcs' (#193) from defaults-fix into v3
Reviewed-on: #193
2023-03-16 07:14:41 +03:00
36e492314d config: another fix for Default funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-16 07:14:21 +03:00
0c78873277 Merge pull request 'config: fix Default funcs' (#192) from config-fix into v3
Reviewed-on: #192
2023-03-15 22:53:21 +03:00
7f57dc09d3 config: fix Default funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-03-15 22:51:40 +03:00
54 changed files with 2018 additions and 221 deletions

View File

@@ -0,0 +1,18 @@
---
name: Bug report
about: For reporting bugs in micro
title: "[BUG]"
labels: ''
assignees: ''
---
**Describe the bug**
1. What are you trying to do?
2. What did you expect to happen?
3. What happens instead?
**How to reproduce the bug:**
If possible, please include a minimal code snippet here.

View File

@@ -0,0 +1,17 @@
---
name: Feature request / Enhancement
about: If you have a need not served by micro
title: "[FEATURE]"
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@@ -0,0 +1,8 @@
---
name: Question
about: Ask a question about micro
title: ''
labels: ''
assignees: ''
---

28
.gitea/autoapprove.yml Normal file
View File

@@ -0,0 +1,28 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
"chmod +x ./tea",
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
"./tea pr --repo ${{ github.event.repository.name }}"
]
if: github.actor == 'vtolstov'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

24
.gitea/workflows/lint.yml Normal file
View File

@@ -0,0 +1,24 @@
name: lint
on:
pull_request:
branches:
- master
- v3
jobs:
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: setup-go
uses: https://gitea.com/actions/setup-go@v3
with:
go-version: 1.18
- name: checkout
uses: https://gitea.com/actions/checkout@v3
- name: deps
run: go get -v -d ./...
- name: lint
uses: https://github.com/golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
version: v1.52

23
.gitea/workflows/pr.yml Normal file
View File

@@ -0,0 +1,23 @@
name: pr
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: checkout
uses: https://gitea.com/actions/checkout@v3
- name: setup-go
uses: https://gitea.com/actions/setup-go@v3
with:
go-version: 1.18
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

View File

@@ -0,0 +1,9 @@
## Pull Request template
Please, go through these steps before clicking submit on this PR.
1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

View File

@@ -37,11 +37,4 @@ jobs:
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
version: v1.30

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
)
@@ -85,33 +86,12 @@ type Event interface {
SetError(err error)
}
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}
// Message is used to transfer data
type Message struct {
// Header contains message metadata
Header metadata.Metadata
// Body contains message body
Body RawMessage
Body codec.RawMessage
}
// NewMessage create broker message with topic filled

View File

@@ -490,6 +490,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
if !ok {
md = metadata.New(0)
}
iter := p.Metadata().Iterator()
var k, v string
for iter.Next(&k, &v) {
md.Set(k, v)
}
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()

View File

@@ -84,3 +84,24 @@ func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte,
return append(buf, mbuf...), nil
}
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}

View File

@@ -127,6 +127,9 @@ var (
// DefaultBeforeLoad default func that runs before config Load
DefaultBeforeLoad = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().BeforeLoad {
if fn == nil {
return nil
}
if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s BeforeLoad err: %v", c.String(), err)
if !c.Options().AllowFail {
@@ -139,6 +142,9 @@ var (
// DefaultAfterLoad default func that runs after config Load
DefaultAfterLoad = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().AfterLoad {
if fn == nil {
return nil
}
if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s AfterLoad err: %v", c.String(), err)
if !c.Options().AllowFail {
@@ -151,6 +157,9 @@ var (
// DefaultBeforeSave default func that runs befora config Save
DefaultBeforeSave = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().BeforeSave {
if fn == nil {
return nil
}
if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s BeforeSave err: %v", c.String(), err)
if !c.Options().AllowFail {
@@ -163,6 +172,9 @@ var (
// DefaultAfterSave default func that runs after config Save
DefaultAfterSave = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().AfterSave {
if fn == nil {
return nil
}
if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s AfterSave err: %v", c.String(), err)
if !c.Options().AllowFail {
@@ -175,6 +187,9 @@ var (
// DefaultBeforeInit default func that runs befora config Init
DefaultBeforeInit = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().BeforeInit {
if fn == nil {
return nil
}
if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s BeforeInit err: %v", c.String(), err)
if !c.Options().AllowFail {
@@ -187,6 +202,9 @@ var (
// DefaultAfterInit default func that runs after config Init
DefaultAfterInit = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().AfterSave {
if fn == nil {
return nil
}
if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s AfterInit err: %v", c.String(), err)
if !c.Options().AllowFail {

View File

@@ -7,7 +7,9 @@ import (
"strings"
"time"
"github.com/google/uuid"
"github.com/imdario/mergo"
mid "go.unistack.org/micro/v3/util/id"
rutil "go.unistack.org/micro/v3/util/reflect"
mtime "go.unistack.org/micro/v3/util/time"
)
@@ -124,6 +126,20 @@ func fillValue(value reflect.Value, val string) error {
}
value.Set(reflect.ValueOf(v))
case reflect.String:
switch val {
case "micro:generate uuid":
uid, err := uuid.NewRandom()
if err != nil {
return err
}
val = uid.String()
case "micro:generate id":
uid, err := mid.New()
if err != nil {
return err
}
val = uid
}
value.Set(reflect.ValueOf(val))
case reflect.Float32:
v, err := strconv.ParseFloat(val, 32)

View File

@@ -7,6 +7,7 @@ import (
"time"
"go.unistack.org/micro/v3/config"
mid "go.unistack.org/micro/v3/util/id"
mtime "go.unistack.org/micro/v3/util/time"
)
@@ -14,9 +15,12 @@ type cfg struct {
StringValue string `default:"string_value"`
IgnoreValue string `json:"-"`
StructValue *cfgStructValue
IntValue int `default:"99"`
DurationValue time.Duration `default:"10s"`
MDurationValue mtime.Duration `default:"10s"`
IntValue int `default:"99"`
DurationValue time.Duration `default:"10s"`
MDurationValue mtime.Duration `default:"10s"`
MapValue map[string]bool `default:"key1=true,key2=false"`
UUIDValue string `default:"micro:generate uuid"`
IDValue string `default:"micro:generate id"`
}
type cfgStructValue struct {
@@ -67,6 +71,21 @@ func TestDefault(t *testing.T) {
if conf.StringValue != "after_load" {
t.Fatal("AfterLoad option not working")
}
if len(conf.MapValue) != 2 {
t.Fatalf("map value invalid: %#+v\n", conf.MapValue)
}
if conf.UUIDValue == "" {
t.Fatalf("uuid value empty")
} else if len(conf.UUIDValue) != 36 {
t.Fatalf("uuid value invalid: %s", conf.UUIDValue)
}
if conf.IDValue == "" {
t.Fatalf("id value empty")
} else if len(conf.IDValue) != mid.DefaultSize {
t.Fatalf("id value invalid: %s", conf.IDValue)
}
_ = conf
// t.Logf("%#+v\n", conf)
}

View File

@@ -4,11 +4,17 @@ package errors // import "go.unistack.org/micro/v3/errors"
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
@@ -340,3 +346,135 @@ func addslashes(str string) string {
}
return buf.String()
}
type retryableError struct {
err error
}
// Retryable returns error that can be retried later
func Retryable(err error) error {
return &retryableError{err: err}
}
type IsRetryableFunc func(error) bool
var (
RetrayableOracleErrors = []IsRetryableFunc{
func(err error) bool {
errmsg := err.Error()
switch {
case strings.Contains(errmsg, `ORA-`):
return true
case strings.Contains(errmsg, `can not assign`):
return true
case strings.Contains(errmsg, `can't assign`):
return true
}
return false
},
}
RetrayablePostgresErrors = []IsRetryableFunc{
func(err error) bool {
errmsg := err.Error()
switch {
case strings.Contains(errmsg, `number of field descriptions must equal number of`):
return true
case strings.Contains(errmsg, `not a pointer`):
return true
case strings.Contains(errmsg, `values, but dst struct has only`):
return true
case strings.Contains(errmsg, `struct doesn't have corresponding row field`):
return true
case strings.Contains(errmsg, `cannot find field`):
return true
case strings.Contains(errmsg, `cannot scan`) || strings.Contains(errmsg, `cannot convert`):
return true
case strings.Contains(errmsg, `failed to connect to`):
return true
}
return false
},
}
RetryableMicroErrors = []IsRetryableFunc{
func(err error) bool {
switch verr := err.(type) {
case *Error:
switch verr.Code {
case 401, 403, 408, 500, 501, 502, 503, 504:
return true
default:
return false
}
case *retryableError:
return true
}
return false
},
}
RetryableGoErrors = []IsRetryableFunc{
func(err error) bool {
switch verr := err.(type) {
case interface{ SafeToRetry() bool }:
return verr.SafeToRetry()
case interface{ Timeout() bool }:
return verr.Timeout()
}
switch {
case errors.Is(err, io.EOF), errors.Is(err, io.ErrUnexpectedEOF):
return true
case errors.Is(err, context.DeadlineExceeded):
return true
case errors.Is(err, io.ErrClosedPipe), errors.Is(err, io.ErrShortBuffer), errors.Is(err, io.ErrShortWrite):
return true
}
return false
},
}
RetryableGrpcErrors = []IsRetryableFunc{
func(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
switch st.Code() {
case codes.Unavailable, codes.ResourceExhausted:
return true
case codes.DeadlineExceeded:
return true
case codes.Internal:
switch {
case strings.Contains(st.Message(), `transport: received the unexpected content-type "text/html; charset=UTF-8"`):
return true
case strings.Contains(st.Message(), io.ErrUnexpectedEOF.Error()):
return true
case strings.Contains(st.Message(), `stream terminated by RST_STREAM with error code: INTERNAL_ERROR`):
return true
}
}
return false
},
}
)
// Unwrap provides error wrapping
func (e *retryableError) Unwrap() error {
return e.err
}
// Error returns the error string
func (e *retryableError) Error() string {
if e.err == nil {
return ""
}
return e.err.Error()
}
// IsRetryable checks error for ability to retry later
func IsRetryable(err error, fns ...IsRetryableFunc) bool {
for _, fn := range fns {
if ok := fn(err); ok {
return true
}
}
return false
}

View File

@@ -8,6 +8,13 @@ import (
"testing"
)
func TestIsRetrayable(t *testing.T) {
err := fmt.Errorf("ORA-")
if !IsRetryable(err, RetrayableOracleErrors...) {
t.Fatalf("IsRetrayable not works")
}
}
func TestMarshalJSON(t *testing.T) {
e := InternalServerError("id", "err: %v", fmt.Errorf("err: %v", `xxx: "UNIX_TIMESTAMP": invalid identifier`))
_, err := json.Marshal(e)

18
go.mod
View File

@@ -1,10 +1,20 @@
module go.unistack.org/micro/v3
go 1.16
go 1.19
require (
github.com/imdario/mergo v0.3.13
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.15
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
gopkg.in/yaml.v3 v3.0.1 // indirect
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect
)

33
go.sum
View File

@@ -1,10 +1,33 @@
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -39,8 +39,6 @@ func FromOutgoingContext(ctx context.Context) (Metadata, bool) {
// FromContext returns metadata from the given context
// returned metadata shoud not be modified or race condition happens
//
// Deprecated: use FromIncomingContext or FromOutgoingContext
func FromContext(ctx context.Context) (Metadata, bool) {
if ctx == nil {
return nil, false
@@ -53,8 +51,6 @@ func FromContext(ctx context.Context) (Metadata, bool) {
}
// NewContext creates a new context with the given metadata
//
// Deprecated: use NewIncomingContext or NewOutgoingContext
func NewContext(ctx context.Context, md Metadata) context.Context {
if ctx == nil {
ctx = context.Background()

View File

@@ -19,6 +19,8 @@ var (
HeaderTimeout = "Micro-Timeout"
// HeaderAuthorization specifies Authorization header
HeaderAuthorization = "Authorization"
// HeaderXRequestID specifies request id
HeaderXRequestID = "X-Request-Id"
)
// Metadata is our way of representing request headers internally.

View File

@@ -211,6 +211,7 @@ func Stores(s ...store.Store) Option {
}
// Logger set the logger to use
//
//nolint:gocyclo
func Logger(l logger.Logger, opts ...LoggerOption) Option {
return func(o *Options) error {
@@ -329,6 +330,7 @@ func Meters(m ...meter.Meter) Option {
// Register sets the register for the service
// and the underlying components
//
//nolint:gocyclo
func Register(r register.Register, opts ...RegisterOption) Option {
return func(o *Options) error {
@@ -403,6 +405,7 @@ func RegisterBroker(n string) RegisterOption {
}
// Tracer sets the tracer
//
//nolint:gocyclo
func Tracer(t tracer.Tracer, opts ...TracerOption) Option {
return func(o *Options) error {

31
options/hooks.go Normal file
View File

@@ -0,0 +1,31 @@
package options // import "go.unistack.org/micro/v3/options"
// Hook func interface
type Hook interface{}
// Hooks func slice
type Hooks []Hook
// Append is used to add hooks
func (hs *Hooks) Append(h ...Hook) {
*hs = append(*hs, h...)
}
// Replace is used to set hooks
func (hs *Hooks) Replace(h ...Hook) {
*hs = h
}
// EachNext is used to itearate over hooks forward
func (hs *Hooks) EachNext(fn func(Hook)) {
for idx := 0; idx < len(*hs); idx++ {
fn((*hs)[idx])
}
}
// EachPrev is used to iterate over hooks backward
func (hs *Hooks) EachPrev(fn func(Hook)) {
for idx := len(*hs) - 1; idx >= 0; idx-- {
fn((*hs)[idx])
}
}

65
options/hooks_test.go Normal file
View File

@@ -0,0 +1,65 @@
package options
import "testing"
func TestHooks_Append(t *testing.T) {
fn1 := func() {}
fn2 := func() {}
hs := &Hooks{}
hs.Append(fn1, fn2)
if len(*hs) != 2 {
t.Fatalf("unexpected Append error")
}
}
func TestHooks_Replace(t *testing.T) {
fn1 := func() {}
fn2 := func() {}
hs := &Hooks{}
hs.Append(fn1, fn2, fn1)
if len(*hs) != 3 {
t.Fatalf("unexpected Append error")
}
hs.Replace(fn1, fn2)
if len(*hs) != 2 {
t.Fatalf("unexpected Replace error")
}
}
func TestHooks_EachNext(t *testing.T) {
n := 5
fn1 := func() {
n *= 2
}
fn2 := func() {
n -= 10
}
hs := &Hooks{}
hs.Append(fn1, fn2)
hs.EachNext(func(h Hook) {
h.(func())()
})
if n != 0 {
t.Fatalf("unexpected EachNext")
}
}
func TestHooks_EachPrev(t *testing.T) {
n := 5
fn1 := func() {
n *= 2
}
fn2 := func() {
n -= 10
}
hs := &Hooks{}
hs.Append(fn2, fn1)
hs.EachPrev(func(h Hook) {
h.(func())()
})
if n != 0 {
t.Fatalf("unexpected EachPrev")
}
}

View File

@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
n.Lock()
defer n.Unlock()
cx := config.Context
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
n.registered = true
if cacheService {
n.rsvc = service
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
}
}
if err := n.subscribe(); err != nil {
return err
}
go func() {
t := new(time.Ticker)
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
return nil
}
func (n *noopServer) subscribe() error {
config := n.Options()
cx := config.Context
var err error
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (n *noopServer) Stop() error {
n.RLock()
if !n.started {

View File

@@ -13,6 +13,7 @@ import (
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/network/transport"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v3/util/id"
@@ -83,6 +84,8 @@ type Options struct {
MaxConn int
// DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int
// Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper
Hooks options.Hooks
}
// NewOptions returns new options struct with default or passed values

View File

@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
//nolint:gocyclo
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
}
//nolint:gocyclo
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {

View File

@@ -4,23 +4,37 @@ import (
"context"
)
var _ Tracer = (*noopTracer)(nil)
type noopTracer struct {
opts Options
opts Options
spans []Span
}
func (t *noopTracer) Spans() []Span {
return t.spans
}
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
options := NewSpanOptions(opts...)
span := &noopSpan{
name: name,
ctx: ctx,
tracer: t,
opts: NewSpanOptions(opts...),
labels: options.Labels,
kind: options.Kind,
}
if span.ctx == nil {
span.ctx = context.Background()
}
t.spans = append(t.spans, span)
return NewSpanContext(ctx, span), span
}
func (t *noopTracer) Flush(ctx context.Context) error {
return nil
}
func (t *noopTracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
@@ -32,13 +46,21 @@ func (t *noopTracer) Name() string {
return t.opts.Name
}
type noopEvent struct {
name string
labels []interface{}
}
type noopSpan struct {
ctx context.Context
tracer Tracer
name string
opts SpanOptions
status SpanStatus
statusMsg string
events []*noopEvent
labels []interface{}
logs []interface{}
kind SpanKind
status SpanStatus
}
func (s *noopSpan) Finish(opts ...SpanOption) {
@@ -53,22 +75,24 @@ func (s *noopSpan) Tracer() Tracer {
}
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
options := NewEventOptions(opts...)
s.events = append(s.events, &noopEvent{name: name, labels: options.Labels})
}
func (s *noopSpan) SetName(name string) {
s.name = name
}
func (s *noopSpan) SetLabels(labels ...interface{}) {
s.opts.Labels = labels
func (s *noopSpan) AddLogs(kv ...interface{}) {
s.logs = append(s.logs, kv...)
}
func (s *noopSpan) AddLabels(labels ...interface{}) {
s.opts.Labels = append(s.opts.Labels, labels...)
func (s *noopSpan) AddLabels(kv ...interface{}) {
s.labels = append(s.labels, kv...)
}
func (s *noopSpan) Kind() SpanKind {
return s.opts.Kind
return s.kind
}
func (s *noopSpan) Status() (SpanStatus, string) {

View File

@@ -91,14 +91,22 @@ type SpanOptions struct {
type SpanOption func(o *SpanOptions)
// EventOptions contains event options
type EventOptions struct{}
type EventOptions struct {
Labels []interface{}
}
// EventOption func signature
type EventOption func(o *EventOptions)
func WithSpanLabels(labels ...interface{}) SpanOption {
func WithEventLabels(kv ...interface{}) EventOption {
return func(o *EventOptions) {
o.Labels = kv
}
}
func WithSpanLabels(kv ...interface{}) SpanOption {
return func(o *SpanOptions) {
o.Labels = labels
o.Labels = kv
}
}
@@ -128,6 +136,15 @@ func Logger(l logger.Logger) Option {
}
}
// NewEventOptions returns default EventOptions
func NewEventOptions(opts ...EventOption) EventOptions {
options := EventOptions{}
for _, o := range opts {
o(&options)
}
return options
}
// NewSpanOptions returns default SpanOptions
func NewSpanOptions(opts ...SpanOption) SpanOptions {
options := SpanOptions{

View File

@@ -3,6 +3,8 @@ package tracer // import "go.unistack.org/micro/v3/tracer"
import (
"context"
"fmt"
"sort"
)
// DefaultTracer is the global default tracer
@@ -16,6 +18,8 @@ type Tracer interface {
Init(...Option) error
// Start a trace
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
// Flush flushes spans
Flush(ctx context.Context) error
}
type Span interface {
@@ -23,8 +27,6 @@ type Span interface {
Tracer() Tracer
// Finish complete and send span
Finish(opts ...SpanOption)
// AddEvent add event to span
AddEvent(name string, opts ...EventOption)
// Context return context with span
Context() context.Context
// SetName set the span name
@@ -33,10 +35,46 @@ type Span interface {
SetStatus(status SpanStatus, msg string)
// Status returns span status and msg
Status() (SpanStatus, string)
// SetLabels set the span labels
SetLabels(labels ...interface{})
// AddLabels append the span labels
AddLabels(labels ...interface{})
// AddLabels append labels to span
AddLabels(kv ...interface{})
// AddEvent append event to span
AddEvent(name string, opts ...EventOption)
// AddEvent append event to span
AddLogs(kv ...interface{})
// Kind returns span kind
Kind() SpanKind
}
// sort labels alphabeticaly by label name
type byKey []interface{}
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
func (k byKey) Swap(i, j int) {
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func UniqLabels(labels []interface{}) []interface{} {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
return labels
}

13
tracer/tracer_test.go Normal file
View File

@@ -0,0 +1,13 @@
package tracer
import (
"testing"
)
func TestUniqLabels(t *testing.T) {
labels := []interface{}{"key1", "val1", "key1", "val2"}
labels = UniqLabels(labels)
if labels[1] != "val2" {
t.Fatalf("UniqLabels not works")
}
}

View File

@@ -4,6 +4,7 @@ package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
import (
"context"
"fmt"
"strings"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
@@ -11,107 +12,87 @@ import (
"go.unistack.org/micro/v3/tracer"
)
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
for _, k := range DefaultHeadersExctract {
if v, ok := md.Get(k); ok {
labels = append(labels, strings.ToLower(k), v)
}
}
return labels
}
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]interface{}, 0, len(md)+1)
for k, v := range md {
labels = append(labels, k, v)
}
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
labels = append(labels, "error", err.Error())
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
labels = append(labels, "kind", sp.Kind())
sp.SetLabels(labels...)
sp.AddLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]interface{}, 0, len(md))
for k, v := range md {
labels = append(labels, k, v)
}
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
labels = append(labels, "error", err.Error())
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
labels = append(labels, "kind", sp.Kind())
sp.SetLabels(labels...)
sp.AddLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Publish %s", msg.Topic()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]interface{}, 0, len(md))
for k, v := range md {
labels = append(labels, k, v)
}
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
if err != nil {
labels = append(labels, "error", err.Error())
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
labels = append(labels, "kind", sp.Kind())
sp.SetLabels(labels...)
sp.AddLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]interface{}, 0, len(md))
for k, v := range md {
labels = append(labels, k, v)
}
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
labels = append(labels, "error", err.Error())
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
labels = append(labels, "kind", sp.Kind())
sp.SetLabels(labels...)
sp.AddLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Subscriber %s", msg.Topic()))
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]interface{}, 0, len(md))
for k, v := range md {
labels = append(labels, k, v)
}
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
if err != nil {
labels = append(labels, "error", err.Error())
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
labels = append(labels, "kind", sp.Kind())
sp.SetLabels(labels...)
sp.AddLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method()))
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]interface{}, 0, len(md))
for k, v := range md {
labels = append(labels, k, v)
}
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
labels = append(labels, "error", err.Error())
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
labels = append(labels, "kind", sp.Kind())
sp.SetLabels(labels...)
sp.AddLabels(labels...)
}
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
@@ -241,16 +222,22 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
}
}
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient))
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.Client.Call(ctx, req, rsp, opts...)
err := ot.Client.Call(nctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(ctx, req, rsp, opts, sp, err)
o(nctx, req, rsp, opts, sp, err)
}
return err
@@ -264,32 +251,36 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie
}
}
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient))
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "stream",
),
)
defer sp.Finish()
stream, err := ot.Client.Stream(ctx, req, opts...)
stream, err := ot.Client.Stream(nctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(ctx, req, opts, stream, sp, err)
o(nctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindProducer))
}
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
defer sp.Finish()
err := ot.Client.Publish(ctx, msg, opts...)
sp.AddLabels("messaging.destination.name", msg.Topic())
sp.AddLabels("messaging.operation", "publish")
err := ot.Client.Publish(nctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(ctx, msg, opts, sp, err)
o(nctx, msg, opts, sp, err)
}
return err
@@ -303,32 +294,41 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i
}
}
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindServer))
callType := "unary"
if req.Stream() {
callType = "stream"
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", callType,
),
)
defer sp.Finish()
err := ot.serverHandler(ctx, req, rsp)
err := ot.serverHandler(nctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(ctx, req, rsp, sp, err)
o(nctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindConsumer))
}
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
defer sp.Finish()
err := ot.serverSubscriber(ctx, msg)
sp.AddLabels("messaging.operation", "process")
sp.AddLabels("messaging.source.name", msg.Topic())
err := ot.serverSubscriber(nctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(ctx, msg, sp, err)
o(nctx, msg, sp, err)
}
return err
@@ -366,16 +366,23 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.
}
}
sp, ok := tracer.SpanFromContext(ctx)
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient))
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(ctx, addr, req, rsp, opts, sp, err)
o(nctx, addr, req, rsp, opts, sp, err)
}
return err

271
util/grpc/tracer.go Normal file
View File

@@ -0,0 +1,271 @@
package grpc_util
import (
"context"
"net"
"strconv"
"strings"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/tracer"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
type gRPCContextKey struct{}
type gRPCContext struct {
messagesReceived int64
messagesSent int64
}
type Options struct {
Tracer tracer.Tracer
}
type Option func(*Options)
func Tracer(tr tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = tr
}
}
// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
options := Options{Tracer: tracer.DefaultTracer}
for _, o := range opts {
o(&options)
}
h := &serverHandler{
opts: options,
}
return h
}
type serverHandler struct {
opts Options
}
// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := parseFullMethod(info.FullMethodName)
attrs = append(attrs, "rpc.system", "grpc")
ctx, _ = h.opts.Tracer.Start(
ctx,
name,
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(attrs...),
)
gctx := gRPCContext{}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}
// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
}
// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
if span, ok := tracer.SpanFromContext(ctx); ok {
attrs := peerAttr(peerFromCtx(ctx))
span.AddLabels(attrs...)
}
return ctx
}
// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}
type clientHandler struct {
opts Options
}
// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
options := Options{Tracer: tracer.DefaultTracer}
for _, o := range opts {
o(&options)
}
h := &clientHandler{
opts: options,
}
return h
}
// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := parseFullMethod(info.FullMethodName)
attrs = append(attrs, "rpc.system", "grpc", "rpc.flavor", "grpc", "rpc.call", info.FullMethodName)
ctx, _ = h.opts.Tracer.Start(
ctx,
name,
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(attrs...),
)
gctx := gRPCContext{}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}
// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
}
// TagConn can attach some information to the given context.
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
// TODO
if span, ok := tracer.SpanFromContext(ctx); ok {
attrs := peerAttr(cti.RemoteAddr.String())
span.AddLabels(attrs...)
}
return ctx
}
// HandleConn processes the Conn stats.
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}
func handleRPC(ctx context.Context, rs stats.RPCStats) {
span, ok := tracer.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageID int64
if rs.IsClient() {
span.AddLabels("span.kind", "client")
} else {
span.AddLabels("span.kind", "server")
}
switch rs := rs.(type) {
case *stats.Begin:
if rs.IsClientStream || rs.IsServerStream {
span.AddLabels("rpc.call_type", "stream")
} else {
span.AddLabels("rpc.call_type", "unary")
}
span.AddEvent("message",
tracer.WithEventLabels(
"message.begin_time", rs.BeginTime.Format(time.RFC3339),
),
)
case *stats.InPayload:
if gctx != nil {
messageID = atomic.AddInt64(&gctx.messagesReceived, 1)
}
if ok {
span.AddEvent("message",
tracer.WithEventLabels(
"message.recv_time", rs.RecvTime.Format(time.RFC3339),
"message.type", "RECEIVED",
"message.id", messageID,
"message.compressed_size", rs.CompressedLength,
"message.uncompressed_size", rs.Length,
),
)
}
case *stats.OutPayload:
if gctx != nil {
messageID = atomic.AddInt64(&gctx.messagesSent, 1)
}
if ok {
span.AddEvent("message",
tracer.WithEventLabels(
"message.sent_time", rs.SentTime.Format(time.RFC3339),
"message.type", "SENT",
"message.id", messageID,
"message.compressed_size", rs.CompressedLength,
"message.uncompressed_size", rs.Length,
),
)
}
case *stats.End:
if ok {
span.AddEvent("message",
tracer.WithEventLabels(
"message.begin_time", rs.BeginTime.Format(time.RFC3339),
"message.end_time", rs.EndTime.Format(time.RFC3339),
),
)
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(tracer.SpanStatusError, s.Message())
span.AddLabels("rpc.grpc.status_code", s.Code())
} else {
span.AddLabels("rpc.grpc.status_code", grpc_codes.OK)
}
span.Finish()
}
default:
return
}
}
func parseFullMethod(fullMethod string) (string, []interface{}) {
if !strings.HasPrefix(fullMethod, "/") {
// Invalid format, does not follow `/package.service/method`.
return fullMethod, nil
}
name := fullMethod[1:]
pos := strings.LastIndex(name, "/")
if pos < 0 {
// Invalid format, does not follow `/package.service/method`.
return name, nil
}
service, method := name[:pos], name[pos+1:]
var attrs []interface{}
if service != "" {
attrs = append(attrs, "rpc.service", service)
}
if method != "" {
attrs = append(attrs, "rpc.method", method)
}
return name, attrs
}
func peerAttr(addr string) []interface{} {
host, p, err := net.SplitHostPort(addr)
if err != nil {
return nil
}
if host == "" {
host = "127.0.0.1"
}
port, err := strconv.Atoi(p)
if err != nil {
return nil
}
var attr []interface{}
if ip := net.ParseIP(host); ip != nil {
attr = []interface{}{
"net.sock.peer.addr", host,
"net.sock.peer.port", port,
}
} else {
attr = []interface{}{
"net.peer.name", host,
"net.peer.port", port,
}
}
return attr
}
func peerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return ""
}
return p.Addr.String()
}

253
util/http/clienttracer.go Normal file
View File

@@ -0,0 +1,253 @@
//
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"context"
"crypto/tls"
"net/http/httptrace"
"net/textproto"
"strings"
"sync"
"go.unistack.org/micro/v3/tracer"
)
const (
httpStatus = "http.status"
httpHeaderMIME = "http.mime"
httpRemoteAddr = "http.remote"
httpLocalAddr = "http.local"
httpHost = "http.host"
)
var hookMap = map[string]string{
"http.dns": "http.getconn",
"http.connect": "http.getconn",
"http.tls": "http.getconn",
}
func parentHook(hook string) string {
if strings.HasPrefix(hook, "http.connect") {
return hookMap["http.connect"]
}
return hookMap[hook]
}
type clientTracer struct {
context.Context
tr tracer.Tracer
activeHooks map[string]context.Context
root tracer.Span
mtx sync.Mutex
}
func NewClientTrace(ctx context.Context, tr tracer.Tracer) *httptrace.ClientTrace {
ct := &clientTracer{
Context: ctx,
activeHooks: make(map[string]context.Context),
tr: tr,
}
return &httptrace.ClientTrace{
GetConn: ct.getConn,
GotConn: ct.gotConn,
PutIdleConn: ct.putIdleConn,
GotFirstResponseByte: ct.gotFirstResponseByte,
Got100Continue: ct.got100Continue,
Got1xxResponse: ct.got1xxResponse,
DNSStart: ct.dnsStart,
DNSDone: ct.dnsDone,
ConnectStart: ct.connectStart,
ConnectDone: ct.connectDone,
TLSHandshakeStart: ct.tlsHandshakeStart,
TLSHandshakeDone: ct.tlsHandshakeDone,
WroteHeaderField: ct.wroteHeaderField,
WroteHeaders: ct.wroteHeaders,
Wait100Continue: ct.wait100Continue,
WroteRequest: ct.wroteRequest,
}
}
func (ct *clientTracer) start(hook, spanName string, attrs ...interface{}) {
ct.mtx.Lock()
defer ct.mtx.Unlock()
if hookCtx, found := ct.activeHooks[hook]; !found {
var sp tracer.Span
ct.activeHooks[hook], sp = ct.tr.Start(ct.getParentContext(hook), spanName, tracer.WithSpanLabels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient))
if ct.root == nil {
ct.root = sp
}
} else {
// end was called before start finished, add the start attributes and end the span here
if span, ok := tracer.SpanFromContext(hookCtx); ok {
span.AddLabels(attrs...)
span.Finish()
}
delete(ct.activeHooks, hook)
}
}
func (ct *clientTracer) end(hook string, err error, attrs ...interface{}) {
ct.mtx.Lock()
defer ct.mtx.Unlock()
if ctx, ok := ct.activeHooks[hook]; ok { // nolint:nestif
if span, ok := tracer.SpanFromContext(ctx); ok {
if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
}
span.AddLabels(attrs...)
span.Finish()
}
delete(ct.activeHooks, hook)
} else {
// start is not finished before end is called.
// Start a span here with the ending attributes that will be finished when start finishes.
// Yes, it's backwards. v0v
ctx, span := ct.tr.Start(ct.getParentContext(hook), hook, tracer.WithSpanLabels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient))
if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
}
ct.activeHooks[hook] = ctx
}
}
func (ct *clientTracer) getParentContext(hook string) context.Context {
ctx, ok := ct.activeHooks[parentHook(hook)]
if !ok {
return ct.Context
}
return ctx
}
func (ct *clientTracer) span(hook string) (tracer.Span, bool) {
ct.mtx.Lock()
defer ct.mtx.Unlock()
if ctx, ok := ct.activeHooks[hook]; ok {
return tracer.SpanFromContext(ctx)
}
return nil, false
}
func (ct *clientTracer) getConn(host string) {
ct.start("http.getconn", "http.getconn", httpHost, host)
}
func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) {
ct.end("http.getconn",
nil,
httpRemoteAddr, info.Conn.RemoteAddr().String(),
httpLocalAddr, info.Conn.LocalAddr().String(),
)
}
func (ct *clientTracer) putIdleConn(err error) {
ct.end("http.receive", err)
}
func (ct *clientTracer) gotFirstResponseByte() {
ct.start("http.receive", "http.receive")
}
func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) {
ct.start("http.dns", "http.dns", httpHost, info.Host)
}
func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) {
ct.end("http.dns", info.Err)
}
func (ct *clientTracer) connectStart(network, addr string) {
_ = network
ct.start("http.connect."+addr, "http.connect", httpRemoteAddr, addr)
}
func (ct *clientTracer) connectDone(network, addr string, err error) {
_ = network
ct.end("http.connect."+addr, err)
}
func (ct *clientTracer) tlsHandshakeStart() {
ct.start("http.tls", "http.tls")
}
func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) {
ct.end("http.tls", err)
}
func (ct *clientTracer) wroteHeaderField(k string, v []string) {
if sp, ok := ct.span("http.headers"); !ok || sp == nil {
ct.start("http.headers", "http.headers")
}
ct.root.AddLabels("http."+strings.ToLower(k), sliceToString(v))
}
func (ct *clientTracer) wroteHeaders() {
ct.start("http.send", "http.send")
}
func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
if info.Err != nil {
ct.root.SetStatus(tracer.SpanStatusError, info.Err.Error())
}
ct.end("http.send", info.Err)
}
func (ct *clientTracer) got100Continue() {
if sp, ok := ct.span("http.receive"); ok && sp != nil {
sp.AddEvent("GOT 100 - Continue")
}
}
func (ct *clientTracer) wait100Continue() {
if sp, ok := ct.span("http.receive"); ok && sp != nil {
sp.AddEvent("GOT 100 - Wait")
}
}
func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error {
if sp, ok := ct.span("http.receive"); ok && sp != nil {
sp.AddEvent("GOT 1xx",
tracer.WithEventLabels(
httpStatus, code,
httpHeaderMIME, sm2s(header),
),
)
}
return nil
}
func sliceToString(value []string) string {
if len(value) == 0 {
return "undefined"
}
return strings.Join(value, ",")
}
func sm2s(value map[string][]string) string {
var buf strings.Builder
for k, v := range value {
if buf.Len() != 0 {
buf.WriteString(",")
}
buf.WriteString(k)
buf.WriteString("=")
buf.WriteString(sliceToString(v))
}
return buf.String()
}

View File

@@ -5,8 +5,32 @@ import (
"testing"
)
func TestTrieBackwards(t *testing.T) {
_ = &Trie{}
func TestTrieRPC(t *testing.T) {
var err error
type handler struct {
name string
}
tr := NewTrie()
if err = tr.Insert([]string{"helloworld"}, "Call", &handler{name: "helloworld.Call"}); err != nil {
t.Fatal(err)
}
if err = tr.Insert([]string{"helloworld"}, "Stream", &handler{name: "helloworld.Stream"}); err != nil {
t.Fatal(err)
}
h, _, err := tr.Search("helloworld", "Call")
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if h.(*handler).name != "helloworld.Call" {
t.Fatalf("invalid handler %v", h)
}
h, _, err = tr.Search("helloworld", "Stream")
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if h.(*handler).name != "helloworld.Stream" {
t.Fatalf("invalid handler %v", h)
}
}
func TestTrieWildcardPathPrefix(t *testing.T) {

View File

@@ -508,3 +508,74 @@ func FieldName(name string) string {
return string(newstr)
}
func Equal(src interface{}, dst interface{}, excptFields ...string) bool {
srcVal := reflect.ValueOf(src)
dstVal := reflect.ValueOf(dst)
switch srcVal.Kind() {
case reflect.Array, reflect.Slice:
for i := 0; i < srcVal.Len(); i++ {
e := srcVal.Index(i).Interface()
a := dstVal.Index(i).Interface()
if !Equal(e, a, excptFields...) {
return false
}
}
return true
case reflect.Map:
for i := 0; i < len(srcVal.MapKeys()); i++ {
key := srcVal.MapKeys()[i]
keyStr := fmt.Sprintf("%v", key.Interface())
if stringContains(keyStr, excptFields) {
continue
}
s := srcVal.MapIndex(key)
d := dstVal.MapIndex(key)
if !Equal(s.Interface(), d.Interface(), excptFields...) {
return false
}
}
return true
case reflect.Struct, reflect.Interface:
for i := 0; i < srcVal.NumField(); i++ {
typeField := srcVal.Type().Field(i)
if stringContains(typeField.Name, excptFields) {
continue
}
s := srcVal.Field(i)
d := dstVal.FieldByName(typeField.Name)
if s.CanInterface() && d.CanInterface() {
if !Equal(s.Interface(), d.Interface(), excptFields...) {
return false
}
} else {
return false
}
}
return true
case reflect.Ptr:
if srcVal.IsNil() {
return dstVal.IsNil()
}
s := srcVal.Elem()
d := reflect.Indirect(dstVal)
if s.CanInterface() && d.CanInterface() {
return Equal(s.Interface(), d.Interface(), excptFields...)
}
return false
case reflect.String, reflect.Int, reflect.Int64, reflect.Float32, reflect.Float64, reflect.Bool:
return src == dst
default:
return srcVal.Interface() == dstVal.Interface()
}
}
func stringContains(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}

View File

@@ -133,3 +133,16 @@ func TestMergeNested(t *testing.T) {
t.Fatalf("merge error: %#+v", dst.Nested)
}
}
func TestEqual(t *testing.T) {
type tstr struct {
Key1 string
Key2 string
}
src := &tstr{Key1: "val1", Key2: "micro:generate"}
dst := &tstr{Key1: "val1", Key2: "val2"}
if !Equal(src, dst, "Key2") {
t.Fatal("invalid Equal test")
}
}

505
util/test/test.go Normal file
View File

@@ -0,0 +1,505 @@
package test
import (
"bufio"
"bytes"
"context"
"database/sql/driver"
"encoding/csv"
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"time"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/metadata"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
var ErrUnknownContentType = fmt.Errorf("unknown content type")
type Extension struct {
Ext []string
}
var (
// ExtToTypes map file extension to content type
ExtToTypes = map[string][]string{
"json": {"application/json", "application/grpc+json"},
"yaml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
"yml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
"proto": {"application/grpc", "application/grpc+proto", "application/proto"},
}
// DefaultExts specifies default file extensions to load data
DefaultExts = []string{"csv", "json", "yaml", "yml", "proto"}
// Codecs map to detect codec for test file or request content type
Codecs map[string]codec.Codec
// ResponseCompareFunc used to compare actual response with test case data
ResponseCompareFunc = func(expectRsp []byte, testRsp interface{}, expectCodec codec.Codec, testCodec codec.Codec) error {
var err error
expectMap := make(map[string]interface{})
if err = expectCodec.Unmarshal(expectRsp, &expectMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
testMap := make(map[string]interface{})
switch v := testRsp.(type) {
case *codec.Frame:
if err = testCodec.Unmarshal(v.Data, &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
case *errors.Error:
if err = expectCodec.Unmarshal([]byte(v.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
case error:
st, ok := status.FromError(v)
if !ok {
return v
}
me := errors.Parse(st.Message())
if me.Code != 0 {
if err = expectCodec.Unmarshal([]byte(me.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
break
}
for _, se := range st.Details() {
switch ne := se.(type) {
case proto.Message:
buf, err := testCodec.Marshal(ne)
if err != nil {
return fmt.Errorf("failed to marshal err: %w", err)
}
if err = testCodec.Unmarshal(buf, &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
default:
return st.Err()
}
}
case interface{ GRPCStatus() *status.Status }:
st := v.GRPCStatus()
me := errors.Parse(st.Message())
if me.Code != 0 {
if err = expectCodec.Unmarshal([]byte(me.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
break
}
case *status.Status:
me := errors.Parse(v.Message())
if me.Code != 0 {
if err = expectCodec.Unmarshal([]byte(me.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
break
}
for _, se := range v.Details() {
switch ne := se.(type) {
case proto.Message:
buf, err := testCodec.Marshal(ne)
if err != nil {
return fmt.Errorf("failed to marshal err: %w", err)
}
if err = testCodec.Unmarshal(buf, &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
default:
return v.Err()
}
}
}
if !reflect.DeepEqual(expectMap, testMap) {
return fmt.Errorf("test: %s != rsp: %s", expectMap, testMap)
}
return nil
}
)
func FromCSVString(columns []*sqlmock.Column, rows *sqlmock.Rows, s string) *sqlmock.Rows {
res := strings.NewReader(strings.TrimSpace(s))
csvReader := csv.NewReader(res)
for {
res, err := csvReader.Read()
if err != nil || res == nil {
break
}
var row []driver.Value
for i, v := range res {
item := CSVColumnParser(strings.TrimSpace(v))
if null, nullOk := columns[i].IsNullable(); null && nullOk && item == nil {
row = append(row, nil)
} else {
row = append(row, item)
}
}
rows = rows.AddRow(row...)
}
return rows
}
func CSVColumnParser(s string) []byte {
switch {
case strings.ToLower(s) == "null":
return nil
case s == "":
return nil
}
return []byte(s)
}
func NewResponseFromFile(rspfile string) (*codec.Frame, error) {
rspbuf, err := os.ReadFile(rspfile)
if err != nil {
return nil, err
}
return &codec.Frame{Data: rspbuf}, nil
}
func getCodec(codecs map[string]codec.Codec, ext string) (codec.Codec, error) {
var c codec.Codec
if cts, ok := ExtToTypes[ext]; ok {
for _, t := range cts {
if c, ok = codecs[t]; ok {
return c, nil
}
}
}
return nil, ErrUnknownContentType
}
func getContentType(codecs map[string]codec.Codec, ext string) (string, error) {
if cts, ok := ExtToTypes[ext]; ok {
for _, t := range cts {
if _, ok = codecs[t]; ok {
return t, nil
}
}
}
return "", ErrUnknownContentType
}
func getExt(name string) string {
ext := filepath.Ext(name)
if len(ext) > 0 && ext[0] == '.' {
ext = ext[1:]
}
return ext
}
func getNameWithoutExt(name string) string {
return strings.TrimSuffix(name, filepath.Ext(name))
}
func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error) {
reqbuf, err := os.ReadFile(reqfile)
if err != nil {
return nil, err
}
endpoint := path.Base(path.Dir(reqfile))
if idx := strings.Index(endpoint, "_"); idx > 0 {
endpoint = endpoint[idx+1:]
}
ext := getExt(reqfile)
ct, err := getContentType(c.Options().Codecs, ext)
if err != nil {
return nil, err
}
req := c.NewRequest("test", endpoint, &codec.Frame{Data: reqbuf}, client.RequestContentType(ct))
return req, nil
}
func SQLFromFile(m sqlmock.Sqlmock, name string) error {
fp, err := os.Open(name)
if err != nil {
return err
}
defer fp.Close()
return SQLFromReader(m, fp)
}
func SQLFromBytes(m sqlmock.Sqlmock, buf []byte) error {
return SQLFromReader(m, bytes.NewReader(buf))
}
func SQLFromString(m sqlmock.Sqlmock, buf string) error {
return SQLFromReader(m, strings.NewReader(buf))
}
func SQLFromReader(m sqlmock.Sqlmock, r io.Reader) error {
var rows *sqlmock.Rows
var exp *sqlmock.ExpectedQuery
var columns []*sqlmock.Column
br := bufio.NewReader(r)
for {
s, err := br.ReadString('\n')
if err != nil && err != io.EOF {
return err
} else if err == io.EOF && len(s) == 0 {
if rows != nil && exp != nil {
exp.WillReturnRows(rows)
}
return nil
}
if s[0] != '#' {
r := csv.NewReader(strings.NewReader(s))
r.Comma = ','
var records [][]string
records, err = r.ReadAll()
if err != nil {
return err
}
if rows == nil && len(columns) > 0 {
rows = m.NewRowsWithColumnDefinition(columns...)
} else {
for idx := 0; idx < len(records); idx++ {
if len(columns) == 0 {
return fmt.Errorf("csv file not valid, does not have %q line", "# columns ")
}
rows = FromCSVString(columns, rows, strings.Join(records[idx], ","))
}
}
continue
}
if rows != nil {
exp.WillReturnRows(rows)
rows = nil
}
switch {
case strings.HasPrefix(strings.ToLower(s[2:]), "columns"):
for _, field := range strings.Split(s[2+len("columns")+1:], ",") {
args := strings.Split(field, "|")
column := sqlmock.NewColumn(args[0]).Nullable(false)
if len(args) > 1 {
for _, arg := range args {
switch arg {
case "BOOLEAN", "BOOL":
column = column.OfType("BOOL", false)
case "NUMBER", "DECIMAL":
column = column.OfType("DECIMAL", float64(0.0)).WithPrecisionAndScale(10, 4)
case "VARCHAR":
column = column.OfType("VARCHAR", nil)
case "NULL":
column = column.Nullable(true)
}
}
}
columns = append(columns, column)
}
case strings.HasPrefix(strings.ToLower(s[2:]), "begin"):
m.ExpectBegin()
case strings.HasPrefix(strings.ToLower(s[2:]), "commit"):
m.ExpectCommit()
case strings.HasPrefix(strings.ToLower(s[2:]), "rollback"):
m.ExpectRollback()
case strings.HasPrefix(strings.ToLower(s[2:]), "exec "):
m.ExpectExec(s[2+len("exec "):])
case strings.HasPrefix(strings.ToLower(s[2:]), "query "):
exp = m.ExpectQuery(s[2+len("query "):])
}
}
}
func Run(ctx context.Context, c client.Client, m sqlmock.Sqlmock, dir string, exts []string) error {
tcases, err := GetCases(dir, exts)
if err != nil {
return err
}
g, gctx := errgroup.WithContext(ctx)
if !strings.Contains(dir, "parallel") {
g.SetLimit(1)
}
for _, tcase := range tcases {
for _, dbfile := range tcase.dbfiles {
if err = SQLFromFile(m, dbfile); err != nil {
return err
}
}
tc := tcase
g.Go(func() error {
var xrid string
var gerr error
treq, err := NewRequestFromFile(c, tc.reqfile)
if err != nil {
gerr = fmt.Errorf("failed to read request from file %s err: %w", tc.reqfile, err)
return gerr
}
xrid = fmt.Sprintf("%s-%d", treq.Endpoint(), time.Now().Unix())
defer func() {
if gerr == nil {
fmt.Printf("test %s xrid: %s status: success\n", filepath.Dir(tc.reqfile), xrid)
} else {
fmt.Printf("test %s xrid: %s status: failure error: %v\n", filepath.Dir(tc.reqfile), xrid, err)
}
}()
data := &codec.Frame{}
md := metadata.New(1)
md.Set("X-Request-Id", xrid)
cerr := c.Call(metadata.NewOutgoingContext(gctx, md), treq, data, client.WithContentType(treq.ContentType()))
var rspfile string
if tc.errfile != "" {
rspfile = tc.errfile
} else if tc.rspfile != "" {
rspfile = tc.rspfile
} else {
gerr = fmt.Errorf("errfile and rspfile is empty")
return gerr
}
expectRsp, err := NewResponseFromFile(rspfile)
if err != nil {
gerr = fmt.Errorf("failed to read response from file %s err: %w", rspfile, err)
return gerr
}
testCodec, err := getCodec(Codecs, getExt(tc.reqfile))
if err != nil {
gerr = fmt.Errorf("failed to get response file codec err: %w", err)
return gerr
}
expectCodec, err := getCodec(Codecs, getExt(rspfile))
if err != nil {
gerr = fmt.Errorf("failed to get response file codec err: %w", err)
return gerr
}
if cerr == nil && tc.errfile != "" {
gerr = fmt.Errorf("expected err %s not happened", expectRsp.Data)
return gerr
} else if cerr != nil && tc.errfile != "" {
if err = ResponseCompareFunc(expectRsp.Data, cerr, expectCodec, testCodec); err != nil {
gerr = err
return gerr
}
} else if cerr != nil && tc.errfile == "" {
gerr = cerr
return gerr
} else if cerr == nil && tc.errfile == "" {
if err = ResponseCompareFunc(expectRsp.Data, data, expectCodec, testCodec); err != nil {
gerr = err
return gerr
}
}
/*
cf, err := getCodec(c.Options().Codecs, getExt(tc.rspfile))
if err != nil {
return err
}
*/
return nil
})
}
return g.Wait()
}
type Case struct {
dbfiles []string
reqfile string
rspfile string
errfile string
}
func GetCases(dir string, exts []string) ([]Case, error) {
var tcases []Case
entries, err := os.ReadDir(dir)
if len(entries) == 0 && err != nil {
return tcases, err
}
if exts == nil {
exts = DefaultExts
}
var dirs []string
var dbfiles []string
var reqfile, rspfile, errfile string
for _, entry := range entries {
if entry.IsDir() {
dirs = append(dirs, filepath.Join(dir, entry.Name()))
continue
}
if info, err := entry.Info(); err != nil {
return tcases, err
} else if !info.Mode().IsRegular() {
continue
}
for _, ext := range exts {
if getExt(entry.Name()) == ext {
name := getNameWithoutExt(entry.Name())
switch {
case strings.HasSuffix(name, "_db"):
dbfiles = append(dbfiles, filepath.Join(dir, entry.Name()))
case strings.HasSuffix(name, "_req"):
reqfile = filepath.Join(dir, entry.Name())
case strings.HasSuffix(name, "_rsp"):
rspfile = filepath.Join(dir, entry.Name())
case strings.HasSuffix(name, "_err"):
errfile = filepath.Join(dir, entry.Name())
}
}
}
}
if reqfile != "" && (rspfile != "" || errfile != "") {
tcases = append(tcases, Case{dbfiles: dbfiles, reqfile: reqfile, rspfile: rspfile, errfile: errfile})
}
for _, dir = range dirs {
ntcases, err := GetCases(dir, exts)
if len(ntcases) == 0 && err != nil {
return tcases, err
} else if len(ntcases) == 0 {
continue
}
tcases = append(tcases, ntcases...)
}
return tcases, nil
}

72
util/test/test_test.go Normal file
View File

@@ -0,0 +1,72 @@
package test
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
func Test_SQLFromFile(t *testing.T) {
ctx := context.TODO()
db, c, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
defer db.Close()
if err = SQLFromFile(c, "testdata/result/01_firstcase/Call_db.csv"); err != nil {
t.Fatal(err)
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
rows, err := tx.QueryContext(ctx, "select * from test;")
if err != nil {
t.Fatal(err)
}
for rows.Next() {
var id int64
var name string
err = rows.Scan(&id, &name)
if err != nil {
t.Fatal(err)
}
if id != 1 || name != "test" {
t.Fatalf("invalid rows %v %v", id, name)
}
}
if err = rows.Close(); err != nil {
t.Fatal(err)
}
if err = rows.Err(); err != nil {
t.Fatal(err)
}
if err = tx.Commit(); err != nil {
t.Fatal(err)
}
if err = c.ExpectationsWereMet(); err != nil {
t.Fatal(err)
}
}
func Test_GetCases(t *testing.T) {
files, err := GetCases("testdata/", nil)
if err != nil {
t.Fatal(err)
}
if len(files) == 0 {
t.Fatalf("no files matching")
}
if n := len(files); n != 1 {
t.Fatalf("invalid number of test cases %d", n)
}
}

View File

@@ -0,0 +1,6 @@
# begin
# query select \* from test;
# columns id|VARCHAR,name|VARCHAR
id,name
1,test
# commit
1 # begin
2 # query select \* from test;
3 # columns id|VARCHAR,name|VARCHAR
4 id,name
5 1,test
6 # commit

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1 @@
{}

View File

@@ -2,7 +2,9 @@ package time
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
)
@@ -13,39 +15,42 @@ func ParseDuration(s string) (time.Duration, error) {
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
}
//var sb strings.Builder
/*
for i, r := range s {
switch r {
case 'd':
n, err := strconv.Atoi(s[idx:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
}
s[idx:i] = fmt.Sprintf("%d", n*24)
default:
sb.WriteRune(r)
var p int
var hours int
loop:
for i, r := range s {
switch r {
case 's', 'm':
break loop
case 'h':
d, err := strconv.Atoi(s[p:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
}
}
*/
var td time.Duration
var err error
switch s[len(s)-1] {
case 's', 'm', 'h':
td, err = time.ParseDuration(s)
case 'd':
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
td *= 24
}
case 'y':
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
year := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.Local)
days := year.YearDay()
td *= 24 * time.Duration(days)
hours += d
p = i + 1
case 'd':
d, err := strconv.Atoi(s[p:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
}
hours += d * 24
p = i + 1
case 'y':
n, err := strconv.Atoi(s[p:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
}
var d int
for j := n - 1; j >= 0; j-- {
d += time.Date(time.Now().Year()+j, time.December, 31, 0, 0, 0, 0, time.Local).YearDay()
}
hours += d * 24
p = i + 1
}
}
return td, err
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
}
func (d Duration) MarshalJSON() ([]byte, error) {
@@ -62,7 +67,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
*d = Duration(time.Duration(value))
return nil
case string:
dv, err := time.ParseDuration(value)
dv, err := ParseDuration(value)
if err != nil {
return err
}

View File

@@ -23,27 +23,34 @@ func TestUnmarshalJSON(t *testing.T) {
TTL Duration `json:"ttl"`
}
v := &str{}
var err error
err := json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
err = json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 10000000 {
t.Fatalf("invalid duration %v != 10000000", v.TTL)
}
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 31536000000000000 {
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
}
}
func TestParseDuration(t *testing.T) {
var td time.Duration
var err error
t.Skip()
td, err = ParseDuration("14d4h")
if err != nil {
t.Fatalf("ParseDuration error: %v", err)
}
if td.String() != "336h0m0s" {
t.Fatalf("ParseDuration 14d != 336h0m0s : %s", td.String())
if td.String() != "340h0m0s" {
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
}
td, err = ParseDuration("1y")
if err != nil {
t.Fatalf("ParseDuration error: %v", err)