Compare commits
9 Commits
v3.10.106
...
15e9310368
| Author | SHA1 | Date | |
|---|---|---|---|
| 15e9310368 | |||
|
|
16d8cf3434 | ||
| 9704ef2e5e | |||
| 94e8f90f00 | |||
| 34d1587881 | |||
| bf4143cde5 | |||
| 36b7b9f5fb | |||
| ae97023092 | |||
| 115ca6a018 |
@@ -1,24 +1,26 @@
|
|||||||
name: lint
|
name: lint
|
||||||
|
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
|
types: [opened, reopened, closed, synchronize]
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
- v3
|
- v3
|
||||||
|
- v4
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
lint:
|
lint:
|
||||||
name: lint
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: setup-go
|
- name: setup-go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v5
|
||||||
with:
|
with:
|
||||||
go-version: 1.21
|
go-version: 'stable'
|
||||||
- name: checkout
|
- name: checkout
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
- name: deps
|
- name: deps
|
||||||
run: go get -v -d ./...
|
run: go get -v -d ./...
|
||||||
- name: lint
|
- name: lint
|
||||||
uses: https://github.com/golangci/golangci-lint-action@v3.4.0
|
uses: https://github.com/golangci/golangci-lint-action@v6
|
||||||
continue-on-error: true
|
|
||||||
with:
|
with:
|
||||||
version: v1.52
|
version: v1.62.2
|
||||||
@@ -1,22 +1,30 @@
|
|||||||
name: pr
|
name: test
|
||||||
|
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
|
types: [opened, reopened, closed, synchronize]
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
- v3
|
- v3
|
||||||
|
- v4
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
- v3
|
||||||
|
- v4
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
name: test
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
|
- name: setup-go
|
||||||
|
uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version: 'stable'
|
||||||
- name: checkout
|
- name: checkout
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
- name: setup-go
|
|
||||||
uses: actions/setup-go@v3
|
|
||||||
with:
|
|
||||||
go-version: 1.21
|
|
||||||
- name: deps
|
- name: deps
|
||||||
run: go get -v -t -d ./...
|
run: go get -v -d ./...
|
||||||
- name: test
|
- name: test
|
||||||
env:
|
env:
|
||||||
INTEGRATION_TESTS: yes
|
INTEGRATION_TESTS: yes
|
||||||
@@ -1,44 +1,5 @@
|
|||||||
run:
|
run:
|
||||||
concurrency: 4
|
concurrency: 8
|
||||||
deadline: 5m
|
deadline: 5m
|
||||||
issues-exit-code: 1
|
issues-exit-code: 1
|
||||||
tests: true
|
tests: true
|
||||||
|
|
||||||
linters-settings:
|
|
||||||
govet:
|
|
||||||
check-shadowing: true
|
|
||||||
enable:
|
|
||||||
- fieldalignment
|
|
||||||
|
|
||||||
linters:
|
|
||||||
enable:
|
|
||||||
- govet
|
|
||||||
- deadcode
|
|
||||||
- errcheck
|
|
||||||
- govet
|
|
||||||
- ineffassign
|
|
||||||
- staticcheck
|
|
||||||
- structcheck
|
|
||||||
- typecheck
|
|
||||||
- unused
|
|
||||||
- varcheck
|
|
||||||
- bodyclose
|
|
||||||
- gci
|
|
||||||
- goconst
|
|
||||||
- gocritic
|
|
||||||
- gosimple
|
|
||||||
- gofmt
|
|
||||||
- gofumpt
|
|
||||||
- goimports
|
|
||||||
- revive
|
|
||||||
- gosec
|
|
||||||
- makezero
|
|
||||||
- misspell
|
|
||||||
- nakedret
|
|
||||||
- nestif
|
|
||||||
- nilerr
|
|
||||||
- noctx
|
|
||||||
- prealloc
|
|
||||||
- unconvert
|
|
||||||
- unparam
|
|
||||||
disable-all: false
|
|
||||||
|
|||||||
@@ -46,6 +46,12 @@ type Broker interface {
|
|||||||
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
||||||
// String type of broker
|
// String type of broker
|
||||||
String() string
|
String() string
|
||||||
|
// Live returns broker liveness
|
||||||
|
Live() bool
|
||||||
|
// Ready returns broker readiness
|
||||||
|
Ready() bool
|
||||||
|
// Health returns broker health
|
||||||
|
Health() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -339,6 +339,18 @@ func (m *memoryBroker) Name() string {
|
|||||||
return m.opts.Name
|
return m.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) Live() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) Ready() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) Health() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (m *memoryEvent) Topic() string {
|
func (m *memoryEvent) Topic() string {
|
||||||
return m.topic
|
return m.topic
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,18 @@ func NewBroker(opts ...Option) *NoopBroker {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) Health() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) Live() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) Ready() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (b *NoopBroker) Name() string {
|
func (b *NoopBroker) Name() string {
|
||||||
return b.opts.Name
|
return b.opts.Name
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -298,7 +298,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
|
|||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, err := callOpts.Backoff(ctx, req, i)
|
t, err := callOpts.Backoff(ctx, req, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// only sleep if greater than 0
|
// only sleep if greater than 0
|
||||||
@@ -312,7 +312,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
|
|||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// balance the list of nodes
|
// balance the list of nodes
|
||||||
@@ -466,7 +466,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
|
|||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, cerr := callOpts.Backoff(ctx, req, i)
|
t, cerr := callOpts.Backoff(ctx, req, i)
|
||||||
if cerr != nil {
|
if cerr != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", cerr.Error())
|
return nil, errors.InternalServerError("go.micro.client", "%s", cerr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// only sleep if greater than 0
|
// only sleep if greater than 0
|
||||||
@@ -480,7 +480,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
|
|||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
routes, err = n.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// balance the list of nodes
|
// balance the list of nodes
|
||||||
@@ -609,13 +609,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
|
|||||||
// use codec for payload
|
// use codec for payload
|
||||||
cf, err := n.newCodec(p.ContentType())
|
cf, err := n.newCodec(p.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the body
|
// set the body
|
||||||
b, err := cf.Marshal(p.Payload())
|
b, err := cf.Marshal(p.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", "%s", err.Error())
|
||||||
}
|
}
|
||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import (
|
|||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v3/register"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v3/router"
|
||||||
@@ -22,8 +21,6 @@ import (
|
|||||||
|
|
||||||
// Options holds client options
|
// Options holds client options
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Transport used for transfer messages
|
|
||||||
Transport transport.Transport
|
|
||||||
// Selector used to select needed address
|
// Selector used to select needed address
|
||||||
Selector selector.Selector
|
Selector selector.Selector
|
||||||
// Logger used to log messages
|
// Logger used to log messages
|
||||||
@@ -194,18 +191,16 @@ func NewOptions(opts ...Option) Options {
|
|||||||
Retry: DefaultRetry,
|
Retry: DefaultRetry,
|
||||||
Retries: DefaultRetries,
|
Retries: DefaultRetries,
|
||||||
RequestTimeout: DefaultRequestTimeout,
|
RequestTimeout: DefaultRequestTimeout,
|
||||||
DialTimeout: transport.DefaultDialTimeout,
|
|
||||||
},
|
},
|
||||||
Lookup: LookupRoute,
|
Lookup: LookupRoute,
|
||||||
PoolSize: DefaultPoolSize,
|
PoolSize: DefaultPoolSize,
|
||||||
PoolTTL: DefaultPoolTTL,
|
PoolTTL: DefaultPoolTTL,
|
||||||
Selector: random.NewSelector(),
|
Selector: random.NewSelector(),
|
||||||
Logger: logger.DefaultLogger,
|
Logger: logger.DefaultLogger,
|
||||||
Broker: broker.DefaultBroker,
|
Broker: broker.DefaultBroker,
|
||||||
Meter: meter.DefaultMeter,
|
Meter: meter.DefaultMeter,
|
||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
Router: router.DefaultRouter,
|
Router: router.DefaultRouter,
|
||||||
Transport: transport.DefaultTransport,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
@@ -278,13 +273,6 @@ func PoolTTL(d time.Duration) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transport to use for communication e.g http, rabbitmq, etc
|
|
||||||
func Transport(t transport.Transport) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Transport = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register sets the routers register
|
// Register sets the routers register
|
||||||
func Register(r register.Register) Option {
|
func Register(r register.Register) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@@ -334,14 +322,6 @@ func TLSConfig(t *tls.Config) Option {
|
|||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
// set the internal tls
|
// set the internal tls
|
||||||
o.TLSConfig = t
|
o.TLSConfig = t
|
||||||
|
|
||||||
// set the default transport if one is not
|
|
||||||
// already set. Required for Init call below.
|
|
||||||
|
|
||||||
// set the transport tls
|
|
||||||
_ = o.Transport.Init(
|
|
||||||
transport.TLSConfig(t),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -507,13 +487,6 @@ func WithAuthToken(t string) CallOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNetwork is a CallOption which sets the network attribute
|
|
||||||
func WithNetwork(n string) CallOption {
|
|
||||||
return func(o *CallOptions) {
|
|
||||||
o.Network = n
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithRouter sets the router to use for this call
|
// WithRouter sets the router to use for this call
|
||||||
func WithRouter(r router.Router) CallOption {
|
func WithRouter(r router.Router) CallOption {
|
||||||
return func(o *CallOptions) {
|
return func(o *CallOptions) {
|
||||||
|
|||||||
@@ -38,4 +38,10 @@ type Cluster interface {
|
|||||||
Broadcast(ctx context.Context, msg Message, filter ...string) error
|
Broadcast(ctx context.Context, msg Message, filter ...string) error
|
||||||
// Unicast send message to single member in cluster
|
// Unicast send message to single member in cluster
|
||||||
Unicast(ctx context.Context, node Node, msg Message) error
|
Unicast(ctx context.Context, node Node, msg Message) error
|
||||||
|
// Live returns cluster liveness
|
||||||
|
Live() bool
|
||||||
|
// Ready returns cluster readiness
|
||||||
|
Ready() bool
|
||||||
|
// Health returns cluster health
|
||||||
|
Health() bool
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ package config
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -139,7 +138,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := fn(ctx, c); err != nil {
|
if err := fn(ctx, c); err != nil {
|
||||||
c.Options().Logger.Error(ctx, fmt.Sprintf("%s BeforeLoad error", c.String()), err)
|
c.Options().Logger.Error(ctx, c.String()+" BeforeLoad error", err)
|
||||||
if !c.Options().AllowFail {
|
if !c.Options().AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -154,7 +153,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := fn(ctx, c); err != nil {
|
if err := fn(ctx, c); err != nil {
|
||||||
c.Options().Logger.Error(ctx, fmt.Sprintf("%s AfterLoad error", c.String()), err)
|
c.Options().Logger.Error(ctx, c.String()+" AfterLoad error", err)
|
||||||
if !c.Options().AllowFail {
|
if !c.Options().AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -169,7 +168,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := fn(ctx, c); err != nil {
|
if err := fn(ctx, c); err != nil {
|
||||||
c.Options().Logger.Error(ctx, fmt.Sprintf("%s BeforeSave error", c.String()), err)
|
c.Options().Logger.Error(ctx, c.String()+" BeforeSave error", err)
|
||||||
if !c.Options().AllowFail {
|
if !c.Options().AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -184,7 +183,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := fn(ctx, c); err != nil {
|
if err := fn(ctx, c); err != nil {
|
||||||
c.Options().Logger.Error(ctx, fmt.Sprintf("%s AfterSave error", c.String()), err)
|
c.Options().Logger.Error(ctx, c.String()+" AfterSave error", err)
|
||||||
if !c.Options().AllowFail {
|
if !c.Options().AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -199,7 +198,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := fn(ctx, c); err != nil {
|
if err := fn(ctx, c); err != nil {
|
||||||
c.Options().Logger.Error(ctx, fmt.Sprintf("%s BeforeInit error", c.String()), err)
|
c.Options().Logger.Error(ctx, c.String()+" BeforeInit error", err)
|
||||||
if !c.Options().AllowFail {
|
if !c.Options().AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -214,7 +213,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := fn(ctx, c); err != nil {
|
if err := fn(ctx, c); err != nil {
|
||||||
c.Options().Logger.Error(ctx, fmt.Sprintf("%s AfterInit error", c.String(), err), err)
|
c.Options().Logger.Error(ctx, c.String()+" AfterInit error", err)
|
||||||
if !c.Options().AllowFail {
|
if !c.Options().AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package errors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
er "errors"
|
er "errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -26,7 +27,7 @@ func TestMarshalJSON(t *testing.T) {
|
|||||||
func TestEmpty(t *testing.T) {
|
func TestEmpty(t *testing.T) {
|
||||||
msg := "test"
|
msg := "test"
|
||||||
var err *Error
|
var err *Error
|
||||||
err = FromError(fmt.Errorf(msg))
|
err = FromError(errors.New(msg))
|
||||||
if err.Detail != msg {
|
if err.Detail != msg {
|
||||||
t.Fatalf("invalid error %v", err)
|
t.Fatalf("invalid error %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,6 +80,13 @@ func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithAddFields add fields for the logger
|
||||||
|
func WithAddFields(fields ...interface{}) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Fields = append(o.Fields, fields...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithFields set default fields for the logger
|
// WithFields set default fields for the logger
|
||||||
func WithFields(fields ...interface{}) Option {
|
func WithFields(fields ...interface{}) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
|||||||
@@ -46,11 +46,11 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||||
return h.WithAttrs(attrs)
|
return h.h.WithAttrs(attrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) WithGroup(name string) slog.Handler {
|
func (h *wrapper) WithGroup(name string) slog.Handler {
|
||||||
return h.WithGroup(name)
|
return h.h.WithGroup(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||||
@@ -89,7 +89,6 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type slogLogger struct {
|
type slogLogger struct {
|
||||||
leveler *slog.LevelVar
|
|
||||||
handler *wrapper
|
handler *wrapper
|
||||||
opts logger.Options
|
opts logger.Options
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
|||||||
@@ -15,6 +15,34 @@ import (
|
|||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestWithAddFields(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
l := NewLogger(logger.WithLevel(logger.InfoLevel), logger.WithOutput(buf))
|
||||||
|
if err := l.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l.Info(ctx, "msg1")
|
||||||
|
|
||||||
|
if err := l.Init(logger.WithAddFields("key1", "val1")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
l.Info(ctx, "msg2")
|
||||||
|
|
||||||
|
if err := l.Init(logger.WithAddFields("key2", "val2")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
l.Info(ctx, "msg3")
|
||||||
|
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"key1"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"key2"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMultipleFieldsWithLevel(t *testing.T) {
|
func TestMultipleFieldsWithLevel(t *testing.T) {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
buf := bytes.NewBuffer(nil)
|
buf := bytes.NewBuffer(nil)
|
||||||
|
|||||||
@@ -36,14 +36,14 @@ var (
|
|||||||
circularShortBytes = []byte("<shown>")
|
circularShortBytes = []byte("<shown>")
|
||||||
invalidAngleBytes = []byte("<invalid>")
|
invalidAngleBytes = []byte("<invalid>")
|
||||||
filteredBytes = []byte("<filtered>")
|
filteredBytes = []byte("<filtered>")
|
||||||
openBracketBytes = []byte("[")
|
// openBracketBytes = []byte("[")
|
||||||
closeBracketBytes = []byte("]")
|
// closeBracketBytes = []byte("]")
|
||||||
percentBytes = []byte("%")
|
percentBytes = []byte("%")
|
||||||
precisionBytes = []byte(".")
|
precisionBytes = []byte(".")
|
||||||
openAngleBytes = []byte("<")
|
openAngleBytes = []byte("<")
|
||||||
closeAngleBytes = []byte(">")
|
closeAngleBytes = []byte(">")
|
||||||
openMapBytes = []byte("{")
|
openMapBytes = []byte("{")
|
||||||
closeMapBytes = []byte("}")
|
closeMapBytes = []byte("}")
|
||||||
)
|
)
|
||||||
|
|
||||||
type protoMessage interface {
|
type protoMessage interface {
|
||||||
|
|||||||
@@ -82,12 +82,12 @@ func TestTagged(t *testing.T) {
|
|||||||
func TestTaggedNested(t *testing.T) {
|
func TestTaggedNested(t *testing.T) {
|
||||||
type val struct {
|
type val struct {
|
||||||
key string `logger:"take"`
|
key string `logger:"take"`
|
||||||
val string `logger:"omit"`
|
// val string `logger:"omit"`
|
||||||
unk string
|
unk string
|
||||||
}
|
}
|
||||||
type str struct {
|
type str struct {
|
||||||
key string `logger:"omit"`
|
// key string `logger:"omit"`
|
||||||
val *val `logger:"take"`
|
val *val `logger:"take"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var iface interface{}
|
var iface interface{}
|
||||||
|
|||||||
@@ -55,10 +55,7 @@ func NewContext(ctx context.Context, md Metadata) context.Context {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
ctx = context.WithValue(ctx, mdKey{}, &rawMetadata{md})
|
return context.WithValue(ctx, mdKey{}, &rawMetadata{md})
|
||||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
|
|
||||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetOutgoingContext modify outgoing context with given metadata
|
// SetOutgoingContext modify outgoing context with given metadata
|
||||||
@@ -90,11 +87,7 @@ func NewIncomingContext(ctx context.Context, md Metadata) context.Context {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
|
return context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
|
||||||
if v, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); !ok || v == nil {
|
|
||||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
|
|
||||||
}
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutgoingContext creates a new context with outcoming metadata attached
|
// NewOutgoingContext creates a new context with outcoming metadata attached
|
||||||
@@ -102,11 +95,7 @@ func NewOutgoingContext(ctx context.Context, md Metadata) context.Context {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
|
return context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
|
||||||
if v, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); !ok || v == nil {
|
|
||||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
|
|
||||||
}
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendOutgoingContext apends new md to context
|
// AppendOutgoingContext apends new md to context
|
||||||
|
|||||||
@@ -5,6 +5,28 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMultipleUsage(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
md := New(0)
|
||||||
|
md.Set("key1_1", "val1_1", "key1_2", "val1_2", "key1_3", "val1_3")
|
||||||
|
ctx = NewIncomingContext(ctx, Copy(md))
|
||||||
|
ctx = NewOutgoingContext(ctx, Copy(md))
|
||||||
|
imd, _ := FromIncomingContext(ctx)
|
||||||
|
omd, _ := FromOutgoingContext(ctx)
|
||||||
|
_ = func(x context.Context) context.Context {
|
||||||
|
m, _ := FromIncomingContext(x)
|
||||||
|
m.Del("key1_2")
|
||||||
|
return ctx
|
||||||
|
}(ctx)
|
||||||
|
_ = func(x context.Context) context.Context {
|
||||||
|
m, _ := FromIncomingContext(x)
|
||||||
|
m.Del("key1_3")
|
||||||
|
return ctx
|
||||||
|
}(ctx)
|
||||||
|
t.Logf("imd %#+v", imd)
|
||||||
|
t.Logf("omd %#+v", omd)
|
||||||
|
}
|
||||||
|
|
||||||
func TestMetadataSetMultiple(t *testing.T) {
|
func TestMetadataSetMultiple(t *testing.T) {
|
||||||
md := New(4)
|
md := New(4)
|
||||||
md.Set("key1", "val1", "key2", "val2", "key3")
|
md.Set("key1", "val1", "key2", "val2", "key3")
|
||||||
@@ -58,6 +80,14 @@ func TestPassing(t *testing.T) {
|
|||||||
ctx = NewIncomingContext(ctx, md1)
|
ctx = NewIncomingContext(ctx, md1)
|
||||||
testCtx(ctx)
|
testCtx(ctx)
|
||||||
md, ok := FromOutgoingContext(ctx)
|
md, ok := FromOutgoingContext(ctx)
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("create outgoing context")
|
||||||
|
}
|
||||||
|
_ = md
|
||||||
|
|
||||||
|
ctx = NewOutgoingContext(ctx, New(1))
|
||||||
|
testCtx(ctx)
|
||||||
|
md, ok = FromOutgoingContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("missing metadata from outgoing context")
|
t.Fatalf("missing metadata from outgoing context")
|
||||||
}
|
}
|
||||||
|
|||||||
36
micro.go
36
micro.go
@@ -65,6 +65,8 @@ func As(b any, target any) bool {
|
|||||||
break
|
break
|
||||||
case targetType.Implements(routerType):
|
case targetType.Implements(routerType):
|
||||||
break
|
break
|
||||||
|
case targetType.Implements(tracerType):
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -76,19 +78,21 @@ func As(b any, target any) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
|
var (
|
||||||
var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
|
brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
|
||||||
var clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
|
loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
|
||||||
var serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
|
clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
|
||||||
var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
|
serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
|
||||||
var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
|
codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
|
||||||
var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
|
flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
|
||||||
var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
|
fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
|
||||||
var registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
|
meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
|
||||||
var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
|
registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
|
||||||
var routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
|
resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
|
||||||
var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
|
routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
|
||||||
var storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
|
selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
|
||||||
var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
|
storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
|
||||||
var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
|
syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
|
||||||
var serviceType = reflect.TypeOf((*Service)(nil)).Elem()
|
tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
|
||||||
|
serviceType = reflect.TypeOf((*Service)(nil)).Elem()
|
||||||
|
)
|
||||||
|
|||||||
@@ -66,6 +66,12 @@ type bro struct {
|
|||||||
|
|
||||||
func (p *bro) Name() string { return p.name }
|
func (p *bro) Name() string { return p.name }
|
||||||
|
|
||||||
|
func (p *bro) Live() bool { return true }
|
||||||
|
|
||||||
|
func (p *bro) Ready() bool { return true }
|
||||||
|
|
||||||
|
func (p *bro) Health() bool { return true }
|
||||||
|
|
||||||
func (p *bro) Init(opts ...broker.Option) error { return nil }
|
func (p *bro) Init(opts ...broker.Option) error { return nil }
|
||||||
|
|
||||||
// Options returns broker options
|
// Options returns broker options
|
||||||
|
|||||||
@@ -45,6 +45,18 @@ type (
|
|||||||
tunnelAddr struct{}
|
tunnelAddr struct{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (t *tunBroker) Live() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tunBroker) Ready() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tunBroker) Health() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (t *tunBroker) Init(opts ...broker.Option) error {
|
func (t *tunBroker) Init(opts ...broker.Option) error {
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&t.opts)
|
o(&t.opts)
|
||||||
|
|||||||
@@ -269,7 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, trc := range o.Tracers {
|
for _, trc := range o.Tracers {
|
||||||
for _, ot := range lopts.tracers {
|
for _, ot := range lopts.tracers {
|
||||||
if trc.Name() == ot || all {
|
if trc.Name() == ot || all {
|
||||||
@@ -294,8 +294,8 @@ type loggerOptions struct {
|
|||||||
brokers []string
|
brokers []string
|
||||||
registers []string
|
registers []string
|
||||||
stores []string
|
stores []string
|
||||||
meters []string
|
// meters []string
|
||||||
tracers []string
|
tracers []string
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
@@ -469,9 +469,7 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
|
|||||||
}
|
}
|
||||||
|
|
||||||
endpoints := make([]*register.Endpoint, len(s.Endpoints))
|
endpoints := make([]*register.Endpoint, len(s.Endpoints))
|
||||||
for i, e := range s.Endpoints {
|
copy(endpoints, s.Endpoints)
|
||||||
endpoints[i] = e
|
|
||||||
}
|
|
||||||
|
|
||||||
return &record{
|
return &record{
|
||||||
Name: s.Name,
|
Name: s.Name,
|
||||||
|
|||||||
@@ -290,27 +290,25 @@ func TestWatcher(t *testing.T) {
|
|||||||
|
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
m := NewRegister()
|
m := NewRegister()
|
||||||
m.Init()
|
_ = m.Init()
|
||||||
m.Connect(ctx)
|
_ = m.Connect(ctx)
|
||||||
wc, err := m.Watch(ctx)
|
wc, err := m.Watch(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cant watch: %v", err)
|
t.Fatalf("cant watch: %v", err)
|
||||||
}
|
}
|
||||||
defer wc.Stop()
|
defer wc.Stop()
|
||||||
|
|
||||||
|
cherr := make(chan error, 10)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
_, err := wc.Next()
|
||||||
_, err := wc.Next()
|
if err != nil {
|
||||||
if err != nil {
|
cherr <- fmt.Errorf("unexpected err %v", err)
|
||||||
t.Fatal("unexpected err", err)
|
|
||||||
}
|
|
||||||
// t.Logf("changes %#+v", ch.Service)
|
|
||||||
wc.Stop()
|
|
||||||
wg.Done()
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
// t.Logf("changes %#+v", ch.Service)
|
||||||
|
wc.Stop()
|
||||||
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := m.Register(ctx, testSrv); err != nil {
|
if err := m.Register(ctx, testSrv); err != nil {
|
||||||
|
|||||||
@@ -29,17 +29,32 @@ var (
|
|||||||
// and an abstraction over varying implementations
|
// and an abstraction over varying implementations
|
||||||
// {consul, etcd, zookeeper, ...}
|
// {consul, etcd, zookeeper, ...}
|
||||||
type Register interface {
|
type Register interface {
|
||||||
|
// Name returns register name
|
||||||
Name() string
|
Name() string
|
||||||
|
// Init initialize register
|
||||||
Init(...Option) error
|
Init(...Option) error
|
||||||
|
// Options returns options for register
|
||||||
Options() Options
|
Options() Options
|
||||||
|
// Connect initialize connect to register
|
||||||
Connect(context.Context) error
|
Connect(context.Context) error
|
||||||
|
// Disconnect initialize discconection from register
|
||||||
Disconnect(context.Context) error
|
Disconnect(context.Context) error
|
||||||
|
// Register service in registry
|
||||||
Register(context.Context, *Service, ...RegisterOption) error
|
Register(context.Context, *Service, ...RegisterOption) error
|
||||||
|
// Deregister service from registry
|
||||||
Deregister(context.Context, *Service, ...DeregisterOption) error
|
Deregister(context.Context, *Service, ...DeregisterOption) error
|
||||||
|
// LookupService in registry
|
||||||
LookupService(context.Context, string, ...LookupOption) ([]*Service, error)
|
LookupService(context.Context, string, ...LookupOption) ([]*Service, error)
|
||||||
|
// ListServices in registry
|
||||||
ListServices(context.Context, ...ListOption) ([]*Service, error)
|
ListServices(context.Context, ...ListOption) ([]*Service, error)
|
||||||
|
// Watch registry events
|
||||||
Watch(context.Context, ...WatchOption) (Watcher, error)
|
Watch(context.Context, ...WatchOption) (Watcher, error)
|
||||||
|
// String returns registry string representation
|
||||||
String() string
|
String() string
|
||||||
|
// Live returns register liveness
|
||||||
|
// Live() bool
|
||||||
|
// Ready returns register readiness
|
||||||
|
// Ready() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service holds service register info
|
// Service holds service register info
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ import (
|
|||||||
|
|
||||||
// Resolver is a DNS network resolve
|
// Resolver is a DNS network resolve
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
sync.RWMutex
|
|
||||||
goresolver *net.Resolver
|
goresolver *net.Resolver
|
||||||
Address string
|
Address string
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve tries to resolve endpoint address
|
// Resolve tries to resolve endpoint address
|
||||||
@@ -39,12 +39,12 @@ func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
|
|||||||
return []*resolver.Record{rec}, nil
|
return []*resolver.Record{rec}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
r.RLock()
|
r.mu.RLock()
|
||||||
goresolver := r.goresolver
|
goresolver := r.goresolver
|
||||||
r.RUnlock()
|
r.mu.RUnlock()
|
||||||
|
|
||||||
if goresolver == nil {
|
if goresolver == nil {
|
||||||
r.Lock()
|
r.mu.Lock()
|
||||||
r.goresolver = &net.Resolver{
|
r.goresolver = &net.Resolver{
|
||||||
Dial: func(ctx context.Context, _ string, _ string) (net.Conn, error) {
|
Dial: func(ctx context.Context, _ string, _ string) (net.Conn, error) {
|
||||||
d := net.Dialer{
|
d := net.Dialer{
|
||||||
@@ -53,7 +53,7 @@ func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
|
|||||||
return d.DialContext(ctx, "udp", r.Address)
|
return d.DialContext(ctx, "udp", r.Address)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
r.Unlock()
|
r.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs, err := goresolver.LookupIP(context.TODO(), "ip", host)
|
addrs, err := goresolver.LookupIP(context.TODO(), "ip", host)
|
||||||
|
|||||||
@@ -121,6 +121,18 @@ func (n *noopServer) newCodec(contentType string) (codec.Codec, error) {
|
|||||||
return nil, codec.ErrUnknownContentType
|
return nil, codec.ErrUnknownContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *noopServer) Live() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopServer) Ready() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopServer) Health() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (n *noopServer) Handle(handler Handler) error {
|
func (n *noopServer) Handle(handler Handler) error {
|
||||||
n.h = handler
|
n.h = handler
|
||||||
return nil
|
return nil
|
||||||
@@ -159,7 +171,6 @@ type rpcMessage struct {
|
|||||||
header metadata.Metadata
|
header metadata.Metadata
|
||||||
topic string
|
topic string
|
||||||
contentType string
|
contentType string
|
||||||
body []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcMessage) ContentType() string {
|
func (r *rpcMessage) ContentType() string {
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func TestNoopSub(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
|
_ = logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
|
||||||
s := server.NewServer(
|
s := server.NewServer(
|
||||||
server.Broker(b),
|
server.Broker(b),
|
||||||
server.Codec("application/octet-stream", codec.NewCodec()),
|
server.Codec("application/octet-stream", codec.NewCodec()),
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import (
|
|||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v3/register"
|
||||||
msync "go.unistack.org/micro/v3/sync"
|
msync "go.unistack.org/micro/v3/sync"
|
||||||
@@ -37,8 +36,6 @@ type Options struct {
|
|||||||
Logger logger.Logger
|
Logger logger.Logger
|
||||||
// Meter holds the meter
|
// Meter holds the meter
|
||||||
Meter meter.Meter
|
Meter meter.Meter
|
||||||
// Transport holds the transport
|
|
||||||
Transport transport.Transport
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// Router for requests
|
// Router for requests
|
||||||
@@ -100,7 +97,6 @@ func NewOptions(opts ...Option) Options {
|
|||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
Broker: broker.DefaultBroker,
|
Broker: broker.DefaultBroker,
|
||||||
Register: register.DefaultRegister,
|
Register: register.DefaultRegister,
|
||||||
Transport: transport.DefaultTransport,
|
|
||||||
Address: DefaultAddress,
|
Address: DefaultAddress,
|
||||||
Name: DefaultName,
|
Name: DefaultName,
|
||||||
Version: DefaultVersion,
|
Version: DefaultVersion,
|
||||||
@@ -209,13 +205,6 @@ func Tracer(t tracer.Tracer) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transport mechanism for communication e.g http, rabbitmq, etc
|
|
||||||
func Transport(t transport.Transport) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Transport = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Metadata associated with the server
|
// Metadata associated with the server
|
||||||
func Metadata(md metadata.Metadata) Option {
|
func Metadata(md metadata.Metadata) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@@ -249,14 +238,6 @@ func TLSConfig(t *tls.Config) Option {
|
|||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
// set the internal tls
|
// set the internal tls
|
||||||
o.TLSConfig = t
|
o.TLSConfig = t
|
||||||
|
|
||||||
// set the default transport if one is not
|
|
||||||
// already set. Required for Init call below.
|
|
||||||
|
|
||||||
// set the transport tls
|
|
||||||
_ = o.Transport.Init(
|
|
||||||
transport.TLSConfig(t),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -62,6 +62,12 @@ type Server interface {
|
|||||||
Stop() error
|
Stop() error
|
||||||
// Server implementation
|
// Server implementation
|
||||||
String() string
|
String() string
|
||||||
|
// Live returns server liveness
|
||||||
|
Live() bool
|
||||||
|
// Ready returns server readiness
|
||||||
|
Ready() bool
|
||||||
|
// Health returns server health
|
||||||
|
Health() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
84
service.go
84
service.go
@@ -1,9 +1,11 @@
|
|||||||
// Package micro is a pluggable framework for microservices
|
// Package micro is a pluggable framework for microservices
|
||||||
package micro // import "go.unistack.org/micro/v3"
|
package micro
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/KimMachineGun/automemlimit/memlimit"
|
"github.com/KimMachineGun/automemlimit/memlimit"
|
||||||
"go.uber.org/automaxprocs/maxprocs"
|
"go.uber.org/automaxprocs/maxprocs"
|
||||||
@@ -17,11 +19,12 @@ import (
|
|||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v3/server"
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v3/store"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
|
utildns "go.unistack.org/micro/v3/util/dns"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
maxprocs.Set()
|
_, _ = maxprocs.Set()
|
||||||
memlimit.SetGoMemLimitWithOpts(
|
_, _ = memlimit.SetGoMemLimitWithOpts(
|
||||||
memlimit.WithRatio(0.9),
|
memlimit.WithRatio(0.9),
|
||||||
memlimit.WithProvider(
|
memlimit.WithProvider(
|
||||||
memlimit.ApplyFallback(
|
memlimit.ApplyFallback(
|
||||||
@@ -30,6 +33,8 @@ func init() {
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
net.DefaultResolver = utildns.NewNetResolver(utildns.Timeout(1 * time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is an interface that wraps the lower level components.
|
// Service is an interface that wraps the lower level components.
|
||||||
@@ -72,8 +77,14 @@ type Service interface {
|
|||||||
Start() error
|
Start() error
|
||||||
// Stop the service
|
// Stop the service
|
||||||
Stop() error
|
Stop() error
|
||||||
// The service implementation
|
// String service representation
|
||||||
String() string
|
String() string
|
||||||
|
// Live returns service liveness
|
||||||
|
Live() bool
|
||||||
|
// Ready returns service readiness
|
||||||
|
Ready() bool
|
||||||
|
// Health returns service health
|
||||||
|
Health() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterHandler is syntactic sugar for registering a handler
|
// RegisterHandler is syntactic sugar for registering a handler
|
||||||
@@ -101,9 +112,7 @@ func (s *service) Name() string {
|
|||||||
return s.opts.Name
|
return s.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initialises options. Additionally it calls cmd.Init
|
// Init initialises options.
|
||||||
// which parses command line flags. cmd.Init is only called
|
|
||||||
// on first Init.
|
|
||||||
//
|
//
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (s *service) Init(opts ...Option) error {
|
func (s *service) Init(opts ...Option) error {
|
||||||
@@ -252,6 +261,63 @@ func (s *service) String() string {
|
|||||||
return s.opts.Name
|
return s.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *service) Live() bool {
|
||||||
|
for _, v := range s.opts.Brokers {
|
||||||
|
if !v.Live() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range s.opts.Servers {
|
||||||
|
if !v.Live() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range s.opts.Stores {
|
||||||
|
if !v.Live() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) Ready() bool {
|
||||||
|
for _, v := range s.opts.Brokers {
|
||||||
|
if !v.Ready() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range s.opts.Servers {
|
||||||
|
if !v.Ready() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range s.opts.Stores {
|
||||||
|
if !v.Ready() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) Health() bool {
|
||||||
|
for _, v := range s.opts.Brokers {
|
||||||
|
if !v.Health() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range s.opts.Servers {
|
||||||
|
if !v.Health() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, v := range s.opts.Stores {
|
||||||
|
if !v.Health() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (s *service) Start() error {
|
func (s *service) Start() error {
|
||||||
var err error
|
var err error
|
||||||
@@ -281,10 +347,6 @@ func (s *service) Start() error {
|
|||||||
config.Loggers[0].Info(s.opts.Context, fmt.Sprintf("starting [service] %s version %s", s.Options().Name, s.Options().Version))
|
config.Loggers[0].Info(s.opts.Context, fmt.Sprintf("starting [service] %s version %s", s.Options().Name, s.Options().Version))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.opts.Servers) == 0 {
|
|
||||||
return fmt.Errorf("cant start nil server")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, reg := range s.opts.Registers {
|
for _, reg := range s.opts.Registers {
|
||||||
if err = reg.Connect(s.opts.Context); err != nil {
|
if err = reg.Connect(s.opts.Context); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -149,6 +149,18 @@ func (m *memoryStore) Name() string {
|
|||||||
return m.opts.Name
|
return m.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) Live() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) Ready() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) Health() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||||
if m.opts.LazyConnect {
|
if m.opts.LazyConnect {
|
||||||
if err := m.connect(ctx); err != nil {
|
if err := m.connect(ctx); err != nil {
|
||||||
@@ -279,3 +291,16 @@ func (m *memoryStore) connect(ctx context.Context) error {
|
|||||||
m.isConnected.CompareAndSwap(0, 1)
|
m.isConnected.CompareAndSwap(0, 1)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) {
|
||||||
|
return &watcher{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type watcher struct{}
|
||||||
|
|
||||||
|
func (w *watcher) Next() (store.Event, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Stop() {
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,14 +2,18 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v3/options"
|
||||||
|
"go.unistack.org/micro/v3/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ Store = (*noopStore)(nil)
|
var _ Store = (*noopStore)(nil)
|
||||||
|
|
||||||
type noopStore struct {
|
type noopStore struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
watchers map[string]Watcher
|
||||||
funcRead FuncRead
|
funcRead FuncRead
|
||||||
funcWrite FuncWrite
|
funcWrite FuncWrite
|
||||||
funcExists FuncExists
|
funcExists FuncExists
|
||||||
@@ -19,6 +23,18 @@ type noopStore struct {
|
|||||||
isConnected atomic.Int32
|
isConnected atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Live() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Ready() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Health() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func NewStore(opts ...Option) *noopStore {
|
func NewStore(opts ...Option) *noopStore {
|
||||||
options := NewOptions(opts...)
|
options := NewOptions(opts...)
|
||||||
return &noopStore{opts: options}
|
return &noopStore{opts: options}
|
||||||
@@ -182,3 +198,41 @@ func (n *noopStore) connect(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type watcher struct {
|
||||||
|
exit chan bool
|
||||||
|
id string
|
||||||
|
ch chan Event
|
||||||
|
opts WatchOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
|
||||||
|
id, err := id.New()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wo, err := NewWatchOptions(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// construct the watcher
|
||||||
|
w := &watcher{
|
||||||
|
exit: make(chan bool),
|
||||||
|
ch: make(chan Event),
|
||||||
|
id: id,
|
||||||
|
opts: wo,
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
m.watchers[w.id] = w
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
return w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Next() (Event, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Stop() {
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,9 +4,11 @@ package store
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
ErrWatcherStopped = errors.New("watcher stopped")
|
||||||
// ErrNotConnected is returned when a store is not connected
|
// ErrNotConnected is returned when a store is not connected
|
||||||
ErrNotConnected = errors.New("not conected")
|
ErrNotConnected = errors.New("not conected")
|
||||||
// ErrNotFound is returned when a key doesn't exist
|
// ErrNotFound is returned when a key doesn't exist
|
||||||
@@ -43,6 +45,14 @@ type Store interface {
|
|||||||
Disconnect(ctx context.Context) error
|
Disconnect(ctx context.Context) error
|
||||||
// String returns the name of the implementation.
|
// String returns the name of the implementation.
|
||||||
String() string
|
String() string
|
||||||
|
// Watch returns events watcher
|
||||||
|
Watch(ctx context.Context, opts ...WatchOption) (Watcher, error)
|
||||||
|
// Live returns store liveness
|
||||||
|
Live() bool
|
||||||
|
// Ready returns store readiness
|
||||||
|
Ready() bool
|
||||||
|
// Health returns store health
|
||||||
|
Health() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@@ -57,3 +67,45 @@ type (
|
|||||||
FuncList func(ctx context.Context, opts ...ListOption) ([]string, error)
|
FuncList func(ctx context.Context, opts ...ListOption) ([]string, error)
|
||||||
HookList func(next FuncList) FuncList
|
HookList func(next FuncList) FuncList
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type EventType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventTypeUnknown = iota
|
||||||
|
EventTypeConnect
|
||||||
|
EventTypeDisconnect
|
||||||
|
EventTypeOpError
|
||||||
|
)
|
||||||
|
|
||||||
|
type Event interface {
|
||||||
|
Timestamp() time.Time
|
||||||
|
Error() error
|
||||||
|
Type() EventType
|
||||||
|
}
|
||||||
|
|
||||||
|
type Watcher interface {
|
||||||
|
// Next is a blocking call
|
||||||
|
Next() (Event, error)
|
||||||
|
// Stop stops the watcher
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatchOption func(*WatchOptions) error
|
||||||
|
|
||||||
|
type WatchOptions struct{}
|
||||||
|
|
||||||
|
func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) {
|
||||||
|
options := WatchOptions{}
|
||||||
|
var err error
|
||||||
|
for _, o := range opts {
|
||||||
|
if err = o(&options); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return options, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func Watch(context.Context) (Watcher, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -67,16 +67,18 @@ func (w *NamespaceStore) String() string {
|
|||||||
return w.s.String()
|
return w.s.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// type NamespaceWrapper struct{}
|
func (w *NamespaceStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
|
||||||
|
return w.s.Watch(ctx, opts...)
|
||||||
// func NewNamespaceWrapper() Wrapper {
|
}
|
||||||
// return &NamespaceWrapper{}
|
|
||||||
// }
|
func (w *NamespaceStore) Live() bool {
|
||||||
|
return w.s.Live()
|
||||||
/*
|
}
|
||||||
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
|
|
||||||
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
|
func (w *NamespaceStore) Ready() bool {
|
||||||
fn(ctx, level, msg, getArgs(args)...)
|
return w.s.Ready()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Health() bool {
|
||||||
|
return w.s.Health()
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|||||||
377
util/dns/cache.go
Normal file
377
util/dns/cache.go
Normal file
@@ -0,0 +1,377 @@
|
|||||||
|
package dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DialFunc is a [net.Resolver.Dial] function.
|
||||||
|
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
|
|
||||||
|
// NewNetResolver creates a caching [net.Resolver] that uses parent to resolve names.
|
||||||
|
func NewNetResolver(opts ...Option) *net.Resolver {
|
||||||
|
options := Options{Resolver: &net.Resolver{}}
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &net.Resolver{
|
||||||
|
PreferGo: true,
|
||||||
|
StrictErrors: options.Resolver.StrictErrors,
|
||||||
|
Dial: NewNetDialer(options.Resolver.Dial, append(opts, Resolver(options.Resolver))...),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewNetDialer adds caching to a [net.Resolver.Dial] function.
|
||||||
|
func NewNetDialer(parent DialFunc, opts ...Option) DialFunc {
|
||||||
|
cache := cache{dial: parent, opts: Options{}}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&cache.opts)
|
||||||
|
}
|
||||||
|
if cache.opts.MaxCacheEntries == 0 {
|
||||||
|
cache.opts.MaxCacheEntries = DefaultMaxCacheEntries
|
||||||
|
}
|
||||||
|
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
|
conn := &dnsConn{}
|
||||||
|
conn.roundTrip = cachingRoundTrip(&cache, network, address)
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const DefaultMaxCacheEntries = 300
|
||||||
|
|
||||||
|
// A Option customizes the resolver cache.
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
Resolver *net.Resolver
|
||||||
|
MaxCacheEntries int
|
||||||
|
MaxCacheTTL time.Duration
|
||||||
|
MinCacheTTL time.Duration
|
||||||
|
NegativeCache bool
|
||||||
|
PreferIPV4 bool
|
||||||
|
PreferIPV6 bool
|
||||||
|
Timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxCacheEntries sets the maximum number of entries to cache.
|
||||||
|
// If zero, [DefaultMaxCacheEntries] is used; negative means no limit.
|
||||||
|
func MaxCacheEntries(n int) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.MaxCacheEntries = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxCacheTTL sets the maximum time-to-live for entries in the cache.
|
||||||
|
func MaxCacheTTL(td time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.MaxCacheTTL = td
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MinCacheTTL sets the minimum time-to-live for entries in the cache.
|
||||||
|
func MinCacheTTL(td time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.MinCacheTTL = td
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NegativeCache sets whether to cache negative responses.
|
||||||
|
func NegativeCache(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.NegativeCache = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timeout sets upstream *net.Resolver timeout
|
||||||
|
func Timeout(td time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Timeout = td
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolver sets upstream *net.Resolver.
|
||||||
|
func Resolver(r *net.Resolver) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Resolver = r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreferIPV4 resolve ipv4 records.
|
||||||
|
func PreferIPV4(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.PreferIPV4 = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreferIPV6 resolve ipv4 records.
|
||||||
|
func PreferIPV6(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.PreferIPV6 = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type cache struct {
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
|
dial DialFunc
|
||||||
|
entries map[string]cacheEntry
|
||||||
|
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
type cacheEntry struct {
|
||||||
|
deadline time.Time
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) put(req string, res string) {
|
||||||
|
// ignore uncacheable/unparseable answers
|
||||||
|
if invalid(req, res) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore errors (if requested)
|
||||||
|
if nameError(res) && !c.opts.NegativeCache {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore uncacheable/unparseable answers
|
||||||
|
ttl := getTTL(res)
|
||||||
|
if ttl <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// adjust TTL
|
||||||
|
if ttl < c.opts.MinCacheTTL {
|
||||||
|
ttl = c.opts.MinCacheTTL
|
||||||
|
}
|
||||||
|
// maxTTL overrides minTTL
|
||||||
|
if ttl > c.opts.MaxCacheTTL && c.opts.MaxCacheTTL != 0 {
|
||||||
|
ttl = c.opts.MaxCacheTTL
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
if c.entries == nil {
|
||||||
|
c.entries = make(map[string]cacheEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// do some cache evition
|
||||||
|
var tested, evicted int
|
||||||
|
for k, e := range c.entries {
|
||||||
|
if time.Until(e.deadline) <= 0 {
|
||||||
|
// delete expired entry
|
||||||
|
delete(c.entries, k)
|
||||||
|
evicted++
|
||||||
|
}
|
||||||
|
tested++
|
||||||
|
|
||||||
|
if tested < 8 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries {
|
||||||
|
// delete at least one entry
|
||||||
|
delete(c.entries, k)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove message IDs
|
||||||
|
c.entries[req[2:]] = cacheEntry{
|
||||||
|
deadline: time.Now().Add(ttl),
|
||||||
|
value: res[2:],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) get(req string) (res string) {
|
||||||
|
// ignore invalid messages
|
||||||
|
if len(req) < 12 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if req[2] >= 0x7f {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
c.RLock()
|
||||||
|
defer c.RUnlock()
|
||||||
|
|
||||||
|
if c.entries == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove message ID
|
||||||
|
entry, ok := c.entries[req[2:]]
|
||||||
|
if ok && time.Until(entry.deadline) > 0 {
|
||||||
|
// prepend correct ID
|
||||||
|
return req[:2] + entry.value
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func invalid(req string, res string) bool {
|
||||||
|
if len(req) < 12 || len(res) < 12 { // header size
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if req[0] != res[0] || req[1] != res[1] { // IDs match
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if req[2] >= 0x7f || res[2] < 0x7f { // query, response
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if req[2]&0x7a != 0 || res[2]&0x7a != 0 { // standard query, not truncated
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if res[3]&0xf != 0 && res[3]&0xf != 3 { // no error, or name error
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func nameError(res string) bool {
|
||||||
|
return res[3]&0xf == 3
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTTL(msg string) time.Duration {
|
||||||
|
ttl := math.MaxInt32
|
||||||
|
|
||||||
|
qdcount := getUint16(msg[4:])
|
||||||
|
ancount := getUint16(msg[6:])
|
||||||
|
nscount := getUint16(msg[8:])
|
||||||
|
arcount := getUint16(msg[10:])
|
||||||
|
rdcount := ancount + nscount + arcount
|
||||||
|
|
||||||
|
msg = msg[12:] // skip header
|
||||||
|
|
||||||
|
// skip questions
|
||||||
|
for i := 0; i < qdcount; i++ {
|
||||||
|
name := getNameLen(msg)
|
||||||
|
if name < 0 || name+4 > len(msg) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
msg = msg[name+4:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse records
|
||||||
|
for i := 0; i < rdcount; i++ {
|
||||||
|
name := getNameLen(msg)
|
||||||
|
if name < 0 || name+10 > len(msg) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
rtyp := getUint16(msg[name+0:])
|
||||||
|
rttl := getUint32(msg[name+4:])
|
||||||
|
rlen := getUint16(msg[name+8:])
|
||||||
|
if name+10+rlen > len(msg) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
// skip EDNS OPT since it doesn't have a TTL
|
||||||
|
if rtyp != 41 && rttl < ttl {
|
||||||
|
ttl = rttl
|
||||||
|
}
|
||||||
|
msg = msg[name+10+rlen:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Duration(ttl) * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNameLen(msg string) int {
|
||||||
|
i := 0
|
||||||
|
for i < len(msg) {
|
||||||
|
if msg[i] == 0 {
|
||||||
|
// end of name
|
||||||
|
i += 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if msg[i] >= 0xc0 {
|
||||||
|
// compressed name
|
||||||
|
i += 2
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if msg[i] >= 0x40 {
|
||||||
|
// reserved
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
i += int(msg[i] + 1)
|
||||||
|
}
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUint16(s string) int {
|
||||||
|
return int(s[1]) | int(s[0])<<8
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUint32(s string) int {
|
||||||
|
return int(s[3]) | int(s[2])<<8 | int(s[1])<<16 | int(s[0])<<24
|
||||||
|
}
|
||||||
|
|
||||||
|
func cachingRoundTrip(cache *cache, network, address string) roundTripper {
|
||||||
|
return func(ctx context.Context, req string) (res string, err error) {
|
||||||
|
// check cache
|
||||||
|
if res := cache.get(req); res != "" {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case cache.opts.PreferIPV4 && cache.opts.PreferIPV6:
|
||||||
|
network = "udp"
|
||||||
|
case cache.opts.PreferIPV4:
|
||||||
|
network = "udp4"
|
||||||
|
case cache.opts.PreferIPV6:
|
||||||
|
network = "udp6"
|
||||||
|
default:
|
||||||
|
network = "udp"
|
||||||
|
}
|
||||||
|
|
||||||
|
if cache.opts.Timeout > 0 {
|
||||||
|
var cancel func()
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, cache.opts.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// dial connection
|
||||||
|
var conn net.Conn
|
||||||
|
if cache.dial != nil {
|
||||||
|
conn, err = cache.dial(ctx, network, address)
|
||||||
|
} else {
|
||||||
|
var d net.Dialer
|
||||||
|
conn, err = d.DialContext(ctx, network, address)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if t, ok := ctx.Deadline(); ok {
|
||||||
|
err = conn.SetDeadline(t)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send request
|
||||||
|
err = writeMessage(conn, req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// read response
|
||||||
|
res, err = readMessage(conn)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// cache response
|
||||||
|
cache.put(req, res)
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
16
util/dns/cache_test.go
Normal file
16
util/dns/cache_test.go
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
package dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCache(t *testing.T) {
|
||||||
|
net.DefaultResolver = NewNetResolver(PreferIPV4(true))
|
||||||
|
|
||||||
|
addrs, err := net.LookupHost("unistack.org")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Logf("addrs %v", addrs)
|
||||||
|
}
|
||||||
183
util/dns/conn.go
Normal file
183
util/dns/conn.go
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
package dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type dnsConn struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
ibuf bytes.Buffer
|
||||||
|
obuf bytes.Buffer
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
deadline time.Time
|
||||||
|
roundTrip roundTripper
|
||||||
|
}
|
||||||
|
|
||||||
|
type roundTripper func(ctx context.Context, req string) (res string, err error)
|
||||||
|
|
||||||
|
func (c *dnsConn) Read(b []byte) (n int, err error) {
|
||||||
|
imsg, n, err := c.drainBuffers(b)
|
||||||
|
if n != 0 || err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := c.childContext()
|
||||||
|
omsg, err := c.roundTrip(ctx, imsg)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.fillBuffer(b, omsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) Write(b []byte) (n int, err error) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
return c.ibuf.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) Close() error {
|
||||||
|
c.Lock()
|
||||||
|
cancel := c.cancel
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) LocalAddr() net.Addr {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) RemoteAddr() net.Addr {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) SetDeadline(t time.Time) error {
|
||||||
|
var err error
|
||||||
|
if err = c.SetReadDeadline(t); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = c.SetWriteDeadline(t); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) SetReadDeadline(t time.Time) error {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
c.deadline = t
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
// writes do not timeout
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) drainBuffers(b []byte) (string, int, error) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
// drain the output buffer
|
||||||
|
if c.obuf.Len() > 0 {
|
||||||
|
n, err := c.obuf.Read(b)
|
||||||
|
return "", n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, get the next message from the input buffer
|
||||||
|
sz := c.ibuf.Next(2)
|
||||||
|
if len(sz) < 2 {
|
||||||
|
return "", 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
|
||||||
|
size := int64(sz[0])<<8 | int64(sz[1])
|
||||||
|
|
||||||
|
var str strings.Builder
|
||||||
|
_, err := io.CopyN(&str, &c.ibuf, size)
|
||||||
|
if err == io.EOF {
|
||||||
|
return "", 0, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
return str.String(), 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) fillBuffer(b []byte, str string) (int, error) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
c.obuf.WriteByte(byte(len(str) >> 8))
|
||||||
|
c.obuf.WriteByte(byte(len(str)))
|
||||||
|
c.obuf.WriteString(str)
|
||||||
|
return c.obuf.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dnsConn) childContext() (context.Context, context.CancelFunc) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
if c.ctx == nil {
|
||||||
|
c.ctx, c.cancel = context.WithCancel(context.Background())
|
||||||
|
}
|
||||||
|
return context.WithDeadline(c.ctx, c.deadline)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeMessage(conn net.Conn, msg string) error {
|
||||||
|
var buf []byte
|
||||||
|
if _, ok := conn.(net.PacketConn); ok {
|
||||||
|
buf = []byte(msg)
|
||||||
|
} else {
|
||||||
|
buf = make([]byte, len(msg)+2)
|
||||||
|
buf[0] = byte(len(msg) >> 8)
|
||||||
|
buf[1] = byte(len(msg))
|
||||||
|
copy(buf[2:], msg)
|
||||||
|
}
|
||||||
|
// SHOULD do a single write on TCP (RFC 7766, section 8).
|
||||||
|
// MUST do a single write on UDP.
|
||||||
|
_, err := conn.Write(buf)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func readMessage(c net.Conn) (string, error) {
|
||||||
|
if _, ok := c.(net.PacketConn); ok {
|
||||||
|
// RFC 1035 specifies 512 as the maximum message size for DNS over UDP.
|
||||||
|
// RFC 6891 OTOH suggests 4096 as the maximum payload size for EDNS.
|
||||||
|
b := make([]byte, 4096)
|
||||||
|
n, err := c.Read(b)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(b[:n]), nil
|
||||||
|
} else {
|
||||||
|
var sz [2]byte
|
||||||
|
_, err := io.ReadFull(c, sz[:])
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
size := int64(sz[0])<<8 | int64(sz[1])
|
||||||
|
|
||||||
|
var str strings.Builder
|
||||||
|
_, err = io.CopyN(&str, c, size)
|
||||||
|
if err == io.EOF {
|
||||||
|
return "", io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return str.String(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,40 +0,0 @@
|
|||||||
// Package io is for io management
|
|
||||||
package io
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
type rwc struct {
|
|
||||||
socket transport.Socket
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rwc) Read(p []byte) (n int, err error) {
|
|
||||||
m := new(transport.Message)
|
|
||||||
if err := r.socket.Recv(m); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
copy(p, m.Body)
|
|
||||||
return len(m.Body), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rwc) Write(p []byte) (n int, err error) {
|
|
||||||
err = r.socket.Send(&transport.Message{
|
|
||||||
Body: p,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rwc) Close() error {
|
|
||||||
return r.socket.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRWC returns a new ReadWriteCloser
|
|
||||||
func NewRWC(sock transport.Socket) io.ReadWriteCloser {
|
|
||||||
return &rwc{sock}
|
|
||||||
}
|
|
||||||
@@ -16,7 +16,6 @@ type Ticker struct {
|
|||||||
C chan time.Time
|
C chan time.Time
|
||||||
min int64
|
min int64
|
||||||
max int64
|
max int64
|
||||||
exp int64
|
|
||||||
exit bool
|
exit bool
|
||||||
rng rand.Rand
|
rng rand.Rand
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,118 +0,0 @@
|
|||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
"go.unistack.org/micro/v3/util/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
type pool struct {
|
|
||||||
tr transport.Transport
|
|
||||||
conns map[string][]*poolConn
|
|
||||||
size int
|
|
||||||
ttl time.Duration
|
|
||||||
sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
type poolConn struct {
|
|
||||||
created time.Time
|
|
||||||
transport.Client
|
|
||||||
id string
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPool(options Options) *pool {
|
|
||||||
return &pool{
|
|
||||||
size: options.Size,
|
|
||||||
tr: options.Transport,
|
|
||||||
ttl: options.TTL,
|
|
||||||
conns: make(map[string][]*poolConn),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) Close() error {
|
|
||||||
p.Lock()
|
|
||||||
for k, c := range p.conns {
|
|
||||||
for _, conn := range c {
|
|
||||||
conn.Client.Close()
|
|
||||||
}
|
|
||||||
delete(p.conns, k)
|
|
||||||
}
|
|
||||||
p.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NoOp the Close since we manage it
|
|
||||||
func (p *poolConn) Close() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *poolConn) ID() string {
|
|
||||||
return p.id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *poolConn) Created() time.Time {
|
|
||||||
return p.created
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) Get(ctx context.Context, addr string, opts ...transport.DialOption) (Conn, error) {
|
|
||||||
p.Lock()
|
|
||||||
conns := p.conns[addr]
|
|
||||||
|
|
||||||
// while we have conns check age and then return one
|
|
||||||
// otherwise we'll create a new conn
|
|
||||||
for len(conns) > 0 {
|
|
||||||
conn := conns[len(conns)-1]
|
|
||||||
conns = conns[:len(conns)-1]
|
|
||||||
p.conns[addr] = conns
|
|
||||||
|
|
||||||
// if conn is old kill it and move on
|
|
||||||
if d := time.Since(conn.Created()); d > p.ttl {
|
|
||||||
conn.Client.Close()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// we got a good conn, lets unlock and return it
|
|
||||||
p.Unlock()
|
|
||||||
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
p.Unlock()
|
|
||||||
|
|
||||||
// create new conn
|
|
||||||
c, err := p.tr.Dial(ctx, addr, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
id, err := id.New()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &poolConn{
|
|
||||||
Client: c,
|
|
||||||
id: id,
|
|
||||||
created: time.Now(),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) Release(conn Conn, err error) error {
|
|
||||||
// don't store the conn if it has errored
|
|
||||||
if err != nil {
|
|
||||||
return conn.(*poolConn).Client.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise put it back for reuse
|
|
||||||
p.Lock()
|
|
||||||
conns := p.conns[conn.Remote()]
|
|
||||||
if len(conns) >= p.size {
|
|
||||||
p.Unlock()
|
|
||||||
return conn.(*poolConn).Client.Close()
|
|
||||||
}
|
|
||||||
p.conns[conn.Remote()] = append(conns, conn.(*poolConn))
|
|
||||||
p.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,92 +0,0 @@
|
|||||||
//go:build ignore
|
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
"go.unistack.org/micro/v3/network/transport/memory"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testPool(t *testing.T, size int, ttl time.Duration) {
|
|
||||||
// mock transport
|
|
||||||
tr := memory.NewTransport()
|
|
||||||
|
|
||||||
options := Options{
|
|
||||||
TTL: ttl,
|
|
||||||
Size: size,
|
|
||||||
Transport: tr,
|
|
||||||
}
|
|
||||||
// zero pool
|
|
||||||
p := newPool(options)
|
|
||||||
|
|
||||||
// listen
|
|
||||||
l, err := tr.Listen(":0")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer l.Close()
|
|
||||||
|
|
||||||
// accept loop
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
if err := l.Accept(func(s transport.Socket) {
|
|
||||||
for {
|
|
||||||
var msg transport.Message
|
|
||||||
if err := s.Recv(&msg); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := s.Send(&msg); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
// get a conn
|
|
||||||
c, err := p.Get(l.Addr())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &transport.Message{
|
|
||||||
Body: []byte(`hello world`),
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.Send(msg); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rcv transport.Message
|
|
||||||
|
|
||||||
if err := c.Recv(&rcv); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(rcv.Body) != string(msg.Body) {
|
|
||||||
t.Fatalf("got %v, expected %v", rcv.Body, msg.Body)
|
|
||||||
}
|
|
||||||
|
|
||||||
// release the conn
|
|
||||||
p.Release(c, nil)
|
|
||||||
|
|
||||||
p.Lock()
|
|
||||||
if i := len(p.conns[l.Addr()]); i > size {
|
|
||||||
p.Unlock()
|
|
||||||
t.Fatalf("pool size %d is greater than expected %d", i, size)
|
|
||||||
}
|
|
||||||
p.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClientPool(t *testing.T) {
|
|
||||||
testPool(t, 0, time.Minute)
|
|
||||||
testPool(t, 2, time.Minute)
|
|
||||||
}
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Options struct
|
|
||||||
type Options struct {
|
|
||||||
Transport transport.Transport
|
|
||||||
TTL time.Duration
|
|
||||||
Size int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option func signature
|
|
||||||
type Option func(*Options)
|
|
||||||
|
|
||||||
// Size sets the size
|
|
||||||
func Size(i int) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Size = i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transport sets the transport
|
|
||||||
func Transport(t transport.Transport) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Transport = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TTL specifies ttl
|
|
||||||
func TTL(t time.Duration) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.TTL = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
// Package pool is a connection pool
|
|
||||||
package pool
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Pool is an interface for connection pooling
|
|
||||||
type Pool interface {
|
|
||||||
// Close the pool
|
|
||||||
Close() error
|
|
||||||
// Get a connection
|
|
||||||
Get(ctx context.Context, addr string, opts ...transport.DialOption) (Conn, error)
|
|
||||||
// Release the connection
|
|
||||||
Release(c Conn, status error) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Conn conn pool interface
|
|
||||||
type Conn interface {
|
|
||||||
// unique id of connection
|
|
||||||
ID() string
|
|
||||||
// time it was created
|
|
||||||
Created() time.Time
|
|
||||||
// embedded connection
|
|
||||||
transport.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPool creates new connection pool
|
|
||||||
func NewPool(opts ...Option) Pool {
|
|
||||||
options := Options{}
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
return newPool(options)
|
|
||||||
}
|
|
||||||
@@ -91,7 +91,7 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if mapper, ok := dst.(map[string]interface{}); ok {
|
if mapper, ok := dst.(map[string]interface{}); ok {
|
||||||
dst = mergeMap(mapper, mp, 0)
|
mergeMap(mapper, mp, 0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,38 @@
|
|||||||
package reflect
|
package reflect
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMergeMapStringInterface(t *testing.T) {
|
||||||
|
var dst interface{} //nolint:gosimple
|
||||||
|
dst = map[string]interface{}{
|
||||||
|
"xx": 11,
|
||||||
|
}
|
||||||
|
|
||||||
|
src := map[string]interface{}{
|
||||||
|
"zz": "aa",
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Merge(dst, src); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mp, ok := dst.(map[string]interface{})
|
||||||
|
if !ok || mp == nil {
|
||||||
|
t.Fatalf("xxx %#+v\n", dst)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fmt.Sprintf("%v", mp["xx"]) != "11" {
|
||||||
|
t.Fatalf("xxx zzzz %#+v", mp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fmt.Sprintf("%v", mp["zz"]) != "aa" {
|
||||||
|
t.Fatalf("xxx zzzz %#+v", mp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMergeMap(t *testing.T) {
|
func TestMergeMap(t *testing.T) {
|
||||||
src := map[string]interface{}{
|
src := map[string]interface{}{
|
||||||
"skey1": "sval1",
|
"skey1": "sval1",
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ type DigitalOceanMetadata struct {
|
|||||||
func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
case "/metadata/v1.json":
|
case "/metadata/v1.json":
|
||||||
json.NewEncoder(w).Encode(stfs.Metadata.V1)
|
_ = json.NewEncoder(w).Encode(stfs.Metadata.V1)
|
||||||
default:
|
default:
|
||||||
fs := FileServer(stfs, "json", time.Now())
|
fs := FileServer(stfs, "json", time.Now())
|
||||||
idx := strings.Index(r.URL.Path[1:], "/")
|
idx := strings.Index(r.URL.Path[1:], "/")
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ type EC2Metadata struct {
|
|||||||
InstanceType string `json:"instance-type"`
|
InstanceType string `json:"instance-type"`
|
||||||
LocalHostname string `json:"local-hostname"`
|
LocalHostname string `json:"local-hostname"`
|
||||||
LocalIPv4 string `json:"local-ipv4"`
|
LocalIPv4 string `json:"local-ipv4"`
|
||||||
kernelID int `json:"kernel-id"`
|
KernelID int `json:"kernel-id"`
|
||||||
Placement string `json:"placement"`
|
Placement string `json:"placement"`
|
||||||
AvailabilityZone string `json:"availability-zone"`
|
AvailabilityZone string `json:"availability-zone"`
|
||||||
ProductCodes string `json:"product-codes"`
|
ProductCodes string `json:"product-codes"`
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func (fs *fs) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
f, err := fs.Open(r.URL.Path)
|
f, err := fs.Open(r.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
w.Write([]byte(err.Error()))
|
_, _ = w.Write([]byte(err.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
@@ -67,9 +67,9 @@ func (fi *fileInfo) Name() string {
|
|||||||
|
|
||||||
func (fi *fileInfo) Mode() os.FileMode {
|
func (fi *fileInfo) Mode() os.FileMode {
|
||||||
if strings.HasSuffix(fi.name, "/") {
|
if strings.HasSuffix(fi.name, "/") {
|
||||||
return os.FileMode(0755) | os.ModeDir
|
return os.FileMode(0o755) | os.ModeDir
|
||||||
}
|
}
|
||||||
return os.FileMode(0644)
|
return os.FileMode(0o644)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi *fileInfo) IsDir() bool {
|
func (fi *fileInfo) IsDir() bool {
|
||||||
@@ -112,15 +112,14 @@ func (f *file) Readdir(count int) ([]os.FileInfo, error) {
|
|||||||
func (f *file) Seek(offset int64, whence int) (int64, error) {
|
func (f *file) Seek(offset int64, whence int) (int64, error) {
|
||||||
// log.Printf("seek %d %d %s\n", offset, whence, f.name)
|
// log.Printf("seek %d %d %s\n", offset, whence, f.name)
|
||||||
switch whence {
|
switch whence {
|
||||||
case os.SEEK_SET:
|
case io.SeekStart:
|
||||||
f.offset = offset
|
f.offset = offset
|
||||||
case os.SEEK_CUR:
|
case io.SeekCurrent:
|
||||||
f.offset += offset
|
f.offset += offset
|
||||||
case os.SEEK_END:
|
case io.SeekEnd:
|
||||||
f.offset = int64(len(f.data)) + offset
|
f.offset = int64(len(f.data)) + offset
|
||||||
}
|
}
|
||||||
return f.offset, nil
|
return f.offset, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *file) Stat() (os.FileInfo, error) {
|
func (f *file) Stat() (os.FileInfo, error) {
|
||||||
@@ -222,6 +221,7 @@ func getValue(name string, iface interface{}, tag string) ([]byte, error) {
|
|||||||
return nil, fmt.Errorf("failed to find %s in interface %T", name, iface)
|
return nil, fmt.Errorf("failed to find %s in interface %T", name, iface)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
func hasValidType(obj interface{}, types []reflect.Kind) bool {
|
func hasValidType(obj interface{}, types []reflect.Kind) bool {
|
||||||
for _, t := range types {
|
for _, t := range types {
|
||||||
if reflect.TypeOf(obj).Kind() == t {
|
if reflect.TypeOf(obj).Kind() == t {
|
||||||
@@ -231,6 +231,7 @@ func hasValidType(obj interface{}, types []reflect.Kind) bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
func reflectValue(obj interface{}) reflect.Value {
|
func reflectValue(obj interface{}) reflect.Value {
|
||||||
var val reflect.Value
|
var val reflect.Value
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package structfs
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -61,7 +61,7 @@ var doOrig = []byte(`{
|
|||||||
}
|
}
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func server(t *testing.T) {
|
func server(t *testing.T, ch chan error) {
|
||||||
stfs := DigitalOceanMetadata{}
|
stfs := DigitalOceanMetadata{}
|
||||||
err := json.Unmarshal(doOrig, &stfs.Metadata.V1)
|
err := json.Unmarshal(doOrig, &stfs.Metadata.V1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -71,7 +71,7 @@ func server(t *testing.T) {
|
|||||||
http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now()))
|
http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now()))
|
||||||
http.Handle("/metadata/v1.json", &stfs)
|
http.Handle("/metadata/v1.json", &stfs)
|
||||||
go func() {
|
go func() {
|
||||||
t.Fatal(http.ListenAndServe("127.0.0.1:8080", nil))
|
ch <- http.ListenAndServe("127.0.0.1:8080", nil)
|
||||||
}()
|
}()
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
@@ -82,13 +82,14 @@ func get(path string) ([]byte, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
return ioutil.ReadAll(res.Body)
|
return io.ReadAll(res.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAll(t *testing.T) {
|
func TestAll(t *testing.T) {
|
||||||
server(t)
|
ch := make(chan error)
|
||||||
|
server(t, ch)
|
||||||
|
|
||||||
var tests = []struct {
|
tests := []struct {
|
||||||
in string
|
in string
|
||||||
out string
|
out string
|
||||||
}{
|
}{
|
||||||
@@ -100,34 +101,44 @@ func TestAll(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
buf, err := get(tt.in)
|
select {
|
||||||
|
case err := <-ch:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
buf, err := get(tt.in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if string(buf) != tt.out {
|
||||||
|
t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-ch:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
doTest, err := get("http://127.0.0.1:8080/metadata/v1.json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if string(buf) != tt.out {
|
|
||||||
t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out)
|
oSt := DigitalOceanMetadata{}
|
||||||
|
err = json.Unmarshal(doOrig, &oSt.Metadata.V1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nSt := DigitalOceanMetadata{}
|
||||||
|
|
||||||
|
err = json.Unmarshal(doTest, &nSt.Metadata.V1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(oSt, nSt) {
|
||||||
|
t.Fatalf("%v not match %v", oSt, nSt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doTest, err := get("http://127.0.0.1:8080/metadata/v1.json")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
oSt := DigitalOceanMetadata{}
|
|
||||||
err = json.Unmarshal(doOrig, &oSt.Metadata.V1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
nSt := DigitalOceanMetadata{}
|
|
||||||
|
|
||||||
err = json.Unmarshal(doTest, &nSt.Metadata.V1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(oSt, nSt) {
|
|
||||||
t.Fatalf("%v not match %v", oSt, nSt)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ type Duration int64
|
|||||||
|
|
||||||
func ParseDuration(s string) (time.Duration, error) {
|
func ParseDuration(s string) (time.Duration, error) {
|
||||||
if s == "" {
|
if s == "" {
|
||||||
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
return 0, errors.New(`time: invalid duration "` + s + `"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
var p int
|
var p int
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ func TestMarshalYAML(t *testing.T) {
|
|||||||
|
|
||||||
func TestUnmarshalYAML(t *testing.T) {
|
func TestUnmarshalYAML(t *testing.T) {
|
||||||
type str struct {
|
type str struct {
|
||||||
TTL Duration `yaml:"ttl"`
|
TTL *Duration `yaml:"ttl"`
|
||||||
}
|
}
|
||||||
v := &str{}
|
v := &str{}
|
||||||
var err error
|
var err error
|
||||||
@@ -31,14 +31,14 @@ func TestUnmarshalYAML(t *testing.T) {
|
|||||||
err = yaml.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
err = yaml.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if v.TTL != 10000000 {
|
} else if *(v.TTL) != 10000000 {
|
||||||
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if v.TTL != 31622400000000000 {
|
} else if *(v.TTL) != 31622400000000000 {
|
||||||
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
|
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,19 +39,16 @@ func newStatsMeter() {
|
|||||||
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for range ticker.C {
|
||||||
select {
|
poolsMu.Lock()
|
||||||
case <-ticker.C:
|
for _, st := range pools {
|
||||||
poolsMu.Lock()
|
stats := st.Stats()
|
||||||
for _, st := range pools {
|
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
||||||
stats := st.Stats()
|
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
||||||
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
||||||
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
||||||
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
|
||||||
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
|
||||||
}
|
|
||||||
poolsMu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
poolsMu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user