Compare commits

...

21 Commits

Author SHA1 Message Date
0b41a60112 store: add mock store
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-10 01:35:30 +03:00
d9afc9ce4f update all
Some checks failed
coverage / build (push) Failing after 3m9s
test / test (push) Failing after 18m22s
sync / sync (push) Successful in 9s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-31 21:52:22 +03:00
7a325e2c9e remove using global map for default codecs (#223)
Some checks failed
test / test (push) Failing after 15m6s
coverage / build (push) Failing after 15m16s
sync / sync (push) Failing after 8s
2025-10-15 21:32:52 +03:00
7daa927e70 add HistogramExt method with custom quantiles
Some checks failed
coverage / build (push) Successful in 4m4s
test / test (push) Failing after 18m1s
sync / sync (push) Successful in 26s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-12 15:55:00 +03:00
vtolstov
54bb7f7acb Apply Code Coverage Badge 2025-10-12 11:27:04 +00:00
9eaab95519 meter: improve Gauge
All checks were successful
sync / sync (push) Successful in 1m56s
coverage / build (push) Successful in 3m55s
test / test (push) Successful in 4m12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-12 14:24:44 +03:00
vtolstov
9219dc6b2a Apply Code Coverage Badge 2025-10-11 15:49:04 +00:00
52607b38f1 logger: fixup Fatal finalizers
All checks were successful
coverage / build (push) Successful in 2m0s
test / test (push) Successful in 3m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-11 18:46:42 +03:00
vtolstov
886f046409 Apply Code Coverage Badge 2025-10-10 12:30:04 +00:00
4d6d469d40 logger: add Fatal finalizers
All checks were successful
coverage / build (push) Successful in 2m37s
test / test (push) Successful in 4m49s
* closes #222

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-10 15:28:10 +03:00
vtolstov
4a944274f4 Apply Code Coverage Badge 2025-10-07 20:56:10 +00:00
b0cbddcfdd meter: improve meter usage across micro framework (#409)
All checks were successful
sync / sync (push) Successful in 1m41s
coverage / build (push) Successful in 3m13s
test / test (push) Successful in 4m2s
Reviewed-on: #409
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-07 23:54:20 +03:00
vtolstov
d0534a7d05 Apply Code Coverage Badge 2025-09-20 19:59:32 +00:00
ab051405c5 initial hasql support (#407)
Some checks failed
coverage / build (push) Successful in 3m47s
test / test (push) Failing after 17m14s
closes #403

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Reviewed-on: #407
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:57:39 +03:00
vtolstov
268b3dbff4 Apply Code Coverage Badge 2025-07-12 21:20:05 +00:00
f9d2c14597 fixup tests
Some checks failed
sync / sync (push) Successful in 1m8s
coverage / build (push) Successful in 2m3s
test / test (push) Failing after 2m55s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-07-13 00:11:08 +03:00
e6bf914dd9 tracer: write log fields only if span exists and recording
Some checks failed
coverage / build (push) Failing after 1m14s
test / test (push) Has been cancelled
sync / sync (push) Successful in 1m37s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-07-13 00:08:30 +03:00
b59f4a16f0 meter: disable auto sorting labels
Some checks failed
coverage / build (push) Failing after 1m39s
test / test (push) Successful in 4m37s
sync / sync (push) Successful in 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-17 19:02:06 +03:00
3deb572f72 [v4] fix out-of-bounds behavior in seeker buffer and add tests (#219)
Some checks failed
coverage / build (push) Failing after 2m12s
test / test (push) Successful in 4m27s
sync / sync (push) Successful in 7s
* add check negative position to Read() and write tests

* add tests for Write() method

* add tests for Write() method

* add checks of whence and negative position to Seek() and write tests

* add tests for Rewind()

* add tests for Close()

* add tests for Reset()

* add tests for Len()

* add tests for Bytes()

* tests polishing

* tests polishing

* tests polishing

* tests polishing
2025-06-15 17:24:48 +03:00
0e668c0f0f fixup tests
Some checks failed
coverage / build (push) Failing after 2m13s
test / test (push) Failing after 19m18s
sync / sync (push) Successful in 19s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-09 17:36:11 +03:00
2bac878845 broker: fix message options
Some checks failed
coverage / build (push) Failing after 1m58s
test / test (push) Has started running
sync / sync (push) Successful in 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-09 17:23:30 +03:00
37 changed files with 3044 additions and 277 deletions

View File

@@ -25,7 +25,7 @@ jobs:
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
if [ "$src_hash" != "$dst_hash" -a "$src_hash" != "" -a "$dst_hash" != "" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT

View File

@@ -41,7 +41,7 @@ type Broker interface {
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error)
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
// Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler

View File

@@ -42,9 +42,9 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
}
}
// SetPublishOption returns a function to setup a context with given value
func SetPublishOption(k, v interface{}) PublishOption {
return func(o *PublishOptions) {
// SetMessageOption returns a function to setup a context with given value
func SetMessageOption(k, v interface{}) MessageOption {
return func(o *MessageOptions) {
if o.Context == nil {
o.Context = context.Background()
}

View File

@@ -32,7 +32,7 @@ type memoryMessage struct {
ctx context.Context
body []byte
hdr metadata.Metadata
opts broker.PublishOptions
opts broker.MessageOptions
}
func (m *memoryMessage) Ack() error {
@@ -157,8 +157,8 @@ func (b *Broker) Init(opts ...broker.Option) error {
return nil
}
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) {
options := broker.NewPublishOptions(opts...)
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) {
options := broker.NewMessageOptions(opts...)
if options.ContentType == "" {
options.ContentType = b.opts.ContentType
}

View File

@@ -49,7 +49,7 @@ func TestMemoryBroker(t *testing.T) {
"id", fmt.Sprintf("%d", i),
),
[]byte(`"hello world"`),
broker.PublishContentType("application/octet-stream"),
broker.MessageContentType("application/octet-stream"),
)
if err != nil {
t.Fatal(err)

View File

@@ -99,7 +99,7 @@ type noopMessage struct {
ctx context.Context
body []byte
hdr metadata.Metadata
opts PublishOptions
opts MessageOptions
}
func (m *noopMessage) Ack() error {
@@ -126,8 +126,8 @@ func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
return m.c.Unmarshal(m.body, dst)
}
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) {
options := NewPublishOptions(opts...)
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) {
options := NewMessageOptions(opts...)
if options.ContentType == "" {
options.ContentType = b.opts.ContentType
}

View File

@@ -87,8 +87,8 @@ func ContentType(ct string) Option {
}
}
// PublishOptions struct
type PublishOptions struct {
// MessageOptions struct
type MessageOptions struct {
// ContentType for message body
ContentType string
// BodyOnly flag says the message contains raw body bytes and don't need
@@ -98,9 +98,9 @@ type PublishOptions struct {
Context context.Context
}
// NewPublishOptions creates PublishOptions struct
func NewPublishOptions(opts ...PublishOption) PublishOptions {
options := PublishOptions{
// NewMessageOptions creates MessageOptions struct
func NewMessageOptions(opts ...MessageOption) MessageOptions {
options := MessageOptions{
Context: context.Background(),
}
for _, o := range opts {
@@ -128,19 +128,19 @@ type SubscribeOptions struct {
// Option func
type Option func(*Options)
// PublishOption func
type PublishOption func(*PublishOptions)
// MessageOption func
type MessageOption func(*MessageOptions)
// PublishContentType sets message content-type that used to Marshal
func PublishContentType(ct string) PublishOption {
return func(o *PublishOptions) {
// MessageContentType sets message content-type that used to Marshal
func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// PublishBodyOnly publish only body of the message
func PublishBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) {
// MessageBodyOnly publish only body of the message
func MessageBodyOnly(b bool) MessageOption {
return func(o *MessageOptions) {
o.BodyOnly = b
}
}

View File

@@ -15,11 +15,6 @@ import (
"go.unistack.org/micro/v4/tracer"
)
// DefaultCodecs will be used to encode/decode data
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type noopClient struct {
funcCall FuncCall
funcStream FuncStream

View File

@@ -161,7 +161,7 @@ func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
ContentType: DefaultContentType,
Codecs: DefaultCodecs,
Codecs: make(map[string]codec.Codec),
CallOptions: CallOptions{
Context: context.Background(),
Backoff: DefaultBackoff,

235
cluster/hasql/cluster.go Normal file
View File

@@ -0,0 +1,235 @@
package sql
import (
"context"
"database/sql"
"reflect"
"unsafe"
"golang.yandex/hasql/v2"
)
func newSQLRowError() *sql.Row {
row := &sql.Row{}
t := reflect.TypeOf(row).Elem()
field, _ := t.FieldByName("err")
rowPtr := unsafe.Pointer(row)
errFieldPtr := unsafe.Pointer(uintptr(rowPtr) + field.Offset)
errPtr := (*error)(errFieldPtr)
*errPtr = ErrorNoAliveNodes
return row
}
type ClusterQuerier interface {
Querier
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
}
type Cluster struct {
hasql *hasql.Cluster[Querier]
options ClusterOptions
}
// NewCluster returns [Querier] that provides cluster of nodes
func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
options := ClusterOptions{Context: context.Background()}
for _, opt := range opts {
opt(&options)
}
if options.NodeChecker == nil {
return nil, ErrClusterChecker
}
if options.NodeDiscoverer == nil {
return nil, ErrClusterDiscoverer
}
if options.NodePicker == nil {
return nil, ErrClusterPicker
}
if options.Retries < 1 {
options.Retries = 1
}
if options.NodeStateCriterion == 0 {
options.NodeStateCriterion = hasql.Primary
}
options.Options = append(options.Options, hasql.WithNodePicker(options.NodePicker))
if p, ok := options.NodePicker.(*CustomPicker[Querier]); ok {
p.opts.Priority = options.NodePriority
}
c, err := hasql.NewCluster(
options.NodeDiscoverer,
options.NodeChecker,
options.Options...,
)
if err != nil {
return nil, err
}
return &Cluster{hasql: c, options: options}, nil
}
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
var tx *sql.Tx
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if tx, err = n.DB().BeginTx(ctx, opts); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if tx == nil && err == nil {
err = ErrorNoAliveNodes
}
return tx, err
}
func (c *Cluster) Close() error {
return c.hasql.Close()
}
func (c *Cluster) Conn(ctx context.Context) (*sql.Conn, error) {
var conn *sql.Conn
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if conn, err = n.DB().Conn(ctx); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if conn == nil && err == nil {
err = ErrorNoAliveNodes
}
return conn, err
}
func (c *Cluster) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
var res sql.Result
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if res, err = n.DB().ExecContext(ctx, query, args...); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if res == nil && err == nil {
err = ErrorNoAliveNodes
}
return res, err
}
func (c *Cluster) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
var res *sql.Stmt
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if res, err = n.DB().PrepareContext(ctx, query); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if res == nil && err == nil {
err = ErrorNoAliveNodes
}
return res, err
}
func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
var res *sql.Rows
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if res, err = n.DB().QueryContext(ctx, query); err != nil && err != sql.ErrNoRows && retries >= c.options.Retries {
return true
}
}
return false
})
if res == nil && err == nil {
err = ErrorNoAliveNodes
}
return res, err
}
func (c *Cluster) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
var res *sql.Row
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
res = n.DB().QueryRowContext(ctx, query, args...)
if res.Err() == nil {
return false
} else if res.Err() != nil && retries >= c.options.Retries {
return false
}
}
return true
})
if res == nil {
res = newSQLRowError()
}
return res
}
func (c *Cluster) PingContext(ctx context.Context) error {
var err error
var ok bool
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
ok = true
for ; retries < c.options.Retries; retries++ {
if err = n.DB().PingContext(ctx); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if !ok {
err = ErrorNoAliveNodes
}
return err
}
func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStateCriterion) error {
for _, criterion := range criterions {
if _, err := c.hasql.WaitForNode(ctx, criterion); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,171 @@
package sql
import (
"context"
"fmt"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"golang.yandex/hasql/v2"
)
func TestNewCluster(t *testing.T) {
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbMaster.Close()
dbMasterMock.MatchExpectationsInOrder(false)
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(1, 0)).
RowsWillBeClosed().
WithoutArgs()
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("master-dc1"))
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbDRMaster.Close()
dbDRMasterMock.MatchExpectationsInOrder(false)
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 40)).
RowsWillBeClosed().
WithoutArgs()
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster1-dc2"))
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster"))
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC1.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC2.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
c, err := NewCluster[Querier](
WithClusterContext(tctx),
WithClusterNodeChecker(hasql.PostgreSQLChecker),
WithClusterNodePicker(NewCustomPicker[Querier](
CustomPickerMaxLag(100),
)),
WithClusterNodes(
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
ClusterNode{"master-dc1", dbMaster, 1},
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
),
WithClusterOptions(
hasql.WithUpdateInterval[Querier](2*time.Second),
hasql.WithUpdateTimeout[Querier](1*time.Second),
),
)
if err != nil {
t.Fatal(err)
}
defer c.Close()
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
node1Name := ""
fmt.Printf("check for Standby\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.Standby), "SELECT node_name as name"); row.Err() != nil {
t.Fatal(row.Err())
} else if err = row.Scan(&node1Name); err != nil {
t.Fatal(err)
} else if "slave-dc1" != node1Name {
t.Fatalf("invalid node name %s != %s", "slave-dc1", node1Name)
}
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
node2Name := ""
fmt.Printf("check for PreferStandby\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() != nil {
t.Fatal(row.Err())
} else if err = row.Scan(&node2Name); err != nil {
t.Fatal(err)
} else if "slave-dc1" != node2Name {
t.Fatalf("invalid node name %s != %s", "slave-dc1", node2Name)
}
node3Name := ""
fmt.Printf("check for PreferPrimary\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferPrimary), "SELECT node_name as name"); row.Err() != nil {
t.Fatal(row.Err())
} else if err = row.Scan(&node3Name); err != nil {
t.Fatal(err)
} else if "master-dc1" != node3Name {
t.Fatalf("invalid node name %s != %s", "master-dc1", node3Name)
}
dbSlaveDC1Mock.ExpectQuery(`.*`).WillReturnRows(sqlmock.NewRows([]string{"role"}).RowError(1, fmt.Errorf("row error")))
time.Sleep(2 * time.Second)
fmt.Printf("check for PreferStandby\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() == nil {
t.Fatal("must return error")
}
if dbMasterErr := dbMasterMock.ExpectationsWereMet(); dbMasterErr != nil {
t.Error(dbMasterErr)
}
}

25
cluster/hasql/db.go Normal file
View File

@@ -0,0 +1,25 @@
package sql
import (
"context"
"database/sql"
)
type Querier interface {
// Basic connection methods
PingContext(ctx context.Context) error
Close() error
// Query methods with context
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
// Prepared statements with context
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
// Transaction management with context
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Conn(ctx context.Context) (*sql.Conn, error)
}

295
cluster/hasql/driver.go Normal file
View File

@@ -0,0 +1,295 @@
package sql
import (
"context"
"database/sql"
"database/sql/driver"
"io"
"sync"
"time"
)
// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier]
func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
driver := NewClusterDriver(db)
connector, err := driver.OpenConnector("")
if err != nil {
return nil, err
}
return sql.OpenDB(connector), nil
}
// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier]
type ClusterDriver struct {
db ClusterQuerier
}
// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier]
func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
return &ClusterDriver{db: db}
}
// Open implements [driver.Driver.Open]
func (d *ClusterDriver) Open(name string) (driver.Conn, error) {
return d.Connect(context.Background())
}
// OpenConnector implements [driver.DriverContext.OpenConnector]
func (d *ClusterDriver) OpenConnector(name string) (driver.Connector, error) {
return d, nil
}
// Connect implements [driver.Connector.Connect]
func (d *ClusterDriver) Connect(ctx context.Context) (driver.Conn, error) {
conn, err := d.db.Conn(ctx)
if err != nil {
return nil, err
}
return &dbConn{conn: conn}, nil
}
// Driver implements [driver.Connector.Driver]
func (d *ClusterDriver) Driver() driver.Driver {
return d
}
// dbConn implements driver.Conn with both context and legacy methods
type dbConn struct {
conn *sql.Conn
mu sync.Mutex
}
// Prepare implements [driver.Conn.Prepare] (legacy method)
func (c *dbConn) Prepare(query string) (driver.Stmt, error) {
return c.PrepareContext(context.Background(), query)
}
// PrepareContext implements [driver.ConnPrepareContext.PrepareContext]
func (c *dbConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
c.mu.Lock()
defer c.mu.Unlock()
stmt, err := c.conn.PrepareContext(ctx, query)
if err != nil {
return nil, err
}
return &dbStmt{stmt: stmt}, nil
}
// Exec implements [driver.Execer.Exec] (legacy method)
func (c *dbConn) Exec(query string, args []driver.Value) (driver.Result, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return c.ExecContext(context.Background(), query, namedArgs)
}
// ExecContext implements [driver.ExecerContext.ExecContext]
func (c *dbConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Convert driver.NamedValue to any
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
return c.conn.ExecContext(ctx, query, interfaceArgs...)
}
// Query implements [driver.Queryer.Query] (legacy method)
func (c *dbConn) Query(query string, args []driver.Value) (driver.Rows, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return c.QueryContext(context.Background(), query, namedArgs)
}
// QueryContext implements [driver.QueryerContext.QueryContext]
func (c *dbConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Convert driver.NamedValue to any
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
rows, err := c.conn.QueryContext(ctx, query, interfaceArgs...)
if err != nil {
return nil, err
}
return &dbRows{rows: rows}, nil
}
// Begin implements [driver.Conn.Begin] (legacy method)
func (c *dbConn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{})
}
// BeginTx implements [driver.ConnBeginTx.BeginTx]
func (c *dbConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
c.mu.Lock()
defer c.mu.Unlock()
sqlOpts := &sql.TxOptions{
Isolation: sql.IsolationLevel(opts.Isolation),
ReadOnly: opts.ReadOnly,
}
tx, err := c.conn.BeginTx(ctx, sqlOpts)
if err != nil {
return nil, err
}
return &dbTx{tx: tx}, nil
}
// Ping implements [driver.Pinger.Ping]
func (c *dbConn) Ping(ctx context.Context) error {
return c.conn.PingContext(ctx)
}
// Close implements [driver.Conn.Close]
func (c *dbConn) Close() error {
return c.conn.Close()
}
// IsValid implements [driver.Validator.IsValid]
func (c *dbConn) IsValid() bool {
// Ping with a short timeout to check if the connection is still valid
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
return c.conn.PingContext(ctx) == nil
}
// dbStmt implements [driver.Stmt] with both context and legacy methods
type dbStmt struct {
stmt *sql.Stmt
mu sync.Mutex
}
// Close implements [driver.Stmt.Close]
func (s *dbStmt) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.stmt.Close()
}
// Close implements [driver.Stmt.NumInput]
func (s *dbStmt) NumInput() int {
return -1 // Number of parameters is unknown
}
// Exec implements [driver.Stmt.Exec] (legacy method)
func (s *dbStmt) Exec(args []driver.Value) (driver.Result, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return s.ExecContext(context.Background(), namedArgs)
}
// ExecContext implements [driver.StmtExecContext.ExecContext]
func (s *dbStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
s.mu.Lock()
defer s.mu.Unlock()
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
return s.stmt.ExecContext(ctx, interfaceArgs...)
}
// Query implements [driver.Stmt.Query] (legacy method)
func (s *dbStmt) Query(args []driver.Value) (driver.Rows, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return s.QueryContext(context.Background(), namedArgs)
}
// QueryContext implements [driver.StmtQueryContext.QueryContext]
func (s *dbStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
s.mu.Lock()
defer s.mu.Unlock()
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
rows, err := s.stmt.QueryContext(ctx, interfaceArgs...)
if err != nil {
return nil, err
}
return &dbRows{rows: rows}, nil
}
// dbRows implements [driver.Rows]
type dbRows struct {
rows *sql.Rows
}
// Columns implements [driver.Rows.Columns]
func (r *dbRows) Columns() []string {
cols, err := r.rows.Columns()
if err != nil {
// This shouldn't happen if the query was successful
return []string{}
}
return cols
}
// Close implements [driver.Rows.Close]
func (r *dbRows) Close() error {
return r.rows.Close()
}
// Next implements [driver.Rows.Next]
func (r *dbRows) Next(dest []driver.Value) error {
if !r.rows.Next() {
if err := r.rows.Err(); err != nil {
return err
}
return io.EOF
}
// Create a slice of interfaces to scan into
scanArgs := make([]any, len(dest))
for i := range scanArgs {
scanArgs[i] = &dest[i]
}
return r.rows.Scan(scanArgs...)
}
// dbTx implements [driver.Tx]
type dbTx struct {
tx *sql.Tx
mu sync.Mutex
}
// Commit implements [driver.Tx.Commit]
func (t *dbTx) Commit() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.tx.Commit()
}
// Rollback implements [driver.Tx.Rollback]
func (t *dbTx) Rollback() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.tx.Rollback()
}

View File

@@ -0,0 +1,141 @@
package sql
import (
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"golang.yandex/hasql/v2"
)
func TestDriver(t *testing.T) {
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbMaster.Close()
dbMasterMock.MatchExpectationsInOrder(false)
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(1, 0)).
RowsWillBeClosed().
WithoutArgs()
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("master-dc1"))
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbDRMaster.Close()
dbDRMasterMock.MatchExpectationsInOrder(false)
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 40)).
RowsWillBeClosed().
WithoutArgs()
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster1-dc2"))
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster"))
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC1.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC2.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
c, err := NewCluster[Querier](
WithClusterContext(tctx),
WithClusterNodeChecker(hasql.PostgreSQLChecker),
WithClusterNodePicker(NewCustomPicker[Querier](
CustomPickerMaxLag(100),
)),
WithClusterNodes(
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
ClusterNode{"master-dc1", dbMaster, 1},
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
),
WithClusterOptions(
hasql.WithUpdateInterval[Querier](2*time.Second),
hasql.WithUpdateTimeout[Querier](1*time.Second),
),
)
if err != nil {
t.Fatal(err)
}
defer c.Close()
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
t.Fatal(err)
}
db, err := OpenDBWithCluster(c)
if err != nil {
t.Fatal(err)
}
// Use context methods
row := db.QueryRowContext(NodeStateCriterion(t.Context(), hasql.Primary), "SELECT node_name as name")
if err = row.Err(); err != nil {
t.Fatal(err)
}
nodeName := ""
if err = row.Scan(&nodeName); err != nil {
t.Fatal(err)
}
if nodeName != "master-dc1" {
t.Fatalf("invalid node_name %s != %s", "master-dc1", nodeName)
}
}

10
cluster/hasql/error.go Normal file
View File

@@ -0,0 +1,10 @@
package sql
import "errors"
var (
ErrClusterChecker = errors.New("cluster node checker required")
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
ErrClusterPicker = errors.New("cluster node picker required")
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
)

110
cluster/hasql/options.go Normal file
View File

@@ -0,0 +1,110 @@
package sql
import (
"context"
"math"
"golang.yandex/hasql/v2"
)
// ClusterOptions contains cluster specific options
type ClusterOptions struct {
NodeChecker hasql.NodeChecker
NodePicker hasql.NodePicker[Querier]
NodeDiscoverer hasql.NodeDiscoverer[Querier]
Options []hasql.ClusterOpt[Querier]
Context context.Context
Retries int
NodePriority map[string]int32
NodeStateCriterion hasql.NodeStateCriterion
}
// ClusterOption apply cluster options to ClusterOptions
type ClusterOption func(*ClusterOptions)
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
return func(o *ClusterOptions) {
o.NodeChecker = c
}
}
// WithClusterNodePicker pass hasql.NodePicker to cluster options
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodePicker = p
}
}
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodeDiscoverer = d
}
}
// WithRetries retry count on other nodes in case of error
func WithRetries(n int) ClusterOption {
return func(o *ClusterOptions) {
o.Retries = n
}
}
// WithClusterContext pass context.Context to cluster options and used for checks
func WithClusterContext(ctx context.Context) ClusterOption {
return func(o *ClusterOptions) {
o.Context = ctx
}
}
// WithClusterOptions pass hasql.ClusterOpt
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.Options = append(o.Options, opts...)
}
}
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
return func(o *ClusterOptions) {
o.NodeStateCriterion = c
}
}
type ClusterNode struct {
Name string
DB Querier
Priority int32
}
// WithClusterNodes create cluster with static NodeDiscoverer
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
return func(o *ClusterOptions) {
nodes := make([]*hasql.Node[Querier], 0, len(cns))
if o.NodePriority == nil {
o.NodePriority = make(map[string]int32, len(cns))
}
for _, cn := range cns {
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
if cn.Priority == 0 {
cn.Priority = math.MaxInt32
}
o.NodePriority[cn.Name] = cn.Priority
}
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
}
}
type nodeStateCriterionKey struct{}
// NodeStateCriterion inject hasql.NodeStateCriterion to context
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
}
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
return v
}
return c.options.NodeStateCriterion
}

113
cluster/hasql/picker.go Normal file
View File

@@ -0,0 +1,113 @@
package sql
import (
"fmt"
"math"
"time"
"golang.yandex/hasql/v2"
)
// compile time guard
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
// CustomPickerOptions holds options to pick nodes
type CustomPickerOptions struct {
MaxLag int
Priority map[string]int32
Retries int
}
// CustomPickerOption func apply option to CustomPickerOptions
type CustomPickerOption func(*CustomPickerOptions)
// CustomPickerMaxLag specifies max lag for which node can be used
func CustomPickerMaxLag(n int) CustomPickerOption {
return func(o *CustomPickerOptions) {
o.MaxLag = n
}
}
// NewCustomPicker creates new node picker
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
options := CustomPickerOptions{}
for _, o := range opts {
o(&options)
}
return &CustomPicker[Querier]{opts: options}
}
// CustomPicker holds node picker options
type CustomPicker[T Querier] struct {
opts CustomPickerOptions
}
// PickNode used to return specific node
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
for _, n := range cnodes {
fmt.Printf("node %s\n", n.Node.String())
}
return cnodes[0]
}
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
if prio, ok := p.opts.Priority[nodeName]; ok {
return prio
}
return math.MaxInt32 // Default to lowest priority
}
// CompareNodes used to sort nodes
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
// Get replication lag values
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
// First check that lag lower then MaxLag
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
return 0 // both are equal
}
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
if aLag > p.opts.MaxLag {
return 1 // b is better
}
if bLag > p.opts.MaxLag {
return -1 // a is better
}
// Get node priorities
aPrio := p.getPriority(a.Node.String())
bPrio := p.getPriority(b.Node.String())
// if both priority equals
if aPrio == bPrio {
// First compare by replication lag
if aLag < bLag {
return -1
}
if aLag > bLag {
return 1
}
// If replication lag is equal, compare by latency
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
if aLatency < bLatency {
return -1
}
if aLatency > bLatency {
return 1
}
// If lag and latency is equal
return 0
}
// If priorities are different, prefer the node with lower priority value
if aPrio < bPrio {
return -1
}
return 1
}

31
go.mod
View File

@@ -1,34 +1,33 @@
module go.unistack.org/micro/v4
go 1.22.0
go 1.25
require (
dario.cat/mergo v1.0.1
dario.cat/mergo v1.0.2
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/KimMachineGun/automemlimit v0.7.0
github.com/goccy/go-yaml v1.17.1
github.com/KimMachineGun/automemlimit v0.7.5
github.com/goccy/go-yaml v1.18.0
github.com/google/uuid v1.6.0
github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/spf13/cast v1.7.1
github.com/stretchr/testify v1.10.0
github.com/spf13/cast v1.10.0
github.com/stretchr/testify v1.11.1
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v4 v4.1.0
golang.org/x/sync v0.10.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
golang.org/x/sync v0.17.0
golang.yandex/hasql/v2 v2.1.0
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
)
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
golang.org/x/sys v0.37.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

70
go.sum
View File

@@ -1,19 +1,19 @@
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlDI/pBWK49u88=
github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk=
github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
@@ -30,38 +30,36 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k=
golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -3,6 +3,7 @@ package sql
import (
"context"
"database/sql"
"sync"
"time"
)
@@ -11,31 +12,84 @@ type Statser interface {
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
if db == nil {
return
}
options := NewOptions(opts...)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
var (
statsMu sync.Mutex
lastUpdated time.Time
maxOpenConnections, openConnections, inUse, idle, waitCount float64
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
waitDuration float64
)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
updateFn := func() {
statsMu.Lock()
defer statsMu.Unlock()
if time.Since(lastUpdated) < options.MeterStatsInterval {
return
}
}()
stats := db.Stats()
maxOpenConnections = float64(stats.MaxOpenConnections)
openConnections = float64(stats.OpenConnections)
inUse = float64(stats.InUse)
idle = float64(stats.Idle)
waitCount = float64(stats.WaitCount)
maxIdleClosed = float64(stats.MaxIdleClosed)
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
waitDuration = float64(stats.WaitDuration.Seconds())
lastUpdated = time.Now()
}
options.Meter.Gauge(MaxOpenConnections, func() float64 {
updateFn()
return maxOpenConnections
})
options.Meter.Gauge(OpenConnections, func() float64 {
updateFn()
return openConnections
})
options.Meter.Gauge(InuseConnections, func() float64 {
updateFn()
return inUse
})
options.Meter.Gauge(IdleConnections, func() float64 {
updateFn()
return idle
})
options.Meter.Gauge(WaitConnections, func() float64 {
updateFn()
return waitCount
})
options.Meter.Gauge(BlockedSeconds, func() float64 {
updateFn()
return waitDuration
})
options.Meter.Gauge(MaxIdleClosed, func() float64 {
updateFn()
return maxIdleClosed
})
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
updateFn()
return maxIdleTimeClosed
})
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
updateFn()
return maxLifetimeClosed
})
}

View File

@@ -52,6 +52,12 @@ type Options struct {
AddStacktrace bool
// DedupKeys deduplicate keys in log output
DedupKeys bool
// FatalFinalizers runs in order in [logger.Fatal] method
FatalFinalizers []func(context.Context)
}
var DefaultFatalFinalizer = func(ctx context.Context) {
os.Exit(1)
}
// NewOptions creates new options struct
@@ -65,6 +71,7 @@ func NewOptions(opts ...Option) Options {
AddSource: true,
TimeFunc: time.Now,
Meter: meter.DefaultMeter,
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
}
WithMicroKeys()(&options)
@@ -76,6 +83,13 @@ func NewOptions(opts ...Option) Options {
return options
}
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {
o.FatalFinalizers = fncs
}
}
// WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) {

View File

@@ -4,14 +4,12 @@ import (
"context"
"io"
"log/slog"
"os"
"reflect"
"regexp"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/semconv"
@@ -231,11 +229,12 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
s.printLog(ctx, logger.FatalLevel, msg, attrs...)
for _, fn := range s.opts.FatalFinalizers {
fn(ctx)
}
if closer, ok := s.opts.Out.(io.Closer); ok {
closer.Close()
}
time.Sleep(1 * time.Second)
os.Exit(1)
}
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {

View File

@@ -80,7 +80,7 @@ func TestTime(t *testing.T) {
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
logger.WithTimeFunc(func() time.Time {
return time.Unix(0, 0)
return time.Unix(0, 0).UTC()
}),
)
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
@@ -89,8 +89,7 @@ func TestTime(t *testing.T) {
l.Error(ctx, "msg1", errors.New("err"))
if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T03:00:00.000000000+03:00`)) &&
!bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
@@ -470,3 +469,25 @@ func Test_WithContextAttrFunc(t *testing.T) {
// t.Logf("xxx %s", buf.Bytes())
}
func TestFatalFinalizers(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(
logger.WithLevel(logger.TraceLevel),
logger.WithOutput(buf),
)
if err := l.Init(
logger.WithFatalFinalizers(func(ctx context.Context) {
l.Info(ctx, "fatal finalizer")
})); err != nil {
t.Fatal(err)
}
l.Fatal(ctx, "info_msg1")
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
}
}

View File

@@ -4,8 +4,8 @@ package meter
import (
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
)
@@ -49,9 +49,11 @@ type Meter interface {
Set(opts ...Option) Meter
// Histogram get or create histogram
Histogram(name string, labels ...string) Histogram
// HistogramExt get or create histogram with specified quantiles
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
// Summary get or create summary
Summary(name string, labels ...string) Summary
// SummaryExt get or create summary with spcified quantiles and window time
// SummaryExt get or create summary with specified quantiles and window time
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
// Write writes metrics to io.Writer
Write(w io.Writer, opts ...Option) error
@@ -59,6 +61,8 @@ type Meter interface {
Options() Options
// String return meter type
String() string
// Unregister metric name and drop all data
Unregister(name string, labels ...string) bool
}
// Counter is a counter
@@ -80,7 +84,11 @@ type FloatCounter interface {
// Gauge is a float64 gauge
type Gauge interface {
Add(float64)
Get() float64
Set(float64)
Dec()
Inc()
}
// Histogram is a histogram for non-negative values with automatically created buckets
@@ -117,6 +125,39 @@ func BuildLabels(labels ...string) []string {
return labels
}
var spool = newStringsPool(500)
type stringsPool struct {
p *sync.Pool
c int
}
func newStringsPool(size int) *stringsPool {
p := &stringsPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
}
return p
}
func (p *stringsPool) Cap() int {
return p.c
}
func (p *stringsPool) Get() *strings.Builder {
return p.p.Get().(*strings.Builder)
}
func (p *stringsPool) Put(b *strings.Builder) {
if b.Cap() > p.c {
return
}
b.Reset()
p.p.Put(b)
}
// BuildName used to combine metric with labels.
// If labels count is odd, drop last element
func BuildName(name string, labels ...string) string {
@@ -125,8 +166,6 @@ func BuildName(name string, labels ...string) string {
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
@@ -141,7 +180,9 @@ func BuildName(name string, labels ...string) string {
}
}
var b strings.Builder
b := spool.Get()
defer spool.Put(b)
_, _ = b.WriteString(name)
_, _ = b.WriteRune('{')
for idx := 0; idx < len(labels); idx += 2 {
@@ -149,8 +190,9 @@ func BuildName(name string, labels ...string) string {
_, _ = b.WriteRune(',')
}
_, _ = b.WriteString(labels[idx])
_, _ = b.WriteString(`=`)
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
_, _ = b.WriteString(`="`)
_, _ = b.WriteString(labels[idx+1])
_, _ = b.WriteRune('"')
}
_, _ = b.WriteRune('}')

View File

@@ -50,11 +50,12 @@ func TestBuildName(t *testing.T) {
data := map[string][]string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: {
"my_metric",
"zerolabel", "value3", "firstlabel", "value2",
"firstlabel", "value2",
"zerolabel", "value3",
},
`my_metric{broker="broker2",register="mdns",server="tcp"}`: {
"my_metric",
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
"broker", "broker1", "broker", "broker2", "register", "mdns", "server", "http", "server", "tcp",
},
`my_metric{aaa="aaa"}`: {
"my_metric",

View File

@@ -28,6 +28,10 @@ func (r *noopMeter) Name() string {
return r.opts.Name
}
func (r *noopMeter) Unregister(name string, labels ...string) bool {
return true
}
// Init initialize options
func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts {
@@ -66,6 +70,11 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// HistogramExt implements the Meter interface
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// Set implements the Meter interface
func (r *noopMeter) Set(opts ...Option) Meter {
m := &noopMeter{opts: r.opts}
@@ -132,6 +141,18 @@ type noopGauge struct {
labels []string
}
func (r *noopGauge) Add(float64) {
}
func (r *noopGauge) Set(float64) {
}
func (r *noopGauge) Inc() {
}
func (r *noopGauge) Dec() {
}
func (r *noopGauge) Get() float64 {
return 0
}

View File

@@ -4,6 +4,8 @@ import (
"context"
)
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
// Option powers the configuration for metrics implementations:
type Option func(*Options)
@@ -23,6 +25,8 @@ type Options struct {
WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool
// Quantiles specifies buckets for histogram
Quantiles []float64
}
// NewOptions prepares a set of options:
@@ -61,14 +65,12 @@ func Address(value string) Option {
}
}
/*
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
func TimingObjectives(value map[float64]float64) Option {
// Quantiles defines the desired spread of statistics for histogram metrics:
func Quantiles(quantiles []float64) Option {
return func(o *Options) {
o.TimingObjectives = value
o.Quantiles = quantiles
}
}
*/
// Labels add the meter labels
func Labels(ls ...string) Option {

View File

@@ -91,7 +91,7 @@ func (p *bro) Connect(_ context.Context) error { return nil }
func (p *bro) Disconnect(_ context.Context) error { return nil }
// NewMessage creates new message
func (p *bro) NewMessage(_ context.Context, _ metadata.Metadata, _ interface{}, _ ...broker.PublishOption) (broker.Message, error) {
func (p *bro) NewMessage(_ context.Context, _ metadata.Metadata, _ interface{}, _ ...broker.MessageOption) (broker.Message, error) {
return nil, nil
}

View File

@@ -6,7 +6,6 @@ import (
"sync"
"time"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register"
maddr "go.unistack.org/micro/v4/util/addr"
@@ -14,11 +13,6 @@ import (
"go.unistack.org/micro/v4/util/rand"
)
// DefaultCodecs will be used to encode/decode
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type rpcHandler struct {
opts HandlerOptions
handler interface{}

View File

@@ -8,7 +8,6 @@ import (
"time"
"github.com/KimMachineGun/automemlimit/memlimit"
"go.uber.org/automaxprocs/maxprocs"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config"
@@ -23,8 +22,8 @@ import (
)
func init() {
_, _ = maxprocs.Set()
_, _ = memlimit.SetGoMemLimitWithOpts(
memlimit.WithRefreshInterval(1*time.Minute),
memlimit.WithRatio(0.9),
memlimit.WithProvider(
memlimit.ApplyFallback(

815
store/mock/mock.go Normal file
View File

@@ -0,0 +1,815 @@
package mock
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
"go.unistack.org/micro/v4/store"
)
// ExpectedWrite represents an expected Write operation
type ExpectedWrite struct {
key string
value interface{}
ttl time.Duration
metadata map[string]string
namespace string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedWrite) match(key string, val interface{}, opts ...store.WriteOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check value match
if e.value != nil && !reflect.DeepEqual(e.value, val) {
return false
}
// Check options
options := store.NewWriteOptions(opts...)
if e.ttl > 0 && e.ttl != options.TTL {
return false
}
if e.namespace != "" && e.namespace != options.Namespace {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedRead represents an expected Read operation
type ExpectedRead struct {
key string
value interface{}
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedRead) match(key string, opts ...store.ReadOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedDelete represents an expected Delete operation
type ExpectedDelete struct {
key string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedDelete) match(key string, opts ...store.DeleteOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedExists represents an expected Exists operation
type ExpectedExists struct {
key string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedExists) match(key string, opts ...store.ExistsOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedList represents an expected List operation
type ExpectedList struct {
times int
called int
mutex sync.Mutex
err error
keys []string
}
func (e *ExpectedList) match(opts ...store.ListOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// Store is a mock implementation of the Store interface for testing
type Store struct {
expectedWrites []*ExpectedWrite
expectedReads []*ExpectedRead
expectedDeletes []*ExpectedDelete
expectedExists []*ExpectedExists
expectedLists []*ExpectedList
data map[string]interface{}
exists map[string]bool
ttls map[string]time.Time // key -> expiration time
metadata map[string]map[string]string
err error
opts store.Options
mutex sync.RWMutex
}
// NewStore creates a new mock store
func NewStore(opts ...store.Option) *Store {
options := store.NewOptions(opts...)
return &Store{
data: make(map[string]interface{}),
exists: make(map[string]bool),
ttls: make(map[string]time.Time),
metadata: make(map[string]map[string]string),
opts: options,
}
}
// ExpectWrite creates an expectation for a Write operation
func (m *Store) ExpectWrite(key string) *ExpectedWrite {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedWrite{key: key}
m.expectedWrites = append(m.expectedWrites, exp)
return exp
}
// ExpectRead creates an expectation for a Read operation
func (m *Store) ExpectRead(key string) *ExpectedRead {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedRead{key: key}
m.expectedReads = append(m.expectedReads, exp)
return exp
}
// ExpectDelete creates an expectation for a Delete operation
func (m *Store) ExpectDelete(key string) *ExpectedDelete {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedDelete{key: key}
m.expectedDeletes = append(m.expectedDeletes, exp)
return exp
}
// ExpectExists creates an expectation for an Exists operation
func (m *Store) ExpectExists(key string) *ExpectedExists {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedExists{key: key}
m.expectedExists = append(m.expectedExists, exp)
return exp
}
// ExpectList creates an expectation for a List operation
func (m *Store) ExpectList() *ExpectedList {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedList{}
m.expectedLists = append(m.expectedLists, exp)
return exp
}
// WithValue sets the value to return for expected operations
func (e *ExpectedWrite) WithValue(val interface{}) *ExpectedWrite {
e.value = val
return e
}
// WithTTL sets the TTL for expected Write operations
func (e *ExpectedWrite) WithTTL(ttl time.Duration) *ExpectedWrite {
e.ttl = ttl
return e
}
// WithNamespace sets the namespace for expected operations
func (e *ExpectedWrite) WithNamespace(ns string) *ExpectedWrite {
e.namespace = ns
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedWrite) Times(n int) *ExpectedWrite {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedWrite) WillReturnError(err error) *ExpectedWrite {
e.err = err
return e
}
// WithValue sets the value to return for expected Read operations
func (e *ExpectedRead) WithValue(val interface{}) *ExpectedRead {
e.value = val
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedRead) Times(n int) *ExpectedRead {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedRead) WillReturnError(err error) *ExpectedRead {
e.err = err
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedDelete) Times(n int) *ExpectedDelete {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedDelete) WillReturnError(err error) *ExpectedDelete {
e.err = err
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedExists) Times(n int) *ExpectedExists {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedExists) WillReturnError(err error) *ExpectedExists {
e.err = err
return e
}
// WillReturn sets the keys to return for List operations
func (e *ExpectedList) WillReturn(keys ...string) *ExpectedList {
e.keys = keys
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedList) Times(n int) *ExpectedList {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedList) WillReturnError(err error) *ExpectedList {
e.err = err
return e
}
// checkTTL checks if a key has expired
func (m *Store) checkTTL(key string) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
if exp, ok := m.ttls[key]; ok {
if time.Now().After(exp) {
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
return false
}
}
return true
}
// FastForward decrements all TTLs by the given duration
func (m *Store) FastForward(d time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
now := time.Now()
for key, exp := range m.ttls {
// Calculate remaining time before fast forward
remaining := time.Until(exp)
if remaining <= 0 {
// Already expired, remove it
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
} else {
// Apply fast forward
newRemaining := remaining - d
if newRemaining <= 0 {
// Would expire after fast forward, remove it
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
} else {
// Update expiration time
m.ttls[key] = now.Add(newRemaining)
}
}
}
}
// Name returns store name
func (m *Store) Name() string {
return m.opts.Name
}
// Init initializes the mock store
func (m *Store) Init(opts ...store.Option) error {
if m.err != nil {
return m.err
}
for _, o := range opts {
o(&m.opts)
}
return nil
}
// Connect is used when store needs to be connected
func (m *Store) Connect(ctx context.Context) error {
if m.err != nil {
return m.err
}
return nil
}
// Options returns the current options
func (m *Store) Options() store.Options {
return m.opts
}
// Exists checks that key exists in store
func (m *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
if m.err != nil {
return m.err
}
// Check TTL first
if !m.checkTTL(key) {
return store.ErrNotFound
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedExists {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
if !m.exists[key] {
return store.ErrNotFound
}
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
if !m.exists[key] {
return store.ErrNotFound
}
return nil
}
// Read reads a single key name to provided value with optional ReadOptions
func (m *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
if m.err != nil {
return m.err
}
// Check TTL first
if !m.checkTTL(key) {
return store.ErrNotFound
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedReads {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
if !m.exists[key] {
return store.ErrNotFound
}
// Copy the value from expected or actual data
data := exp.value
if data == nil {
data = m.data[key]
}
if data != nil {
// Simple type conversion for testing
if target, ok := val.(*interface{}); ok {
*target = data
} else if target, ok := val.(*string); ok {
if s, ok := data.(string); ok {
*target = s
} else {
*target = fmt.Sprintf("%v", data)
}
} else if target, ok := val.(*int); ok {
if i, ok := data.(int); ok {
*target = i
}
}
}
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
if !m.exists[key] {
return store.ErrNotFound
}
if data, ok := m.data[key]; ok {
if target, ok := val.(*interface{}); ok {
*target = data
} else if target, ok := val.(*string); ok {
if s, ok := data.(string); ok {
*target = s
} else {
*target = fmt.Sprintf("%v", data)
}
} else if target, ok := val.(*int); ok {
if i, ok := data.(int); ok {
*target = i
}
}
}
return nil
}
// Write writes a value to key name to the store with optional WriteOption
func (m *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
if m.err != nil {
return m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedWrites {
if exp.match(key, val, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
// Apply the write operation
m.mutex.Lock()
m.data[key] = val
m.exists[key] = true
// Handle TTL
options := store.NewWriteOptions(opts...)
if options.TTL > 0 {
m.ttls[key] = time.Now().Add(options.TTL)
} else {
delete(m.ttls, key) // Remove TTL if not set
}
// Handle metadata
if options.Metadata != nil {
m.metadata[key] = make(map[string]string)
for k, v := range options.Metadata {
// Convert []string to string by joining with comma
if len(v) > 0 {
m.metadata[key][k] = strings.Join(v, ",")
} else {
m.metadata[key][k] = ""
}
}
}
m.mutex.Unlock()
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
m.mutex.Lock()
m.data[key] = val
m.exists[key] = true
options := store.NewWriteOptions(opts...)
if options.TTL > 0 {
m.ttls[key] = time.Now().Add(options.TTL)
} else {
delete(m.ttls, key)
}
if options.Metadata != nil {
m.metadata[key] = make(map[string]string)
for k, v := range options.Metadata {
// Convert []string to string by joining with comma
if len(v) > 0 {
m.metadata[key][k] = strings.Join(v, ",")
} else {
m.metadata[key][k] = ""
}
}
}
m.mutex.Unlock()
return nil
}
// Delete removes the record with the corresponding key from the store
func (m *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
if m.err != nil {
return m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedDeletes {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
m.mutex.Lock()
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
m.mutex.Unlock()
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
m.mutex.Lock()
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
m.mutex.Unlock()
return nil
}
// List returns any keys that match, or an empty list with no error if none matched
func (m *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
if m.err != nil {
return nil, m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedLists {
if exp.match(opts...) {
m.mutex.Unlock()
if exp.err != nil {
return nil, exp.err
}
return exp.keys, nil
}
}
m.mutex.Unlock()
// If no expectation matched, return actual keys
m.mutex.RLock()
defer m.mutex.RUnlock()
var keys []string
for key := range m.data {
// Check TTL
if exp, ok := m.ttls[key]; ok {
if time.Now().After(exp) {
continue // Skip expired keys
}
}
keys = append(keys, key)
}
// Apply list options filtering
options := store.NewListOptions(opts...)
if options.Prefix != "" {
var filtered []string
for _, key := range keys {
if len(key) >= len(options.Prefix) && key[:len(options.Prefix)] == options.Prefix {
filtered = append(filtered, key)
}
}
keys = filtered
}
if options.Suffix != "" {
var filtered []string
for _, key := range keys {
if len(key) >= len(options.Suffix) && key[len(key)-len(options.Suffix):] == options.Suffix {
filtered = append(filtered, key)
}
}
keys = filtered
}
// Apply limit and offset
if options.Limit > 0 && int(options.Limit) < len(keys) {
end := int(options.Offset) + int(options.Limit)
if end > len(keys) {
end = len(keys)
}
if int(options.Offset) < len(keys) {
keys = keys[options.Offset:end]
} else {
keys = []string{}
}
} else if options.Offset > 0 && int(options.Offset) < len(keys) {
keys = keys[options.Offset:]
} else if options.Offset >= uint(len(keys)) {
keys = []string{}
}
return keys, nil
}
// Disconnect disconnects the mock store
func (m *Store) Disconnect(ctx context.Context) error {
if m.err != nil {
return m.err
}
return nil
}
// String returns the name of the implementation
func (m *Store) String() string {
return "mock"
}
// Watch returns events watcher
func (m *Store) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) {
if m.err != nil {
return nil, m.err
}
return NewWatcher(), nil
}
// Live returns store liveness
func (m *Store) Live() bool {
return true
}
// Ready returns store readiness
func (m *Store) Ready() bool {
return true
}
// Health returns store health
func (m *Store) Health() bool {
return true
}
// ExpectationsWereMet checks that all expected operations were called the expected number of times
func (m *Store) ExpectationsWereMet() error {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, exp := range m.expectedWrites {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected write for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedReads {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected read for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedDeletes {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected delete for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedExists {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected exists for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedLists {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected list to be called %d times, but was called %d times", exp.times, exp.called)
}
}
return nil
}
// Watcher is a mock implementation of the Watcher interface
type Watcher struct {
events chan store.Event
stop chan bool
}
// NewWatcher creates a new mock watcher
func NewWatcher() *Watcher {
return &Watcher{
events: make(chan store.Event, 1),
stop: make(chan bool, 1),
}
}
// Next is a blocking call that returns the next event
func (mw *Watcher) Next() (store.Event, error) {
select {
case event := <-mw.events:
return event, nil
case <-mw.stop:
return nil, store.ErrWatcherStopped
}
}
// Stop stops the watcher
func (mw *Watcher) Stop() {
select {
case mw.stop <- true:
default:
}
}
// SendEvent sends an event to the watcher (for testing purposes)
func (mw *Watcher) SendEvent(event store.Event) {
select {
case mw.events <- event:
default:
// If channel is full, drop the event
}
}

295
store/mock/mock_test.go Normal file
View File

@@ -0,0 +1,295 @@
package mock
import (
"context"
"testing"
"time"
"go.unistack.org/micro/v4/store"
)
func TestStore(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test Write with expectation
s.ExpectWrite("test_key").WithValue("test_value")
err := s.Write(ctx, "test_key", "test_value")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
// Test Read with expectation
s.ExpectRead("test_key").WithValue("test_value")
var value interface{}
err = s.Read(ctx, "test_key", &value)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if value != "test_value" {
t.Fatalf("Expected 'test_value', got %v", value)
}
// Test Read with string
s.ExpectRead("test_key")
var strValue string
err = s.Read(ctx, "test_key", &strValue)
if err != nil {
t.Fatalf("Read string failed: %v", err)
}
if strValue != "test_value" {
t.Fatalf("Expected 'test_value', got %s", strValue)
}
// Test Write and Read integer with TTL
s.ExpectWrite("int_key").WithValue(42).WithTTL(5 * time.Second)
err = s.Write(ctx, "int_key", 42, store.WriteTTL(5*time.Second))
if err != nil {
t.Fatalf("Write int failed: %v", err)
}
s.ExpectRead("int_key")
var intValue int
err = s.Read(ctx, "int_key", &intValue)
if err != nil {
t.Fatalf("Read int failed: %v", err)
}
if intValue != 42 {
t.Fatalf("Expected 42, got %d", intValue)
}
// Test Exists with expectation
s.ExpectExists("test_key")
err = s.Exists(ctx, "test_key")
if err != nil {
t.Fatalf("Exists failed: %v", err)
}
// Test List with expectation
s.ExpectList().WillReturn("test_key", "another_key")
keys, err := s.List(ctx)
if err != nil {
t.Fatalf("List failed: %v", err)
}
if len(keys) != 2 {
t.Fatalf("Expected 2 keys, got %d", len(keys))
}
// Test Delete with expectation
s.ExpectDelete("test_key")
err = s.Delete(ctx, "test_key")
if err != nil {
t.Fatalf("Delete failed: %v", err)
}
// Test that deleted key doesn't exist
s.ExpectExists("test_key").WillReturnError(store.ErrNotFound)
err = s.Exists(ctx, "test_key")
if err == nil {
t.Fatalf("Expected store.ErrNotFound after delete")
}
// Test error handling
s.ExpectExists("nonexistent").WillReturnError(store.ErrNotFound)
err = s.Exists(ctx, "nonexistent")
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound, got %v", err)
}
// Verify all expectations were met
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreFastForward(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Write with TTL
s.ExpectWrite("ttl_key").WithValue("ttl_value").WithTTL(100 * time.Millisecond)
err := s.Write(ctx, "ttl_key", "ttl_value", store.WriteTTL(100*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Check key exists before TTL expires
s.ExpectRead("ttl_key")
var value string
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read before TTL failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value', got %s", value)
}
// Fast forward by 50ms - key should still exist
s.FastForward(50 * time.Millisecond)
s.ExpectRead("ttl_key")
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read after 50ms fast forward failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value' after 50ms, got %s", value)
}
// Fast forward by another 60ms (total 110ms) - key should expire
s.FastForward(60 * time.Millisecond)
s.ExpectRead("ttl_key").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after TTL, got %v", err)
}
// Test FastForward on already expired keys
s.ExpectWrite("ttl_key2").WithValue("ttl_value2").WithTTL(10 * time.Millisecond)
err = s.Write(ctx, "ttl_key2", "ttl_value2", store.WriteTTL(10*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Fast forward by 20ms - key should expire immediately
s.FastForward(20 * time.Millisecond)
s.ExpectRead("ttl_key2").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key2", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after immediate expiration, got %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreWithOptions(t *testing.T) {
s := NewStore(store.Name("test_mock"), store.Namespace("test_ns"))
if s.Name() != "test_mock" {
t.Fatalf("Expected name 'test_mock', got %s", s.Name())
}
opts := s.Options()
if opts.Namespace != "test_ns" {
t.Fatalf("Expected namespace 'test_ns', got %s", opts.Namespace)
}
}
func TestWatcher(t *testing.T) {
watcher := NewWatcher()
// Test Stop
watcher.Stop()
// Test Next after stop
_, err := watcher.Next()
if err != store.ErrWatcherStopped {
t.Fatalf("Expected store.ErrWatcherStopped, got %v", err)
}
}
func TestStoreHealth(t *testing.T) {
s := NewStore()
if !s.Live() {
t.Fatal("Expected Live() to return true")
}
if !s.Ready() {
t.Fatal("Expected Ready() to return true")
}
if !s.Health() {
t.Fatal("Expected Health() to return true")
}
}
func TestStoreConnectDisconnect(t *testing.T) {
s := NewStore()
err := s.Connect(context.Background())
if err != nil {
t.Fatalf("Connect failed: %v", err)
}
err = s.Disconnect(context.Background())
if err != nil {
t.Fatalf("Disconnect failed: %v", err)
}
// Test error propagation
s.ExpectWrite("test_key").WillReturnError(store.ErrNotConnected)
err = s.Write(context.Background(), "test_key", "value")
if err != store.ErrNotConnected {
t.Fatalf("Expected store.ErrNotConnected, got %v", err)
}
}
func TestStoreTTL(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test Write with TTL
s.ExpectWrite("ttl_key").WithValue("ttl_value").WithTTL(100 * time.Millisecond)
err := s.Write(ctx, "ttl_key", "ttl_value", store.WriteTTL(100*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Read before TTL expires
s.ExpectRead("ttl_key")
var value string
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read before TTL failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value', got %s", value)
}
// Wait for TTL to expire
time.Sleep(150 * time.Millisecond)
// Read after TTL expires should return ErrNotFound
s.ExpectRead("ttl_key").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after TTL, got %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreExpectedOperations(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test expected operations with Times
s.ExpectWrite("once_key").Times(1)
s.ExpectWrite("twice_key").Times(2)
err := s.Write(ctx, "once_key", "value1")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
err = s.Write(ctx, "twice_key", "value2")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
err = s.Write(ctx, "twice_key", "value3")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}

View File

@@ -2,6 +2,7 @@ package store
import (
"context"
"errors"
"testing"
)
@@ -25,7 +26,8 @@ func TestHook(t *testing.T) {
t.Fatal(err)
}
if err := s.Exists(context.TODO(), "test"); err != nil {
err := s.Exists(context.TODO(), "test")
if !errors.Is(err, ErrNotFound) {
t.Fatal(err)
}

View File

@@ -29,10 +29,10 @@ type ContextAttrFunc func(ctx context.Context) []interface{}
func init() {
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
func(ctx context.Context) []interface{} {
if span, ok := SpanFromContext(ctx); ok {
if sp, ok := SpanFromContext(ctx); ok && sp != nil && sp.IsRecording() {
return []interface{}{
TraceIDKey, span.TraceID(),
SpanIDKey, span.SpanID(),
TraceIDKey, sp.TraceID(),
SpanIDKey, sp.SpanID(),
}
}
return nil

View File

@@ -1,13 +1,16 @@
package buffer
import "io"
import (
"fmt"
"io"
)
var _ interface {
io.ReadCloser
io.ReadSeeker
} = (*SeekerBuffer)(nil)
// Buffer is a ReadWriteCloser that supports seeking. It's intended to
// SeekerBuffer is a ReadWriteCloser that supports seeking. It's intended to
// replicate the functionality of bytes.Buffer that I use in my projects.
//
// Note that the seeking is limited to the read marker; all writes are
@@ -23,6 +26,7 @@ func NewSeekerBuffer(data []byte) *SeekerBuffer {
}
}
// Read reads up to len(p) bytes into p from the current read position.
func (b *SeekerBuffer) Read(p []byte) (int, error) {
if b.pos >= int64(len(b.data)) {
return 0, io.EOF
@@ -30,29 +34,51 @@ func (b *SeekerBuffer) Read(p []byte) (int, error) {
n := copy(p, b.data[b.pos:])
b.pos += int64(n)
return n, nil
}
// Write appends the contents of p to the end of the buffer. It does not affect the read position.
func (b *SeekerBuffer) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
b.data = append(b.data, p...)
return len(p), nil
}
// Seek sets the read pointer to pos.
// Seek sets the offset for the next Read operation.
// The offset is interpreted according to whence:
// - io.SeekStart: relative to the beginning of the buffer
// - io.SeekCurrent: relative to the current position
// - io.SeekEnd: relative to the end of the buffer
//
// Returns an error if the resulting position is negative or if whence is invalid.
func (b *SeekerBuffer) Seek(offset int64, whence int) (int64, error) {
var newPos int64
switch whence {
case io.SeekStart:
b.pos = offset
newPos = offset
case io.SeekEnd:
b.pos = int64(len(b.data)) + offset
newPos = int64(len(b.data)) + offset
case io.SeekCurrent:
b.pos += offset
newPos = b.pos + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
}
if newPos < 0 {
return 0, fmt.Errorf("invalid seek: resulting position %d is negative", newPos)
}
b.pos = newPos
return b.pos, nil
}
// Rewind resets the read pointer to 0.
// Rewind resets the read position to 0.
func (b *SeekerBuffer) Rewind() error {
if _, err := b.Seek(0, io.SeekStart); err != nil {
return err
@@ -75,10 +101,16 @@ func (b *SeekerBuffer) Reset() {
// Len returns the length of data remaining to be read.
func (b *SeekerBuffer) Len() int {
if b.pos >= int64(len(b.data)) {
return 0
}
return len(b.data[b.pos:])
}
// Bytes returns the underlying bytes from the current position.
func (b *SeekerBuffer) Bytes() []byte {
if b.pos >= int64(len(b.data)) {
return []byte{}
}
return b.data[b.pos:]
}

View File

@@ -2,54 +2,384 @@ package buffer
import (
"fmt"
"strings"
"io"
"testing"
"github.com/stretchr/testify/require"
)
func noErrorT(t *testing.T, err error) {
if nil != err {
t.Fatalf("%s", err)
func TestNewSeekerBuffer(t *testing.T) {
input := []byte{'a', 'b', 'c', 'd', 'e'}
expected := &SeekerBuffer{data: []byte{'a', 'b', 'c', 'd', 'e'}, pos: 0}
require.Equal(t, expected, NewSeekerBuffer(input))
}
func TestSeekerBuffer_Read(t *testing.T) {
tests := []struct {
name string
data []byte
initPos int64
readBuf []byte
expectedN int
expectedData []byte
expectedErr error
expectedPos int64
}{
{
name: "read with empty buffer",
data: []byte("hello"),
initPos: 0,
readBuf: []byte{},
expectedN: 0,
expectedData: []byte{},
expectedErr: nil,
expectedPos: 0,
},
{
name: "read with nil buffer",
data: []byte("hello"),
initPos: 0,
readBuf: nil,
expectedN: 0,
expectedData: nil,
expectedErr: nil,
expectedPos: 0,
},
{
name: "read full buffer",
data: []byte("hello"),
initPos: 0,
readBuf: make([]byte, 5),
expectedN: 5,
expectedData: []byte("hello"),
expectedErr: nil,
expectedPos: 5,
},
{
name: "read partial buffer",
data: []byte("hello"),
initPos: 2,
readBuf: make([]byte, 2),
expectedN: 2,
expectedData: []byte("ll"),
expectedErr: nil,
expectedPos: 4,
},
{
name: "read after end",
data: []byte("hello"),
initPos: 5,
readBuf: make([]byte, 5),
expectedN: 0,
expectedData: make([]byte, 5),
expectedErr: io.EOF,
expectedPos: 5,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSeekerBuffer(tt.data)
sb.pos = tt.initPos
n, err := sb.Read(tt.readBuf)
if tt.expectedErr != nil {
require.Equal(t, err, tt.expectedErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.expectedN, n)
require.Equal(t, tt.expectedData, tt.readBuf)
require.Equal(t, tt.expectedPos, sb.pos)
})
}
}
func boolT(t *testing.T, cond bool, s ...string) {
if !cond {
what := strings.Join(s, ", ")
if len(what) > 0 {
what = ": " + what
}
t.Fatalf("assert.Bool failed%s", what)
func TestSeekerBuffer_Write(t *testing.T) {
tests := []struct {
name string
initialData []byte
initialPos int64
writeData []byte
expectedData []byte
expectedN int
}{
{
name: "write empty slice",
initialData: []byte("data"),
initialPos: 0,
writeData: []byte{},
expectedData: []byte("data"),
expectedN: 0,
},
{
name: "write nil slice",
initialData: []byte("data"),
initialPos: 0,
writeData: nil,
expectedData: []byte("data"),
expectedN: 0,
},
{
name: "write to empty buffer",
initialData: nil,
initialPos: 0,
writeData: []byte("abc"),
expectedData: []byte("abc"),
expectedN: 3,
},
{
name: "write to existing buffer",
initialData: []byte("hello"),
initialPos: 0,
writeData: []byte(" world"),
expectedData: []byte("hello world"),
expectedN: 6,
},
{
name: "write after read",
initialData: []byte("abc"),
initialPos: 2,
writeData: []byte("XYZ"),
expectedData: []byte("abcXYZ"),
expectedN: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSeekerBuffer(tt.initialData)
sb.pos = tt.initialPos
n, err := sb.Write(tt.writeData)
require.NoError(t, err)
require.Equal(t, tt.expectedN, n)
require.Equal(t, tt.expectedData, sb.data)
require.Equal(t, tt.initialPos, sb.pos)
})
}
}
func TestSeeking(t *testing.T) {
partA := []byte("hello, ")
partB := []byte("world!")
func TestSeekerBuffer_Seek(t *testing.T) {
tests := []struct {
name string
initialData []byte
initialPos int64
offset int64
whence int
expectedPos int64
expectedErr error
}{
{
name: "seek with invalid whence",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 1,
whence: 12345,
expectedPos: 0,
expectedErr: fmt.Errorf("invalid whence: %d", 12345),
},
{
name: "seek negative from start",
initialData: []byte("abcdef"),
initialPos: 0,
offset: -1,
whence: io.SeekStart,
expectedPos: 0,
expectedErr: fmt.Errorf("invalid seek: resulting position %d is negative", -1),
},
{
name: "seek from start to 0",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 0,
whence: io.SeekStart,
expectedPos: 0,
expectedErr: nil,
},
{
name: "seek from start to 3",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 3,
whence: io.SeekStart,
expectedPos: 3,
expectedErr: nil,
},
{
name: "seek from end to -1 (last byte)",
initialData: []byte("abcdef"),
initialPos: 0,
offset: -1,
whence: io.SeekEnd,
expectedPos: 5,
expectedErr: nil,
},
{
name: "seek from current forward",
initialData: []byte("abcdef"),
initialPos: 2,
offset: 2,
whence: io.SeekCurrent,
expectedPos: 4,
expectedErr: nil,
},
{
name: "seek from current backward",
initialData: []byte("abcdef"),
initialPos: 4,
offset: -2,
whence: io.SeekCurrent,
expectedPos: 2,
expectedErr: nil,
},
{
name: "seek to end exactly",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 0,
whence: io.SeekEnd,
expectedPos: 6,
expectedErr: nil,
},
{
name: "seek to out of range",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 2,
whence: io.SeekEnd,
expectedPos: 8,
expectedErr: nil,
},
}
buf := NewSeekerBuffer(partA)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSeekerBuffer(tt.initialData)
sb.pos = tt.initialPos
boolT(t, buf.Len() == len(partA), fmt.Sprintf("on init: have length %d, want length %d", buf.Len(), len(partA)))
newPos, err := sb.Seek(tt.offset, tt.whence)
b := make([]byte, 32)
n, err := buf.Read(b)
noErrorT(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after reading 1: have length %d, want length 0", buf.Len()))
boolT(t, n == len(partA), fmt.Sprintf("after reading 2: have length %d, want length %d", n, len(partA)))
n, err = buf.Write(partB)
noErrorT(t, err)
boolT(t, n == len(partB), fmt.Sprintf("after writing: have length %d, want length %d", n, len(partB)))
n, err = buf.Read(b)
noErrorT(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after rereading 1: have length %d, want length 0", buf.Len()))
boolT(t, n == len(partB), fmt.Sprintf("after rereading 2: have length %d, want length %d", n, len(partB)))
partsLen := len(partA) + len(partB)
_ = buf.Rewind()
boolT(t, buf.Len() == partsLen, fmt.Sprintf("after rewinding: have length %d, want length %d", buf.Len(), partsLen))
buf.Close()
boolT(t, buf.Len() == 0, fmt.Sprintf("after closing, have length %d, want length 0", buf.Len()))
if tt.expectedErr != nil {
require.Equal(t, tt.expectedErr, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.expectedPos, newPos)
require.Equal(t, tt.expectedPos, sb.pos)
}
})
}
}
func TestSeekerBuffer_Rewind(t *testing.T) {
buf := NewSeekerBuffer([]byte("hello world"))
buf.pos = 4
require.NoError(t, buf.Rewind())
require.Equal(t, []byte("hello world"), buf.data)
require.Equal(t, int64(0), buf.pos)
}
func TestSeekerBuffer_Close(t *testing.T) {
buf := NewSeekerBuffer([]byte("hello world"))
buf.pos = 2
require.NoError(t, buf.Close())
require.Nil(t, buf.data)
require.Equal(t, int64(0), buf.pos)
}
func TestSeekerBuffer_Reset(t *testing.T) {
buf := NewSeekerBuffer([]byte("hello world"))
buf.pos = 2
buf.Reset()
require.Nil(t, buf.data)
require.Equal(t, int64(0), buf.pos)
}
func TestSeekerBuffer_Len(t *testing.T) {
tests := []struct {
name string
data []byte
pos int64
expected int
}{
{
name: "full buffer",
data: []byte("abcde"),
pos: 0,
expected: 5,
},
{
name: "partial read",
data: []byte("abcde"),
pos: 2,
expected: 3,
},
{
name: "fully read",
data: []byte("abcde"),
pos: 5,
expected: 0,
},
{
name: "pos > len",
data: []byte("abcde"),
pos: 10,
expected: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := NewSeekerBuffer(tt.data)
buf.pos = tt.pos
require.Equal(t, tt.expected, buf.Len())
})
}
}
func TestSeekerBuffer_Bytes(t *testing.T) {
tests := []struct {
name string
data []byte
pos int64
expected []byte
}{
{
name: "start of buffer",
data: []byte("abcde"),
pos: 0,
expected: []byte("abcde"),
},
{
name: "middle of buffer",
data: []byte("abcde"),
pos: 2,
expected: []byte("cde"),
},
{
name: "end of buffer",
data: []byte("abcde"),
pos: 5,
expected: []byte{},
},
{
name: "pos beyond end",
data: []byte("abcde"),
pos: 10,
expected: []byte{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := NewSeekerBuffer(tt.data)
buf.pos = tt.pos
require.Equal(t, tt.expected, buf.Bytes())
})
}
}

View File

@@ -6,18 +6,18 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv"
)
var (
pools = make([]Statser, 0)
poolsMu sync.Mutex
)
func unregisterMetrics(size int) {
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
// Stats struct
type Stats struct {
Get uint64
Put uint64
@@ -25,41 +25,13 @@ type Stats struct {
Ret uint64
}
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct {
p *sync.Pool
p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
}
func (p Pool[T]) Put(t T) {
@@ -70,37 +42,82 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T)
}
func NewPool[T any](fn func() T) Pool[T] {
return Pool[T]{
p: &sync.Pool{
New: func() interface{} {
return fn()
},
func NewPool[T any](fn func() T, size int) Pool[T] {
p := Pool[T]{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
},
}
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
}
type BytePool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
}
func NewBytePool(size int) *BytePool {
p := &BytePool{c: size}
p := &BytePool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
p.mis.Add(1)
b := make([]byte, 0, size)
return &b
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
}
@@ -110,49 +127,73 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
Put: p.put.Load(),
Get: p.get.Load(),
Mis: p.mis.Load(),
Ret: p.ret.Load(),
}
}
func (p *BytePool) Get() *[]byte {
atomic.AddUint64(&p.get, 1)
p.get.Add(1)
return p.p.Get().(*[]byte)
}
func (p *BytePool) Put(b *[]byte) {
atomic.AddUint64(&p.put, 1)
p.put.Add(1)
if cap(*b) > p.c {
atomic.AddUint64(&p.ret, 1)
p.ret.Add(1)
return
}
*b = (*b)[:0]
p.p.Put(b)
}
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
}
func NewBytesPool(size int) *BytesPool {
p := &BytesPool{c: size}
p := &BytesPool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
p.mis.Add(1)
b := bytes.NewBuffer(make([]byte, 0, size))
return b
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
}
@@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
Put: p.put.Load(),
Get: p.get.Load(),
Mis: p.mis.Load(),
Ret: p.ret.Load(),
}
}
@@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer {
}
func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c {
atomic.AddUint64(&p.ret, 1)
p.ret.Add(1)
return
}
b.Reset()
p.p.Put(b)
}
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
}
func NewStringsPool(size int) *StringsPool {
p := &StringsPool{c: size}
p := &StringsPool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
p.mis.Add(1)
return &strings.Builder{}
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
@@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
Put: p.put.Load(),
Get: p.get.Load(),
Mis: p.mis.Load(),
Ret: p.ret.Load(),
}
}
func (p *StringsPool) Get() *strings.Builder {
atomic.AddUint64(&p.get, 1)
p.get.Add(1)
return p.p.Get().(*strings.Builder)
}
func (p *StringsPool) Put(b *strings.Builder) {
atomic.AddUint64(&p.put, 1)
p.put.Add(1)
if b.Cap() > p.c {
atomic.AddUint64(&p.ret, 1)
p.ret.Add(1)
return
}
b.Reset()
p.p.Put(b)
}
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}