Compare commits

..

8 Commits

Author SHA1 Message Date
614b740c56 split files
Some checks failed
lint / lint (pull_request) Successful in 2m27s
test / test (pull_request) Failing after 17m13s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:46:47 +03:00
bc011a2e7f split files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:44:29 +03:00
f92e18897a implement driver
Some checks failed
lint / lint (pull_request) Successful in 2m43s
test / test (pull_request) Successful in 4m53s
coverage / build (pull_request) Failing after 17m18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:27:42 +03:00
022326ddc4 move
Some checks failed
lint / lint (pull_request) Successful in 3m27s
test / test (pull_request) Successful in 4m49s
coverage / build (pull_request) Failing after 17m28s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:31:11 +03:00
e053eeac74 add default node state criterion
Some checks failed
lint / lint (pull_request) Successful in 2m36s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:26:16 +03:00
6c6916a050 initial hasql support
Some checks failed
lint / lint (pull_request) Successful in 4m23s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 17:16:20 +03:00
ea84ac094f Merge branch 'v4' into hasql
Some checks failed
test / test (pull_request) Failing after 17m58s
coverage / build (pull_request) Failing after 18m40s
lint / lint (pull_request) Failing after 1m41s
2025-09-18 14:35:10 +03:00
2886a7fe8a initial hasql support
Some checks failed
test / test (pull_request) Failing after 19m47s
lint / lint (pull_request) Failing after 19m59s
coverage / build (pull_request) Failing after 20m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 14:34:48 +03:00
18 changed files with 193 additions and 1459 deletions

View File

@@ -25,7 +25,7 @@ jobs:
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" -a "$src_hash" != "" -a "$dst_hash" != "" ]; then
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT

View File

@@ -1,5 +1,5 @@
# Micro
![Coverage](https://img.shields.io/badge/Coverage-33.6%25-yellow)
![Coverage](https://img.shields.io/badge/Coverage-33.8%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)

View File

@@ -15,6 +15,11 @@ import (
"go.unistack.org/micro/v4/tracer"
)
// DefaultCodecs will be used to encode/decode data
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type noopClient struct {
funcCall FuncCall
funcStream FuncStream

View File

@@ -161,7 +161,7 @@ func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
ContentType: DefaultContentType,
Codecs: make(map[string]codec.Codec),
Codecs: DefaultCodecs,
CallOptions: CallOptions{
Context: context.Background(),
Backoff: DefaultBackoff,

30
go.mod
View File

@@ -1,33 +1,35 @@
module go.unistack.org/micro/v4
go 1.25
go 1.24
require (
dario.cat/mergo v1.0.2
dario.cat/mergo v1.0.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/KimMachineGun/automemlimit v0.7.5
github.com/goccy/go-yaml v1.18.0
github.com/KimMachineGun/automemlimit v0.7.0
github.com/goccy/go-yaml v1.17.1
github.com/google/uuid v1.6.0
github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/spf13/cast v1.10.0
github.com/stretchr/testify v1.11.1
github.com/spf13/cast v1.7.1
github.com/stretchr/testify v1.10.0
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v4 v4.1.0
golang.org/x/sync v0.17.0
golang.org/x/sync v0.10.0
golang.yandex/hasql/v2 v2.1.0
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
golang.org/x/sys v0.37.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

68
go.sum
View File

@@ -1,19 +1,19 @@
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk=
github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlDI/pBWK49u88=
github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
@@ -30,36 +30,40 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k=
golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -3,7 +3,6 @@ package sql
import (
"context"
"database/sql"
"sync"
"time"
)
@@ -12,84 +11,31 @@ type Statser interface {
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
options := NewOptions(opts...)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
options := NewOptions(opts...)
var (
statsMu sync.Mutex
lastUpdated time.Time
maxOpenConnections, openConnections, inUse, idle, waitCount float64
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
waitDuration float64
)
updateFn := func() {
statsMu.Lock()
defer statsMu.Unlock()
if time.Since(lastUpdated) < options.MeterStatsInterval {
return
}
stats := db.Stats()
maxOpenConnections = float64(stats.MaxOpenConnections)
openConnections = float64(stats.OpenConnections)
inUse = float64(stats.InUse)
idle = float64(stats.Idle)
waitCount = float64(stats.WaitCount)
maxIdleClosed = float64(stats.MaxIdleClosed)
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
waitDuration = float64(stats.WaitDuration.Seconds())
lastUpdated = time.Now()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
options.Meter.Gauge(MaxOpenConnections, func() float64 {
updateFn()
return maxOpenConnections
})
options.Meter.Gauge(OpenConnections, func() float64 {
updateFn()
return openConnections
})
options.Meter.Gauge(InuseConnections, func() float64 {
updateFn()
return inUse
})
options.Meter.Gauge(IdleConnections, func() float64 {
updateFn()
return idle
})
options.Meter.Gauge(WaitConnections, func() float64 {
updateFn()
return waitCount
})
options.Meter.Gauge(BlockedSeconds, func() float64 {
updateFn()
return waitDuration
})
options.Meter.Gauge(MaxIdleClosed, func() float64 {
updateFn()
return maxIdleClosed
})
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
updateFn()
return maxIdleTimeClosed
})
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
updateFn()
return maxLifetimeClosed
})
}
}()
}

View File

@@ -52,12 +52,6 @@ type Options struct {
AddStacktrace bool
// DedupKeys deduplicate keys in log output
DedupKeys bool
// FatalFinalizers runs in order in [logger.Fatal] method
FatalFinalizers []func(context.Context)
}
var DefaultFatalFinalizer = func(ctx context.Context) {
os.Exit(1)
}
// NewOptions creates new options struct
@@ -71,7 +65,6 @@ func NewOptions(opts ...Option) Options {
AddSource: true,
TimeFunc: time.Now,
Meter: meter.DefaultMeter,
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
}
WithMicroKeys()(&options)
@@ -83,13 +76,6 @@ func NewOptions(opts ...Option) Options {
return options
}
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {
o.FatalFinalizers = fncs
}
}
// WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) {

View File

@@ -4,12 +4,14 @@ import (
"context"
"io"
"log/slog"
"os"
"reflect"
"regexp"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/semconv"
@@ -229,12 +231,11 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
s.printLog(ctx, logger.FatalLevel, msg, attrs...)
for _, fn := range s.opts.FatalFinalizers {
fn(ctx)
}
if closer, ok := s.opts.Out.(io.Closer); ok {
closer.Close()
}
time.Sleep(1 * time.Second)
os.Exit(1)
}
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {

View File

@@ -469,25 +469,3 @@ func Test_WithContextAttrFunc(t *testing.T) {
// t.Logf("xxx %s", buf.Bytes())
}
func TestFatalFinalizers(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(
logger.WithLevel(logger.TraceLevel),
logger.WithOutput(buf),
)
if err := l.Init(
logger.WithFatalFinalizers(func(ctx context.Context) {
l.Info(ctx, "fatal finalizer")
})); err != nil {
t.Fatal(err)
}
l.Fatal(ctx, "info_msg1")
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
}
}

View File

@@ -49,11 +49,9 @@ type Meter interface {
Set(opts ...Option) Meter
// Histogram get or create histogram
Histogram(name string, labels ...string) Histogram
// HistogramExt get or create histogram with specified quantiles
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
// Summary get or create summary
Summary(name string, labels ...string) Summary
// SummaryExt get or create summary with specified quantiles and window time
// SummaryExt get or create summary with spcified quantiles and window time
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
// Write writes metrics to io.Writer
Write(w io.Writer, opts ...Option) error
@@ -61,8 +59,6 @@ type Meter interface {
Options() Options
// String return meter type
String() string
// Unregister metric name and drop all data
Unregister(name string, labels ...string) bool
}
// Counter is a counter
@@ -84,11 +80,7 @@ type FloatCounter interface {
// Gauge is a float64 gauge
type Gauge interface {
Add(float64)
Get() float64
Set(float64)
Dec()
Inc()
}
// Histogram is a histogram for non-negative values with automatically created buckets

View File

@@ -28,10 +28,6 @@ func (r *noopMeter) Name() string {
return r.opts.Name
}
func (r *noopMeter) Unregister(name string, labels ...string) bool {
return true
}
// Init initialize options
func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts {
@@ -70,11 +66,6 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// HistogramExt implements the Meter interface
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// Set implements the Meter interface
func (r *noopMeter) Set(opts ...Option) Meter {
m := &noopMeter{opts: r.opts}
@@ -141,18 +132,6 @@ type noopGauge struct {
labels []string
}
func (r *noopGauge) Add(float64) {
}
func (r *noopGauge) Set(float64) {
}
func (r *noopGauge) Inc() {
}
func (r *noopGauge) Dec() {
}
func (r *noopGauge) Get() float64 {
return 0
}

View File

@@ -4,8 +4,6 @@ import (
"context"
)
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
// Option powers the configuration for metrics implementations:
type Option func(*Options)
@@ -25,8 +23,6 @@ type Options struct {
WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool
// Quantiles specifies buckets for histogram
Quantiles []float64
}
// NewOptions prepares a set of options:
@@ -65,12 +61,14 @@ func Address(value string) Option {
}
}
// Quantiles defines the desired spread of statistics for histogram metrics:
func Quantiles(quantiles []float64) Option {
/*
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
func TimingObjectives(value map[float64]float64) Option {
return func(o *Options) {
o.Quantiles = quantiles
o.TimingObjectives = value
}
}
*/
// Labels add the meter labels
func Labels(ls ...string) Option {

View File

@@ -6,6 +6,7 @@ import (
"sync"
"time"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register"
maddr "go.unistack.org/micro/v4/util/addr"
@@ -13,6 +14,11 @@ import (
"go.unistack.org/micro/v4/util/rand"
)
// DefaultCodecs will be used to encode/decode
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type rpcHandler struct {
opts HandlerOptions
handler interface{}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/KimMachineGun/automemlimit/memlimit"
"go.uber.org/automaxprocs/maxprocs"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config"
@@ -22,8 +23,8 @@ import (
)
func init() {
_, _ = maxprocs.Set()
_, _ = memlimit.SetGoMemLimitWithOpts(
memlimit.WithRefreshInterval(1*time.Minute),
memlimit.WithRatio(0.9),
memlimit.WithProvider(
memlimit.ApplyFallback(

View File

@@ -1,815 +0,0 @@
package mock
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
"go.unistack.org/micro/v4/store"
)
// ExpectedWrite represents an expected Write operation
type ExpectedWrite struct {
key string
value interface{}
ttl time.Duration
metadata map[string]string
namespace string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedWrite) match(key string, val interface{}, opts ...store.WriteOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check value match
if e.value != nil && !reflect.DeepEqual(e.value, val) {
return false
}
// Check options
options := store.NewWriteOptions(opts...)
if e.ttl > 0 && e.ttl != options.TTL {
return false
}
if e.namespace != "" && e.namespace != options.Namespace {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedRead represents an expected Read operation
type ExpectedRead struct {
key string
value interface{}
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedRead) match(key string, opts ...store.ReadOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedDelete represents an expected Delete operation
type ExpectedDelete struct {
key string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedDelete) match(key string, opts ...store.DeleteOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedExists represents an expected Exists operation
type ExpectedExists struct {
key string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedExists) match(key string, opts ...store.ExistsOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedList represents an expected List operation
type ExpectedList struct {
times int
called int
mutex sync.Mutex
err error
keys []string
}
func (e *ExpectedList) match(opts ...store.ListOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// Store is a mock implementation of the Store interface for testing
type Store struct {
expectedWrites []*ExpectedWrite
expectedReads []*ExpectedRead
expectedDeletes []*ExpectedDelete
expectedExists []*ExpectedExists
expectedLists []*ExpectedList
data map[string]interface{}
exists map[string]bool
ttls map[string]time.Time // key -> expiration time
metadata map[string]map[string]string
err error
opts store.Options
mutex sync.RWMutex
}
// NewStore creates a new mock store
func NewStore(opts ...store.Option) *Store {
options := store.NewOptions(opts...)
return &Store{
data: make(map[string]interface{}),
exists: make(map[string]bool),
ttls: make(map[string]time.Time),
metadata: make(map[string]map[string]string),
opts: options,
}
}
// ExpectWrite creates an expectation for a Write operation
func (m *Store) ExpectWrite(key string) *ExpectedWrite {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedWrite{key: key}
m.expectedWrites = append(m.expectedWrites, exp)
return exp
}
// ExpectRead creates an expectation for a Read operation
func (m *Store) ExpectRead(key string) *ExpectedRead {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedRead{key: key}
m.expectedReads = append(m.expectedReads, exp)
return exp
}
// ExpectDelete creates an expectation for a Delete operation
func (m *Store) ExpectDelete(key string) *ExpectedDelete {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedDelete{key: key}
m.expectedDeletes = append(m.expectedDeletes, exp)
return exp
}
// ExpectExists creates an expectation for an Exists operation
func (m *Store) ExpectExists(key string) *ExpectedExists {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedExists{key: key}
m.expectedExists = append(m.expectedExists, exp)
return exp
}
// ExpectList creates an expectation for a List operation
func (m *Store) ExpectList() *ExpectedList {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedList{}
m.expectedLists = append(m.expectedLists, exp)
return exp
}
// WithValue sets the value to return for expected operations
func (e *ExpectedWrite) WithValue(val interface{}) *ExpectedWrite {
e.value = val
return e
}
// WithTTL sets the TTL for expected Write operations
func (e *ExpectedWrite) WithTTL(ttl time.Duration) *ExpectedWrite {
e.ttl = ttl
return e
}
// WithNamespace sets the namespace for expected operations
func (e *ExpectedWrite) WithNamespace(ns string) *ExpectedWrite {
e.namespace = ns
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedWrite) Times(n int) *ExpectedWrite {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedWrite) WillReturnError(err error) *ExpectedWrite {
e.err = err
return e
}
// WithValue sets the value to return for expected Read operations
func (e *ExpectedRead) WithValue(val interface{}) *ExpectedRead {
e.value = val
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedRead) Times(n int) *ExpectedRead {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedRead) WillReturnError(err error) *ExpectedRead {
e.err = err
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedDelete) Times(n int) *ExpectedDelete {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedDelete) WillReturnError(err error) *ExpectedDelete {
e.err = err
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedExists) Times(n int) *ExpectedExists {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedExists) WillReturnError(err error) *ExpectedExists {
e.err = err
return e
}
// WillReturn sets the keys to return for List operations
func (e *ExpectedList) WillReturn(keys ...string) *ExpectedList {
e.keys = keys
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedList) Times(n int) *ExpectedList {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedList) WillReturnError(err error) *ExpectedList {
e.err = err
return e
}
// checkTTL checks if a key has expired
func (m *Store) checkTTL(key string) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
if exp, ok := m.ttls[key]; ok {
if time.Now().After(exp) {
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
return false
}
}
return true
}
// FastForward decrements all TTLs by the given duration
func (m *Store) FastForward(d time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
now := time.Now()
for key, exp := range m.ttls {
// Calculate remaining time before fast forward
remaining := time.Until(exp)
if remaining <= 0 {
// Already expired, remove it
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
} else {
// Apply fast forward
newRemaining := remaining - d
if newRemaining <= 0 {
// Would expire after fast forward, remove it
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
} else {
// Update expiration time
m.ttls[key] = now.Add(newRemaining)
}
}
}
}
// Name returns store name
func (m *Store) Name() string {
return m.opts.Name
}
// Init initializes the mock store
func (m *Store) Init(opts ...store.Option) error {
if m.err != nil {
return m.err
}
for _, o := range opts {
o(&m.opts)
}
return nil
}
// Connect is used when store needs to be connected
func (m *Store) Connect(ctx context.Context) error {
if m.err != nil {
return m.err
}
return nil
}
// Options returns the current options
func (m *Store) Options() store.Options {
return m.opts
}
// Exists checks that key exists in store
func (m *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
if m.err != nil {
return m.err
}
// Check TTL first
if !m.checkTTL(key) {
return store.ErrNotFound
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedExists {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
if !m.exists[key] {
return store.ErrNotFound
}
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
if !m.exists[key] {
return store.ErrNotFound
}
return nil
}
// Read reads a single key name to provided value with optional ReadOptions
func (m *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
if m.err != nil {
return m.err
}
// Check TTL first
if !m.checkTTL(key) {
return store.ErrNotFound
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedReads {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
if !m.exists[key] {
return store.ErrNotFound
}
// Copy the value from expected or actual data
data := exp.value
if data == nil {
data = m.data[key]
}
if data != nil {
// Simple type conversion for testing
if target, ok := val.(*interface{}); ok {
*target = data
} else if target, ok := val.(*string); ok {
if s, ok := data.(string); ok {
*target = s
} else {
*target = fmt.Sprintf("%v", data)
}
} else if target, ok := val.(*int); ok {
if i, ok := data.(int); ok {
*target = i
}
}
}
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
if !m.exists[key] {
return store.ErrNotFound
}
if data, ok := m.data[key]; ok {
if target, ok := val.(*interface{}); ok {
*target = data
} else if target, ok := val.(*string); ok {
if s, ok := data.(string); ok {
*target = s
} else {
*target = fmt.Sprintf("%v", data)
}
} else if target, ok := val.(*int); ok {
if i, ok := data.(int); ok {
*target = i
}
}
}
return nil
}
// Write writes a value to key name to the store with optional WriteOption
func (m *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
if m.err != nil {
return m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedWrites {
if exp.match(key, val, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
// Apply the write operation
m.mutex.Lock()
m.data[key] = val
m.exists[key] = true
// Handle TTL
options := store.NewWriteOptions(opts...)
if options.TTL > 0 {
m.ttls[key] = time.Now().Add(options.TTL)
} else {
delete(m.ttls, key) // Remove TTL if not set
}
// Handle metadata
if options.Metadata != nil {
m.metadata[key] = make(map[string]string)
for k, v := range options.Metadata {
// Convert []string to string by joining with comma
if len(v) > 0 {
m.metadata[key][k] = strings.Join(v, ",")
} else {
m.metadata[key][k] = ""
}
}
}
m.mutex.Unlock()
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
m.mutex.Lock()
m.data[key] = val
m.exists[key] = true
options := store.NewWriteOptions(opts...)
if options.TTL > 0 {
m.ttls[key] = time.Now().Add(options.TTL)
} else {
delete(m.ttls, key)
}
if options.Metadata != nil {
m.metadata[key] = make(map[string]string)
for k, v := range options.Metadata {
// Convert []string to string by joining with comma
if len(v) > 0 {
m.metadata[key][k] = strings.Join(v, ",")
} else {
m.metadata[key][k] = ""
}
}
}
m.mutex.Unlock()
return nil
}
// Delete removes the record with the corresponding key from the store
func (m *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
if m.err != nil {
return m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedDeletes {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
m.mutex.Lock()
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
m.mutex.Unlock()
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
m.mutex.Lock()
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
m.mutex.Unlock()
return nil
}
// List returns any keys that match, or an empty list with no error if none matched
func (m *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
if m.err != nil {
return nil, m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedLists {
if exp.match(opts...) {
m.mutex.Unlock()
if exp.err != nil {
return nil, exp.err
}
return exp.keys, nil
}
}
m.mutex.Unlock()
// If no expectation matched, return actual keys
m.mutex.RLock()
defer m.mutex.RUnlock()
var keys []string
for key := range m.data {
// Check TTL
if exp, ok := m.ttls[key]; ok {
if time.Now().After(exp) {
continue // Skip expired keys
}
}
keys = append(keys, key)
}
// Apply list options filtering
options := store.NewListOptions(opts...)
if options.Prefix != "" {
var filtered []string
for _, key := range keys {
if len(key) >= len(options.Prefix) && key[:len(options.Prefix)] == options.Prefix {
filtered = append(filtered, key)
}
}
keys = filtered
}
if options.Suffix != "" {
var filtered []string
for _, key := range keys {
if len(key) >= len(options.Suffix) && key[len(key)-len(options.Suffix):] == options.Suffix {
filtered = append(filtered, key)
}
}
keys = filtered
}
// Apply limit and offset
if options.Limit > 0 && int(options.Limit) < len(keys) {
end := int(options.Offset) + int(options.Limit)
if end > len(keys) {
end = len(keys)
}
if int(options.Offset) < len(keys) {
keys = keys[options.Offset:end]
} else {
keys = []string{}
}
} else if options.Offset > 0 && int(options.Offset) < len(keys) {
keys = keys[options.Offset:]
} else if options.Offset >= uint(len(keys)) {
keys = []string{}
}
return keys, nil
}
// Disconnect disconnects the mock store
func (m *Store) Disconnect(ctx context.Context) error {
if m.err != nil {
return m.err
}
return nil
}
// String returns the name of the implementation
func (m *Store) String() string {
return "mock"
}
// Watch returns events watcher
func (m *Store) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) {
if m.err != nil {
return nil, m.err
}
return NewWatcher(), nil
}
// Live returns store liveness
func (m *Store) Live() bool {
return true
}
// Ready returns store readiness
func (m *Store) Ready() bool {
return true
}
// Health returns store health
func (m *Store) Health() bool {
return true
}
// ExpectationsWereMet checks that all expected operations were called the expected number of times
func (m *Store) ExpectationsWereMet() error {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, exp := range m.expectedWrites {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected write for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedReads {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected read for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedDeletes {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected delete for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedExists {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected exists for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedLists {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected list to be called %d times, but was called %d times", exp.times, exp.called)
}
}
return nil
}
// Watcher is a mock implementation of the Watcher interface
type Watcher struct {
events chan store.Event
stop chan bool
}
// NewWatcher creates a new mock watcher
func NewWatcher() *Watcher {
return &Watcher{
events: make(chan store.Event, 1),
stop: make(chan bool, 1),
}
}
// Next is a blocking call that returns the next event
func (mw *Watcher) Next() (store.Event, error) {
select {
case event := <-mw.events:
return event, nil
case <-mw.stop:
return nil, store.ErrWatcherStopped
}
}
// Stop stops the watcher
func (mw *Watcher) Stop() {
select {
case mw.stop <- true:
default:
}
}
// SendEvent sends an event to the watcher (for testing purposes)
func (mw *Watcher) SendEvent(event store.Event) {
select {
case mw.events <- event:
default:
// If channel is full, drop the event
}
}

View File

@@ -1,295 +0,0 @@
package mock
import (
"context"
"testing"
"time"
"go.unistack.org/micro/v4/store"
)
func TestStore(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test Write with expectation
s.ExpectWrite("test_key").WithValue("test_value")
err := s.Write(ctx, "test_key", "test_value")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
// Test Read with expectation
s.ExpectRead("test_key").WithValue("test_value")
var value interface{}
err = s.Read(ctx, "test_key", &value)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if value != "test_value" {
t.Fatalf("Expected 'test_value', got %v", value)
}
// Test Read with string
s.ExpectRead("test_key")
var strValue string
err = s.Read(ctx, "test_key", &strValue)
if err != nil {
t.Fatalf("Read string failed: %v", err)
}
if strValue != "test_value" {
t.Fatalf("Expected 'test_value', got %s", strValue)
}
// Test Write and Read integer with TTL
s.ExpectWrite("int_key").WithValue(42).WithTTL(5 * time.Second)
err = s.Write(ctx, "int_key", 42, store.WriteTTL(5*time.Second))
if err != nil {
t.Fatalf("Write int failed: %v", err)
}
s.ExpectRead("int_key")
var intValue int
err = s.Read(ctx, "int_key", &intValue)
if err != nil {
t.Fatalf("Read int failed: %v", err)
}
if intValue != 42 {
t.Fatalf("Expected 42, got %d", intValue)
}
// Test Exists with expectation
s.ExpectExists("test_key")
err = s.Exists(ctx, "test_key")
if err != nil {
t.Fatalf("Exists failed: %v", err)
}
// Test List with expectation
s.ExpectList().WillReturn("test_key", "another_key")
keys, err := s.List(ctx)
if err != nil {
t.Fatalf("List failed: %v", err)
}
if len(keys) != 2 {
t.Fatalf("Expected 2 keys, got %d", len(keys))
}
// Test Delete with expectation
s.ExpectDelete("test_key")
err = s.Delete(ctx, "test_key")
if err != nil {
t.Fatalf("Delete failed: %v", err)
}
// Test that deleted key doesn't exist
s.ExpectExists("test_key").WillReturnError(store.ErrNotFound)
err = s.Exists(ctx, "test_key")
if err == nil {
t.Fatalf("Expected store.ErrNotFound after delete")
}
// Test error handling
s.ExpectExists("nonexistent").WillReturnError(store.ErrNotFound)
err = s.Exists(ctx, "nonexistent")
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound, got %v", err)
}
// Verify all expectations were met
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreFastForward(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Write with TTL
s.ExpectWrite("ttl_key").WithValue("ttl_value").WithTTL(100 * time.Millisecond)
err := s.Write(ctx, "ttl_key", "ttl_value", store.WriteTTL(100*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Check key exists before TTL expires
s.ExpectRead("ttl_key")
var value string
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read before TTL failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value', got %s", value)
}
// Fast forward by 50ms - key should still exist
s.FastForward(50 * time.Millisecond)
s.ExpectRead("ttl_key")
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read after 50ms fast forward failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value' after 50ms, got %s", value)
}
// Fast forward by another 60ms (total 110ms) - key should expire
s.FastForward(60 * time.Millisecond)
s.ExpectRead("ttl_key").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after TTL, got %v", err)
}
// Test FastForward on already expired keys
s.ExpectWrite("ttl_key2").WithValue("ttl_value2").WithTTL(10 * time.Millisecond)
err = s.Write(ctx, "ttl_key2", "ttl_value2", store.WriteTTL(10*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Fast forward by 20ms - key should expire immediately
s.FastForward(20 * time.Millisecond)
s.ExpectRead("ttl_key2").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key2", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after immediate expiration, got %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreWithOptions(t *testing.T) {
s := NewStore(store.Name("test_mock"), store.Namespace("test_ns"))
if s.Name() != "test_mock" {
t.Fatalf("Expected name 'test_mock', got %s", s.Name())
}
opts := s.Options()
if opts.Namespace != "test_ns" {
t.Fatalf("Expected namespace 'test_ns', got %s", opts.Namespace)
}
}
func TestWatcher(t *testing.T) {
watcher := NewWatcher()
// Test Stop
watcher.Stop()
// Test Next after stop
_, err := watcher.Next()
if err != store.ErrWatcherStopped {
t.Fatalf("Expected store.ErrWatcherStopped, got %v", err)
}
}
func TestStoreHealth(t *testing.T) {
s := NewStore()
if !s.Live() {
t.Fatal("Expected Live() to return true")
}
if !s.Ready() {
t.Fatal("Expected Ready() to return true")
}
if !s.Health() {
t.Fatal("Expected Health() to return true")
}
}
func TestStoreConnectDisconnect(t *testing.T) {
s := NewStore()
err := s.Connect(context.Background())
if err != nil {
t.Fatalf("Connect failed: %v", err)
}
err = s.Disconnect(context.Background())
if err != nil {
t.Fatalf("Disconnect failed: %v", err)
}
// Test error propagation
s.ExpectWrite("test_key").WillReturnError(store.ErrNotConnected)
err = s.Write(context.Background(), "test_key", "value")
if err != store.ErrNotConnected {
t.Fatalf("Expected store.ErrNotConnected, got %v", err)
}
}
func TestStoreTTL(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test Write with TTL
s.ExpectWrite("ttl_key").WithValue("ttl_value").WithTTL(100 * time.Millisecond)
err := s.Write(ctx, "ttl_key", "ttl_value", store.WriteTTL(100*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Read before TTL expires
s.ExpectRead("ttl_key")
var value string
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read before TTL failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value', got %s", value)
}
// Wait for TTL to expire
time.Sleep(150 * time.Millisecond)
// Read after TTL expires should return ErrNotFound
s.ExpectRead("ttl_key").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after TTL, got %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreExpectedOperations(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test expected operations with Times
s.ExpectWrite("once_key").Times(1)
s.ExpectWrite("twice_key").Times(2)
err := s.Write(ctx, "once_key", "value1")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
err = s.Write(ctx, "twice_key", "value2")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
err = s.Write(ctx, "twice_key", "value3")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}

View File

@@ -6,18 +6,18 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv"
)
func unregisterMetrics(size int) {
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
var (
pools = make([]Statser, 0)
poolsMu sync.Mutex
)
// Stats struct
type Stats struct {
Get uint64
Put uint64
@@ -25,13 +25,41 @@ type Stats struct {
Ret uint64
}
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct {
p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
}
func (p Pool[T]) Put(t T) {
@@ -42,82 +70,37 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T)
}
func NewPool[T any](fn func() T, size int) Pool[T] {
p := Pool[T]{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
func NewPool[T any](fn func() T) Pool[T] {
return Pool[T]{
p: &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
},
},
}
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
}
type BytePool struct {
p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewBytePool(size int) *BytePool {
p := &BytePool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p := &BytePool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size)
return &b
},
}
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
@@ -127,73 +110,49 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats {
return Stats{
Put: p.put.Load(),
Get: p.get.Load(),
Mis: p.mis.Load(),
Ret: p.ret.Load(),
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *BytePool) Get() *[]byte {
p.get.Add(1)
atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte)
}
func (p *BytePool) Put(b *[]byte) {
p.put.Add(1)
atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c {
p.ret.Add(1)
atomic.AddUint64(&p.ret, 1)
return
}
*b = (*b)[:0]
p.p.Put(b)
}
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct {
p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewBytesPool(size int) *BytesPool {
p := &BytesPool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p := &BytesPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size))
return b
},
}
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
@@ -203,10 +162,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats {
return Stats{
Put: p.put.Load(),
Get: p.get.Load(),
Mis: p.mis.Load(),
Ret: p.ret.Load(),
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
@@ -215,43 +174,34 @@ func (p *BytesPool) Get() *bytes.Buffer {
}
func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c {
p.ret.Add(1)
atomic.AddUint64(&p.ret, 1)
return
}
b.Reset()
p.p.Put(b)
}
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct {
p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewStringsPool(size int) *StringsPool {
p := &StringsPool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p := &StringsPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
atomic.AddUint64(&p.mis, 1)
return &strings.Builder{}
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
@@ -261,28 +211,24 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats {
return Stats{
Put: p.put.Load(),
Get: p.get.Load(),
Mis: p.mis.Load(),
Ret: p.ret.Load(),
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *StringsPool) Get() *strings.Builder {
p.get.Add(1)
atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder)
}
func (p *StringsPool) Put(b *strings.Builder) {
p.put.Add(1)
atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c {
p.ret.Add(1)
atomic.AddUint64(&p.ret, 1)
return
}
b.Reset()
p.p.Put(b)
}
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}