From 9704ef2e5e8151ff1c1f212f1f77f1dd9c002fd7 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 6 Dec 2024 19:05:27 +0300 Subject: [PATCH 1/8] fix pipeline (#365) Co-authored-by: Aleksandr Tolstikhin Reviewed-on: https://git.unistack.org/unistack-org/micro/pulls/365 Co-authored-by: Vasiliy Tolstov Co-committed-by: Vasiliy Tolstov --- .gitea/workflows/lint.yml | 24 --------- .gitea/workflows/pipeline.yml | 40 ++++++++++++++ .gitea/workflows/pr.yml | 23 -------- .golangci.yml | 41 +-------------- client/noop.go | 12 ++--- errors/errors_test.go | 3 +- logger/slog/slog.go | 5 +- logger/unwrap/unwrap.go | 16 +++--- logger/unwrap/unwrap_test.go | 6 +-- metadata/metadata_test.go | 1 + micro.go | 36 +++++++------ options.go | 6 +-- register/memory/memory.go | 4 +- register/memory/memory_test.go | 20 ++++--- server/noop.go | 1 - server/noop_test.go | 2 +- service.go | 4 +- store/store.go | 4 ++ util/dns/conn.go | 9 +++- util/jitter/ticker.go | 1 - util/reflect/reflect.go | 2 +- util/reflect/reflect_test.go | 29 ++++++++++ util/structfs/metadata_digitalocean.go | 2 +- util/structfs/metadata_ec2.go | 2 +- util/structfs/structfs.go | 15 +++--- util/structfs/structfs_test.go | 73 +++++++++++++++----------- util/time/duration.go | 2 +- util/xpool/pool.go | 21 ++++---- 28 files changed, 202 insertions(+), 202 deletions(-) delete mode 100644 .gitea/workflows/lint.yml create mode 100644 .gitea/workflows/pipeline.yml delete mode 100644 .gitea/workflows/pr.yml diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/lint.yml deleted file mode 100644 index 1cc293dc..00000000 --- a/.gitea/workflows/lint.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: lint -on: - pull_request: - branches: - - master - - v3 -jobs: - lint: - name: lint - runs-on: ubuntu-latest - steps: - - name: setup-go - uses: actions/setup-go@v3 - with: - go-version: 1.21 - - name: checkout - uses: actions/checkout@v3 - - name: deps - run: go get -v -d ./... - - name: lint - uses: https://github.com/golangci/golangci-lint-action@v3.4.0 - continue-on-error: true - with: - version: v1.52 diff --git a/.gitea/workflows/pipeline.yml b/.gitea/workflows/pipeline.yml new file mode 100644 index 00000000..31f97f3b --- /dev/null +++ b/.gitea/workflows/pipeline.yml @@ -0,0 +1,40 @@ +name: pipeline +on: + pull_request: + branches: + - master + - v3 + - v4 +jobs: + lint: + name: lint + runs-on: ubuntu-latest + steps: + - name: setup-go + uses: actions/setup-go@v5 + with: + go-version: 'stable' + - name: checkout + uses: actions/checkout@v3 + - name: deps + run: go get -v -d ./... + - name: lint + uses: https://github.com/golangci/golangci-lint-action@v6 + with: + version: v1.62.2 + test: + name: test + runs-on: ubuntu-latest + steps: + - name: setup-go + uses: actions/setup-go@v5 + with: + go-version: 'stable' + - name: checkout + uses: actions/checkout@v3 + - name: deps + run: go get -v -d ./... + - name: test + env: + INTEGRATION_TESTS: yes + run: go test -mod readonly -v ./... diff --git a/.gitea/workflows/pr.yml b/.gitea/workflows/pr.yml deleted file mode 100644 index b3e67b06..00000000 --- a/.gitea/workflows/pr.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: pr -on: - pull_request: - branches: - - master - - v3 -jobs: - test: - name: test - runs-on: ubuntu-latest - steps: - - name: checkout - uses: actions/checkout@v3 - - name: setup-go - uses: actions/setup-go@v3 - with: - go-version: 1.21 - - name: deps - run: go get -v -t -d ./... - - name: test - env: - INTEGRATION_TESTS: yes - run: go test -mod readonly -v ./... diff --git a/.golangci.yml b/.golangci.yml index 524fc7f8..2bb1c300 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,44 +1,5 @@ run: - concurrency: 4 + concurrency: 8 deadline: 5m issues-exit-code: 1 tests: true - -linters-settings: - govet: - check-shadowing: true - enable: - - fieldalignment - -linters: - enable: - - govet - - deadcode - - errcheck - - govet - - ineffassign - - staticcheck - - structcheck - - typecheck - - unused - - varcheck - - bodyclose - - gci - - goconst - - gocritic - - gosimple - - gofmt - - gofumpt - - goimports - - revive - - gosec - - makezero - - misspell - - nakedret - - nestif - - nilerr - - noctx - - prealloc - - unconvert - - unparam - disable-all: false diff --git a/client/noop.go b/client/noop.go index 5fb8d571..ad0fc2a8 100644 --- a/client/noop.go +++ b/client/noop.go @@ -298,7 +298,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%s", err.Error()) } // only sleep if greater than 0 @@ -312,7 +312,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o // TODO apply any filtering here routes, err = n.opts.Lookup(ctx, req, callOpts) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%s", err.Error()) } // balance the list of nodes @@ -466,7 +466,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti // call backoff first. Someone may want an initial start delay t, cerr := callOpts.Backoff(ctx, req, i) if cerr != nil { - return nil, errors.InternalServerError("go.micro.client", cerr.Error()) + return nil, errors.InternalServerError("go.micro.client", "%s", cerr.Error()) } // only sleep if greater than 0 @@ -480,7 +480,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti // TODO apply any filtering here routes, err = n.opts.Lookup(ctx, req, callOpts) if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + return nil, errors.InternalServerError("go.micro.client", "%s", err.Error()) } // balance the list of nodes @@ -609,13 +609,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO // use codec for payload cf, err := n.newCodec(p.ContentType()) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%s", err.Error()) } // set the body b, err := cf.Marshal(p.Payload()) if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + return errors.InternalServerError("go.micro.client", "%s", err.Error()) } body = b } diff --git a/errors/errors_test.go b/errors/errors_test.go index 03c264c2..dd476637 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -2,6 +2,7 @@ package errors import ( "encoding/json" + "errors" er "errors" "fmt" "net/http" @@ -26,7 +27,7 @@ func TestMarshalJSON(t *testing.T) { func TestEmpty(t *testing.T) { msg := "test" var err *Error - err = FromError(fmt.Errorf(msg)) + err = FromError(errors.New(msg)) if err.Detail != msg { t.Fatalf("invalid error %v", err) } diff --git a/logger/slog/slog.go b/logger/slog/slog.go index 4d3e8c06..4923e6a5 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -46,11 +46,11 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error { } func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { - return h.WithAttrs(attrs) + return h.h.WithAttrs(attrs) } func (h *wrapper) WithGroup(name string) slog.Handler { - return h.WithGroup(name) + return h.h.WithGroup(name) } func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { @@ -89,7 +89,6 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { } type slogLogger struct { - leveler *slog.LevelVar handler *wrapper opts logger.Options mu sync.RWMutex diff --git a/logger/unwrap/unwrap.go b/logger/unwrap/unwrap.go index 93be44d4..5cfcf167 100644 --- a/logger/unwrap/unwrap.go +++ b/logger/unwrap/unwrap.go @@ -36,14 +36,14 @@ var ( circularShortBytes = []byte("") invalidAngleBytes = []byte("") filteredBytes = []byte("") - openBracketBytes = []byte("[") - closeBracketBytes = []byte("]") - percentBytes = []byte("%") - precisionBytes = []byte(".") - openAngleBytes = []byte("<") - closeAngleBytes = []byte(">") - openMapBytes = []byte("{") - closeMapBytes = []byte("}") + // openBracketBytes = []byte("[") + // closeBracketBytes = []byte("]") + percentBytes = []byte("%") + precisionBytes = []byte(".") + openAngleBytes = []byte("<") + closeAngleBytes = []byte(">") + openMapBytes = []byte("{") + closeMapBytes = []byte("}") ) type protoMessage interface { diff --git a/logger/unwrap/unwrap_test.go b/logger/unwrap/unwrap_test.go index 3161c342..af55db0a 100644 --- a/logger/unwrap/unwrap_test.go +++ b/logger/unwrap/unwrap_test.go @@ -82,12 +82,12 @@ func TestTagged(t *testing.T) { func TestTaggedNested(t *testing.T) { type val struct { key string `logger:"take"` - val string `logger:"omit"` + // val string `logger:"omit"` unk string } type str struct { - key string `logger:"omit"` - val *val `logger:"take"` + // key string `logger:"omit"` + val *val `logger:"take"` } var iface interface{} diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 28cf93b7..7b6eb885 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -83,6 +83,7 @@ func TestPassing(t *testing.T) { if ok { t.Fatalf("create outgoing context") } + _ = md ctx = NewOutgoingContext(ctx, New(1)) testCtx(ctx) diff --git a/micro.go b/micro.go index 20295601..e2dc1e62 100644 --- a/micro.go +++ b/micro.go @@ -65,6 +65,8 @@ func As(b any, target any) bool { break case targetType.Implements(routerType): break + case targetType.Implements(tracerType): + break default: return false } @@ -76,19 +78,21 @@ func As(b any, target any) bool { return false } -var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem() -var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem() -var clientType = reflect.TypeOf((*client.Client)(nil)).Elem() -var serverType = reflect.TypeOf((*server.Server)(nil)).Elem() -var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem() -var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem() -var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem() -var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem() -var registerType = reflect.TypeOf((*register.Register)(nil)).Elem() -var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem() -var routerType = reflect.TypeOf((*router.Router)(nil)).Elem() -var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem() -var storeType = reflect.TypeOf((*store.Store)(nil)).Elem() -var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem() -var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem() -var serviceType = reflect.TypeOf((*Service)(nil)).Elem() +var ( + brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem() + loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem() + clientType = reflect.TypeOf((*client.Client)(nil)).Elem() + serverType = reflect.TypeOf((*server.Server)(nil)).Elem() + codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem() + flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem() + fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem() + meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem() + registerType = reflect.TypeOf((*register.Register)(nil)).Elem() + resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem() + routerType = reflect.TypeOf((*router.Router)(nil)).Elem() + selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem() + storeType = reflect.TypeOf((*store.Store)(nil)).Elem() + syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem() + tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem() + serviceType = reflect.TypeOf((*Service)(nil)).Elem() +) diff --git a/options.go b/options.go index 97adbf5e..cee1b9bb 100644 --- a/options.go +++ b/options.go @@ -269,7 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option { } } } - + for _, trc := range o.Tracers { for _, ot := range lopts.tracers { if trc.Name() == ot || all { @@ -294,8 +294,8 @@ type loggerOptions struct { brokers []string registers []string stores []string - meters []string - tracers []string + // meters []string + tracers []string } /* diff --git a/register/memory/memory.go b/register/memory/memory.go index ed241c4f..1784a53c 100644 --- a/register/memory/memory.go +++ b/register/memory/memory.go @@ -469,9 +469,7 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record { } endpoints := make([]*register.Endpoint, len(s.Endpoints)) - for i, e := range s.Endpoints { - endpoints[i] = e - } + copy(endpoints, s.Endpoints) return &record{ Name: s.Name, diff --git a/register/memory/memory_test.go b/register/memory/memory_test.go index 2210b3f6..04502460 100644 --- a/register/memory/memory_test.go +++ b/register/memory/memory_test.go @@ -290,27 +290,25 @@ func TestWatcher(t *testing.T) { ctx := context.TODO() m := NewRegister() - m.Init() - m.Connect(ctx) + _ = m.Init() + _ = m.Connect(ctx) wc, err := m.Watch(ctx) if err != nil { t.Fatalf("cant watch: %v", err) } defer wc.Stop() + cherr := make(chan error, 10) var wg sync.WaitGroup wg.Add(1) go func() { - for { - _, err := wc.Next() - if err != nil { - t.Fatal("unexpected err", err) - } - // t.Logf("changes %#+v", ch.Service) - wc.Stop() - wg.Done() - return + _, err := wc.Next() + if err != nil { + cherr <- fmt.Errorf("unexpected err %v", err) } + // t.Logf("changes %#+v", ch.Service) + wc.Stop() + wg.Done() }() if err := m.Register(ctx, testSrv); err != nil { diff --git a/server/noop.go b/server/noop.go index 86fcfeb9..8fd15791 100644 --- a/server/noop.go +++ b/server/noop.go @@ -171,7 +171,6 @@ type rpcMessage struct { header metadata.Metadata topic string contentType string - body []byte } func (r *rpcMessage) ContentType() string { diff --git a/server/noop_test.go b/server/noop_test.go index 2bad82e6..8b44a1e0 100644 --- a/server/noop_test.go +++ b/server/noop_test.go @@ -38,7 +38,7 @@ func TestNoopSub(t *testing.T) { t.Fatal(err) } - logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)) + _ = logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)) s := server.NewServer( server.Broker(b), server.Codec("application/octet-stream", codec.NewCodec()), diff --git a/service.go b/service.go index 3c782035..469e823c 100644 --- a/service.go +++ b/service.go @@ -23,8 +23,8 @@ import ( ) func init() { - maxprocs.Set() - memlimit.SetGoMemLimitWithOpts( + _, _ = maxprocs.Set() + _, _ = memlimit.SetGoMemLimitWithOpts( memlimit.WithRatio(0.9), memlimit.WithProvider( memlimit.ApplyFallback( diff --git a/store/store.go b/store/store.go index c5b8ac3e..fcd0c987 100644 --- a/store/store.go +++ b/store/store.go @@ -105,3 +105,7 @@ func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) { return options, err } + +func Watch(context.Context) (Watcher, error) { + return nil, nil +} diff --git a/util/dns/conn.go b/util/dns/conn.go index f6057a96..8d71ad46 100644 --- a/util/dns/conn.go +++ b/util/dns/conn.go @@ -66,8 +66,13 @@ func (c *dnsConn) RemoteAddr() net.Addr { } func (c *dnsConn) SetDeadline(t time.Time) error { - c.SetReadDeadline(t) - c.SetWriteDeadline(t) + var err error + if err = c.SetReadDeadline(t); err != nil { + return err + } + if err = c.SetWriteDeadline(t); err != nil { + return err + } return nil } diff --git a/util/jitter/ticker.go b/util/jitter/ticker.go index b8d06ee5..34e9d65f 100644 --- a/util/jitter/ticker.go +++ b/util/jitter/ticker.go @@ -16,7 +16,6 @@ type Ticker struct { C chan time.Time min int64 max int64 - exp int64 exit bool rng rand.Rand } diff --git a/util/reflect/reflect.go b/util/reflect/reflect.go index 729b4e02..859beb20 100644 --- a/util/reflect/reflect.go +++ b/util/reflect/reflect.go @@ -91,7 +91,7 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error { } if mapper, ok := dst.(map[string]interface{}); ok { - dst = mergeMap(mapper, mp, 0) + mergeMap(mapper, mp, 0) return nil } diff --git a/util/reflect/reflect_test.go b/util/reflect/reflect_test.go index c11d7736..2b2a10b0 100644 --- a/util/reflect/reflect_test.go +++ b/util/reflect/reflect_test.go @@ -1,9 +1,38 @@ package reflect import ( + "fmt" "testing" ) +func TestMergeMapStringInterface(t *testing.T) { + var dst interface{} //nolint:gosimple + dst = map[string]interface{}{ + "xx": 11, + } + + src := map[string]interface{}{ + "zz": "aa", + } + + if err := Merge(dst, src); err != nil { + t.Fatal(err) + } + + mp, ok := dst.(map[string]interface{}) + if !ok || mp == nil { + t.Fatalf("xxx %#+v\n", dst) + } + + if fmt.Sprintf("%v", mp["xx"]) != "11" { + t.Fatalf("xxx zzzz %#+v", mp) + } + + if fmt.Sprintf("%v", mp["zz"]) != "aa" { + t.Fatalf("xxx zzzz %#+v", mp) + } +} + func TestMergeMap(t *testing.T) { src := map[string]interface{}{ "skey1": "sval1", diff --git a/util/structfs/metadata_digitalocean.go b/util/structfs/metadata_digitalocean.go index 93ced79c..5effe043 100644 --- a/util/structfs/metadata_digitalocean.go +++ b/util/structfs/metadata_digitalocean.go @@ -56,7 +56,7 @@ type DigitalOceanMetadata struct { func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/metadata/v1.json": - json.NewEncoder(w).Encode(stfs.Metadata.V1) + _ = json.NewEncoder(w).Encode(stfs.Metadata.V1) default: fs := FileServer(stfs, "json", time.Now()) idx := strings.Index(r.URL.Path[1:], "/") diff --git a/util/structfs/metadata_ec2.go b/util/structfs/metadata_ec2.go index 6c0f63aa..07be6e42 100644 --- a/util/structfs/metadata_ec2.go +++ b/util/structfs/metadata_ec2.go @@ -12,7 +12,7 @@ type EC2Metadata struct { InstanceType string `json:"instance-type"` LocalHostname string `json:"local-hostname"` LocalIPv4 string `json:"local-ipv4"` - kernelID int `json:"kernel-id"` + KernelID int `json:"kernel-id"` Placement string `json:"placement"` AvailabilityZone string `json:"availability-zone"` ProductCodes string `json:"product-codes"` diff --git a/util/structfs/structfs.go b/util/structfs/structfs.go index 755c691c..a26a6bdf 100644 --- a/util/structfs/structfs.go +++ b/util/structfs/structfs.go @@ -27,7 +27,7 @@ func (fs *fs) ServeHTTP(w http.ResponseWriter, r *http.Request) { f, err := fs.Open(r.URL.Path) if err != nil { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(err.Error())) + _, _ = w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/octet-stream") @@ -67,9 +67,9 @@ func (fi *fileInfo) Name() string { func (fi *fileInfo) Mode() os.FileMode { if strings.HasSuffix(fi.name, "/") { - return os.FileMode(0755) | os.ModeDir + return os.FileMode(0o755) | os.ModeDir } - return os.FileMode(0644) + return os.FileMode(0o644) } func (fi *fileInfo) IsDir() bool { @@ -112,15 +112,14 @@ func (f *file) Readdir(count int) ([]os.FileInfo, error) { func (f *file) Seek(offset int64, whence int) (int64, error) { // log.Printf("seek %d %d %s\n", offset, whence, f.name) switch whence { - case os.SEEK_SET: + case io.SeekStart: f.offset = offset - case os.SEEK_CUR: + case io.SeekCurrent: f.offset += offset - case os.SEEK_END: + case io.SeekEnd: f.offset = int64(len(f.data)) + offset } return f.offset, nil - } func (f *file) Stat() (os.FileInfo, error) { @@ -222,6 +221,7 @@ func getValue(name string, iface interface{}, tag string) ([]byte, error) { return nil, fmt.Errorf("failed to find %s in interface %T", name, iface) } +/* func hasValidType(obj interface{}, types []reflect.Kind) bool { for _, t := range types { if reflect.TypeOf(obj).Kind() == t { @@ -231,6 +231,7 @@ func hasValidType(obj interface{}, types []reflect.Kind) bool { return false } +*/ func reflectValue(obj interface{}) reflect.Value { var val reflect.Value diff --git a/util/structfs/structfs_test.go b/util/structfs/structfs_test.go index 7abf8edb..8f84dd9d 100644 --- a/util/structfs/structfs_test.go +++ b/util/structfs/structfs_test.go @@ -2,7 +2,7 @@ package structfs import ( "encoding/json" - "io/ioutil" + "io" "net/http" "reflect" "testing" @@ -61,7 +61,7 @@ var doOrig = []byte(`{ } `) -func server(t *testing.T) { +func server(t *testing.T, ch chan error) { stfs := DigitalOceanMetadata{} err := json.Unmarshal(doOrig, &stfs.Metadata.V1) if err != nil { @@ -71,7 +71,7 @@ func server(t *testing.T) { http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now())) http.Handle("/metadata/v1.json", &stfs) go func() { - t.Fatal(http.ListenAndServe("127.0.0.1:8080", nil)) + ch <- http.ListenAndServe("127.0.0.1:8080", nil) }() time.Sleep(2 * time.Second) } @@ -82,13 +82,14 @@ func get(path string) ([]byte, error) { return nil, err } defer res.Body.Close() - return ioutil.ReadAll(res.Body) + return io.ReadAll(res.Body) } func TestAll(t *testing.T) { - server(t) + ch := make(chan error) + server(t, ch) - var tests = []struct { + tests := []struct { in string out string }{ @@ -100,34 +101,44 @@ func TestAll(t *testing.T) { } for _, tt := range tests { - buf, err := get(tt.in) + select { + case err := <-ch: + t.Fatal(err) + default: + buf, err := get(tt.in) + if err != nil { + t.Fatal(err) + } + if string(buf) != tt.out { + t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out) + } + } + } + + select { + case err := <-ch: + t.Fatal(err) + default: + doTest, err := get("http://127.0.0.1:8080/metadata/v1.json") if err != nil { t.Fatal(err) } - if string(buf) != tt.out { - t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out) + + oSt := DigitalOceanMetadata{} + err = json.Unmarshal(doOrig, &oSt.Metadata.V1) + if err != nil { + t.Fatal(err) + } + + nSt := DigitalOceanMetadata{} + + err = json.Unmarshal(doTest, &nSt.Metadata.V1) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(oSt, nSt) { + t.Fatalf("%v not match %v", oSt, nSt) } } - - doTest, err := get("http://127.0.0.1:8080/metadata/v1.json") - if err != nil { - t.Fatal(err) - } - - oSt := DigitalOceanMetadata{} - err = json.Unmarshal(doOrig, &oSt.Metadata.V1) - if err != nil { - t.Fatal(err) - } - - nSt := DigitalOceanMetadata{} - - err = json.Unmarshal(doTest, &nSt.Metadata.V1) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(oSt, nSt) { - t.Fatalf("%v not match %v", oSt, nSt) - } } diff --git a/util/time/duration.go b/util/time/duration.go index 49550e5e..33acb1a9 100644 --- a/util/time/duration.go +++ b/util/time/duration.go @@ -14,7 +14,7 @@ type Duration int64 func ParseDuration(s string) (time.Duration, error) { if s == "" { - return 0, fmt.Errorf(`time: invalid duration "` + s + `"`) + return 0, errors.New(`time: invalid duration "` + s + `"`) } var p int diff --git a/util/xpool/pool.go b/util/xpool/pool.go index b80cd482..d38b3a08 100644 --- a/util/xpool/pool.go +++ b/util/xpool/pool.go @@ -39,19 +39,16 @@ func newStatsMeter() { ticker := time.NewTicker(meter.DefaultMeterStatsInterval) defer ticker.Stop() - for { - select { - case <-ticker.C: - poolsMu.Lock() - for _, st := range pools { - stats := st.Stats() - meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) - meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) - meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) - meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) - } - poolsMu.Unlock() + for range ticker.C { + poolsMu.Lock() + for _, st := range pools { + stats := st.Stats() + meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) + meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) + meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) + meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) } + poolsMu.Unlock() } } From 16d8cf3434680605f6553934dd4657ef44c3fb01 Mon Sep 17 00:00:00 2001 From: Aleksandr Tolstikhin Date: Sat, 7 Dec 2024 02:37:12 +0700 Subject: [PATCH 2/8] Update actions --- .../workflows/{pipeline.yml => job_lint.yml} | 22 +++---------- .gitea/workflows/job_test.yml | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+), 18 deletions(-) rename .gitea/workflows/{pipeline.yml => job_lint.yml} (52%) create mode 100644 .gitea/workflows/job_test.yml diff --git a/.gitea/workflows/pipeline.yml b/.gitea/workflows/job_lint.yml similarity index 52% rename from .gitea/workflows/pipeline.yml rename to .gitea/workflows/job_lint.yml index 31f97f3b..ee3238f2 100644 --- a/.gitea/workflows/pipeline.yml +++ b/.gitea/workflows/job_lint.yml @@ -1,13 +1,15 @@ -name: pipeline +name: lint + on: pull_request: + types: [opened, reopened, closed, synchronize] branches: - master - v3 - v4 + jobs: lint: - name: lint runs-on: ubuntu-latest steps: - name: setup-go @@ -22,19 +24,3 @@ jobs: uses: https://github.com/golangci/golangci-lint-action@v6 with: version: v1.62.2 - test: - name: test - runs-on: ubuntu-latest - steps: - - name: setup-go - uses: actions/setup-go@v5 - with: - go-version: 'stable' - - name: checkout - uses: actions/checkout@v3 - - name: deps - run: go get -v -d ./... - - name: test - env: - INTEGRATION_TESTS: yes - run: go test -mod readonly -v ./... diff --git a/.gitea/workflows/job_test.yml b/.gitea/workflows/job_test.yml new file mode 100644 index 00000000..4223043f --- /dev/null +++ b/.gitea/workflows/job_test.yml @@ -0,0 +1,31 @@ +name: test + +on: + pull_request: + types: [opened, reopened, closed, synchronize] + branches: + - master + - v3 + - v4 + push: + branches: + - master + - v3 + - v4 + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: setup-go + uses: actions/setup-go@v5 + with: + go-version: 'stable' + - name: checkout + uses: actions/checkout@v3 + - name: deps + run: go get -v -d ./... + - name: test + env: + INTEGRATION_TESTS: yes + run: go test -mod readonly -v ./... From 2df259b5b8bf7d4cb534803aa8d9a0e50646d1f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B8=D0=B9=20=D0=A2=D0=BE?= =?UTF-8?q?=D0=BB=D1=81=D1=82=D0=BE=D0=B2?= Date: Fri, 6 Dec 2024 23:11:32 +0300 Subject: [PATCH 3/8] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20.gitea/workflows/job=5Ftest.yml?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/job_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitea/workflows/job_test.yml b/.gitea/workflows/job_test.yml index 4223043f..15716f1d 100644 --- a/.gitea/workflows/job_test.yml +++ b/.gitea/workflows/job_test.yml @@ -2,7 +2,7 @@ name: test on: pull_request: - types: [opened, reopened, closed, synchronize] + types: [opened, reopened, synchronize] branches: - master - v3 From aa360dcf51fa34a37fe58cc7abeb9d505fe43960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B8=D0=B9=20=D0=A2=D0=BE?= =?UTF-8?q?=D0=BB=D1=81=D1=82=D0=BE=D0=B2?= Date: Fri, 6 Dec 2024 23:11:48 +0300 Subject: [PATCH 4/8] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20.gitea/workflows/job=5Flint.yml?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/job_lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitea/workflows/job_lint.yml b/.gitea/workflows/job_lint.yml index ee3238f2..1194df7d 100644 --- a/.gitea/workflows/job_lint.yml +++ b/.gitea/workflows/job_lint.yml @@ -2,7 +2,7 @@ name: lint on: pull_request: - types: [opened, reopened, closed, synchronize] + types: [opened, reopened, synchronize] branches: - master - v3 From 9213fd212f8574b9564be7d62959322629d469ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D1=81=D0=B8=D0=BB=D0=B8=D0=B9=20=D0=A2=D0=BE?= =?UTF-8?q?=D0=BB=D1=81=D1=82=D0=BE=D0=B2?= Date: Fri, 6 Dec 2024 23:13:24 +0300 Subject: [PATCH 5/8] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20.gitea/workflows/job=5Flint.yml?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/job_lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitea/workflows/job_lint.yml b/.gitea/workflows/job_lint.yml index 1194df7d..a47bf298 100644 --- a/.gitea/workflows/job_lint.yml +++ b/.gitea/workflows/job_lint.yml @@ -23,4 +23,4 @@ jobs: - name: lint uses: https://github.com/golangci/golangci-lint-action@v6 with: - version: v1.62.2 + version: 'latest' From 0e66688f8f5c05b94b93d4806211d5f842a772cc Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 7 Dec 2024 14:18:25 +0300 Subject: [PATCH 6/8] logger/slog: add option to pass slog.Handler Signed-off-by: Vasiliy Tolstov --- logger/slog/slog.go | 20 +++++++++++++++++++- store/store.go | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/logger/slog/slog.go b/logger/slog/slog.go index 4923e6a5..8a4260f1 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -171,7 +171,19 @@ func (s *slogLogger) Init(opts ...logger.Option) error { } attrs, _ := s.argsAttrs(s.opts.Fields) - s.handler = &wrapper{h: slog.NewJSONHandler(s.opts.Out, handleOpt).WithAttrs(attrs)} + + var h slog.Handler + if s.opts.Context != nil { + if v, ok := s.opts.Context.Value(handlerKey{}).(slog.Handler); ok && v != nil { + h = v + } + } + + if h == nil { + h = slog.NewJSONHandler(s.opts.Out, handleOpt) + } + + s.handler = &wrapper{h: h.WithAttrs(attrs)} s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level))) s.mu.Unlock() @@ -329,3 +341,9 @@ func (s *slogLogger) argsAttrs(args []interface{}) ([]slog.Attr, error) { return attrs, err } + +type handlerKey struct{} + +func WithHandler(h slog.Handler) logger.Option { + return logger.SetOption(handlerKey{}, h) +} diff --git a/store/store.go b/store/store.go index fcd0c987..d93b58d3 100644 --- a/store/store.go +++ b/store/store.go @@ -10,7 +10,7 @@ import ( var ( ErrWatcherStopped = errors.New("watcher stopped") // ErrNotConnected is returned when a store is not connected - ErrNotConnected = errors.New("not conected") + ErrNotConnected = errors.New("not connected") // ErrNotFound is returned when a key doesn't exist ErrNotFound = errors.New("not found") // ErrInvalidKey is returned when a key has empty or have invalid format From d9b822deffb74fef3b6e01687fce31d267af4238 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 7 Dec 2024 16:16:45 +0300 Subject: [PATCH 7/8] logger/slog: add ability to pass func that creates slog.Handler compatible interface Signed-off-by: Vasiliy Tolstov --- logger/slog/slog.go | 17 +++++++++++++++++ logger/slog/slog_test.go | 18 ++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/logger/slog/slog.go b/logger/slog/slog.go index 8a4260f1..534675f5 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "os" + "reflect" "regexp" "runtime" "strconv" @@ -177,6 +178,16 @@ func (s *slogLogger) Init(opts ...logger.Option) error { if v, ok := s.opts.Context.Value(handlerKey{}).(slog.Handler); ok && v != nil { h = v } + + if fn := s.opts.Context.Value(handlerFnKey{}); fn != nil { + if rfn := reflect.ValueOf(fn); rfn.Kind() == reflect.Func { + if ret := rfn.Call([]reflect.Value{reflect.ValueOf(s.opts.Out), reflect.ValueOf(handleOpt)}); len(ret) == 1 { + if iface, ok := ret[0].Interface().(slog.Handler); ok && iface != nil { + h = iface + } + } + } + } } if h == nil { @@ -347,3 +358,9 @@ type handlerKey struct{} func WithHandler(h slog.Handler) logger.Option { return logger.SetOption(handlerKey{}, h) } + +type handlerFnKey struct{} + +func WithHandlerFunc(fn any) logger.Option { + return logger.SetOption(handlerFnKey{}, fn) +} diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index ba9b69ba..24a60bde 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "log/slog" "strings" "testing" @@ -15,6 +16,23 @@ import ( "go.unistack.org/micro/v3/logger" ) +func TestWithHandlerFunc(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + l := NewLogger(logger.WithLevel(logger.InfoLevel), logger.WithOutput(buf), + WithHandlerFunc(slog.NewTextHandler), + ) + if err := l.Init(); err != nil { + t.Fatal(err) + } + + l.Info(ctx, "msg1") + + if !bytes.Contains(buf.Bytes(), []byte(`msg=msg1`)) { + t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) + } +} + func TestWithAddFields(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) From b6a0e4d983816d9b1cff12c0a67fd90f35c4ec17 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 9 Dec 2024 00:41:08 +0300 Subject: [PATCH 8/8] add metrics for dns Signed-off-by: Vasiliy Tolstov --- client/noop.go | 4 ++-- semconv/cache.go | 14 ++++++++++++++ semconv/store.go | 2 +- service.go | 5 ++++- util/dns/cache.go | 35 ++++++++++++++++++++++++++++++++++- util/dns/cache_test.go | 8 +++++++- util/dns/conn.go | 12 +++++------- 7 files changed, 67 insertions(+), 13 deletions(-) create mode 100644 semconv/cache.go diff --git a/client/noop.go b/client/noop.go index ad0fc2a8..dc24e1ec 100644 --- a/client/noop.go +++ b/client/noop.go @@ -222,7 +222,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt ts := time.Now() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() var sp tracer.Span - ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client", tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanLabels("endpoint", req.Endpoint()), ) @@ -385,7 +385,7 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption ts := time.Now() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() var sp tracer.Span - ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client", tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanLabels("endpoint", req.Endpoint()), ) diff --git a/semconv/cache.go b/semconv/cache.go new file mode 100644 index 00000000..d23d0cb5 --- /dev/null +++ b/semconv/cache.go @@ -0,0 +1,14 @@ +package semconv + +var ( + // CacheRequestDurationSeconds specifies meter metric name + CacheRequestDurationSeconds = "micro_cache_request_duration_seconds" + // CacheRequestLatencyMicroseconds specifies meter metric name + CacheRequestLatencyMicroseconds = "micro_cache_request_latency_microseconds" + // CacheRequestTotal specifies meter metric name + CacheRequestTotal = "micro_cache_request_total" + // CacheRequestInflight specifies meter metric name + CacheRequestInflight = "micro_cache_request_inflight" + // CacheItemsTotal specifies total cache items + CacheItemsTotal = "micro_cache_items_total" +) diff --git a/semconv/store.go b/semconv/store.go index a9044f48..f1f2e684 100644 --- a/semconv/store.go +++ b/semconv/store.go @@ -3,7 +3,7 @@ package semconv var ( // StoreRequestDurationSeconds specifies meter metric name StoreRequestDurationSeconds = "micro_store_request_duration_seconds" - // ClientRequestLatencyMicroseconds specifies meter metric name + // StoreRequestLatencyMicroseconds specifies meter metric name StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds" // StoreRequestTotal specifies meter metric name StoreRequestTotal = "micro_store_request_total" diff --git a/service.go b/service.go index 469e823c..b5b053f8 100644 --- a/service.go +++ b/service.go @@ -34,7 +34,10 @@ func init() { ), ) - net.DefaultResolver = utildns.NewNetResolver(utildns.Timeout(1 * time.Second)) + net.DefaultResolver = utildns.NewNetResolver( + utildns.Timeout(1*time.Second), + utildns.MinCacheTTL(5*time.Second), + ) } // Service is an interface that wraps the lower level components. diff --git a/util/dns/cache.go b/util/dns/cache.go index 051cc041..3322b698 100644 --- a/util/dns/cache.go +++ b/util/dns/cache.go @@ -6,6 +6,9 @@ import ( "net" "sync" "time" + + "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/semconv" ) // DialFunc is a [net.Resolver.Dial] function. @@ -19,6 +22,11 @@ func NewNetResolver(opts ...Option) *net.Resolver { o(&options) } + if options.Meter == nil { + options.Meter = meter.DefaultMeter + opts = append(opts, Meter(options.Meter)) + } + return &net.Resolver{ PreferGo: true, StrictErrors: options.Resolver.StrictErrors, @@ -56,6 +64,7 @@ type Options struct { PreferIPV4 bool PreferIPV6 bool Timeout time.Duration + Meter meter.Meter } // MaxCacheEntries sets the maximum number of entries to cache. @@ -87,6 +96,13 @@ func NegativeCache(b bool) Option { } } +// Meter sets meter.Meter +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + // Timeout sets upstream *net.Resolver timeout func Timeout(td time.Duration) Option { return func(o *Options) { @@ -156,7 +172,6 @@ func (c *cache) put(req string, res string) { } c.Lock() - defer c.Unlock() if c.entries == nil { c.entries = make(map[string]cacheEntry) } @@ -165,6 +180,8 @@ func (c *cache) put(req string, res string) { var tested, evicted int for k, e := range c.entries { if time.Until(e.deadline) <= 0 { + c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec() + c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc() // delete expired entry delete(c.entries, k) evicted++ @@ -175,6 +192,8 @@ func (c *cache) put(req string, res string) { continue } if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries { + c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec() + c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc() // delete at least one entry delete(c.entries, k) } @@ -186,6 +205,9 @@ func (c *cache) put(req string, res string) { deadline: time.Now().Add(ttl), value: res[2:], } + + c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Inc() + c.Unlock() } func (c *cache) get(req string) (res string) { @@ -210,6 +232,7 @@ func (c *cache) get(req string) (res string) { // prepend correct ID return req[:2] + entry.value } + return "" } @@ -310,10 +333,19 @@ func getUint32(s string) int { func cachingRoundTrip(cache *cache, network, address string) roundTripper { return func(ctx context.Context, req string) (res string, err error) { + cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Inc() + defer cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Dec() // check cache if res := cache.get(req); res != "" { + cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "hit").Inc() return res, nil } + cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "miss").Inc() + ts := time.Now() + defer func() { + cache.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "type", "dns", "method", "get").UpdateDuration(ts) + cache.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "type", "dns", "method", "get").UpdateDuration(ts) + }() switch { case cache.opts.PreferIPV4 && cache.opts.PreferIPV6: @@ -340,6 +372,7 @@ func cachingRoundTrip(cache *cache, network, address string) roundTripper { var d net.Dialer conn, err = d.DialContext(ctx, network, address) } + if err != nil { return "", err } diff --git a/util/dns/cache_test.go b/util/dns/cache_test.go index 6e2fb1d1..1c939145 100644 --- a/util/dns/cache_test.go +++ b/util/dns/cache_test.go @@ -12,5 +12,11 @@ func TestCache(t *testing.T) { if err != nil { t.Fatal(err) } - t.Logf("addrs %v", addrs) + + addrs, err = net.LookupHost("unistack.org") + if err != nil { + t.Fatal(err) + } + + _ = addrs } diff --git a/util/dns/conn.go b/util/dns/conn.go index 8d71ad46..8cbb95e7 100644 --- a/util/dns/conn.go +++ b/util/dns/conn.go @@ -11,15 +11,13 @@ import ( ) type dnsConn struct { - sync.Mutex - - ibuf bytes.Buffer - obuf bytes.Buffer - + deadline time.Time ctx context.Context cancel context.CancelFunc - deadline time.Time roundTrip roundTripper + ibuf bytes.Buffer + obuf bytes.Buffer + sync.Mutex } type roundTripper func(ctx context.Context, req string) (res string, err error) @@ -78,8 +76,8 @@ func (c *dnsConn) SetDeadline(t time.Time) error { func (c *dnsConn) SetReadDeadline(t time.Time) error { c.Lock() - defer c.Unlock() c.deadline = t + c.Unlock() return nil }