Compare commits
6 Commits
v3.10.109
...
15e9310368
| Author | SHA1 | Date | |
|---|---|---|---|
| 15e9310368 | |||
|
|
16d8cf3434 | ||
| 9704ef2e5e | |||
| 94e8f90f00 | |||
| 34d1587881 | |||
| bf4143cde5 |
@@ -1,24 +1,26 @@
|
|||||||
name: lint
|
name: lint
|
||||||
|
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
|
types: [opened, reopened, closed, synchronize]
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
- v3
|
- v3
|
||||||
|
- v4
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
lint:
|
lint:
|
||||||
name: lint
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: setup-go
|
- name: setup-go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v5
|
||||||
with:
|
with:
|
||||||
go-version: 1.21
|
go-version: 'stable'
|
||||||
- name: checkout
|
- name: checkout
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
- name: deps
|
- name: deps
|
||||||
run: go get -v -d ./...
|
run: go get -v -d ./...
|
||||||
- name: lint
|
- name: lint
|
||||||
uses: https://github.com/golangci/golangci-lint-action@v3.4.0
|
uses: https://github.com/golangci/golangci-lint-action@v6
|
||||||
continue-on-error: true
|
|
||||||
with:
|
with:
|
||||||
version: v1.52
|
version: v1.62.2
|
||||||
@@ -1,22 +1,30 @@
|
|||||||
name: pr
|
name: test
|
||||||
|
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
|
types: [opened, reopened, closed, synchronize]
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
- v3
|
- v3
|
||||||
|
- v4
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
- v3
|
||||||
|
- v4
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
name: test
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
|
- name: setup-go
|
||||||
|
uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version: 'stable'
|
||||||
- name: checkout
|
- name: checkout
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
- name: setup-go
|
|
||||||
uses: actions/setup-go@v3
|
|
||||||
with:
|
|
||||||
go-version: 1.21
|
|
||||||
- name: deps
|
- name: deps
|
||||||
run: go get -v -t -d ./...
|
run: go get -v -d ./...
|
||||||
- name: test
|
- name: test
|
||||||
env:
|
env:
|
||||||
INTEGRATION_TESTS: yes
|
INTEGRATION_TESTS: yes
|
||||||
@@ -1,44 +1,5 @@
|
|||||||
run:
|
run:
|
||||||
concurrency: 4
|
concurrency: 8
|
||||||
deadline: 5m
|
deadline: 5m
|
||||||
issues-exit-code: 1
|
issues-exit-code: 1
|
||||||
tests: true
|
tests: true
|
||||||
|
|
||||||
linters-settings:
|
|
||||||
govet:
|
|
||||||
check-shadowing: true
|
|
||||||
enable:
|
|
||||||
- fieldalignment
|
|
||||||
|
|
||||||
linters:
|
|
||||||
enable:
|
|
||||||
- govet
|
|
||||||
- deadcode
|
|
||||||
- errcheck
|
|
||||||
- govet
|
|
||||||
- ineffassign
|
|
||||||
- staticcheck
|
|
||||||
- structcheck
|
|
||||||
- typecheck
|
|
||||||
- unused
|
|
||||||
- varcheck
|
|
||||||
- bodyclose
|
|
||||||
- gci
|
|
||||||
- goconst
|
|
||||||
- gocritic
|
|
||||||
- gosimple
|
|
||||||
- gofmt
|
|
||||||
- gofumpt
|
|
||||||
- goimports
|
|
||||||
- revive
|
|
||||||
- gosec
|
|
||||||
- makezero
|
|
||||||
- misspell
|
|
||||||
- nakedret
|
|
||||||
- nestif
|
|
||||||
- nilerr
|
|
||||||
- noctx
|
|
||||||
- prealloc
|
|
||||||
- unconvert
|
|
||||||
- unparam
|
|
||||||
disable-all: false
|
|
||||||
|
|||||||
@@ -298,7 +298,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
|
|||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, err := callOpts.Backoff(ctx, req, i)
|
t, err := callOpts.Backoff(ctx, req, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// only sleep if greater than 0
|
// only sleep if greater than 0
|
||||||
@@ -312,7 +312,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
|
|||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// balance the list of nodes
|
// balance the list of nodes
|
||||||
@@ -466,7 +466,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
|
|||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, cerr := callOpts.Backoff(ctx, req, i)
|
t, cerr := callOpts.Backoff(ctx, req, i)
|
||||||
if cerr != nil {
|
if cerr != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", cerr.Error())
|
return nil, errors.InternalServerError("go.micro.client", "%s", cerr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// only sleep if greater than 0
|
// only sleep if greater than 0
|
||||||
@@ -480,7 +480,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
|
|||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// balance the list of nodes
|
// balance the list of nodes
|
||||||
@@ -609,13 +609,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
|
|||||||
// use codec for payload
|
// use codec for payload
|
||||||
cf, err := n.newCodec(p.ContentType())
|
cf, err := n.newCodec(p.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the body
|
// set the body
|
||||||
b, err := cf.Marshal(p.Payload())
|
b, err := cf.Marshal(p.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package errors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
er "errors"
|
er "errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -26,7 +27,7 @@ func TestMarshalJSON(t *testing.T) {
|
|||||||
func TestEmpty(t *testing.T) {
|
func TestEmpty(t *testing.T) {
|
||||||
msg := "test"
|
msg := "test"
|
||||||
var err *Error
|
var err *Error
|
||||||
err = FromError(fmt.Errorf(msg))
|
err = FromError(errors.New(msg))
|
||||||
if err.Detail != msg {
|
if err.Detail != msg {
|
||||||
t.Fatalf("invalid error %v", err)
|
t.Fatalf("invalid error %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,11 +46,11 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||||
return h.WithAttrs(attrs)
|
return h.h.WithAttrs(attrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) WithGroup(name string) slog.Handler {
|
func (h *wrapper) WithGroup(name string) slog.Handler {
|
||||||
return h.WithGroup(name)
|
return h.h.WithGroup(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||||
@@ -89,7 +89,6 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type slogLogger struct {
|
type slogLogger struct {
|
||||||
leveler *slog.LevelVar
|
|
||||||
handler *wrapper
|
handler *wrapper
|
||||||
opts logger.Options
|
opts logger.Options
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
|||||||
@@ -36,14 +36,14 @@ var (
|
|||||||
circularShortBytes = []byte("<shown>")
|
circularShortBytes = []byte("<shown>")
|
||||||
invalidAngleBytes = []byte("<invalid>")
|
invalidAngleBytes = []byte("<invalid>")
|
||||||
filteredBytes = []byte("<filtered>")
|
filteredBytes = []byte("<filtered>")
|
||||||
openBracketBytes = []byte("[")
|
// openBracketBytes = []byte("[")
|
||||||
closeBracketBytes = []byte("]")
|
// closeBracketBytes = []byte("]")
|
||||||
percentBytes = []byte("%")
|
percentBytes = []byte("%")
|
||||||
precisionBytes = []byte(".")
|
precisionBytes = []byte(".")
|
||||||
openAngleBytes = []byte("<")
|
openAngleBytes = []byte("<")
|
||||||
closeAngleBytes = []byte(">")
|
closeAngleBytes = []byte(">")
|
||||||
openMapBytes = []byte("{")
|
openMapBytes = []byte("{")
|
||||||
closeMapBytes = []byte("}")
|
closeMapBytes = []byte("}")
|
||||||
)
|
)
|
||||||
|
|
||||||
type protoMessage interface {
|
type protoMessage interface {
|
||||||
|
|||||||
@@ -82,12 +82,12 @@ func TestTagged(t *testing.T) {
|
|||||||
func TestTaggedNested(t *testing.T) {
|
func TestTaggedNested(t *testing.T) {
|
||||||
type val struct {
|
type val struct {
|
||||||
key string `logger:"take"`
|
key string `logger:"take"`
|
||||||
val string `logger:"omit"`
|
// val string `logger:"omit"`
|
||||||
unk string
|
unk string
|
||||||
}
|
}
|
||||||
type str struct {
|
type str struct {
|
||||||
key string `logger:"omit"`
|
// key string `logger:"omit"`
|
||||||
val *val `logger:"take"`
|
val *val `logger:"take"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var iface interface{}
|
var iface interface{}
|
||||||
|
|||||||
@@ -55,10 +55,7 @@ func NewContext(ctx context.Context, md Metadata) context.Context {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
ctx = context.WithValue(ctx, mdKey{}, &rawMetadata{md})
|
return context.WithValue(ctx, mdKey{}, &rawMetadata{md})
|
||||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
|
|
||||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetOutgoingContext modify outgoing context with given metadata
|
// SetOutgoingContext modify outgoing context with given metadata
|
||||||
@@ -90,11 +87,7 @@ func NewIncomingContext(ctx context.Context, md Metadata) context.Context {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
|
return context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
|
||||||
if v, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); !ok || v == nil {
|
|
||||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
|
|
||||||
}
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutgoingContext creates a new context with outcoming metadata attached
|
// NewOutgoingContext creates a new context with outcoming metadata attached
|
||||||
@@ -102,11 +95,7 @@ func NewOutgoingContext(ctx context.Context, md Metadata) context.Context {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
|
return context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
|
||||||
if v, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); !ok || v == nil {
|
|
||||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
|
|
||||||
}
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendOutgoingContext apends new md to context
|
// AppendOutgoingContext apends new md to context
|
||||||
|
|||||||
@@ -80,6 +80,14 @@ func TestPassing(t *testing.T) {
|
|||||||
ctx = NewIncomingContext(ctx, md1)
|
ctx = NewIncomingContext(ctx, md1)
|
||||||
testCtx(ctx)
|
testCtx(ctx)
|
||||||
md, ok := FromOutgoingContext(ctx)
|
md, ok := FromOutgoingContext(ctx)
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("create outgoing context")
|
||||||
|
}
|
||||||
|
_ = md
|
||||||
|
|
||||||
|
ctx = NewOutgoingContext(ctx, New(1))
|
||||||
|
testCtx(ctx)
|
||||||
|
md, ok = FromOutgoingContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("missing metadata from outgoing context")
|
t.Fatalf("missing metadata from outgoing context")
|
||||||
}
|
}
|
||||||
|
|||||||
36
micro.go
36
micro.go
@@ -65,6 +65,8 @@ func As(b any, target any) bool {
|
|||||||
break
|
break
|
||||||
case targetType.Implements(routerType):
|
case targetType.Implements(routerType):
|
||||||
break
|
break
|
||||||
|
case targetType.Implements(tracerType):
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -76,19 +78,21 @@ func As(b any, target any) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
|
var (
|
||||||
var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
|
brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
|
||||||
var clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
|
loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
|
||||||
var serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
|
clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
|
||||||
var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
|
serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
|
||||||
var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
|
codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
|
||||||
var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
|
flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
|
||||||
var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
|
fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
|
||||||
var registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
|
meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
|
||||||
var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
|
registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
|
||||||
var routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
|
resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
|
||||||
var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
|
routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
|
||||||
var storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
|
selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
|
||||||
var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
|
storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
|
||||||
var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
|
syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
|
||||||
var serviceType = reflect.TypeOf((*Service)(nil)).Elem()
|
tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
|
||||||
|
serviceType = reflect.TypeOf((*Service)(nil)).Elem()
|
||||||
|
)
|
||||||
|
|||||||
@@ -269,7 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, trc := range o.Tracers {
|
for _, trc := range o.Tracers {
|
||||||
for _, ot := range lopts.tracers {
|
for _, ot := range lopts.tracers {
|
||||||
if trc.Name() == ot || all {
|
if trc.Name() == ot || all {
|
||||||
@@ -294,8 +294,8 @@ type loggerOptions struct {
|
|||||||
brokers []string
|
brokers []string
|
||||||
registers []string
|
registers []string
|
||||||
stores []string
|
stores []string
|
||||||
meters []string
|
// meters []string
|
||||||
tracers []string
|
tracers []string
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
@@ -469,9 +469,7 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
|
|||||||
}
|
}
|
||||||
|
|
||||||
endpoints := make([]*register.Endpoint, len(s.Endpoints))
|
endpoints := make([]*register.Endpoint, len(s.Endpoints))
|
||||||
for i, e := range s.Endpoints {
|
copy(endpoints, s.Endpoints)
|
||||||
endpoints[i] = e
|
|
||||||
}
|
|
||||||
|
|
||||||
return &record{
|
return &record{
|
||||||
Name: s.Name,
|
Name: s.Name,
|
||||||
|
|||||||
@@ -290,27 +290,25 @@ func TestWatcher(t *testing.T) {
|
|||||||
|
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
m := NewRegister()
|
m := NewRegister()
|
||||||
m.Init()
|
_ = m.Init()
|
||||||
m.Connect(ctx)
|
_ = m.Connect(ctx)
|
||||||
wc, err := m.Watch(ctx)
|
wc, err := m.Watch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cant watch: %v", err)
|
t.Fatalf("cant watch: %v", err)
|
||||||
}
|
}
|
||||||
defer wc.Stop()
|
defer wc.Stop()
|
||||||
|
|
||||||
|
cherr := make(chan error, 10)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
_, err := wc.Next()
|
||||||
_, err := wc.Next()
|
if err != nil {
|
||||||
if err != nil {
|
cherr <- fmt.Errorf("unexpected err %v", err)
|
||||||
t.Fatal("unexpected err", err)
|
|
||||||
}
|
|
||||||
// t.Logf("changes %#+v", ch.Service)
|
|
||||||
wc.Stop()
|
|
||||||
wg.Done()
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
// t.Logf("changes %#+v", ch.Service)
|
||||||
|
wc.Stop()
|
||||||
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := m.Register(ctx, testSrv); err != nil {
|
if err := m.Register(ctx, testSrv); err != nil {
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ import (
|
|||||||
|
|
||||||
// Resolver is a DNS network resolve
|
// Resolver is a DNS network resolve
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
sync.RWMutex
|
|
||||||
goresolver *net.Resolver
|
goresolver *net.Resolver
|
||||||
Address string
|
Address string
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve tries to resolve endpoint address
|
// Resolve tries to resolve endpoint address
|
||||||
@@ -39,12 +39,12 @@ func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
|
|||||||
return []*resolver.Record{rec}, nil
|
return []*resolver.Record{rec}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
r.RLock()
|
r.mu.RLock()
|
||||||
goresolver := r.goresolver
|
goresolver := r.goresolver
|
||||||
r.RUnlock()
|
r.mu.RUnlock()
|
||||||
|
|
||||||
if goresolver == nil {
|
if goresolver == nil {
|
||||||
r.Lock()
|
r.mu.Lock()
|
||||||
r.goresolver = &net.Resolver{
|
r.goresolver = &net.Resolver{
|
||||||
Dial: func(ctx context.Context, _ string, _ string) (net.Conn, error) {
|
Dial: func(ctx context.Context, _ string, _ string) (net.Conn, error) {
|
||||||
d := net.Dialer{
|
d := net.Dialer{
|
||||||
@@ -53,7 +53,7 @@ func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
|
|||||||
return d.DialContext(ctx, "udp", r.Address)
|
return d.DialContext(ctx, "udp", r.Address)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
r.Unlock()
|
r.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs, err := goresolver.LookupIP(context.TODO(), "ip", host)
|
addrs, err := goresolver.LookupIP(context.TODO(), "ip", host)
|
||||||
|
|||||||
@@ -171,7 +171,6 @@ type rpcMessage struct {
|
|||||||
header metadata.Metadata
|
header metadata.Metadata
|
||||||
topic string
|
topic string
|
||||||
contentType string
|
contentType string
|
||||||
body []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcMessage) ContentType() string {
|
func (r *rpcMessage) ContentType() string {
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func TestNoopSub(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
|
_ = logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
|
||||||
s := server.NewServer(
|
s := server.NewServer(
|
||||||
server.Broker(b),
|
server.Broker(b),
|
||||||
server.Codec("application/octet-stream", codec.NewCodec()),
|
server.Codec("application/octet-stream", codec.NewCodec()),
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ package micro
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/KimMachineGun/automemlimit/memlimit"
|
"github.com/KimMachineGun/automemlimit/memlimit"
|
||||||
"go.uber.org/automaxprocs/maxprocs"
|
"go.uber.org/automaxprocs/maxprocs"
|
||||||
@@ -17,11 +19,12 @@ import (
|
|||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v3/server"
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v3/store"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
|
utildns "go.unistack.org/micro/v3/util/dns"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
maxprocs.Set()
|
_, _ = maxprocs.Set()
|
||||||
memlimit.SetGoMemLimitWithOpts(
|
_, _ = memlimit.SetGoMemLimitWithOpts(
|
||||||
memlimit.WithRatio(0.9),
|
memlimit.WithRatio(0.9),
|
||||||
memlimit.WithProvider(
|
memlimit.WithProvider(
|
||||||
memlimit.ApplyFallback(
|
memlimit.ApplyFallback(
|
||||||
@@ -30,6 +33,8 @@ func init() {
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
net.DefaultResolver = utildns.NewNetResolver(utildns.Timeout(1 * time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is an interface that wraps the lower level components.
|
// Service is an interface that wraps the lower level components.
|
||||||
|
|||||||
@@ -105,3 +105,7 @@ func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) {
|
|||||||
|
|
||||||
return options, err
|
return options, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Watch(context.Context) (Watcher, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|||||||
377
util/dns/cache.go
Normal file
377
util/dns/cache.go
Normal file
@@ -0,0 +1,377 @@
|
|||||||
|
package dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DialFunc is a [net.Resolver.Dial] function.
|
||||||
|
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
|
|
||||||
|
// NewNetResolver creates a caching [net.Resolver] that uses parent to resolve names.
|
||||||
|
func NewNetResolver(opts ...Option) *net.Resolver {
|
||||||
|
options := Options{Resolver: &net.Resolver{}}
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &net.Resolver{
|
||||||
|
PreferGo: true,
|
||||||
|
StrictErrors: options.Resolver.StrictErrors,
|
||||||
|
Dial: NewNetDialer(options.Resolver.Dial, append(opts, Resolver(options.Resolver))...),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewNetDialer adds caching to a [net.Resolver.Dial] function.
|
||||||
|
func NewNetDialer(parent DialFunc, opts ...Option) DialFunc {
|
||||||
|
cache := cache{dial: parent, opts: Options{}}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&cache.opts)
|
||||||
|
}
|
||||||
|
if cache.opts.MaxCacheEntries == 0 {
|
||||||
|
cache.opts.MaxCacheEntries = DefaultMaxCacheEntries
|
||||||
|
}
|
||||||
|
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
|
conn := &dnsConn{}
|
||||||
|
conn.roundTrip = cachingRoundTrip(&cache, network, address)
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const DefaultMaxCacheEntries = 300
|
||||||
|
|
||||||
|
// A Option customizes the resolver cache.
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
Resolver *net.Resolver
|
||||||
|
MaxCacheEntries int
|
||||||
|
MaxCacheTTL time.Duration
|
||||||
|
MinCacheTTL time.Duration
|
||||||
|
NegativeCache bool
|
||||||
|
PreferIPV4 bool
|
||||||
|
PreferIPV6 bool
|
||||||
|
Timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxCacheEntries sets the maximum number of entries to cache.
|
||||||
|
// If zero, [DefaultMaxCacheEntries] is used; negative means no limit.
|
||||||
|
func MaxCacheEntries(n int) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.MaxCacheEntries = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxCacheTTL sets the maximum time-to-live for entries in the cache.
|
||||||
|
func MaxCacheTTL(td time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.MaxCacheTTL = td
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MinCacheTTL sets the minimum time-to-live for entries in the cache.
|
||||||
|
func MinCacheTTL(td time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.MinCacheTTL = td
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NegativeCache sets whether to cache negative responses.
|
||||||
|
func NegativeCache(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.NegativeCache = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timeout sets upstream *net.Resolver timeout
|
||||||
|
func Timeout(td time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Timeout = td
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolver sets upstream *net.Resolver.
|
||||||
|
func Resolver(r *net.Resolver) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Resolver = r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreferIPV4 resolve ipv4 records.
|
||||||
|
func PreferIPV4(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.PreferIPV4 = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreferIPV6 resolve ipv4 records.
|
||||||
|
func PreferIPV6(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.PreferIPV6 = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type cache struct {
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
|
dial DialFunc
|
||||||
|
entries map[string]cacheEntry
|
||||||
|
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
type cacheEntry struct {
|
||||||
|
deadline time.Time
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) put(req string, res string) {
|
||||||
|
// ignore uncacheable/unparseable answers
|
||||||
|
if invalid(req, res) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore errors (if requested)
|
||||||
|
if nameError(res) && !c.opts.NegativeCache {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore uncacheable/unparseable answers
|
||||||
|
ttl := getTTL(res)
|
||||||
|
if ttl <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// adjust TTL
|
||||||
|
if ttl < c.opts.MinCacheTTL {
|
||||||
|
ttl = c.opts.MinCacheTTL
|
||||||
|
}
|
||||||
|
// maxTTL overrides minTTL
|
||||||
|
if ttl > c.opts.MaxCacheTTL && c.opts.MaxCacheTTL != 0 {
|
||||||
|
ttl = c.opts.MaxCacheTTL
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
if c.entries == nil {
|
||||||
|
c.entries = make(map[string]cacheEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// do some cache evition
|
||||||
|
var tested, evicted int
|
||||||
|
for k, e := range c.entries {
|
||||||
|
if time.Until(e.deadline) <= 0 {
|
||||||
|
// delete expired entry
|
||||||
|
delete(c.entries, k)
|
||||||
|
evicted++
|
||||||
|
}
|
||||||
|
tested++
|
||||||
|
|
||||||
|
if tested < 8 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries {
|
||||||
|
// delete at least one entry
|
||||||
|
delete(c.entries, k)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove message IDs
|
||||||
|
c.entries[req[2:]] = cacheEntry{
|
||||||
|
deadline: time.Now().Add(ttl),
|
||||||
|
value: res[2:],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) get(req string) (res string) {
|
||||||
|
// ignore invalid messages
|
||||||
|
if len(req) < 12 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if req[2] >= 0x7f {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
c.RLock()
|
||||||
|
defer c.RUnlock()
|
||||||
|
|
||||||
|
if c.entries == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove message ID
|
||||||
|
entry, ok := c.entries[req[2:]]
|
||||||
|
if ok && time.Until(entry.deadline) > 0 {
|
||||||
|
// prepend correct ID
|
||||||
|
return req[:2] + entry.value
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func invalid(req string, res string) bool {
|
||||||
|
if len(req) < 12 || len(res) < 12 { // header size
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if req[0] != res[0] || req[1] != res[1] { // IDs match
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if req[2] >= 0x7f || res[2] < 0x7f { // query, response
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if req[2]&0x7a != 0 || res[2]&0x7a != 0 { // standard query, not truncated
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if res[3]&0xf != 0 && res[3]&0xf != 3 { // no error, or name error
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func nameError(res string) bool {
|
||||||
|
return res[3]&0xf == 3
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTTL(msg string) time.Duration {
|
||||||
|
ttl := math.MaxInt32
|
||||||
|
|
||||||
|
qdcount := getUint16(msg[4:])
|
||||||
|
ancount := getUint16(msg[6:])
|
||||||
|
nscount := getUint16(msg[8:])
|
||||||
|
arcount := getUint16(msg[10:])
|
||||||
|
rdcount := ancount + nscount + arcount
|
||||||
|
|
||||||
|
msg = msg[12:] // skip header
|
||||||
|
|
||||||
|
// skip questions
|
||||||
|
for i := 0; i < qdcount; i++ {
|
||||||
|
name := getNameLen(msg)
|
||||||
|
if name < 0 || name+4 > len(msg) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
msg = msg[name+4:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse records
|
||||||
|
for i := 0; i < rdcount; i++ {
|
||||||
|
name := getNameLen(msg)
|
||||||
|
if name < 0 || name+10 > len(msg) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
rtyp := getUint16(msg[name+0:])
|
||||||
|
rttl := getUint32(msg[name+4:])
|
||||||
|
rlen := getUint16(msg[name+8:])
|
||||||
|
if name+10+rlen > len(msg) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
// skip EDNS OPT since it doesn't have a TTL
|
||||||
|
if rtyp != 41 && rttl < ttl {
|
||||||
|
ttl = rttl
|
||||||
|
}
|
||||||
|
msg = msg[name+10+rlen:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Duration(ttl) * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNameLen(msg string) int {
|
||||||
|
i := 0
|
||||||
|
for i < len(msg) {
|
||||||
|
if msg[i] == 0 {
|
||||||
|
// end of name
|
||||||
|
i += 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if msg[i] >= 0xc0 {
|
||||||
|
// compressed name
|
||||||
|
i += 2
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if msg[i] >= 0x40 {
|
||||||
|
// reserved
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
i += int(msg[i] + 1)
|
||||||
|
}
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUint16(s string) int {
|
||||||
|
return int(s[1]) | int(s[0])<<8
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUint32(s string) int {
|
||||||
|
return int(s[3]) | int(s[2])<<8 | int(s[1])<<16 | int(s[0])<<24
|
||||||
|
}
|
||||||
|
|
||||||
|
func cachingRoundTrip(cache *cache, network, address string) roundTripper {
|
||||||
|
return func(ctx context.Context, req string) (res string, err error) {
|
||||||
|
// check cache
|
||||||
|
if res := cache.get(req); res != "" {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case cache.opts.PreferIPV4 && cache.opts.PreferIPV6:
|
||||||
|
network = "udp"
|
||||||
|
case cache.opts.PreferIPV4:
|
||||||
|
network = "udp4"
|
||||||
|
case cache.opts.PreferIPV6:
|
||||||
|
network = "udp6"
|
||||||
|
default:
|
||||||
|
network = "udp"
|
||||||
|
}
|
||||||
|
|
||||||
|
if cache.opts.Timeout > 0 {
|
||||||
|
var cancel func()
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, cache.opts.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// dial connection
|
||||||
|
var conn net.Conn
|
||||||
|
if cache.dial != nil {
|
||||||
|
conn, err = cache.dial(ctx, network, address)
|
||||||
|
} else {
|
||||||
|
var d net.Dialer
|
||||||
|
conn, err = d.DialContext(ctx, network, address)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if t, ok := ctx.Deadline(); ok {
|
||||||
|
err = conn.SetDeadline(t)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send request
|
||||||
|
err = writeMessage(conn, req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// read response
|
||||||
|
res, err = readMessage(conn)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// cache response
|
||||||
|
cache.put(req, res)
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
16
util/dns/cache_test.go
Normal file
16
util/dns/cache_test.go
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
package dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCache(t *testing.T) {
|
||||||
|
net.DefaultResolver = NewNetResolver(PreferIPV4(true))
|
||||||
|
|
||||||
|
addrs, err := net.LookupHost("unistack.org")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Logf("addrs %v", addrs)
|
||||||
|
}
|
||||||
183
util/dns/conn.go
Normal file
183
util/dns/conn.go
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
package dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type dnsConn struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
ibuf bytes.Buffer
|
||||||
|
obuf bytes.Buffer
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
deadline time.Time
|
||||||
|
roundTrip roundTripper
|
||||||
|
}
|
||||||
|
|
||||||
|
type roundTripper func(ctx context.Context, req string) (res string, err error)
|
||||||
|
|
||||||
|
func (c *dnsConn) Read(b []byte) (n int, err error) {
|
||||||
|
imsg, n, err := c.drainBuffers(b)
|
||||||
|
if n != 0 || err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := c.childContext()
|
||||||
|
omsg, err := c.roundTrip(ctx, imsg)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.fillBuffer(b, omsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) Write(b []byte) (n int, err error) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
return c.ibuf.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) Close() error {
|
||||||
|
c.Lock()
|
||||||
|
cancel := c.cancel
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) LocalAddr() net.Addr {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) RemoteAddr() net.Addr {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) SetDeadline(t time.Time) error {
|
||||||
|
var err error
|
||||||
|
if err = c.SetReadDeadline(t); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = c.SetWriteDeadline(t); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) SetReadDeadline(t time.Time) error {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
c.deadline = t
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
// writes do not timeout
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) drainBuffers(b []byte) (string, int, error) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
// drain the output buffer
|
||||||
|
if c.obuf.Len() > 0 {
|
||||||
|
n, err := c.obuf.Read(b)
|
||||||
|
return "", n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, get the next message from the input buffer
|
||||||
|
sz := c.ibuf.Next(2)
|
||||||
|
if len(sz) < 2 {
|
||||||
|
return "", 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
|
||||||
|
size := int64(sz[0])<<8 | int64(sz[1])
|
||||||
|
|
||||||
|
var str strings.Builder
|
||||||
|
_, err := io.CopyN(&str, &c.ibuf, size)
|
||||||
|
if err == io.EOF {
|
||||||
|
return "", 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
return str.String(), 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) fillBuffer(b []byte, str string) (int, error) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
c.obuf.WriteByte(byte(len(str) >> 8))
|
||||||
|
c.obuf.WriteByte(byte(len(str)))
|
||||||
|
c.obuf.WriteString(str)
|
||||||
|
return c.obuf.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) childContext() (context.Context, context.CancelFunc) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
if c.ctx == nil {
|
||||||
|
c.ctx, c.cancel = context.WithCancel(context.Background())
|
||||||
|
}
|
||||||
|
return context.WithDeadline(c.ctx, c.deadline)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeMessage(conn net.Conn, msg string) error {
|
||||||
|
var buf []byte
|
||||||
|
if _, ok := conn.(net.PacketConn); ok {
|
||||||
|
buf = []byte(msg)
|
||||||
|
} else {
|
||||||
|
buf = make([]byte, len(msg)+2)
|
||||||
|
buf[0] = byte(len(msg) >> 8)
|
||||||
|
buf[1] = byte(len(msg))
|
||||||
|
copy(buf[2:], msg)
|
||||||
|
}
|
||||||
|
// SHOULD do a single write on TCP (RFC 7766, section 8).
|
||||||
|
// MUST do a single write on UDP.
|
||||||
|
_, err := conn.Write(buf)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func readMessage(c net.Conn) (string, error) {
|
||||||
|
if _, ok := c.(net.PacketConn); ok {
|
||||||
|
// RFC 1035 specifies 512 as the maximum message size for DNS over UDP.
|
||||||
|
// RFC 6891 OTOH suggests 4096 as the maximum payload size for EDNS.
|
||||||
|
b := make([]byte, 4096)
|
||||||
|
n, err := c.Read(b)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(b[:n]), nil
|
||||||
|
} else {
|
||||||
|
var sz [2]byte
|
||||||
|
_, err := io.ReadFull(c, sz[:])
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
size := int64(sz[0])<<8 | int64(sz[1])
|
||||||
|
|
||||||
|
var str strings.Builder
|
||||||
|
_, err = io.CopyN(&str, c, size)
|
||||||
|
if err == io.EOF {
|
||||||
|
return "", io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return str.String(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,7 +16,6 @@ type Ticker struct {
|
|||||||
C chan time.Time
|
C chan time.Time
|
||||||
min int64
|
min int64
|
||||||
max int64
|
max int64
|
||||||
exp int64
|
|
||||||
exit bool
|
exit bool
|
||||||
rng rand.Rand
|
rng rand.Rand
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -91,7 +91,7 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if mapper, ok := dst.(map[string]interface{}); ok {
|
if mapper, ok := dst.(map[string]interface{}); ok {
|
||||||
dst = mergeMap(mapper, mp, 0)
|
mergeMap(mapper, mp, 0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,38 @@
|
|||||||
package reflect
|
package reflect
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMergeMapStringInterface(t *testing.T) {
|
||||||
|
var dst interface{} //nolint:gosimple
|
||||||
|
dst = map[string]interface{}{
|
||||||
|
"xx": 11,
|
||||||
|
}
|
||||||
|
|
||||||
|
src := map[string]interface{}{
|
||||||
|
"zz": "aa",
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Merge(dst, src); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mp, ok := dst.(map[string]interface{})
|
||||||
|
if !ok || mp == nil {
|
||||||
|
t.Fatalf("xxx %#+v\n", dst)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fmt.Sprintf("%v", mp["xx"]) != "11" {
|
||||||
|
t.Fatalf("xxx zzzz %#+v", mp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fmt.Sprintf("%v", mp["zz"]) != "aa" {
|
||||||
|
t.Fatalf("xxx zzzz %#+v", mp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMergeMap(t *testing.T) {
|
func TestMergeMap(t *testing.T) {
|
||||||
src := map[string]interface{}{
|
src := map[string]interface{}{
|
||||||
"skey1": "sval1",
|
"skey1": "sval1",
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ type DigitalOceanMetadata struct {
|
|||||||
func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
case "/metadata/v1.json":
|
case "/metadata/v1.json":
|
||||||
json.NewEncoder(w).Encode(stfs.Metadata.V1)
|
_ = json.NewEncoder(w).Encode(stfs.Metadata.V1)
|
||||||
default:
|
default:
|
||||||
fs := FileServer(stfs, "json", time.Now())
|
fs := FileServer(stfs, "json", time.Now())
|
||||||
idx := strings.Index(r.URL.Path[1:], "/")
|
idx := strings.Index(r.URL.Path[1:], "/")
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ type EC2Metadata struct {
|
|||||||
InstanceType string `json:"instance-type"`
|
InstanceType string `json:"instance-type"`
|
||||||
LocalHostname string `json:"local-hostname"`
|
LocalHostname string `json:"local-hostname"`
|
||||||
LocalIPv4 string `json:"local-ipv4"`
|
LocalIPv4 string `json:"local-ipv4"`
|
||||||
kernelID int `json:"kernel-id"`
|
KernelID int `json:"kernel-id"`
|
||||||
Placement string `json:"placement"`
|
Placement string `json:"placement"`
|
||||||
AvailabilityZone string `json:"availability-zone"`
|
AvailabilityZone string `json:"availability-zone"`
|
||||||
ProductCodes string `json:"product-codes"`
|
ProductCodes string `json:"product-codes"`
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func (fs *fs) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
f, err := fs.Open(r.URL.Path)
|
f, err := fs.Open(r.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
w.Write([]byte(err.Error()))
|
_, _ = w.Write([]byte(err.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
@@ -67,9 +67,9 @@ func (fi *fileInfo) Name() string {
|
|||||||
|
|
||||||
func (fi *fileInfo) Mode() os.FileMode {
|
func (fi *fileInfo) Mode() os.FileMode {
|
||||||
if strings.HasSuffix(fi.name, "/") {
|
if strings.HasSuffix(fi.name, "/") {
|
||||||
return os.FileMode(0755) | os.ModeDir
|
return os.FileMode(0o755) | os.ModeDir
|
||||||
}
|
}
|
||||||
return os.FileMode(0644)
|
return os.FileMode(0o644)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *fileInfo) IsDir() bool {
|
func (fi *fileInfo) IsDir() bool {
|
||||||
@@ -112,15 +112,14 @@ func (f *file) Readdir(count int) ([]os.FileInfo, error) {
|
|||||||
func (f *file) Seek(offset int64, whence int) (int64, error) {
|
func (f *file) Seek(offset int64, whence int) (int64, error) {
|
||||||
// log.Printf("seek %d %d %s\n", offset, whence, f.name)
|
// log.Printf("seek %d %d %s\n", offset, whence, f.name)
|
||||||
switch whence {
|
switch whence {
|
||||||
case os.SEEK_SET:
|
case io.SeekStart:
|
||||||
f.offset = offset
|
f.offset = offset
|
||||||
case os.SEEK_CUR:
|
case io.SeekCurrent:
|
||||||
f.offset += offset
|
f.offset += offset
|
||||||
case os.SEEK_END:
|
case io.SeekEnd:
|
||||||
f.offset = int64(len(f.data)) + offset
|
f.offset = int64(len(f.data)) + offset
|
||||||
}
|
}
|
||||||
return f.offset, nil
|
return f.offset, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *file) Stat() (os.FileInfo, error) {
|
func (f *file) Stat() (os.FileInfo, error) {
|
||||||
@@ -222,6 +221,7 @@ func getValue(name string, iface interface{}, tag string) ([]byte, error) {
|
|||||||
return nil, fmt.Errorf("failed to find %s in interface %T", name, iface)
|
return nil, fmt.Errorf("failed to find %s in interface %T", name, iface)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
func hasValidType(obj interface{}, types []reflect.Kind) bool {
|
func hasValidType(obj interface{}, types []reflect.Kind) bool {
|
||||||
for _, t := range types {
|
for _, t := range types {
|
||||||
if reflect.TypeOf(obj).Kind() == t {
|
if reflect.TypeOf(obj).Kind() == t {
|
||||||
@@ -231,6 +231,7 @@ func hasValidType(obj interface{}, types []reflect.Kind) bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
func reflectValue(obj interface{}) reflect.Value {
|
func reflectValue(obj interface{}) reflect.Value {
|
||||||
var val reflect.Value
|
var val reflect.Value
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package structfs
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -61,7 +61,7 @@ var doOrig = []byte(`{
|
|||||||
}
|
}
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func server(t *testing.T) {
|
func server(t *testing.T, ch chan error) {
|
||||||
stfs := DigitalOceanMetadata{}
|
stfs := DigitalOceanMetadata{}
|
||||||
err := json.Unmarshal(doOrig, &stfs.Metadata.V1)
|
err := json.Unmarshal(doOrig, &stfs.Metadata.V1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -71,7 +71,7 @@ func server(t *testing.T) {
|
|||||||
http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now()))
|
http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now()))
|
||||||
http.Handle("/metadata/v1.json", &stfs)
|
http.Handle("/metadata/v1.json", &stfs)
|
||||||
go func() {
|
go func() {
|
||||||
t.Fatal(http.ListenAndServe("127.0.0.1:8080", nil))
|
ch <- http.ListenAndServe("127.0.0.1:8080", nil)
|
||||||
}()
|
}()
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
@@ -82,13 +82,14 @@ func get(path string) ([]byte, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
return ioutil.ReadAll(res.Body)
|
return io.ReadAll(res.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAll(t *testing.T) {
|
func TestAll(t *testing.T) {
|
||||||
server(t)
|
ch := make(chan error)
|
||||||
|
server(t, ch)
|
||||||
|
|
||||||
var tests = []struct {
|
tests := []struct {
|
||||||
in string
|
in string
|
||||||
out string
|
out string
|
||||||
}{
|
}{
|
||||||
@@ -100,34 +101,44 @@ func TestAll(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
buf, err := get(tt.in)
|
select {
|
||||||
|
case err := <-ch:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
buf, err := get(tt.in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if string(buf) != tt.out {
|
||||||
|
t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-ch:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
doTest, err := get("http://127.0.0.1:8080/metadata/v1.json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if string(buf) != tt.out {
|
|
||||||
t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out)
|
oSt := DigitalOceanMetadata{}
|
||||||
|
err = json.Unmarshal(doOrig, &oSt.Metadata.V1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nSt := DigitalOceanMetadata{}
|
||||||
|
|
||||||
|
err = json.Unmarshal(doTest, &nSt.Metadata.V1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(oSt, nSt) {
|
||||||
|
t.Fatalf("%v not match %v", oSt, nSt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doTest, err := get("http://127.0.0.1:8080/metadata/v1.json")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
oSt := DigitalOceanMetadata{}
|
|
||||||
err = json.Unmarshal(doOrig, &oSt.Metadata.V1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
nSt := DigitalOceanMetadata{}
|
|
||||||
|
|
||||||
err = json.Unmarshal(doTest, &nSt.Metadata.V1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(oSt, nSt) {
|
|
||||||
t.Fatalf("%v not match %v", oSt, nSt)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ type Duration int64
|
|||||||
|
|
||||||
func ParseDuration(s string) (time.Duration, error) {
|
func ParseDuration(s string) (time.Duration, error) {
|
||||||
if s == "" {
|
if s == "" {
|
||||||
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
return 0, errors.New(`time: invalid duration "` + s + `"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
var p int
|
var p int
|
||||||
|
|||||||
@@ -39,19 +39,16 @@ func newStatsMeter() {
|
|||||||
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for range ticker.C {
|
||||||
select {
|
poolsMu.Lock()
|
||||||
case <-ticker.C:
|
for _, st := range pools {
|
||||||
poolsMu.Lock()
|
stats := st.Stats()
|
||||||
for _, st := range pools {
|
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
||||||
stats := st.Stats()
|
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
||||||
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
||||||
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
||||||
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
|
||||||
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
|
||||||
}
|
|
||||||
poolsMu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
poolsMu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user