diff --git a/.gitea/workflows/lint.yml b/.gitea/workflows/job_lint.yml similarity index 57% rename from .gitea/workflows/lint.yml rename to .gitea/workflows/job_lint.yml index 1cc293dc..a47bf298 100644 --- a/.gitea/workflows/lint.yml +++ b/.gitea/workflows/job_lint.yml @@ -1,24 +1,26 @@ name: lint + on: pull_request: + types: [opened, reopened, synchronize] branches: - master - v3 + - v4 + jobs: lint: - name: lint runs-on: ubuntu-latest steps: - name: setup-go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: - go-version: 1.21 + go-version: 'stable' - name: checkout uses: actions/checkout@v3 - name: deps run: go get -v -d ./... - name: lint - uses: https://github.com/golangci/golangci-lint-action@v3.4.0 - continue-on-error: true + uses: https://github.com/golangci/golangci-lint-action@v6 with: - version: v1.52 + version: 'latest' diff --git a/.gitea/workflows/pr.yml b/.gitea/workflows/job_test.yml similarity index 59% rename from .gitea/workflows/pr.yml rename to .gitea/workflows/job_test.yml index b3e67b06..15716f1d 100644 --- a/.gitea/workflows/pr.yml +++ b/.gitea/workflows/job_test.yml @@ -1,22 +1,30 @@ -name: pr +name: test + on: pull_request: + types: [opened, reopened, synchronize] branches: - master - v3 + - v4 + push: + branches: + - master + - v3 + - v4 + jobs: test: - name: test runs-on: ubuntu-latest steps: + - name: setup-go + uses: actions/setup-go@v5 + with: + go-version: 'stable' - name: checkout uses: actions/checkout@v3 - - name: setup-go - uses: actions/setup-go@v3 - with: - go-version: 1.21 - name: deps - run: go get -v -t -d ./... + run: go get -v -d ./... - name: test env: INTEGRATION_TESTS: yes diff --git a/.golangci.yml b/.golangci.yml index 524fc7f8..2bb1c300 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,44 +1,5 @@ run: - concurrency: 4 + concurrency: 8 deadline: 5m issues-exit-code: 1 tests: true - -linters-settings: - govet: - check-shadowing: true - enable: - - fieldalignment - -linters: - enable: - - govet - - deadcode - - errcheck - - govet - - ineffassign - - staticcheck - - structcheck - - typecheck - - unused - - varcheck - - bodyclose - - gci - - goconst - - gocritic - - gosimple - - gofmt - - gofumpt - - goimports - - revive - - gosec - - makezero - - misspell - - nakedret - - nestif - - nilerr - - noctx - - prealloc - - unconvert - - unparam - disable-all: false diff --git a/client/noop.go b/client/noop.go index 7d570fca..db960e62 100644 --- a/client/noop.go +++ b/client/noop.go @@ -222,7 +222,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt ts := time.Now() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() var sp tracer.Span - ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client", tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanLabels("endpoint", req.Endpoint()), ) @@ -385,7 +385,7 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption ts := time.Now() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() var sp tracer.Span - ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client", tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanLabels("endpoint", req.Endpoint()), ) diff --git a/logger/slog/slog.go b/logger/slog/slog.go index ddb46955..ad0aacb1 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "os" + "reflect" "regexp" "runtime" "strconv" @@ -171,7 +172,29 @@ func (s *slogLogger) Init(opts ...logger.Option) error { } attrs, _ := s.argsAttrs(s.opts.Fields) - s.handler = &wrapper{h: slog.NewJSONHandler(s.opts.Out, handleOpt).WithAttrs(attrs)} + + var h slog.Handler + if s.opts.Context != nil { + if v, ok := s.opts.Context.Value(handlerKey{}).(slog.Handler); ok && v != nil { + h = v + } + + if fn := s.opts.Context.Value(handlerFnKey{}); fn != nil { + if rfn := reflect.ValueOf(fn); rfn.Kind() == reflect.Func { + if ret := rfn.Call([]reflect.Value{reflect.ValueOf(s.opts.Out), reflect.ValueOf(handleOpt)}); len(ret) == 1 { + if iface, ok := ret[0].Interface().(slog.Handler); ok && iface != nil { + h = iface + } + } + } + } + } + + if h == nil { + h = slog.NewJSONHandler(s.opts.Out, handleOpt) + } + + s.handler = &wrapper{h: h.WithAttrs(attrs)} s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level))) s.mu.Unlock() @@ -329,3 +352,15 @@ func (s *slogLogger) argsAttrs(args []interface{}) ([]slog.Attr, error) { return attrs, err } + +type handlerKey struct{} + +func WithHandler(h slog.Handler) logger.Option { + return logger.SetOption(handlerKey{}, h) +} + +type handlerFnKey struct{} + +func WithHandlerFunc(fn any) logger.Option { + return logger.SetOption(handlerFnKey{}, fn) +} diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index 7350a8f3..6aa73e81 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "log/slog" "strings" "testing" @@ -14,6 +15,23 @@ import ( "go.unistack.org/micro/v3/metadata" ) +func TestWithHandlerFunc(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + l := NewLogger(logger.WithLevel(logger.InfoLevel), logger.WithOutput(buf), + WithHandlerFunc(slog.NewTextHandler), + ) + if err := l.Init(); err != nil { + t.Fatal(err) + } + + l.Info(ctx, "msg1") + + if !bytes.Contains(buf.Bytes(), []byte(`msg=msg1`)) { + t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) + } +} + func TestWithAddFields(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) diff --git a/logger/unwrap/unwrap.go b/logger/unwrap/unwrap.go index f1f615a7..8bab179d 100644 --- a/logger/unwrap/unwrap.go +++ b/logger/unwrap/unwrap.go @@ -36,14 +36,14 @@ var ( circularShortBytes = []byte("") invalidAngleBytes = []byte("") filteredBytes = []byte("") - openBracketBytes = []byte("[") - closeBracketBytes = []byte("]") - percentBytes = []byte("%") - precisionBytes = []byte(".") - openAngleBytes = []byte("<") - closeAngleBytes = []byte(">") - openMapBytes = []byte("{") - closeMapBytes = []byte("}") + // openBracketBytes = []byte("[") + // closeBracketBytes = []byte("]") + percentBytes = []byte("%") + precisionBytes = []byte(".") + openAngleBytes = []byte("<") + closeAngleBytes = []byte(">") + openMapBytes = []byte("{") + closeMapBytes = []byte("}") ) type protoMessage interface { diff --git a/logger/unwrap/unwrap_test.go b/logger/unwrap/unwrap_test.go index 79f816e8..af55db0a 100644 --- a/logger/unwrap/unwrap_test.go +++ b/logger/unwrap/unwrap_test.go @@ -82,12 +82,12 @@ func TestTagged(t *testing.T) { func TestTaggedNested(t *testing.T) { type val struct { key string `logger:"take"` - val string `logger:"omit"` + // val string `logger:"omit"` unk string } type str struct { - val *val `logger:"take"` - key string `logger:"omit"` + // key string `logger:"omit"` + val *val `logger:"take"` } var iface interface{} diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 84738f81..a0932d39 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -83,6 +83,7 @@ func TestPassing(t *testing.T) { if ok { t.Fatalf("create outgoing context") } + _ = md ctx = NewOutgoingContext(ctx, New(1)) testCtx(ctx) diff --git a/micro.go b/micro.go index 9a9f637d..e2dc1e62 100644 --- a/micro.go +++ b/micro.go @@ -65,6 +65,8 @@ func As(b any, target any) bool { break case targetType.Implements(routerType): break + case targetType.Implements(tracerType): + break default: return false } diff --git a/options.go b/options.go index a7172acc..cee1b9bb 100644 --- a/options.go +++ b/options.go @@ -294,8 +294,8 @@ type loggerOptions struct { brokers []string registers []string stores []string - meters []string - tracers []string + // meters []string + tracers []string } /* diff --git a/register/memory/memory_test.go b/register/memory/memory_test.go index 8d2619ec..03928dcb 100644 --- a/register/memory/memory_test.go +++ b/register/memory/memory_test.go @@ -302,19 +302,17 @@ func TestWatcher(t *testing.T) { } defer wc.Stop() + cherr := make(chan error, 10) var wg sync.WaitGroup wg.Add(1) go func() { - for { - _, err := wc.Next() - if err != nil { - t.Fatal("unexpected err", err) - } - // t.Logf("changes %#+v", ch.Service) - wc.Stop() - wg.Done() - return + _, err := wc.Next() + if err != nil { + cherr <- fmt.Errorf("unexpected err %v", err) } + // t.Logf("changes %#+v", ch.Service) + wc.Stop() + wg.Done() }() if err := m.Register(ctx, testSrv); err != nil { diff --git a/semconv/cache.go b/semconv/cache.go new file mode 100644 index 00000000..d23d0cb5 --- /dev/null +++ b/semconv/cache.go @@ -0,0 +1,14 @@ +package semconv + +var ( + // CacheRequestDurationSeconds specifies meter metric name + CacheRequestDurationSeconds = "micro_cache_request_duration_seconds" + // CacheRequestLatencyMicroseconds specifies meter metric name + CacheRequestLatencyMicroseconds = "micro_cache_request_latency_microseconds" + // CacheRequestTotal specifies meter metric name + CacheRequestTotal = "micro_cache_request_total" + // CacheRequestInflight specifies meter metric name + CacheRequestInflight = "micro_cache_request_inflight" + // CacheItemsTotal specifies total cache items + CacheItemsTotal = "micro_cache_items_total" +) diff --git a/semconv/store.go b/semconv/store.go index a9044f48..f1f2e684 100644 --- a/semconv/store.go +++ b/semconv/store.go @@ -3,7 +3,7 @@ package semconv var ( // StoreRequestDurationSeconds specifies meter metric name StoreRequestDurationSeconds = "micro_store_request_duration_seconds" - // ClientRequestLatencyMicroseconds specifies meter metric name + // StoreRequestLatencyMicroseconds specifies meter metric name StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds" // StoreRequestTotal specifies meter metric name StoreRequestTotal = "micro_store_request_total" diff --git a/server/noop.go b/server/noop.go index d19ffa17..d742706c 100644 --- a/server/noop.go +++ b/server/noop.go @@ -171,7 +171,6 @@ type rpcMessage struct { header metadata.Metadata topic string contentType string - body []byte } func (r *rpcMessage) ContentType() string { diff --git a/service.go b/service.go index cf661c44..b5b053f8 100644 --- a/service.go +++ b/service.go @@ -23,10 +23,8 @@ import ( ) func init() { - if _, err := maxprocs.Set(); err != nil { - panic(err) - } - if _, err := memlimit.SetGoMemLimitWithOpts( + _, _ = maxprocs.Set() + _, _ = memlimit.SetGoMemLimitWithOpts( memlimit.WithRatio(0.9), memlimit.WithProvider( memlimit.ApplyFallback( @@ -34,11 +32,12 @@ func init() { memlimit.FromSystem, ), ), - ); err != nil { - panic(err) - } + ) - net.DefaultResolver = utildns.NewNetResolver(utildns.Timeout(1 * time.Second)) + net.DefaultResolver = utildns.NewNetResolver( + utildns.Timeout(1*time.Second), + utildns.MinCacheTTL(5*time.Second), + ) } // Service is an interface that wraps the lower level components. diff --git a/store/store.go b/store/store.go index 9c02d649..531a022c 100644 --- a/store/store.go +++ b/store/store.go @@ -105,3 +105,7 @@ func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) { return options, err } + +func Watch(context.Context) (Watcher, error) { + return nil, nil +} diff --git a/util/dns/cache.go b/util/dns/cache.go index de184819..f9b06bde 100644 --- a/util/dns/cache.go +++ b/util/dns/cache.go @@ -6,6 +6,9 @@ import ( "net" "sync" "time" + + "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/semconv" ) // DialFunc is a [net.Resolver.Dial] function. @@ -19,6 +22,11 @@ func NewNetResolver(opts ...Option) *net.Resolver { o(&options) } + if options.Meter == nil { + options.Meter = meter.DefaultMeter + opts = append(opts, Meter(options.Meter)) + } + return &net.Resolver{ PreferGo: true, StrictErrors: options.Resolver.StrictErrors, @@ -56,6 +64,7 @@ type Options struct { PreferIPV4 bool PreferIPV6 bool Timeout time.Duration + Meter meter.Meter } // MaxCacheEntries sets the maximum number of entries to cache. @@ -87,6 +96,13 @@ func NegativeCache(b bool) Option { } } +// Meter sets meter.Meter +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + // Timeout sets upstream *net.Resolver timeout func Timeout(td time.Duration) Option { return func(o *Options) { @@ -156,7 +172,6 @@ func (c *cache) put(req string, res string) { } c.Lock() - defer c.Unlock() if c.entries == nil { c.entries = make(map[string]cacheEntry) } @@ -165,6 +180,8 @@ func (c *cache) put(req string, res string) { var tested, evicted int for k, e := range c.entries { if time.Until(e.deadline) <= 0 { + c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec() + c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc() // delete expired entry delete(c.entries, k) evicted++ @@ -175,6 +192,8 @@ func (c *cache) put(req string, res string) { continue } if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries { + c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec() + c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc() // delete at least one entry delete(c.entries, k) } @@ -186,6 +205,9 @@ func (c *cache) put(req string, res string) { deadline: time.Now().Add(ttl), value: res[2:], } + + c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Inc() + c.Unlock() } func (c *cache) get(req string) (res string) { @@ -210,6 +232,7 @@ func (c *cache) get(req string) (res string) { // prepend correct ID return req[:2] + entry.value } + return "" } @@ -310,10 +333,18 @@ func getUint32(s string) int { func cachingRoundTrip(cache *cache, network, address string) roundTripper { return func(ctx context.Context, req string) (res string, err error) { + cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Inc() + defer cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Dec() // check cache if res = cache.get(req); res != "" { return res, nil } + cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "miss").Inc() + ts := time.Now() + defer func() { + cache.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "type", "dns", "method", "get").UpdateDuration(ts) + cache.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "type", "dns", "method", "get").UpdateDuration(ts) + }() switch { case cache.opts.PreferIPV4 && cache.opts.PreferIPV6: @@ -340,6 +371,7 @@ func cachingRoundTrip(cache *cache, network, address string) roundTripper { var d net.Dialer conn, err = d.DialContext(ctx, network, address) } + if err != nil { return "", err } diff --git a/util/dns/cache_test.go b/util/dns/cache_test.go index 6e2fb1d1..1c939145 100644 --- a/util/dns/cache_test.go +++ b/util/dns/cache_test.go @@ -12,5 +12,11 @@ func TestCache(t *testing.T) { if err != nil { t.Fatal(err) } - t.Logf("addrs %v", addrs) + + addrs, err = net.LookupHost("unistack.org") + if err != nil { + t.Fatal(err) + } + + _ = addrs } diff --git a/util/dns/conn.go b/util/dns/conn.go index e7e11eea..e8b5f426 100644 --- a/util/dns/conn.go +++ b/util/dns/conn.go @@ -67,15 +67,20 @@ func (c *dnsConn) RemoteAddr() net.Addr { } func (c *dnsConn) SetDeadline(t time.Time) error { - _ = c.SetReadDeadline(t) - _ = c.SetWriteDeadline(t) + var err error + if err = c.SetReadDeadline(t); err != nil { + return err + } + if err = c.SetWriteDeadline(t); err != nil { + return err + } return nil } func (c *dnsConn) SetReadDeadline(t time.Time) error { c.Lock() - defer c.Unlock() c.deadline = t + c.Unlock() return nil } diff --git a/util/jitter/ticker.go b/util/jitter/ticker.go index f4184899..b5b86f70 100644 --- a/util/jitter/ticker.go +++ b/util/jitter/ticker.go @@ -16,7 +16,6 @@ type Ticker struct { C chan time.Time min int64 max int64 - exp int64 exit bool rng rand.Rand } diff --git a/util/reflect/reflect.go b/util/reflect/reflect.go index 729b4e02..859beb20 100644 --- a/util/reflect/reflect.go +++ b/util/reflect/reflect.go @@ -91,7 +91,7 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error { } if mapper, ok := dst.(map[string]interface{}); ok { - dst = mergeMap(mapper, mp, 0) + mergeMap(mapper, mp, 0) return nil } diff --git a/util/reflect/reflect_test.go b/util/reflect/reflect_test.go index ccb78e49..cbfc13fc 100644 --- a/util/reflect/reflect_test.go +++ b/util/reflect/reflect_test.go @@ -1,9 +1,38 @@ package reflect import ( + "fmt" "testing" ) +func TestMergeMapStringInterface(t *testing.T) { + var dst interface{} //nolint:gosimple + dst = map[string]interface{}{ + "xx": 11, + } + + src := map[string]interface{}{ + "zz": "aa", + } + + if err := Merge(dst, src); err != nil { + t.Fatal(err) + } + + mp, ok := dst.(map[string]interface{}) + if !ok || mp == nil { + t.Fatalf("xxx %#+v\n", dst) + } + + if fmt.Sprintf("%v", mp["xx"]) != "11" { + t.Fatalf("xxx zzzz %#+v", mp) + } + + if fmt.Sprintf("%v", mp["zz"]) != "aa" { + t.Fatalf("xxx zzzz %#+v", mp) + } +} + func TestMergeMap(t *testing.T) { src := map[string]interface{}{ "skey1": "sval1", diff --git a/util/structfs/structfs.go b/util/structfs/structfs.go index 462200db..3cffc750 100644 --- a/util/structfs/structfs.go +++ b/util/structfs/structfs.go @@ -221,6 +221,7 @@ func getValue(name string, iface interface{}, tag string) ([]byte, error) { return nil, fmt.Errorf("failed to find %s in interface %T", name, iface) } +/* func hasValidType(obj interface{}, types []reflect.Kind) bool { for _, t := range types { if reflect.TypeOf(obj).Kind() == t { @@ -230,6 +231,7 @@ func hasValidType(obj interface{}, types []reflect.Kind) bool { return false } +*/ func reflectValue(obj interface{}) reflect.Value { var val reflect.Value diff --git a/util/structfs/structfs_test.go b/util/structfs/structfs_test.go index a245f3b1..ccf18bad 100644 --- a/util/structfs/structfs_test.go +++ b/util/structfs/structfs_test.go @@ -61,7 +61,7 @@ var doOrig = []byte(`{ } `) -func server(t *testing.T) { +func server(t *testing.T, ch chan error) { stfs := DigitalOceanMetadata{} err := json.Unmarshal(doOrig, &stfs.Metadata.V1) if err != nil { @@ -71,7 +71,7 @@ func server(t *testing.T) { http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now())) http.Handle("/metadata/v1.json", &stfs) go func() { - t.Fatal(http.ListenAndServe("127.0.0.1:8080", nil)) + ch <- http.ListenAndServe("127.0.0.1:8080", nil) }() time.Sleep(2 * time.Second) } @@ -86,7 +86,8 @@ func get(path string) ([]byte, error) { } func TestAll(t *testing.T) { - server(t) + ch := make(chan error) + server(t, ch) tests := []struct { in string @@ -100,34 +101,44 @@ func TestAll(t *testing.T) { } for _, tt := range tests { - buf, err := get(tt.in) + select { + case err := <-ch: + t.Fatal(err) + default: + buf, err := get(tt.in) + if err != nil { + t.Fatal(err) + } + if string(buf) != tt.out { + t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out) + } + } + } + + select { + case err := <-ch: + t.Fatal(err) + default: + doTest, err := get("http://127.0.0.1:8080/metadata/v1.json") if err != nil { t.Fatal(err) } - if string(buf) != tt.out { - t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out) + + oSt := DigitalOceanMetadata{} + err = json.Unmarshal(doOrig, &oSt.Metadata.V1) + if err != nil { + t.Fatal(err) + } + + nSt := DigitalOceanMetadata{} + + err = json.Unmarshal(doTest, &nSt.Metadata.V1) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(oSt, nSt) { + t.Fatalf("%v not match %v", oSt, nSt) } } - - doTest, err := get("http://127.0.0.1:8080/metadata/v1.json") - if err != nil { - t.Fatal(err) - } - - oSt := DigitalOceanMetadata{} - err = json.Unmarshal(doOrig, &oSt.Metadata.V1) - if err != nil { - t.Fatal(err) - } - - nSt := DigitalOceanMetadata{} - - err = json.Unmarshal(doTest, &nSt.Metadata.V1) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(oSt, nSt) { - t.Fatalf("%v not match %v", oSt, nSt) - } }