Merge remote-tracking branch 'origin/v3' into v3

# Conflicts:
#	client/noop.go
#	errors/errors_test.go
#	logger/unwrap/unwrap_test.go
#	register/memory/memory_test.go
#	server/noop_test.go
#	service.go
#	util/dns/cache.go
#	util/dns/conn.go
#	util/structfs/metadata_ec2.go
#	util/time/duration.go
This commit is contained in:
Денис Евстигнеев 2024-12-09 13:18:13 +03:00
commit 5a9558d0a3
25 changed files with 248 additions and 123 deletions

View File

@ -1,24 +1,26 @@
name: lint name: lint
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize]
branches: branches:
- master - master
- v3 - v3
- v4
jobs: jobs:
lint: lint:
name: lint
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup-go - name: setup-go
uses: actions/setup-go@v3 uses: actions/setup-go@v5
with: with:
go-version: 1.21 go-version: 'stable'
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: deps - name: deps
run: go get -v -d ./... run: go get -v -d ./...
- name: lint - name: lint
uses: https://github.com/golangci/golangci-lint-action@v3.4.0 uses: https://github.com/golangci/golangci-lint-action@v6
continue-on-error: true
with: with:
version: v1.52 version: 'latest'

View File

@ -1,22 +1,30 @@
name: pr name: test
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize]
branches: branches:
- master - master
- v3 - v3
- v4
push:
branches:
- master
- v3
- v4
jobs: jobs:
test: test:
name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup-go
uses: actions/setup-go@v5
with:
go-version: 'stable'
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: setup-go
uses: actions/setup-go@v3
with:
go-version: 1.21
- name: deps - name: deps
run: go get -v -t -d ./... run: go get -v -d ./...
- name: test - name: test
env: env:
INTEGRATION_TESTS: yes INTEGRATION_TESTS: yes

View File

@ -1,44 +1,5 @@
run: run:
concurrency: 4 concurrency: 8
deadline: 5m deadline: 5m
issues-exit-code: 1 issues-exit-code: 1
tests: true tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- revive
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -222,7 +222,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
ts := time.Now() ts := time.Now()
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span var sp tracer.Span
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()), tracer.WithSpanLabels("endpoint", req.Endpoint()),
) )
@ -385,7 +385,7 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
ts := time.Now() ts := time.Now()
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span var sp tracer.Span
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()), tracer.WithSpanLabels("endpoint", req.Endpoint()),
) )

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"log/slog" "log/slog"
"os" "os"
"reflect"
"regexp" "regexp"
"runtime" "runtime"
"strconv" "strconv"
@ -171,7 +172,29 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
} }
attrs, _ := s.argsAttrs(s.opts.Fields) attrs, _ := s.argsAttrs(s.opts.Fields)
s.handler = &wrapper{h: slog.NewJSONHandler(s.opts.Out, handleOpt).WithAttrs(attrs)}
var h slog.Handler
if s.opts.Context != nil {
if v, ok := s.opts.Context.Value(handlerKey{}).(slog.Handler); ok && v != nil {
h = v
}
if fn := s.opts.Context.Value(handlerFnKey{}); fn != nil {
if rfn := reflect.ValueOf(fn); rfn.Kind() == reflect.Func {
if ret := rfn.Call([]reflect.Value{reflect.ValueOf(s.opts.Out), reflect.ValueOf(handleOpt)}); len(ret) == 1 {
if iface, ok := ret[0].Interface().(slog.Handler); ok && iface != nil {
h = iface
}
}
}
}
}
if h == nil {
h = slog.NewJSONHandler(s.opts.Out, handleOpt)
}
s.handler = &wrapper{h: h.WithAttrs(attrs)}
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level))) s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock() s.mu.Unlock()
@ -329,3 +352,15 @@ func (s *slogLogger) argsAttrs(args []interface{}) ([]slog.Attr, error) {
return attrs, err return attrs, err
} }
type handlerKey struct{}
func WithHandler(h slog.Handler) logger.Option {
return logger.SetOption(handlerKey{}, h)
}
type handlerFnKey struct{}
func WithHandlerFunc(fn any) logger.Option {
return logger.SetOption(handlerFnKey{}, fn)
}

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"log/slog"
"strings" "strings"
"testing" "testing"
@ -14,6 +15,23 @@ import (
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
) )
func TestWithHandlerFunc(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.InfoLevel), logger.WithOutput(buf),
WithHandlerFunc(slog.NewTextHandler),
)
if err := l.Init(); err != nil {
t.Fatal(err)
}
l.Info(ctx, "msg1")
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg1`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
func TestWithAddFields(t *testing.T) { func TestWithAddFields(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)

View File

@ -36,8 +36,8 @@ var (
circularShortBytes = []byte("<shown>") circularShortBytes = []byte("<shown>")
invalidAngleBytes = []byte("<invalid>") invalidAngleBytes = []byte("<invalid>")
filteredBytes = []byte("<filtered>") filteredBytes = []byte("<filtered>")
openBracketBytes = []byte("[") // openBracketBytes = []byte("[")
closeBracketBytes = []byte("]") // closeBracketBytes = []byte("]")
percentBytes = []byte("%") percentBytes = []byte("%")
precisionBytes = []byte(".") precisionBytes = []byte(".")
openAngleBytes = []byte("<") openAngleBytes = []byte("<")

View File

@ -82,12 +82,12 @@ func TestTagged(t *testing.T) {
func TestTaggedNested(t *testing.T) { func TestTaggedNested(t *testing.T) {
type val struct { type val struct {
key string `logger:"take"` key string `logger:"take"`
val string `logger:"omit"` // val string `logger:"omit"`
unk string unk string
} }
type str struct { type str struct {
// key string `logger:"omit"`
val *val `logger:"take"` val *val `logger:"take"`
key string `logger:"omit"`
} }
var iface interface{} var iface interface{}

View File

@ -83,6 +83,7 @@ func TestPassing(t *testing.T) {
if ok { if ok {
t.Fatalf("create outgoing context") t.Fatalf("create outgoing context")
} }
_ = md
ctx = NewOutgoingContext(ctx, New(1)) ctx = NewOutgoingContext(ctx, New(1))
testCtx(ctx) testCtx(ctx)

View File

@ -65,6 +65,8 @@ func As(b any, target any) bool {
break break
case targetType.Implements(routerType): case targetType.Implements(routerType):
break break
case targetType.Implements(tracerType):
break
default: default:
return false return false
} }

View File

@ -294,7 +294,7 @@ type loggerOptions struct {
brokers []string brokers []string
registers []string registers []string
stores []string stores []string
meters []string // meters []string
tracers []string tracers []string
} }

View File

@ -302,19 +302,17 @@ func TestWatcher(t *testing.T) {
} }
defer wc.Stop() defer wc.Stop()
cherr := make(chan error, 10)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
for {
_, err := wc.Next() _, err := wc.Next()
if err != nil { if err != nil {
t.Fatal("unexpected err", err) cherr <- fmt.Errorf("unexpected err %v", err)
} }
// t.Logf("changes %#+v", ch.Service) // t.Logf("changes %#+v", ch.Service)
wc.Stop() wc.Stop()
wg.Done() wg.Done()
return
}
}() }()
if err := m.Register(ctx, testSrv); err != nil { if err := m.Register(ctx, testSrv); err != nil {

14
semconv/cache.go Normal file
View File

@ -0,0 +1,14 @@
package semconv
var (
// CacheRequestDurationSeconds specifies meter metric name
CacheRequestDurationSeconds = "micro_cache_request_duration_seconds"
// CacheRequestLatencyMicroseconds specifies meter metric name
CacheRequestLatencyMicroseconds = "micro_cache_request_latency_microseconds"
// CacheRequestTotal specifies meter metric name
CacheRequestTotal = "micro_cache_request_total"
// CacheRequestInflight specifies meter metric name
CacheRequestInflight = "micro_cache_request_inflight"
// CacheItemsTotal specifies total cache items
CacheItemsTotal = "micro_cache_items_total"
)

View File

@ -3,7 +3,7 @@ package semconv
var ( var (
// StoreRequestDurationSeconds specifies meter metric name // StoreRequestDurationSeconds specifies meter metric name
StoreRequestDurationSeconds = "micro_store_request_duration_seconds" StoreRequestDurationSeconds = "micro_store_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name // StoreRequestLatencyMicroseconds specifies meter metric name
StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds" StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds"
// StoreRequestTotal specifies meter metric name // StoreRequestTotal specifies meter metric name
StoreRequestTotal = "micro_store_request_total" StoreRequestTotal = "micro_store_request_total"

View File

@ -171,7 +171,6 @@ type rpcMessage struct {
header metadata.Metadata header metadata.Metadata
topic string topic string
contentType string contentType string
body []byte
} }
func (r *rpcMessage) ContentType() string { func (r *rpcMessage) ContentType() string {

View File

@ -23,10 +23,8 @@ import (
) )
func init() { func init() {
if _, err := maxprocs.Set(); err != nil { _, _ = maxprocs.Set()
panic(err) _, _ = memlimit.SetGoMemLimitWithOpts(
}
if _, err := memlimit.SetGoMemLimitWithOpts(
memlimit.WithRatio(0.9), memlimit.WithRatio(0.9),
memlimit.WithProvider( memlimit.WithProvider(
memlimit.ApplyFallback( memlimit.ApplyFallback(
@ -34,11 +32,12 @@ func init() {
memlimit.FromSystem, memlimit.FromSystem,
), ),
), ),
); err != nil { )
panic(err)
}
net.DefaultResolver = utildns.NewNetResolver(utildns.Timeout(1 * time.Second)) net.DefaultResolver = utildns.NewNetResolver(
utildns.Timeout(1*time.Second),
utildns.MinCacheTTL(5*time.Second),
)
} }
// Service is an interface that wraps the lower level components. // Service is an interface that wraps the lower level components.

View File

@ -105,3 +105,7 @@ func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) {
return options, err return options, err
} }
func Watch(context.Context) (Watcher, error) {
return nil, nil
}

View File

@ -6,6 +6,9 @@ import (
"net" "net"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/semconv"
) )
// DialFunc is a [net.Resolver.Dial] function. // DialFunc is a [net.Resolver.Dial] function.
@ -19,6 +22,11 @@ func NewNetResolver(opts ...Option) *net.Resolver {
o(&options) o(&options)
} }
if options.Meter == nil {
options.Meter = meter.DefaultMeter
opts = append(opts, Meter(options.Meter))
}
return &net.Resolver{ return &net.Resolver{
PreferGo: true, PreferGo: true,
StrictErrors: options.Resolver.StrictErrors, StrictErrors: options.Resolver.StrictErrors,
@ -56,6 +64,7 @@ type Options struct {
PreferIPV4 bool PreferIPV4 bool
PreferIPV6 bool PreferIPV6 bool
Timeout time.Duration Timeout time.Duration
Meter meter.Meter
} }
// MaxCacheEntries sets the maximum number of entries to cache. // MaxCacheEntries sets the maximum number of entries to cache.
@ -87,6 +96,13 @@ func NegativeCache(b bool) Option {
} }
} }
// Meter sets meter.Meter
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// Timeout sets upstream *net.Resolver timeout // Timeout sets upstream *net.Resolver timeout
func Timeout(td time.Duration) Option { func Timeout(td time.Duration) Option {
return func(o *Options) { return func(o *Options) {
@ -156,7 +172,6 @@ func (c *cache) put(req string, res string) {
} }
c.Lock() c.Lock()
defer c.Unlock()
if c.entries == nil { if c.entries == nil {
c.entries = make(map[string]cacheEntry) c.entries = make(map[string]cacheEntry)
} }
@ -165,6 +180,8 @@ func (c *cache) put(req string, res string) {
var tested, evicted int var tested, evicted int
for k, e := range c.entries { for k, e := range c.entries {
if time.Until(e.deadline) <= 0 { if time.Until(e.deadline) <= 0 {
c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec()
c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc()
// delete expired entry // delete expired entry
delete(c.entries, k) delete(c.entries, k)
evicted++ evicted++
@ -175,6 +192,8 @@ func (c *cache) put(req string, res string) {
continue continue
} }
if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries { if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries {
c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec()
c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc()
// delete at least one entry // delete at least one entry
delete(c.entries, k) delete(c.entries, k)
} }
@ -186,6 +205,9 @@ func (c *cache) put(req string, res string) {
deadline: time.Now().Add(ttl), deadline: time.Now().Add(ttl),
value: res[2:], value: res[2:],
} }
c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Inc()
c.Unlock()
} }
func (c *cache) get(req string) (res string) { func (c *cache) get(req string) (res string) {
@ -210,6 +232,7 @@ func (c *cache) get(req string) (res string) {
// prepend correct ID // prepend correct ID
return req[:2] + entry.value return req[:2] + entry.value
} }
return "" return ""
} }
@ -310,10 +333,18 @@ func getUint32(s string) int {
func cachingRoundTrip(cache *cache, network, address string) roundTripper { func cachingRoundTrip(cache *cache, network, address string) roundTripper {
return func(ctx context.Context, req string) (res string, err error) { return func(ctx context.Context, req string) (res string, err error) {
cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Inc()
defer cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Dec()
// check cache // check cache
if res = cache.get(req); res != "" { if res = cache.get(req); res != "" {
return res, nil return res, nil
} }
cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "miss").Inc()
ts := time.Now()
defer func() {
cache.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "type", "dns", "method", "get").UpdateDuration(ts)
cache.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "type", "dns", "method", "get").UpdateDuration(ts)
}()
switch { switch {
case cache.opts.PreferIPV4 && cache.opts.PreferIPV6: case cache.opts.PreferIPV4 && cache.opts.PreferIPV6:
@ -340,6 +371,7 @@ func cachingRoundTrip(cache *cache, network, address string) roundTripper {
var d net.Dialer var d net.Dialer
conn, err = d.DialContext(ctx, network, address) conn, err = d.DialContext(ctx, network, address)
} }
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -12,5 +12,11 @@ func TestCache(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Logf("addrs %v", addrs)
addrs, err = net.LookupHost("unistack.org")
if err != nil {
t.Fatal(err)
}
_ = addrs
} }

View File

@ -67,15 +67,20 @@ func (c *dnsConn) RemoteAddr() net.Addr {
} }
func (c *dnsConn) SetDeadline(t time.Time) error { func (c *dnsConn) SetDeadline(t time.Time) error {
_ = c.SetReadDeadline(t) var err error
_ = c.SetWriteDeadline(t) if err = c.SetReadDeadline(t); err != nil {
return err
}
if err = c.SetWriteDeadline(t); err != nil {
return err
}
return nil return nil
} }
func (c *dnsConn) SetReadDeadline(t time.Time) error { func (c *dnsConn) SetReadDeadline(t time.Time) error {
c.Lock() c.Lock()
defer c.Unlock()
c.deadline = t c.deadline = t
c.Unlock()
return nil return nil
} }

View File

@ -16,7 +16,6 @@ type Ticker struct {
C chan time.Time C chan time.Time
min int64 min int64
max int64 max int64
exp int64
exit bool exit bool
rng rand.Rand rng rand.Rand
} }

View File

@ -91,7 +91,7 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
} }
if mapper, ok := dst.(map[string]interface{}); ok { if mapper, ok := dst.(map[string]interface{}); ok {
dst = mergeMap(mapper, mp, 0) mergeMap(mapper, mp, 0)
return nil return nil
} }

View File

@ -1,9 +1,38 @@
package reflect package reflect
import ( import (
"fmt"
"testing" "testing"
) )
func TestMergeMapStringInterface(t *testing.T) {
var dst interface{} //nolint:gosimple
dst = map[string]interface{}{
"xx": 11,
}
src := map[string]interface{}{
"zz": "aa",
}
if err := Merge(dst, src); err != nil {
t.Fatal(err)
}
mp, ok := dst.(map[string]interface{})
if !ok || mp == nil {
t.Fatalf("xxx %#+v\n", dst)
}
if fmt.Sprintf("%v", mp["xx"]) != "11" {
t.Fatalf("xxx zzzz %#+v", mp)
}
if fmt.Sprintf("%v", mp["zz"]) != "aa" {
t.Fatalf("xxx zzzz %#+v", mp)
}
}
func TestMergeMap(t *testing.T) { func TestMergeMap(t *testing.T) {
src := map[string]interface{}{ src := map[string]interface{}{
"skey1": "sval1", "skey1": "sval1",

View File

@ -221,6 +221,7 @@ func getValue(name string, iface interface{}, tag string) ([]byte, error) {
return nil, fmt.Errorf("failed to find %s in interface %T", name, iface) return nil, fmt.Errorf("failed to find %s in interface %T", name, iface)
} }
/*
func hasValidType(obj interface{}, types []reflect.Kind) bool { func hasValidType(obj interface{}, types []reflect.Kind) bool {
for _, t := range types { for _, t := range types {
if reflect.TypeOf(obj).Kind() == t { if reflect.TypeOf(obj).Kind() == t {
@ -230,6 +231,7 @@ func hasValidType(obj interface{}, types []reflect.Kind) bool {
return false return false
} }
*/
func reflectValue(obj interface{}) reflect.Value { func reflectValue(obj interface{}) reflect.Value {
var val reflect.Value var val reflect.Value

View File

@ -61,7 +61,7 @@ var doOrig = []byte(`{
} }
`) `)
func server(t *testing.T) { func server(t *testing.T, ch chan error) {
stfs := DigitalOceanMetadata{} stfs := DigitalOceanMetadata{}
err := json.Unmarshal(doOrig, &stfs.Metadata.V1) err := json.Unmarshal(doOrig, &stfs.Metadata.V1)
if err != nil { if err != nil {
@ -71,7 +71,7 @@ func server(t *testing.T) {
http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now())) http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now()))
http.Handle("/metadata/v1.json", &stfs) http.Handle("/metadata/v1.json", &stfs)
go func() { go func() {
t.Fatal(http.ListenAndServe("127.0.0.1:8080", nil)) ch <- http.ListenAndServe("127.0.0.1:8080", nil)
}() }()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
@ -86,7 +86,8 @@ func get(path string) ([]byte, error) {
} }
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
server(t) ch := make(chan error)
server(t, ch)
tests := []struct { tests := []struct {
in string in string
@ -100,6 +101,10 @@ func TestAll(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
select {
case err := <-ch:
t.Fatal(err)
default:
buf, err := get(tt.in) buf, err := get(tt.in)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -108,7 +113,12 @@ func TestAll(t *testing.T) {
t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out) t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out)
} }
} }
}
select {
case err := <-ch:
t.Fatal(err)
default:
doTest, err := get("http://127.0.0.1:8080/metadata/v1.json") doTest, err := get("http://127.0.0.1:8080/metadata/v1.json")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -130,4 +140,5 @@ func TestAll(t *testing.T) {
if !reflect.DeepEqual(oSt, nSt) { if !reflect.DeepEqual(oSt, nSt) {
t.Fatalf("%v not match %v", oSt, nSt) t.Fatalf("%v not match %v", oSt, nSt)
} }
}
} }