Compare commits

..

8 Commits

Author SHA1 Message Date
614b740c56 split files
Some checks failed
lint / lint (pull_request) Successful in 2m27s
test / test (pull_request) Failing after 17m13s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:46:47 +03:00
bc011a2e7f split files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:44:29 +03:00
f92e18897a implement driver
Some checks failed
lint / lint (pull_request) Successful in 2m43s
test / test (pull_request) Successful in 4m53s
coverage / build (pull_request) Failing after 17m18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:27:42 +03:00
022326ddc4 move
Some checks failed
lint / lint (pull_request) Successful in 3m27s
test / test (pull_request) Successful in 4m49s
coverage / build (pull_request) Failing after 17m28s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:31:11 +03:00
e053eeac74 add default node state criterion
Some checks failed
lint / lint (pull_request) Successful in 2m36s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:26:16 +03:00
6c6916a050 initial hasql support
Some checks failed
lint / lint (pull_request) Successful in 4m23s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 17:16:20 +03:00
ea84ac094f Merge branch 'v4' into hasql
Some checks failed
test / test (pull_request) Failing after 17m58s
coverage / build (pull_request) Failing after 18m40s
lint / lint (pull_request) Failing after 1m41s
2025-09-18 14:35:10 +03:00
2886a7fe8a initial hasql support
Some checks failed
test / test (pull_request) Failing after 19m47s
lint / lint (pull_request) Failing after 19m59s
coverage / build (pull_request) Failing after 20m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 14:34:48 +03:00
18 changed files with 220 additions and 416 deletions

View File

@@ -25,7 +25,7 @@ jobs:
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1) dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash" echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash" echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" -a "$src_hash" != "" -a "$dst_hash" != "" ]; then if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT echo "sync_needed=true" >> $GITHUB_OUTPUT
else else
echo "sync_needed=false" >> $GITHUB_OUTPUT echo "sync_needed=false" >> $GITHUB_OUTPUT

View File

@@ -1,5 +1,5 @@
# Micro # Micro
![Coverage](https://img.shields.io/badge/Coverage-33.6%25-yellow) ![Coverage](https://img.shields.io/badge/Coverage-33.8%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush) [![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish. // NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
// Live returns broker liveness // Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type ( type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error) FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe HookSubscribe func(next FuncSubscribe) FuncSubscribe
) )
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte Body() []byte
// Unmarshal try to decode message body to dst. // Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal. // This is helper method that uses codec.Unmarshal.
Unmarshal(dst any, opts ...codec.Option) error Unmarshal(dst interface{}, opts ...codec.Option) error
// Ack acknowledge message if supported. // Ack acknowledge message if supported.
Ack() error Ack() error
} }

View File

@@ -18,6 +18,7 @@ import (
type Options struct { type Options struct {
// Name holds the broker name // Name holds the broker name
Name string Name string
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
@@ -30,20 +31,23 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// Wait waits for a collection of goroutines to finish // Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
// Hooks can be run before broker Publishing and message processing in Subscribe // Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
Hooks options.Hooks Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests // GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message // ContentType will be used if no content-type set when creating message
ContentType string ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler any
} }
// NewOptions create new Options // NewOptions create new Options
@@ -76,12 +80,6 @@ func Context(ctx context.Context) Option {
} }
} }
func GracefulTimeout(t time.Duration) Option {
return func(o *Options) {
o.GracefulTimeout = t
}
}
// ContentType used by default if not specified // ContentType used by default if not specified
func ContentType(ct string) Option { func ContentType(ct string) Option {
return func(o *Options) { return func(o *Options) {
@@ -89,13 +87,6 @@ func ContentType(ct string) Option {
} }
} }
// ErrorHandler handles errors in broker
func ErrorHandler(h any) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct // MessageOptions struct
type MessageOptions struct { type MessageOptions struct {
// ContentType for message body // ContentType for message body

View File

@@ -15,6 +15,11 @@ import (
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
) )
// DefaultCodecs will be used to encode/decode data
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type noopClient struct { type noopClient struct {
funcCall FuncCall funcCall FuncCall
funcStream FuncStream funcStream FuncStream

View File

@@ -161,7 +161,7 @@ func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Context: context.Background(), Context: context.Background(),
ContentType: DefaultContentType, ContentType: DefaultContentType,
Codecs: make(map[string]codec.Codec), Codecs: DefaultCodecs,
CallOptions: CallOptions{ CallOptions: CallOptions{
Context: context.Background(), Context: context.Background(),
Backoff: DefaultBackoff, Backoff: DefaultBackoff,

30
go.mod
View File

@@ -1,33 +1,35 @@
module go.unistack.org/micro/v4 module go.unistack.org/micro/v4
go 1.25 go 1.24
require ( require (
dario.cat/mergo v1.0.2 dario.cat/mergo v1.0.1
github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/KimMachineGun/automemlimit v0.7.5 github.com/KimMachineGun/automemlimit v0.7.0
github.com/goccy/go-yaml v1.18.0 github.com/goccy/go-yaml v1.17.1
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/matoous/go-nanoid v1.5.1 github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/spf13/cast v1.10.0 github.com/spf13/cast v1.7.1
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.10.0
go.uber.org/atomic v1.11.0 go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v4 v4.1.0 go.unistack.org/micro-proto/v4 v4.1.0
golang.org/x/sync v0.17.0 golang.org/x/sync v0.10.0
golang.yandex/hasql/v2 v2.1.0 golang.yandex/hasql/v2 v2.1.0
google.golang.org/grpc v1.76.0 google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.10 google.golang.org/protobuf v1.36.3
) )
require ( require (
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect
golang.org/x/sys v0.37.0 // indirect golang.org/x/net v0.34.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

68
go.sum
View File

@@ -1,19 +1,19 @@
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk= github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlDI/pBWK49u88=
github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
@@ -30,36 +30,40 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k= golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k=
golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA= golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -3,7 +3,6 @@ package sql
import ( import (
"context" "context"
"database/sql" "database/sql"
"sync"
"time" "time"
) )
@@ -12,84 +11,31 @@ type Statser interface {
} }
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
if db == nil {
return
}
options := NewOptions(opts...) options := NewOptions(opts...)
var ( go func() {
statsMu sync.Mutex ticker := time.NewTicker(options.MeterStatsInterval)
lastUpdated time.Time defer ticker.Stop()
maxOpenConnections, openConnections, inUse, idle, waitCount float64
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
waitDuration float64
)
updateFn := func() { for {
statsMu.Lock() select {
defer statsMu.Unlock() case <-ctx.Done():
return
if time.Since(lastUpdated) < options.MeterStatsInterval { case <-ticker.C:
return if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
} }
}()
stats := db.Stats()
maxOpenConnections = float64(stats.MaxOpenConnections)
openConnections = float64(stats.OpenConnections)
inUse = float64(stats.InUse)
idle = float64(stats.Idle)
waitCount = float64(stats.WaitCount)
maxIdleClosed = float64(stats.MaxIdleClosed)
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
waitDuration = float64(stats.WaitDuration.Seconds())
lastUpdated = time.Now()
}
options.Meter.Gauge(MaxOpenConnections, func() float64 {
updateFn()
return maxOpenConnections
})
options.Meter.Gauge(OpenConnections, func() float64 {
updateFn()
return openConnections
})
options.Meter.Gauge(InuseConnections, func() float64 {
updateFn()
return inUse
})
options.Meter.Gauge(IdleConnections, func() float64 {
updateFn()
return idle
})
options.Meter.Gauge(WaitConnections, func() float64 {
updateFn()
return waitCount
})
options.Meter.Gauge(BlockedSeconds, func() float64 {
updateFn()
return waitDuration
})
options.Meter.Gauge(MaxIdleClosed, func() float64 {
updateFn()
return maxIdleClosed
})
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
updateFn()
return maxIdleTimeClosed
})
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
updateFn()
return maxLifetimeClosed
})
} }

View File

@@ -8,7 +8,6 @@ import (
"slices" "slices"
"time" "time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
) )
@@ -43,10 +42,8 @@ type Options struct {
Fields []interface{} Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context // ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frames to skip // callerSkipCount number of frmaes to skip
CallerSkipCount int CallerSkipCount int
// AddCaller enables to get caller
AddCaller bool
// The logging level the logger should log // The logging level the logger should log
Level Level Level Level
// AddSource enabled writing source file and position in log // AddSource enabled writing source file and position in log
@@ -55,12 +52,6 @@ type Options struct {
AddStacktrace bool AddStacktrace bool
// DedupKeys deduplicate keys in log output // DedupKeys deduplicate keys in log output
DedupKeys bool DedupKeys bool
// FatalFinalizers runs in order in [logger.Fatal] method
FatalFinalizers []func(context.Context)
}
var DefaultFatalFinalizer = func(ctx context.Context) {
os.Exit(1)
} }
// NewOptions creates new options struct // NewOptions creates new options struct
@@ -74,7 +65,6 @@ func NewOptions(opts ...Option) Options {
AddSource: true, AddSource: true,
TimeFunc: time.Now, TimeFunc: time.Now,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
} }
WithMicroKeys()(&options) WithMicroKeys()(&options)
@@ -86,19 +76,6 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
func WithCallerEnabled(b bool) logger.Option {
return func(o *Options) {
o.AddCaller = b
}
}
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {
o.FatalFinalizers = fncs
}
}
// WithContextAttrFuncs appends default funcs for the context attrs filler // WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option { func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -4,12 +4,14 @@ import (
"context" "context"
"io" "io"
"log/slog" "log/slog"
"os"
"reflect" "reflect"
"regexp" "regexp"
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
@@ -37,11 +39,11 @@ var (
type wrapper struct { type wrapper struct {
h slog.Handler h slog.Handler
level int64 level atomic.Int64
} }
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool { func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
return level >= slog.Level(atomic.LoadInt64(&h.level)) return level >= slog.Level(int(h.level.Load()))
} }
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error { func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
@@ -49,17 +51,11 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
} }
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return &wrapper{ return h.h.WithAttrs(attrs)
h: h.h.WithAttrs(attrs),
level: atomic.LoadInt64(&h.level),
}
} }
func (h *wrapper) WithGroup(name string) slog.Handler { func (h *wrapper) WithGroup(name string) slog.Handler {
return &wrapper{ return h.h.WithGroup(name)
h: h.h.WithGroup(name),
level: atomic.LoadInt64(&h.level),
}
} }
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@@ -121,13 +117,10 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
attrs, _ := s.argsAttrs(options.Fields) attrs, _ := s.argsAttrs(options.Fields)
l := &slogLogger{ l := &slogLogger{
handler: &wrapper{ handler: &wrapper{h: s.handler.h.WithAttrs(attrs)},
h: s.handler.h.WithAttrs(attrs), opts: options,
level: atomic.LoadInt64(&s.handler.level),
},
opts: options,
} }
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level))) l.handler.level.Store(int64(loggerToSlogLevel(options.Level)))
return l return l
} }
@@ -140,9 +133,9 @@ func (s *slogLogger) V(level logger.Level) bool {
} }
func (s *slogLogger) Level(level logger.Level) { func (s *slogLogger) Level(level logger.Level) {
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
s.mu.Lock() s.mu.Lock()
s.opts.Level = level s.opts.Level = level
s.handler.level.Store(int64(loggerToSlogLevel(level)))
s.mu.Unlock() s.mu.Unlock()
} }
@@ -163,11 +156,8 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
} }
attrs, _ := s.argsAttrs(fields) attrs, _ := s.argsAttrs(fields)
l.handler = &wrapper{ l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)}
h: s.handler.h.WithAttrs(attrs), l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level)))
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
return l return l
} }
@@ -212,11 +202,8 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
h = slog.NewJSONHandler(s.opts.Out, handleOpt) h = slog.NewJSONHandler(s.opts.Out, handleOpt)
} }
s.handler = &wrapper{ s.handler = &wrapper{h: h.WithAttrs(attrs)}
h: h.WithAttrs(attrs), s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock() s.mu.Unlock()
return nil return nil
@@ -244,12 +231,11 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
s.printLog(ctx, logger.FatalLevel, msg, attrs...) s.printLog(ctx, logger.FatalLevel, msg, attrs...)
for _, fn := range s.opts.FatalFinalizers {
fn(ctx)
}
if closer, ok := s.opts.Out.(io.Closer); ok { if closer, ok := s.opts.Out.(io.Closer); ok {
closer.Close() closer.Close()
} }
time.Sleep(1 * time.Second)
os.Exit(1)
} }
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {
@@ -305,17 +291,10 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
} }
} }
var pcs uintptr var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
if s.opts.AddCaller { r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
var caller [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
pcs = caller[0]
}
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
r.AddAttrs(attrs...) r.AddAttrs(attrs...)
_ = s.handler.Handle(ctx, r) _ = s.handler.Handle(ctx, r)
} }

View File

@@ -469,25 +469,3 @@ func Test_WithContextAttrFunc(t *testing.T) {
// t.Logf("xxx %s", buf.Bytes()) // t.Logf("xxx %s", buf.Bytes())
} }
func TestFatalFinalizers(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(
logger.WithLevel(logger.TraceLevel),
logger.WithOutput(buf),
)
if err := l.Init(
logger.WithFatalFinalizers(func(ctx context.Context) {
l.Info(ctx, "fatal finalizer")
})); err != nil {
t.Fatal(err)
}
l.Fatal(ctx, "info_msg1")
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
}
}

View File

@@ -49,11 +49,9 @@ type Meter interface {
Set(opts ...Option) Meter Set(opts ...Option) Meter
// Histogram get or create histogram // Histogram get or create histogram
Histogram(name string, labels ...string) Histogram Histogram(name string, labels ...string) Histogram
// HistogramExt get or create histogram with specified quantiles
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
// Summary get or create summary // Summary get or create summary
Summary(name string, labels ...string) Summary Summary(name string, labels ...string) Summary
// SummaryExt get or create summary with specified quantiles and window time // SummaryExt get or create summary with spcified quantiles and window time
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
// Write writes metrics to io.Writer // Write writes metrics to io.Writer
Write(w io.Writer, opts ...Option) error Write(w io.Writer, opts ...Option) error
@@ -61,8 +59,6 @@ type Meter interface {
Options() Options Options() Options
// String return meter type // String return meter type
String() string String() string
// Unregister metric name and drop all data
Unregister(name string, labels ...string) bool
} }
// Counter is a counter // Counter is a counter
@@ -84,11 +80,7 @@ type FloatCounter interface {
// Gauge is a float64 gauge // Gauge is a float64 gauge
type Gauge interface { type Gauge interface {
Add(float64)
Get() float64 Get() float64
Set(float64)
Dec()
Inc()
} }
// Histogram is a histogram for non-negative values with automatically created buckets // Histogram is a histogram for non-negative values with automatically created buckets

View File

@@ -28,10 +28,6 @@ func (r *noopMeter) Name() string {
return r.opts.Name return r.opts.Name
} }
func (r *noopMeter) Unregister(name string, labels ...string) bool {
return true
}
// Init initialize options // Init initialize options
func (r *noopMeter) Init(opts ...Option) error { func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
@@ -70,11 +66,6 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
return &noopHistogram{labels: labels} return &noopHistogram{labels: labels}
} }
// HistogramExt implements the Meter interface
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// Set implements the Meter interface // Set implements the Meter interface
func (r *noopMeter) Set(opts ...Option) Meter { func (r *noopMeter) Set(opts ...Option) Meter {
m := &noopMeter{opts: r.opts} m := &noopMeter{opts: r.opts}
@@ -141,18 +132,6 @@ type noopGauge struct {
labels []string labels []string
} }
func (r *noopGauge) Add(float64) {
}
func (r *noopGauge) Set(float64) {
}
func (r *noopGauge) Inc() {
}
func (r *noopGauge) Dec() {
}
func (r *noopGauge) Get() float64 { func (r *noopGauge) Get() float64 {
return 0 return 0
} }

View File

@@ -4,8 +4,6 @@ import (
"context" "context"
) )
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
// Option powers the configuration for metrics implementations: // Option powers the configuration for metrics implementations:
type Option func(*Options) type Option func(*Options)
@@ -25,8 +23,6 @@ type Options struct {
WriteProcessMetrics bool WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics // WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool WriteFDMetrics bool
// Quantiles specifies buckets for histogram
Quantiles []float64
} }
// NewOptions prepares a set of options: // NewOptions prepares a set of options:
@@ -65,12 +61,14 @@ func Address(value string) Option {
} }
} }
// Quantiles defines the desired spread of statistics for histogram metrics: /*
func Quantiles(quantiles []float64) Option { // TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
func TimingObjectives(value map[float64]float64) Option {
return func(o *Options) { return func(o *Options) {
o.Quantiles = quantiles o.TimingObjectives = value
} }
} }
*/
// Labels add the meter labels // Labels add the meter labels
func Labels(ls ...string) Option { func Labels(ls ...string) Option {

View File

@@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
maddr "go.unistack.org/micro/v4/util/addr" maddr "go.unistack.org/micro/v4/util/addr"
@@ -13,6 +14,11 @@ import (
"go.unistack.org/micro/v4/util/rand" "go.unistack.org/micro/v4/util/rand"
) )
// DefaultCodecs will be used to encode/decode
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type rpcHandler struct { type rpcHandler struct {
opts HandlerOptions opts HandlerOptions
handler interface{} handler interface{}

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/KimMachineGun/automemlimit/memlimit" "github.com/KimMachineGun/automemlimit/memlimit"
"go.uber.org/automaxprocs/maxprocs"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config" "go.unistack.org/micro/v4/config"
@@ -22,8 +23,8 @@ import (
) )
func init() { func init() {
_, _ = maxprocs.Set()
_, _ = memlimit.SetGoMemLimitWithOpts( _, _ = memlimit.SetGoMemLimitWithOpts(
memlimit.WithRefreshInterval(1*time.Minute),
memlimit.WithRatio(0.9), memlimit.WithRatio(0.9),
memlimit.WithProvider( memlimit.WithProvider(
memlimit.ApplyFallback( memlimit.ApplyFallback(

View File

@@ -6,18 +6,18 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
) )
func unregisterMetrics(size int) { var (
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) pools = make([]Statser, 0)
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) poolsMu sync.Mutex
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) )
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
// Stats struct
type Stats struct { type Stats struct {
Get uint64 Get uint64
Put uint64 Put uint64
@@ -25,13 +25,41 @@ type Stats struct {
Ret uint64 Ret uint64
} }
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct { type Pool[T any] struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
} }
func (p Pool[T]) Put(t T) { func (p Pool[T]) Put(t T) {
@@ -42,82 +70,37 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T) return p.p.Get().(T)
} }
func NewPool[T any](fn func() T, size int) Pool[T] { func NewPool[T any](fn func() T) Pool[T] {
p := Pool[T]{ return Pool[T]{
c: size, p: &sync.Pool{
get: &atomic.Uint64{}, New: func() interface{} {
put: &atomic.Uint64{}, return fn()
mis: &atomic.Uint64{}, },
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
}, },
} }
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
} }
type BytePool struct { type BytePool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytePool(size int) *BytePool { func NewBytePool(size int) *BytePool {
p := &BytePool{ p := &BytePool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size) b := make([]byte, 0, size)
return &b return &b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -127,73 +110,49 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats { func (p *BytePool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *BytePool) Get() *[]byte { func (p *BytePool) Get() *[]byte {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte) return p.p.Get().(*[]byte)
} }
func (p *BytePool) Put(b *[]byte) { func (p *BytePool) Put(b *[]byte) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c { if cap(*b) > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
*b = (*b)[:0] *b = (*b)[:0]
p.p.Put(b) p.p.Put(b)
} }
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct { type BytesPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytesPool(size int) *BytesPool { func NewBytesPool(size int) *BytesPool {
p := &BytesPool{ p := &BytesPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size)) b := bytes.NewBuffer(make([]byte, 0, size))
return b return b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -203,10 +162,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats { func (p *BytesPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
@@ -215,43 +174,34 @@ func (p *BytesPool) Get() *bytes.Buffer {
} }
func (p *BytesPool) Put(b *bytes.Buffer) { func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c { if (*b).Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct { type StringsPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewStringsPool(size int) *StringsPool { func NewStringsPool(size int) *StringsPool {
p := &StringsPool{ p := &StringsPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
return &strings.Builder{} return &strings.Builder{}
}, },
} }
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p return p
} }
@@ -261,28 +211,24 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats { func (p *StringsPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *StringsPool) Get() *strings.Builder { func (p *StringsPool) Get() *strings.Builder {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder) return p.p.Get().(*strings.Builder)
} }
func (p *StringsPool) Put(b *strings.Builder) { func (p *StringsPool) Put(b *strings.Builder) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c { if b.Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}