Compare commits

..

24 Commits

Author SHA1 Message Date
46eb739dff broker: add ErrorHandler
Some checks failed
coverage / build (push) Failing after 4m49s
test / test (push) Failing after 16m1s
sync / sync (push) Failing after 20s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
13b01f59ee logger: conditional caller field
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
52c8f3da86 Merge pull request 'add opt gracefultimeout broker' (#410) from devstigneev/micro:v4_new_opts into v4
Some checks failed
test / test (push) Failing after 12m37s
coverage / build (push) Failing after 12m51s
sync / sync (push) Failing after 16s
Reviewed-on: #410
2025-12-10 15:22:35 +03:00
Evstigneev Denis
e7f9f638bd add opt gracefultimeout broker
Some checks failed
test / test (pull_request) Failing after 13m43s
lint / lint (pull_request) Failing after 14m9s
coverage / build (pull_request) Failing after 14m25s
2025-12-10 15:20:14 +03:00
d9afc9ce4f update all
Some checks failed
coverage / build (push) Failing after 3m9s
test / test (push) Failing after 18m22s
sync / sync (push) Successful in 9s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-31 21:52:22 +03:00
7a325e2c9e remove using global map for default codecs (#223)
Some checks failed
test / test (push) Failing after 15m6s
coverage / build (push) Failing after 15m16s
sync / sync (push) Failing after 8s
2025-10-15 21:32:52 +03:00
7daa927e70 add HistogramExt method with custom quantiles
Some checks failed
coverage / build (push) Successful in 4m4s
test / test (push) Failing after 18m1s
sync / sync (push) Successful in 26s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-12 15:55:00 +03:00
vtolstov
54bb7f7acb Apply Code Coverage Badge 2025-10-12 11:27:04 +00:00
9eaab95519 meter: improve Gauge
All checks were successful
sync / sync (push) Successful in 1m56s
coverage / build (push) Successful in 3m55s
test / test (push) Successful in 4m12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-12 14:24:44 +03:00
vtolstov
9219dc6b2a Apply Code Coverage Badge 2025-10-11 15:49:04 +00:00
52607b38f1 logger: fixup Fatal finalizers
All checks were successful
coverage / build (push) Successful in 2m0s
test / test (push) Successful in 3m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-11 18:46:42 +03:00
vtolstov
886f046409 Apply Code Coverage Badge 2025-10-10 12:30:04 +00:00
4d6d469d40 logger: add Fatal finalizers
All checks were successful
coverage / build (push) Successful in 2m37s
test / test (push) Successful in 4m49s
* closes #222

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-10 15:28:10 +03:00
vtolstov
4a944274f4 Apply Code Coverage Badge 2025-10-07 20:56:10 +00:00
b0cbddcfdd meter: improve meter usage across micro framework (#409)
All checks were successful
sync / sync (push) Successful in 1m41s
coverage / build (push) Successful in 3m13s
test / test (push) Successful in 4m2s
Reviewed-on: #409
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-07 23:54:20 +03:00
vtolstov
d0534a7d05 Apply Code Coverage Badge 2025-09-20 19:59:32 +00:00
ab051405c5 initial hasql support (#407)
Some checks failed
coverage / build (push) Successful in 3m47s
test / test (push) Failing after 17m14s
closes #403

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Reviewed-on: #407
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:57:39 +03:00
vtolstov
268b3dbff4 Apply Code Coverage Badge 2025-07-12 21:20:05 +00:00
f9d2c14597 fixup tests
Some checks failed
sync / sync (push) Successful in 1m8s
coverage / build (push) Successful in 2m3s
test / test (push) Failing after 2m55s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-07-13 00:11:08 +03:00
e6bf914dd9 tracer: write log fields only if span exists and recording
Some checks failed
coverage / build (push) Failing after 1m14s
test / test (push) Has been cancelled
sync / sync (push) Successful in 1m37s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-07-13 00:08:30 +03:00
b59f4a16f0 meter: disable auto sorting labels
Some checks failed
coverage / build (push) Failing after 1m39s
test / test (push) Successful in 4m37s
sync / sync (push) Successful in 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-17 19:02:06 +03:00
3deb572f72 [v4] fix out-of-bounds behavior in seeker buffer and add tests (#219)
Some checks failed
coverage / build (push) Failing after 2m12s
test / test (push) Successful in 4m27s
sync / sync (push) Successful in 7s
* add check negative position to Read() and write tests

* add tests for Write() method

* add tests for Write() method

* add checks of whence and negative position to Seek() and write tests

* add tests for Rewind()

* add tests for Close()

* add tests for Reset()

* add tests for Len()

* add tests for Bytes()

* tests polishing

* tests polishing

* tests polishing

* tests polishing
2025-06-15 17:24:48 +03:00
0e668c0f0f fixup tests
Some checks failed
coverage / build (push) Failing after 2m13s
test / test (push) Failing after 19m18s
sync / sync (push) Successful in 19s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-09 17:36:11 +03:00
2bac878845 broker: fix message options
Some checks failed
coverage / build (push) Failing after 1m58s
test / test (push) Has started running
sync / sync (push) Successful in 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-09 17:23:30 +03:00
35 changed files with 2000 additions and 303 deletions

View File

@@ -25,7 +25,7 @@ jobs:
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1) dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash" echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash" echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then if [ "$src_hash" != "$dst_hash" -a "$src_hash" != "" -a "$dst_hash" != "" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT echo "sync_needed=true" >> $GITHUB_OUTPUT
else else
echo "sync_needed=false" >> $GITHUB_OUTPUT echo "sync_needed=false" >> $GITHUB_OUTPUT

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish. // NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
// Live returns broker liveness // Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type ( type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe HookSubscribe func(next FuncSubscribe) FuncSubscribe
) )
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte Body() []byte
// Unmarshal try to decode message body to dst. // Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal. // This is helper method that uses codec.Unmarshal.
Unmarshal(dst interface{}, opts ...codec.Option) error Unmarshal(dst any, opts ...codec.Option) error
// Ack acknowledge message if supported. // Ack acknowledge message if supported.
Ack() error Ack() error
} }

View File

@@ -42,9 +42,9 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
} }
} }
// SetPublishOption returns a function to setup a context with given value // SetMessageOption returns a function to setup a context with given value
func SetPublishOption(k, v interface{}) PublishOption { func SetMessageOption(k, v interface{}) MessageOption {
return func(o *PublishOptions) { return func(o *MessageOptions) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }

View File

@@ -32,7 +32,7 @@ type memoryMessage struct {
ctx context.Context ctx context.Context
body []byte body []byte
hdr metadata.Metadata hdr metadata.Metadata
opts broker.PublishOptions opts broker.MessageOptions
} }
func (m *memoryMessage) Ack() error { func (m *memoryMessage) Ack() error {
@@ -157,8 +157,8 @@ func (b *Broker) Init(opts ...broker.Option) error {
return nil return nil
} }
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) { func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) {
options := broker.NewPublishOptions(opts...) options := broker.NewMessageOptions(opts...)
if options.ContentType == "" { if options.ContentType == "" {
options.ContentType = b.opts.ContentType options.ContentType = b.opts.ContentType
} }

View File

@@ -49,7 +49,7 @@ func TestMemoryBroker(t *testing.T) {
"id", fmt.Sprintf("%d", i), "id", fmt.Sprintf("%d", i),
), ),
[]byte(`"hello world"`), []byte(`"hello world"`),
broker.PublishContentType("application/octet-stream"), broker.MessageContentType("application/octet-stream"),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@@ -99,7 +99,7 @@ type noopMessage struct {
ctx context.Context ctx context.Context
body []byte body []byte
hdr metadata.Metadata hdr metadata.Metadata
opts PublishOptions opts MessageOptions
} }
func (m *noopMessage) Ack() error { func (m *noopMessage) Ack() error {
@@ -126,8 +126,8 @@ func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
return m.c.Unmarshal(m.body, dst) return m.c.Unmarshal(m.body, dst)
} }
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) { func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) {
options := NewPublishOptions(opts...) options := NewMessageOptions(opts...)
if options.ContentType == "" { if options.ContentType == "" {
options.ContentType = b.opts.ContentType options.ContentType = b.opts.ContentType
} }

View File

@@ -18,7 +18,6 @@ import (
type Options struct { type Options struct {
// Name holds the broker name // Name holds the broker name
Name string Name string
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
@@ -31,23 +30,20 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// Wait waits for a collection of goroutines to finish // Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
// Hooks can be run before broker Publish/BatchPublish and // Hooks can be run before broker Publishing and message processing in Subscribe
// Subscribe/BatchSubscribe methods
Hooks options.Hooks Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests // GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message // ContentType will be used if no content-type set when creating message
ContentType string ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler any
} }
// NewOptions create new Options // NewOptions create new Options
@@ -80,6 +76,12 @@ func Context(ctx context.Context) Option {
} }
} }
func GracefulTimeout(t time.Duration) Option {
return func(o *Options) {
o.GracefulTimeout = t
}
}
// ContentType used by default if not specified // ContentType used by default if not specified
func ContentType(ct string) Option { func ContentType(ct string) Option {
return func(o *Options) { return func(o *Options) {
@@ -87,8 +89,15 @@ func ContentType(ct string) Option {
} }
} }
// PublishOptions struct // ErrorHandler handles errors in broker
type PublishOptions struct { func ErrorHandler(h any) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct
type MessageOptions struct {
// ContentType for message body // ContentType for message body
ContentType string ContentType string
// BodyOnly flag says the message contains raw body bytes and don't need // BodyOnly flag says the message contains raw body bytes and don't need
@@ -98,9 +107,9 @@ type PublishOptions struct {
Context context.Context Context context.Context
} }
// NewPublishOptions creates PublishOptions struct // NewMessageOptions creates MessageOptions struct
func NewPublishOptions(opts ...PublishOption) PublishOptions { func NewMessageOptions(opts ...MessageOption) MessageOptions {
options := PublishOptions{ options := MessageOptions{
Context: context.Background(), Context: context.Background(),
} }
for _, o := range opts { for _, o := range opts {
@@ -128,19 +137,19 @@ type SubscribeOptions struct {
// Option func // Option func
type Option func(*Options) type Option func(*Options)
// PublishOption func // MessageOption func
type PublishOption func(*PublishOptions) type MessageOption func(*MessageOptions)
// PublishContentType sets message content-type that used to Marshal // MessageContentType sets message content-type that used to Marshal
func PublishContentType(ct string) PublishOption { func MessageContentType(ct string) MessageOption {
return func(o *PublishOptions) { return func(o *MessageOptions) {
o.ContentType = ct o.ContentType = ct
} }
} }
// PublishBodyOnly publish only body of the message // MessageBodyOnly publish only body of the message
func PublishBodyOnly(b bool) PublishOption { func MessageBodyOnly(b bool) MessageOption {
return func(o *PublishOptions) { return func(o *MessageOptions) {
o.BodyOnly = b o.BodyOnly = b
} }
} }

View File

@@ -15,11 +15,6 @@ import (
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
) )
// DefaultCodecs will be used to encode/decode data
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type noopClient struct { type noopClient struct {
funcCall FuncCall funcCall FuncCall
funcStream FuncStream funcStream FuncStream

View File

@@ -161,7 +161,7 @@ func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Context: context.Background(), Context: context.Background(),
ContentType: DefaultContentType, ContentType: DefaultContentType,
Codecs: DefaultCodecs, Codecs: make(map[string]codec.Codec),
CallOptions: CallOptions{ CallOptions: CallOptions{
Context: context.Background(), Context: context.Background(),
Backoff: DefaultBackoff, Backoff: DefaultBackoff,

235
cluster/hasql/cluster.go Normal file
View File

@@ -0,0 +1,235 @@
package sql
import (
"context"
"database/sql"
"reflect"
"unsafe"
"golang.yandex/hasql/v2"
)
func newSQLRowError() *sql.Row {
row := &sql.Row{}
t := reflect.TypeOf(row).Elem()
field, _ := t.FieldByName("err")
rowPtr := unsafe.Pointer(row)
errFieldPtr := unsafe.Pointer(uintptr(rowPtr) + field.Offset)
errPtr := (*error)(errFieldPtr)
*errPtr = ErrorNoAliveNodes
return row
}
type ClusterQuerier interface {
Querier
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
}
type Cluster struct {
hasql *hasql.Cluster[Querier]
options ClusterOptions
}
// NewCluster returns [Querier] that provides cluster of nodes
func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
options := ClusterOptions{Context: context.Background()}
for _, opt := range opts {
opt(&options)
}
if options.NodeChecker == nil {
return nil, ErrClusterChecker
}
if options.NodeDiscoverer == nil {
return nil, ErrClusterDiscoverer
}
if options.NodePicker == nil {
return nil, ErrClusterPicker
}
if options.Retries < 1 {
options.Retries = 1
}
if options.NodeStateCriterion == 0 {
options.NodeStateCriterion = hasql.Primary
}
options.Options = append(options.Options, hasql.WithNodePicker(options.NodePicker))
if p, ok := options.NodePicker.(*CustomPicker[Querier]); ok {
p.opts.Priority = options.NodePriority
}
c, err := hasql.NewCluster(
options.NodeDiscoverer,
options.NodeChecker,
options.Options...,
)
if err != nil {
return nil, err
}
return &Cluster{hasql: c, options: options}, nil
}
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
var tx *sql.Tx
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if tx, err = n.DB().BeginTx(ctx, opts); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if tx == nil && err == nil {
err = ErrorNoAliveNodes
}
return tx, err
}
func (c *Cluster) Close() error {
return c.hasql.Close()
}
func (c *Cluster) Conn(ctx context.Context) (*sql.Conn, error) {
var conn *sql.Conn
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if conn, err = n.DB().Conn(ctx); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if conn == nil && err == nil {
err = ErrorNoAliveNodes
}
return conn, err
}
func (c *Cluster) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
var res sql.Result
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if res, err = n.DB().ExecContext(ctx, query, args...); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if res == nil && err == nil {
err = ErrorNoAliveNodes
}
return res, err
}
func (c *Cluster) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
var res *sql.Stmt
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if res, err = n.DB().PrepareContext(ctx, query); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if res == nil && err == nil {
err = ErrorNoAliveNodes
}
return res, err
}
func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
var res *sql.Rows
var err error
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
if res, err = n.DB().QueryContext(ctx, query); err != nil && err != sql.ErrNoRows && retries >= c.options.Retries {
return true
}
}
return false
})
if res == nil && err == nil {
err = ErrorNoAliveNodes
}
return res, err
}
func (c *Cluster) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
var res *sql.Row
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
for ; retries < c.options.Retries; retries++ {
res = n.DB().QueryRowContext(ctx, query, args...)
if res.Err() == nil {
return false
} else if res.Err() != nil && retries >= c.options.Retries {
return false
}
}
return true
})
if res == nil {
res = newSQLRowError()
}
return res
}
func (c *Cluster) PingContext(ctx context.Context) error {
var err error
var ok bool
retries := 0
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
ok = true
for ; retries < c.options.Retries; retries++ {
if err = n.DB().PingContext(ctx); err != nil && retries >= c.options.Retries {
return true
}
}
return false
})
if !ok {
err = ErrorNoAliveNodes
}
return err
}
func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStateCriterion) error {
for _, criterion := range criterions {
if _, err := c.hasql.WaitForNode(ctx, criterion); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,171 @@
package sql
import (
"context"
"fmt"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"golang.yandex/hasql/v2"
)
func TestNewCluster(t *testing.T) {
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbMaster.Close()
dbMasterMock.MatchExpectationsInOrder(false)
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(1, 0)).
RowsWillBeClosed().
WithoutArgs()
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("master-dc1"))
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbDRMaster.Close()
dbDRMasterMock.MatchExpectationsInOrder(false)
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 40)).
RowsWillBeClosed().
WithoutArgs()
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster1-dc2"))
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster"))
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC1.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC2.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
c, err := NewCluster[Querier](
WithClusterContext(tctx),
WithClusterNodeChecker(hasql.PostgreSQLChecker),
WithClusterNodePicker(NewCustomPicker[Querier](
CustomPickerMaxLag(100),
)),
WithClusterNodes(
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
ClusterNode{"master-dc1", dbMaster, 1},
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
),
WithClusterOptions(
hasql.WithUpdateInterval[Querier](2*time.Second),
hasql.WithUpdateTimeout[Querier](1*time.Second),
),
)
if err != nil {
t.Fatal(err)
}
defer c.Close()
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
node1Name := ""
fmt.Printf("check for Standby\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.Standby), "SELECT node_name as name"); row.Err() != nil {
t.Fatal(row.Err())
} else if err = row.Scan(&node1Name); err != nil {
t.Fatal(err)
} else if "slave-dc1" != node1Name {
t.Fatalf("invalid node name %s != %s", "slave-dc1", node1Name)
}
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
node2Name := ""
fmt.Printf("check for PreferStandby\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() != nil {
t.Fatal(row.Err())
} else if err = row.Scan(&node2Name); err != nil {
t.Fatal(err)
} else if "slave-dc1" != node2Name {
t.Fatalf("invalid node name %s != %s", "slave-dc1", node2Name)
}
node3Name := ""
fmt.Printf("check for PreferPrimary\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferPrimary), "SELECT node_name as name"); row.Err() != nil {
t.Fatal(row.Err())
} else if err = row.Scan(&node3Name); err != nil {
t.Fatal(err)
} else if "master-dc1" != node3Name {
t.Fatalf("invalid node name %s != %s", "master-dc1", node3Name)
}
dbSlaveDC1Mock.ExpectQuery(`.*`).WillReturnRows(sqlmock.NewRows([]string{"role"}).RowError(1, fmt.Errorf("row error")))
time.Sleep(2 * time.Second)
fmt.Printf("check for PreferStandby\n")
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() == nil {
t.Fatal("must return error")
}
if dbMasterErr := dbMasterMock.ExpectationsWereMet(); dbMasterErr != nil {
t.Error(dbMasterErr)
}
}

25
cluster/hasql/db.go Normal file
View File

@@ -0,0 +1,25 @@
package sql
import (
"context"
"database/sql"
)
type Querier interface {
// Basic connection methods
PingContext(ctx context.Context) error
Close() error
// Query methods with context
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
// Prepared statements with context
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
// Transaction management with context
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Conn(ctx context.Context) (*sql.Conn, error)
}

295
cluster/hasql/driver.go Normal file
View File

@@ -0,0 +1,295 @@
package sql
import (
"context"
"database/sql"
"database/sql/driver"
"io"
"sync"
"time"
)
// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier]
func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
driver := NewClusterDriver(db)
connector, err := driver.OpenConnector("")
if err != nil {
return nil, err
}
return sql.OpenDB(connector), nil
}
// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier]
type ClusterDriver struct {
db ClusterQuerier
}
// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier]
func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
return &ClusterDriver{db: db}
}
// Open implements [driver.Driver.Open]
func (d *ClusterDriver) Open(name string) (driver.Conn, error) {
return d.Connect(context.Background())
}
// OpenConnector implements [driver.DriverContext.OpenConnector]
func (d *ClusterDriver) OpenConnector(name string) (driver.Connector, error) {
return d, nil
}
// Connect implements [driver.Connector.Connect]
func (d *ClusterDriver) Connect(ctx context.Context) (driver.Conn, error) {
conn, err := d.db.Conn(ctx)
if err != nil {
return nil, err
}
return &dbConn{conn: conn}, nil
}
// Driver implements [driver.Connector.Driver]
func (d *ClusterDriver) Driver() driver.Driver {
return d
}
// dbConn implements driver.Conn with both context and legacy methods
type dbConn struct {
conn *sql.Conn
mu sync.Mutex
}
// Prepare implements [driver.Conn.Prepare] (legacy method)
func (c *dbConn) Prepare(query string) (driver.Stmt, error) {
return c.PrepareContext(context.Background(), query)
}
// PrepareContext implements [driver.ConnPrepareContext.PrepareContext]
func (c *dbConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
c.mu.Lock()
defer c.mu.Unlock()
stmt, err := c.conn.PrepareContext(ctx, query)
if err != nil {
return nil, err
}
return &dbStmt{stmt: stmt}, nil
}
// Exec implements [driver.Execer.Exec] (legacy method)
func (c *dbConn) Exec(query string, args []driver.Value) (driver.Result, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return c.ExecContext(context.Background(), query, namedArgs)
}
// ExecContext implements [driver.ExecerContext.ExecContext]
func (c *dbConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Convert driver.NamedValue to any
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
return c.conn.ExecContext(ctx, query, interfaceArgs...)
}
// Query implements [driver.Queryer.Query] (legacy method)
func (c *dbConn) Query(query string, args []driver.Value) (driver.Rows, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return c.QueryContext(context.Background(), query, namedArgs)
}
// QueryContext implements [driver.QueryerContext.QueryContext]
func (c *dbConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Convert driver.NamedValue to any
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
rows, err := c.conn.QueryContext(ctx, query, interfaceArgs...)
if err != nil {
return nil, err
}
return &dbRows{rows: rows}, nil
}
// Begin implements [driver.Conn.Begin] (legacy method)
func (c *dbConn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{})
}
// BeginTx implements [driver.ConnBeginTx.BeginTx]
func (c *dbConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
c.mu.Lock()
defer c.mu.Unlock()
sqlOpts := &sql.TxOptions{
Isolation: sql.IsolationLevel(opts.Isolation),
ReadOnly: opts.ReadOnly,
}
tx, err := c.conn.BeginTx(ctx, sqlOpts)
if err != nil {
return nil, err
}
return &dbTx{tx: tx}, nil
}
// Ping implements [driver.Pinger.Ping]
func (c *dbConn) Ping(ctx context.Context) error {
return c.conn.PingContext(ctx)
}
// Close implements [driver.Conn.Close]
func (c *dbConn) Close() error {
return c.conn.Close()
}
// IsValid implements [driver.Validator.IsValid]
func (c *dbConn) IsValid() bool {
// Ping with a short timeout to check if the connection is still valid
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
return c.conn.PingContext(ctx) == nil
}
// dbStmt implements [driver.Stmt] with both context and legacy methods
type dbStmt struct {
stmt *sql.Stmt
mu sync.Mutex
}
// Close implements [driver.Stmt.Close]
func (s *dbStmt) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.stmt.Close()
}
// Close implements [driver.Stmt.NumInput]
func (s *dbStmt) NumInput() int {
return -1 // Number of parameters is unknown
}
// Exec implements [driver.Stmt.Exec] (legacy method)
func (s *dbStmt) Exec(args []driver.Value) (driver.Result, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return s.ExecContext(context.Background(), namedArgs)
}
// ExecContext implements [driver.StmtExecContext.ExecContext]
func (s *dbStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
s.mu.Lock()
defer s.mu.Unlock()
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
return s.stmt.ExecContext(ctx, interfaceArgs...)
}
// Query implements [driver.Stmt.Query] (legacy method)
func (s *dbStmt) Query(args []driver.Value) (driver.Rows, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return s.QueryContext(context.Background(), namedArgs)
}
// QueryContext implements [driver.StmtQueryContext.QueryContext]
func (s *dbStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
s.mu.Lock()
defer s.mu.Unlock()
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
rows, err := s.stmt.QueryContext(ctx, interfaceArgs...)
if err != nil {
return nil, err
}
return &dbRows{rows: rows}, nil
}
// dbRows implements [driver.Rows]
type dbRows struct {
rows *sql.Rows
}
// Columns implements [driver.Rows.Columns]
func (r *dbRows) Columns() []string {
cols, err := r.rows.Columns()
if err != nil {
// This shouldn't happen if the query was successful
return []string{}
}
return cols
}
// Close implements [driver.Rows.Close]
func (r *dbRows) Close() error {
return r.rows.Close()
}
// Next implements [driver.Rows.Next]
func (r *dbRows) Next(dest []driver.Value) error {
if !r.rows.Next() {
if err := r.rows.Err(); err != nil {
return err
}
return io.EOF
}
// Create a slice of interfaces to scan into
scanArgs := make([]any, len(dest))
for i := range scanArgs {
scanArgs[i] = &dest[i]
}
return r.rows.Scan(scanArgs...)
}
// dbTx implements [driver.Tx]
type dbTx struct {
tx *sql.Tx
mu sync.Mutex
}
// Commit implements [driver.Tx.Commit]
func (t *dbTx) Commit() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.tx.Commit()
}
// Rollback implements [driver.Tx.Rollback]
func (t *dbTx) Rollback() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.tx.Rollback()
}

View File

@@ -0,0 +1,141 @@
package sql
import (
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"golang.yandex/hasql/v2"
)
func TestDriver(t *testing.T) {
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbMaster.Close()
dbMasterMock.MatchExpectationsInOrder(false)
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(1, 0)).
RowsWillBeClosed().
WithoutArgs()
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("master-dc1"))
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbDRMaster.Close()
dbDRMasterMock.MatchExpectationsInOrder(false)
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 40)).
RowsWillBeClosed().
WithoutArgs()
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster1-dc2"))
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster"))
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC1.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC2.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
c, err := NewCluster[Querier](
WithClusterContext(tctx),
WithClusterNodeChecker(hasql.PostgreSQLChecker),
WithClusterNodePicker(NewCustomPicker[Querier](
CustomPickerMaxLag(100),
)),
WithClusterNodes(
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
ClusterNode{"master-dc1", dbMaster, 1},
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
),
WithClusterOptions(
hasql.WithUpdateInterval[Querier](2*time.Second),
hasql.WithUpdateTimeout[Querier](1*time.Second),
),
)
if err != nil {
t.Fatal(err)
}
defer c.Close()
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
t.Fatal(err)
}
db, err := OpenDBWithCluster(c)
if err != nil {
t.Fatal(err)
}
// Use context methods
row := db.QueryRowContext(NodeStateCriterion(t.Context(), hasql.Primary), "SELECT node_name as name")
if err = row.Err(); err != nil {
t.Fatal(err)
}
nodeName := ""
if err = row.Scan(&nodeName); err != nil {
t.Fatal(err)
}
if nodeName != "master-dc1" {
t.Fatalf("invalid node_name %s != %s", "master-dc1", nodeName)
}
}

10
cluster/hasql/error.go Normal file
View File

@@ -0,0 +1,10 @@
package sql
import "errors"
var (
ErrClusterChecker = errors.New("cluster node checker required")
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
ErrClusterPicker = errors.New("cluster node picker required")
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
)

110
cluster/hasql/options.go Normal file
View File

@@ -0,0 +1,110 @@
package sql
import (
"context"
"math"
"golang.yandex/hasql/v2"
)
// ClusterOptions contains cluster specific options
type ClusterOptions struct {
NodeChecker hasql.NodeChecker
NodePicker hasql.NodePicker[Querier]
NodeDiscoverer hasql.NodeDiscoverer[Querier]
Options []hasql.ClusterOpt[Querier]
Context context.Context
Retries int
NodePriority map[string]int32
NodeStateCriterion hasql.NodeStateCriterion
}
// ClusterOption apply cluster options to ClusterOptions
type ClusterOption func(*ClusterOptions)
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
return func(o *ClusterOptions) {
o.NodeChecker = c
}
}
// WithClusterNodePicker pass hasql.NodePicker to cluster options
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodePicker = p
}
}
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodeDiscoverer = d
}
}
// WithRetries retry count on other nodes in case of error
func WithRetries(n int) ClusterOption {
return func(o *ClusterOptions) {
o.Retries = n
}
}
// WithClusterContext pass context.Context to cluster options and used for checks
func WithClusterContext(ctx context.Context) ClusterOption {
return func(o *ClusterOptions) {
o.Context = ctx
}
}
// WithClusterOptions pass hasql.ClusterOpt
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.Options = append(o.Options, opts...)
}
}
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
return func(o *ClusterOptions) {
o.NodeStateCriterion = c
}
}
type ClusterNode struct {
Name string
DB Querier
Priority int32
}
// WithClusterNodes create cluster with static NodeDiscoverer
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
return func(o *ClusterOptions) {
nodes := make([]*hasql.Node[Querier], 0, len(cns))
if o.NodePriority == nil {
o.NodePriority = make(map[string]int32, len(cns))
}
for _, cn := range cns {
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
if cn.Priority == 0 {
cn.Priority = math.MaxInt32
}
o.NodePriority[cn.Name] = cn.Priority
}
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
}
}
type nodeStateCriterionKey struct{}
// NodeStateCriterion inject hasql.NodeStateCriterion to context
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
}
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
return v
}
return c.options.NodeStateCriterion
}

113
cluster/hasql/picker.go Normal file
View File

@@ -0,0 +1,113 @@
package sql
import (
"fmt"
"math"
"time"
"golang.yandex/hasql/v2"
)
// compile time guard
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
// CustomPickerOptions holds options to pick nodes
type CustomPickerOptions struct {
MaxLag int
Priority map[string]int32
Retries int
}
// CustomPickerOption func apply option to CustomPickerOptions
type CustomPickerOption func(*CustomPickerOptions)
// CustomPickerMaxLag specifies max lag for which node can be used
func CustomPickerMaxLag(n int) CustomPickerOption {
return func(o *CustomPickerOptions) {
o.MaxLag = n
}
}
// NewCustomPicker creates new node picker
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
options := CustomPickerOptions{}
for _, o := range opts {
o(&options)
}
return &CustomPicker[Querier]{opts: options}
}
// CustomPicker holds node picker options
type CustomPicker[T Querier] struct {
opts CustomPickerOptions
}
// PickNode used to return specific node
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
for _, n := range cnodes {
fmt.Printf("node %s\n", n.Node.String())
}
return cnodes[0]
}
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
if prio, ok := p.opts.Priority[nodeName]; ok {
return prio
}
return math.MaxInt32 // Default to lowest priority
}
// CompareNodes used to sort nodes
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
// Get replication lag values
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
// First check that lag lower then MaxLag
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
return 0 // both are equal
}
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
if aLag > p.opts.MaxLag {
return 1 // b is better
}
if bLag > p.opts.MaxLag {
return -1 // a is better
}
// Get node priorities
aPrio := p.getPriority(a.Node.String())
bPrio := p.getPriority(b.Node.String())
// if both priority equals
if aPrio == bPrio {
// First compare by replication lag
if aLag < bLag {
return -1
}
if aLag > bLag {
return 1
}
// If replication lag is equal, compare by latency
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
if aLatency < bLatency {
return -1
}
if aLatency > bLatency {
return 1
}
// If lag and latency is equal
return 0
}
// If priorities are different, prefer the node with lower priority value
if aPrio < bPrio {
return -1
}
return 1
}

31
go.mod
View File

@@ -1,34 +1,33 @@
module go.unistack.org/micro/v4 module go.unistack.org/micro/v4
go 1.22.0 go 1.25
require ( require (
dario.cat/mergo v1.0.1 dario.cat/mergo v1.0.2
github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/KimMachineGun/automemlimit v0.7.0 github.com/KimMachineGun/automemlimit v0.7.5
github.com/goccy/go-yaml v1.17.1 github.com/goccy/go-yaml v1.18.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/matoous/go-nanoid v1.5.1 github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/spf13/cast v1.7.1 github.com/spf13/cast v1.10.0
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.11.1
go.uber.org/atomic v1.11.0 go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v4 v4.1.0 go.unistack.org/micro-proto/v4 v4.1.0
golang.org/x/sync v0.10.0 golang.org/x/sync v0.17.0
google.golang.org/grpc v1.69.4 golang.yandex/hasql/v2 v2.1.0
google.golang.org/protobuf v1.36.3 google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
) )
require ( require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect
golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.37.0 // indirect
golang.org/x/sys v0.29.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

70
go.sum
View File

@@ -1,19 +1,19 @@
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlDI/pBWK49u88= github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk=
github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY= github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
@@ -30,38 +30,36 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -3,6 +3,7 @@ package sql
import ( import (
"context" "context"
"database/sql" "database/sql"
"sync"
"time" "time"
) )
@@ -11,31 +12,84 @@ type Statser interface {
} }
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
if db == nil {
return
}
options := NewOptions(opts...) options := NewOptions(opts...)
go func() { var (
ticker := time.NewTicker(options.MeterStatsInterval) statsMu sync.Mutex
defer ticker.Stop() lastUpdated time.Time
maxOpenConnections, openConnections, inUse, idle, waitCount float64
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
waitDuration float64
)
for { updateFn := func() {
select { statsMu.Lock()
case <-ctx.Done(): defer statsMu.Unlock()
return
case <-ticker.C: if time.Since(lastUpdated) < options.MeterStatsInterval {
if db == nil { return
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
} }
}()
stats := db.Stats()
maxOpenConnections = float64(stats.MaxOpenConnections)
openConnections = float64(stats.OpenConnections)
inUse = float64(stats.InUse)
idle = float64(stats.Idle)
waitCount = float64(stats.WaitCount)
maxIdleClosed = float64(stats.MaxIdleClosed)
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
waitDuration = float64(stats.WaitDuration.Seconds())
lastUpdated = time.Now()
}
options.Meter.Gauge(MaxOpenConnections, func() float64 {
updateFn()
return maxOpenConnections
})
options.Meter.Gauge(OpenConnections, func() float64 {
updateFn()
return openConnections
})
options.Meter.Gauge(InuseConnections, func() float64 {
updateFn()
return inUse
})
options.Meter.Gauge(IdleConnections, func() float64 {
updateFn()
return idle
})
options.Meter.Gauge(WaitConnections, func() float64 {
updateFn()
return waitCount
})
options.Meter.Gauge(BlockedSeconds, func() float64 {
updateFn()
return waitDuration
})
options.Meter.Gauge(MaxIdleClosed, func() float64 {
updateFn()
return maxIdleClosed
})
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
updateFn()
return maxIdleTimeClosed
})
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
updateFn()
return maxLifetimeClosed
})
} }

View File

@@ -8,6 +8,7 @@ import (
"slices" "slices"
"time" "time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
) )
@@ -42,8 +43,10 @@ type Options struct {
Fields []interface{} Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context // ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frmaes to skip // callerSkipCount number of frames to skip
CallerSkipCount int CallerSkipCount int
// AddCaller enables to get caller
AddCaller bool
// The logging level the logger should log // The logging level the logger should log
Level Level Level Level
// AddSource enabled writing source file and position in log // AddSource enabled writing source file and position in log
@@ -52,6 +55,12 @@ type Options struct {
AddStacktrace bool AddStacktrace bool
// DedupKeys deduplicate keys in log output // DedupKeys deduplicate keys in log output
DedupKeys bool DedupKeys bool
// FatalFinalizers runs in order in [logger.Fatal] method
FatalFinalizers []func(context.Context)
}
var DefaultFatalFinalizer = func(ctx context.Context) {
os.Exit(1)
} }
// NewOptions creates new options struct // NewOptions creates new options struct
@@ -65,6 +74,7 @@ func NewOptions(opts ...Option) Options {
AddSource: true, AddSource: true,
TimeFunc: time.Now, TimeFunc: time.Now,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
} }
WithMicroKeys()(&options) WithMicroKeys()(&options)
@@ -76,6 +86,19 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
func WithCallerEnabled(b bool) logger.Option {
return func(o *Options) {
o.AddCaller = b
}
}
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {
o.FatalFinalizers = fncs
}
}
// WithContextAttrFuncs appends default funcs for the context attrs filler // WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option { func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -4,14 +4,12 @@ import (
"context" "context"
"io" "io"
"log/slog" "log/slog"
"os"
"reflect" "reflect"
"regexp" "regexp"
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
@@ -39,11 +37,11 @@ var (
type wrapper struct { type wrapper struct {
h slog.Handler h slog.Handler
level atomic.Int64 level int64
} }
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool { func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
return level >= slog.Level(int(h.level.Load())) return level >= slog.Level(atomic.LoadInt64(&h.level))
} }
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error { func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
@@ -51,11 +49,17 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
} }
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return h.h.WithAttrs(attrs) return &wrapper{
h: h.h.WithAttrs(attrs),
level: atomic.LoadInt64(&h.level),
}
} }
func (h *wrapper) WithGroup(name string) slog.Handler { func (h *wrapper) WithGroup(name string) slog.Handler {
return h.h.WithGroup(name) return &wrapper{
h: h.h.WithGroup(name),
level: atomic.LoadInt64(&h.level),
}
} }
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@@ -117,10 +121,13 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
attrs, _ := s.argsAttrs(options.Fields) attrs, _ := s.argsAttrs(options.Fields)
l := &slogLogger{ l := &slogLogger{
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)}, handler: &wrapper{
opts: options, h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
},
opts: options,
} }
l.handler.level.Store(int64(loggerToSlogLevel(options.Level))) atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level)))
return l return l
} }
@@ -133,9 +140,9 @@ func (s *slogLogger) V(level logger.Level) bool {
} }
func (s *slogLogger) Level(level logger.Level) { func (s *slogLogger) Level(level logger.Level) {
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
s.mu.Lock() s.mu.Lock()
s.opts.Level = level s.opts.Level = level
s.handler.level.Store(int64(loggerToSlogLevel(level)))
s.mu.Unlock() s.mu.Unlock()
} }
@@ -156,8 +163,11 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
} }
attrs, _ := s.argsAttrs(fields) attrs, _ := s.argsAttrs(fields)
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)} l.handler = &wrapper{
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level))) h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
return l return l
} }
@@ -202,8 +212,11 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
h = slog.NewJSONHandler(s.opts.Out, handleOpt) h = slog.NewJSONHandler(s.opts.Out, handleOpt)
} }
s.handler = &wrapper{h: h.WithAttrs(attrs)} s.handler = &wrapper{
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level))) h: h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock() s.mu.Unlock()
return nil return nil
@@ -231,11 +244,12 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
s.printLog(ctx, logger.FatalLevel, msg, attrs...) s.printLog(ctx, logger.FatalLevel, msg, attrs...)
for _, fn := range s.opts.FatalFinalizers {
fn(ctx)
}
if closer, ok := s.opts.Out.(io.Closer); ok { if closer, ok := s.opts.Out.(io.Closer); ok {
closer.Close() closer.Close()
} }
time.Sleep(1 * time.Second)
os.Exit(1)
} }
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {
@@ -291,10 +305,17 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
} }
} }
var pcs [1]uintptr var pcs uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0]) if s.opts.AddCaller {
var caller [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
pcs = caller[0]
}
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
r.AddAttrs(attrs...) r.AddAttrs(attrs...)
_ = s.handler.Handle(ctx, r) _ = s.handler.Handle(ctx, r)
} }

View File

@@ -80,7 +80,7 @@ func TestTime(t *testing.T) {
WithHandlerFunc(slog.NewTextHandler), WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true), logger.WithAddStacktrace(true),
logger.WithTimeFunc(func() time.Time { logger.WithTimeFunc(func() time.Time {
return time.Unix(0, 0) return time.Unix(0, 0).UTC()
}), }),
) )
if err := l.Init(logger.WithFields("key1", "val1")); err != nil { if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
@@ -89,8 +89,7 @@ func TestTime(t *testing.T) {
l.Error(ctx, "msg1", errors.New("err")) l.Error(ctx, "msg1", errors.New("err"))
if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T03:00:00.000000000+03:00`)) && if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
!bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
} }
} }
@@ -470,3 +469,25 @@ func Test_WithContextAttrFunc(t *testing.T) {
// t.Logf("xxx %s", buf.Bytes()) // t.Logf("xxx %s", buf.Bytes())
} }
func TestFatalFinalizers(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(
logger.WithLevel(logger.TraceLevel),
logger.WithOutput(buf),
)
if err := l.Init(
logger.WithFatalFinalizers(func(ctx context.Context) {
l.Info(ctx, "fatal finalizer")
})); err != nil {
t.Fatal(err)
}
l.Fatal(ctx, "info_msg1")
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
}
}

View File

@@ -4,8 +4,8 @@ package meter
import ( import (
"io" "io"
"sort" "sort"
"strconv"
"strings" "strings"
"sync"
"time" "time"
) )
@@ -49,9 +49,11 @@ type Meter interface {
Set(opts ...Option) Meter Set(opts ...Option) Meter
// Histogram get or create histogram // Histogram get or create histogram
Histogram(name string, labels ...string) Histogram Histogram(name string, labels ...string) Histogram
// HistogramExt get or create histogram with specified quantiles
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
// Summary get or create summary // Summary get or create summary
Summary(name string, labels ...string) Summary Summary(name string, labels ...string) Summary
// SummaryExt get or create summary with spcified quantiles and window time // SummaryExt get or create summary with specified quantiles and window time
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
// Write writes metrics to io.Writer // Write writes metrics to io.Writer
Write(w io.Writer, opts ...Option) error Write(w io.Writer, opts ...Option) error
@@ -59,6 +61,8 @@ type Meter interface {
Options() Options Options() Options
// String return meter type // String return meter type
String() string String() string
// Unregister metric name and drop all data
Unregister(name string, labels ...string) bool
} }
// Counter is a counter // Counter is a counter
@@ -80,7 +84,11 @@ type FloatCounter interface {
// Gauge is a float64 gauge // Gauge is a float64 gauge
type Gauge interface { type Gauge interface {
Add(float64)
Get() float64 Get() float64
Set(float64)
Dec()
Inc()
} }
// Histogram is a histogram for non-negative values with automatically created buckets // Histogram is a histogram for non-negative values with automatically created buckets
@@ -117,6 +125,39 @@ func BuildLabels(labels ...string) []string {
return labels return labels
} }
var spool = newStringsPool(500)
type stringsPool struct {
p *sync.Pool
c int
}
func newStringsPool(size int) *stringsPool {
p := &stringsPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
}
return p
}
func (p *stringsPool) Cap() int {
return p.c
}
func (p *stringsPool) Get() *strings.Builder {
return p.p.Get().(*strings.Builder)
}
func (p *stringsPool) Put(b *strings.Builder) {
if b.Cap() > p.c {
return
}
b.Reset()
p.p.Put(b)
}
// BuildName used to combine metric with labels. // BuildName used to combine metric with labels.
// If labels count is odd, drop last element // If labels count is odd, drop last element
func BuildName(name string, labels ...string) string { func BuildName(name string, labels ...string) string {
@@ -125,8 +166,6 @@ func BuildName(name string, labels ...string) string {
} }
if len(labels) > 2 { if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0 idx := 0
for { for {
if labels[idx] == labels[idx+2] { if labels[idx] == labels[idx+2] {
@@ -141,7 +180,9 @@ func BuildName(name string, labels ...string) string {
} }
} }
var b strings.Builder b := spool.Get()
defer spool.Put(b)
_, _ = b.WriteString(name) _, _ = b.WriteString(name)
_, _ = b.WriteRune('{') _, _ = b.WriteRune('{')
for idx := 0; idx < len(labels); idx += 2 { for idx := 0; idx < len(labels); idx += 2 {
@@ -149,8 +190,9 @@ func BuildName(name string, labels ...string) string {
_, _ = b.WriteRune(',') _, _ = b.WriteRune(',')
} }
_, _ = b.WriteString(labels[idx]) _, _ = b.WriteString(labels[idx])
_, _ = b.WriteString(`=`) _, _ = b.WriteString(`="`)
_, _ = b.WriteString(strconv.Quote(labels[idx+1])) _, _ = b.WriteString(labels[idx+1])
_, _ = b.WriteRune('"')
} }
_, _ = b.WriteRune('}') _, _ = b.WriteRune('}')

View File

@@ -50,11 +50,12 @@ func TestBuildName(t *testing.T) {
data := map[string][]string{ data := map[string][]string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: { `my_metric{firstlabel="value2",zerolabel="value3"}`: {
"my_metric", "my_metric",
"zerolabel", "value3", "firstlabel", "value2", "firstlabel", "value2",
"zerolabel", "value3",
}, },
`my_metric{broker="broker2",register="mdns",server="tcp"}`: { `my_metric{broker="broker2",register="mdns",server="tcp"}`: {
"my_metric", "my_metric",
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns", "broker", "broker1", "broker", "broker2", "register", "mdns", "server", "http", "server", "tcp",
}, },
`my_metric{aaa="aaa"}`: { `my_metric{aaa="aaa"}`: {
"my_metric", "my_metric",

View File

@@ -28,6 +28,10 @@ func (r *noopMeter) Name() string {
return r.opts.Name return r.opts.Name
} }
func (r *noopMeter) Unregister(name string, labels ...string) bool {
return true
}
// Init initialize options // Init initialize options
func (r *noopMeter) Init(opts ...Option) error { func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
@@ -66,6 +70,11 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
return &noopHistogram{labels: labels} return &noopHistogram{labels: labels}
} }
// HistogramExt implements the Meter interface
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// Set implements the Meter interface // Set implements the Meter interface
func (r *noopMeter) Set(opts ...Option) Meter { func (r *noopMeter) Set(opts ...Option) Meter {
m := &noopMeter{opts: r.opts} m := &noopMeter{opts: r.opts}
@@ -132,6 +141,18 @@ type noopGauge struct {
labels []string labels []string
} }
func (r *noopGauge) Add(float64) {
}
func (r *noopGauge) Set(float64) {
}
func (r *noopGauge) Inc() {
}
func (r *noopGauge) Dec() {
}
func (r *noopGauge) Get() float64 { func (r *noopGauge) Get() float64 {
return 0 return 0
} }

View File

@@ -4,6 +4,8 @@ import (
"context" "context"
) )
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
// Option powers the configuration for metrics implementations: // Option powers the configuration for metrics implementations:
type Option func(*Options) type Option func(*Options)
@@ -23,6 +25,8 @@ type Options struct {
WriteProcessMetrics bool WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics // WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool WriteFDMetrics bool
// Quantiles specifies buckets for histogram
Quantiles []float64
} }
// NewOptions prepares a set of options: // NewOptions prepares a set of options:
@@ -61,14 +65,12 @@ func Address(value string) Option {
} }
} }
/* // Quantiles defines the desired spread of statistics for histogram metrics:
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics: func Quantiles(quantiles []float64) Option {
func TimingObjectives(value map[float64]float64) Option {
return func(o *Options) { return func(o *Options) {
o.TimingObjectives = value o.Quantiles = quantiles
} }
} }
*/
// Labels add the meter labels // Labels add the meter labels
func Labels(ls ...string) Option { func Labels(ls ...string) Option {

View File

@@ -91,7 +91,7 @@ func (p *bro) Connect(_ context.Context) error { return nil }
func (p *bro) Disconnect(_ context.Context) error { return nil } func (p *bro) Disconnect(_ context.Context) error { return nil }
// NewMessage creates new message // NewMessage creates new message
func (p *bro) NewMessage(_ context.Context, _ metadata.Metadata, _ interface{}, _ ...broker.PublishOption) (broker.Message, error) { func (p *bro) NewMessage(_ context.Context, _ metadata.Metadata, _ interface{}, _ ...broker.MessageOption) (broker.Message, error) {
return nil, nil return nil, nil
} }

View File

@@ -6,7 +6,6 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
maddr "go.unistack.org/micro/v4/util/addr" maddr "go.unistack.org/micro/v4/util/addr"
@@ -14,11 +13,6 @@ import (
"go.unistack.org/micro/v4/util/rand" "go.unistack.org/micro/v4/util/rand"
) )
// DefaultCodecs will be used to encode/decode
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type rpcHandler struct { type rpcHandler struct {
opts HandlerOptions opts HandlerOptions
handler interface{} handler interface{}

View File

@@ -8,7 +8,6 @@ import (
"time" "time"
"github.com/KimMachineGun/automemlimit/memlimit" "github.com/KimMachineGun/automemlimit/memlimit"
"go.uber.org/automaxprocs/maxprocs"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config" "go.unistack.org/micro/v4/config"
@@ -23,8 +22,8 @@ import (
) )
func init() { func init() {
_, _ = maxprocs.Set()
_, _ = memlimit.SetGoMemLimitWithOpts( _, _ = memlimit.SetGoMemLimitWithOpts(
memlimit.WithRefreshInterval(1*time.Minute),
memlimit.WithRatio(0.9), memlimit.WithRatio(0.9),
memlimit.WithProvider( memlimit.WithProvider(
memlimit.ApplyFallback( memlimit.ApplyFallback(

View File

@@ -2,6 +2,7 @@ package store
import ( import (
"context" "context"
"errors"
"testing" "testing"
) )
@@ -25,7 +26,8 @@ func TestHook(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if err := s.Exists(context.TODO(), "test"); err != nil { err := s.Exists(context.TODO(), "test")
if !errors.Is(err, ErrNotFound) {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -29,10 +29,10 @@ type ContextAttrFunc func(ctx context.Context) []interface{}
func init() { func init() {
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
func(ctx context.Context) []interface{} { func(ctx context.Context) []interface{} {
if span, ok := SpanFromContext(ctx); ok { if sp, ok := SpanFromContext(ctx); ok && sp != nil && sp.IsRecording() {
return []interface{}{ return []interface{}{
TraceIDKey, span.TraceID(), TraceIDKey, sp.TraceID(),
SpanIDKey, span.SpanID(), SpanIDKey, sp.SpanID(),
} }
} }
return nil return nil

View File

@@ -1,13 +1,16 @@
package buffer package buffer
import "io" import (
"fmt"
"io"
)
var _ interface { var _ interface {
io.ReadCloser io.ReadCloser
io.ReadSeeker io.ReadSeeker
} = (*SeekerBuffer)(nil) } = (*SeekerBuffer)(nil)
// Buffer is a ReadWriteCloser that supports seeking. It's intended to // SeekerBuffer is a ReadWriteCloser that supports seeking. It's intended to
// replicate the functionality of bytes.Buffer that I use in my projects. // replicate the functionality of bytes.Buffer that I use in my projects.
// //
// Note that the seeking is limited to the read marker; all writes are // Note that the seeking is limited to the read marker; all writes are
@@ -23,6 +26,7 @@ func NewSeekerBuffer(data []byte) *SeekerBuffer {
} }
} }
// Read reads up to len(p) bytes into p from the current read position.
func (b *SeekerBuffer) Read(p []byte) (int, error) { func (b *SeekerBuffer) Read(p []byte) (int, error) {
if b.pos >= int64(len(b.data)) { if b.pos >= int64(len(b.data)) {
return 0, io.EOF return 0, io.EOF
@@ -30,29 +34,51 @@ func (b *SeekerBuffer) Read(p []byte) (int, error) {
n := copy(p, b.data[b.pos:]) n := copy(p, b.data[b.pos:])
b.pos += int64(n) b.pos += int64(n)
return n, nil return n, nil
} }
// Write appends the contents of p to the end of the buffer. It does not affect the read position.
func (b *SeekerBuffer) Write(p []byte) (int, error) { func (b *SeekerBuffer) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
b.data = append(b.data, p...) b.data = append(b.data, p...)
return len(p), nil return len(p), nil
} }
// Seek sets the read pointer to pos. // Seek sets the offset for the next Read operation.
// The offset is interpreted according to whence:
// - io.SeekStart: relative to the beginning of the buffer
// - io.SeekCurrent: relative to the current position
// - io.SeekEnd: relative to the end of the buffer
//
// Returns an error if the resulting position is negative or if whence is invalid.
func (b *SeekerBuffer) Seek(offset int64, whence int) (int64, error) { func (b *SeekerBuffer) Seek(offset int64, whence int) (int64, error) {
var newPos int64
switch whence { switch whence {
case io.SeekStart: case io.SeekStart:
b.pos = offset newPos = offset
case io.SeekEnd: case io.SeekEnd:
b.pos = int64(len(b.data)) + offset newPos = int64(len(b.data)) + offset
case io.SeekCurrent: case io.SeekCurrent:
b.pos += offset newPos = b.pos + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
} }
if newPos < 0 {
return 0, fmt.Errorf("invalid seek: resulting position %d is negative", newPos)
}
b.pos = newPos
return b.pos, nil return b.pos, nil
} }
// Rewind resets the read pointer to 0. // Rewind resets the read position to 0.
func (b *SeekerBuffer) Rewind() error { func (b *SeekerBuffer) Rewind() error {
if _, err := b.Seek(0, io.SeekStart); err != nil { if _, err := b.Seek(0, io.SeekStart); err != nil {
return err return err
@@ -75,10 +101,16 @@ func (b *SeekerBuffer) Reset() {
// Len returns the length of data remaining to be read. // Len returns the length of data remaining to be read.
func (b *SeekerBuffer) Len() int { func (b *SeekerBuffer) Len() int {
if b.pos >= int64(len(b.data)) {
return 0
}
return len(b.data[b.pos:]) return len(b.data[b.pos:])
} }
// Bytes returns the underlying bytes from the current position. // Bytes returns the underlying bytes from the current position.
func (b *SeekerBuffer) Bytes() []byte { func (b *SeekerBuffer) Bytes() []byte {
if b.pos >= int64(len(b.data)) {
return []byte{}
}
return b.data[b.pos:] return b.data[b.pos:]
} }

View File

@@ -2,54 +2,384 @@ package buffer
import ( import (
"fmt" "fmt"
"strings" "io"
"testing" "testing"
"github.com/stretchr/testify/require"
) )
func noErrorT(t *testing.T, err error) { func TestNewSeekerBuffer(t *testing.T) {
if nil != err { input := []byte{'a', 'b', 'c', 'd', 'e'}
t.Fatalf("%s", err) expected := &SeekerBuffer{data: []byte{'a', 'b', 'c', 'd', 'e'}, pos: 0}
require.Equal(t, expected, NewSeekerBuffer(input))
}
func TestSeekerBuffer_Read(t *testing.T) {
tests := []struct {
name string
data []byte
initPos int64
readBuf []byte
expectedN int
expectedData []byte
expectedErr error
expectedPos int64
}{
{
name: "read with empty buffer",
data: []byte("hello"),
initPos: 0,
readBuf: []byte{},
expectedN: 0,
expectedData: []byte{},
expectedErr: nil,
expectedPos: 0,
},
{
name: "read with nil buffer",
data: []byte("hello"),
initPos: 0,
readBuf: nil,
expectedN: 0,
expectedData: nil,
expectedErr: nil,
expectedPos: 0,
},
{
name: "read full buffer",
data: []byte("hello"),
initPos: 0,
readBuf: make([]byte, 5),
expectedN: 5,
expectedData: []byte("hello"),
expectedErr: nil,
expectedPos: 5,
},
{
name: "read partial buffer",
data: []byte("hello"),
initPos: 2,
readBuf: make([]byte, 2),
expectedN: 2,
expectedData: []byte("ll"),
expectedErr: nil,
expectedPos: 4,
},
{
name: "read after end",
data: []byte("hello"),
initPos: 5,
readBuf: make([]byte, 5),
expectedN: 0,
expectedData: make([]byte, 5),
expectedErr: io.EOF,
expectedPos: 5,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSeekerBuffer(tt.data)
sb.pos = tt.initPos
n, err := sb.Read(tt.readBuf)
if tt.expectedErr != nil {
require.Equal(t, err, tt.expectedErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.expectedN, n)
require.Equal(t, tt.expectedData, tt.readBuf)
require.Equal(t, tt.expectedPos, sb.pos)
})
} }
} }
func boolT(t *testing.T, cond bool, s ...string) { func TestSeekerBuffer_Write(t *testing.T) {
if !cond { tests := []struct {
what := strings.Join(s, ", ") name string
if len(what) > 0 { initialData []byte
what = ": " + what initialPos int64
} writeData []byte
t.Fatalf("assert.Bool failed%s", what) expectedData []byte
expectedN int
}{
{
name: "write empty slice",
initialData: []byte("data"),
initialPos: 0,
writeData: []byte{},
expectedData: []byte("data"),
expectedN: 0,
},
{
name: "write nil slice",
initialData: []byte("data"),
initialPos: 0,
writeData: nil,
expectedData: []byte("data"),
expectedN: 0,
},
{
name: "write to empty buffer",
initialData: nil,
initialPos: 0,
writeData: []byte("abc"),
expectedData: []byte("abc"),
expectedN: 3,
},
{
name: "write to existing buffer",
initialData: []byte("hello"),
initialPos: 0,
writeData: []byte(" world"),
expectedData: []byte("hello world"),
expectedN: 6,
},
{
name: "write after read",
initialData: []byte("abc"),
initialPos: 2,
writeData: []byte("XYZ"),
expectedData: []byte("abcXYZ"),
expectedN: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSeekerBuffer(tt.initialData)
sb.pos = tt.initialPos
n, err := sb.Write(tt.writeData)
require.NoError(t, err)
require.Equal(t, tt.expectedN, n)
require.Equal(t, tt.expectedData, sb.data)
require.Equal(t, tt.initialPos, sb.pos)
})
} }
} }
func TestSeeking(t *testing.T) { func TestSeekerBuffer_Seek(t *testing.T) {
partA := []byte("hello, ") tests := []struct {
partB := []byte("world!") name string
initialData []byte
initialPos int64
offset int64
whence int
expectedPos int64
expectedErr error
}{
{
name: "seek with invalid whence",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 1,
whence: 12345,
expectedPos: 0,
expectedErr: fmt.Errorf("invalid whence: %d", 12345),
},
{
name: "seek negative from start",
initialData: []byte("abcdef"),
initialPos: 0,
offset: -1,
whence: io.SeekStart,
expectedPos: 0,
expectedErr: fmt.Errorf("invalid seek: resulting position %d is negative", -1),
},
{
name: "seek from start to 0",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 0,
whence: io.SeekStart,
expectedPos: 0,
expectedErr: nil,
},
{
name: "seek from start to 3",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 3,
whence: io.SeekStart,
expectedPos: 3,
expectedErr: nil,
},
{
name: "seek from end to -1 (last byte)",
initialData: []byte("abcdef"),
initialPos: 0,
offset: -1,
whence: io.SeekEnd,
expectedPos: 5,
expectedErr: nil,
},
{
name: "seek from current forward",
initialData: []byte("abcdef"),
initialPos: 2,
offset: 2,
whence: io.SeekCurrent,
expectedPos: 4,
expectedErr: nil,
},
{
name: "seek from current backward",
initialData: []byte("abcdef"),
initialPos: 4,
offset: -2,
whence: io.SeekCurrent,
expectedPos: 2,
expectedErr: nil,
},
{
name: "seek to end exactly",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 0,
whence: io.SeekEnd,
expectedPos: 6,
expectedErr: nil,
},
{
name: "seek to out of range",
initialData: []byte("abcdef"),
initialPos: 0,
offset: 2,
whence: io.SeekEnd,
expectedPos: 8,
expectedErr: nil,
},
}
buf := NewSeekerBuffer(partA) for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sb := NewSeekerBuffer(tt.initialData)
sb.pos = tt.initialPos
boolT(t, buf.Len() == len(partA), fmt.Sprintf("on init: have length %d, want length %d", buf.Len(), len(partA))) newPos, err := sb.Seek(tt.offset, tt.whence)
b := make([]byte, 32) if tt.expectedErr != nil {
require.Equal(t, tt.expectedErr, err)
n, err := buf.Read(b) } else {
noErrorT(t, err) require.NoError(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after reading 1: have length %d, want length 0", buf.Len())) require.Equal(t, tt.expectedPos, newPos)
boolT(t, n == len(partA), fmt.Sprintf("after reading 2: have length %d, want length %d", n, len(partA))) require.Equal(t, tt.expectedPos, sb.pos)
}
n, err = buf.Write(partB) })
noErrorT(t, err) }
boolT(t, n == len(partB), fmt.Sprintf("after writing: have length %d, want length %d", n, len(partB))) }
n, err = buf.Read(b) func TestSeekerBuffer_Rewind(t *testing.T) {
noErrorT(t, err) buf := NewSeekerBuffer([]byte("hello world"))
boolT(t, buf.Len() == 0, fmt.Sprintf("after rereading 1: have length %d, want length 0", buf.Len())) buf.pos = 4
boolT(t, n == len(partB), fmt.Sprintf("after rereading 2: have length %d, want length %d", n, len(partB)))
require.NoError(t, buf.Rewind())
partsLen := len(partA) + len(partB) require.Equal(t, []byte("hello world"), buf.data)
_ = buf.Rewind() require.Equal(t, int64(0), buf.pos)
boolT(t, buf.Len() == partsLen, fmt.Sprintf("after rewinding: have length %d, want length %d", buf.Len(), partsLen)) }
buf.Close() func TestSeekerBuffer_Close(t *testing.T) {
boolT(t, buf.Len() == 0, fmt.Sprintf("after closing, have length %d, want length 0", buf.Len())) buf := NewSeekerBuffer([]byte("hello world"))
buf.pos = 2
require.NoError(t, buf.Close())
require.Nil(t, buf.data)
require.Equal(t, int64(0), buf.pos)
}
func TestSeekerBuffer_Reset(t *testing.T) {
buf := NewSeekerBuffer([]byte("hello world"))
buf.pos = 2
buf.Reset()
require.Nil(t, buf.data)
require.Equal(t, int64(0), buf.pos)
}
func TestSeekerBuffer_Len(t *testing.T) {
tests := []struct {
name string
data []byte
pos int64
expected int
}{
{
name: "full buffer",
data: []byte("abcde"),
pos: 0,
expected: 5,
},
{
name: "partial read",
data: []byte("abcde"),
pos: 2,
expected: 3,
},
{
name: "fully read",
data: []byte("abcde"),
pos: 5,
expected: 0,
},
{
name: "pos > len",
data: []byte("abcde"),
pos: 10,
expected: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := NewSeekerBuffer(tt.data)
buf.pos = tt.pos
require.Equal(t, tt.expected, buf.Len())
})
}
}
func TestSeekerBuffer_Bytes(t *testing.T) {
tests := []struct {
name string
data []byte
pos int64
expected []byte
}{
{
name: "start of buffer",
data: []byte("abcde"),
pos: 0,
expected: []byte("abcde"),
},
{
name: "middle of buffer",
data: []byte("abcde"),
pos: 2,
expected: []byte("cde"),
},
{
name: "end of buffer",
data: []byte("abcde"),
pos: 5,
expected: []byte{},
},
{
name: "pos beyond end",
data: []byte("abcde"),
pos: 10,
expected: []byte{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := NewSeekerBuffer(tt.data)
buf.pos = tt.pos
require.Equal(t, tt.expected, buf.Bytes())
})
}
} }

View File

@@ -6,18 +6,18 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
) )
var ( func unregisterMetrics(size int) {
pools = make([]Statser, 0) meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size))
poolsMu sync.Mutex meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size))
) meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size))
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
// Stats struct
type Stats struct { type Stats struct {
Get uint64 Get uint64
Put uint64 Put uint64
@@ -25,41 +25,13 @@ type Stats struct {
Ret uint64 Ret uint64
} }
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct { type Pool[T any] struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
} }
func (p Pool[T]) Put(t T) { func (p Pool[T]) Put(t T) {
@@ -70,37 +42,82 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T) return p.p.Get().(T)
} }
func NewPool[T any](fn func() T) Pool[T] { func NewPool[T any](fn func() T, size int) Pool[T] {
return Pool[T]{ p := Pool[T]{
p: &sync.Pool{ c: size,
New: func() interface{} { get: &atomic.Uint64{},
return fn() put: &atomic.Uint64{},
}, mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
}, },
} }
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
} }
type BytePool struct { type BytePool struct {
p *sync.Pool p *sync.Pool
get uint64 get *atomic.Uint64
put uint64 put *atomic.Uint64
mis uint64 mis *atomic.Uint64
ret uint64 ret *atomic.Uint64
c int c int
} }
func NewBytePool(size int) *BytePool { func NewBytePool(size int) *BytePool {
p := &BytePool{c: size} p := &BytePool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
atomic.AddUint64(&p.mis, 1) p.mis.Add(1)
b := make([]byte, 0, size) b := make([]byte, 0, size)
return &b return &b
}, },
} }
poolsMu.Lock()
pools = append(pools, p) meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
poolsMu.Unlock() return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -110,49 +127,73 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats { func (p *BytePool) Stats() Stats {
return Stats{ return Stats{
Put: atomic.LoadUint64(&p.put), Put: p.put.Load(),
Get: atomic.LoadUint64(&p.get), Get: p.get.Load(),
Mis: atomic.LoadUint64(&p.mis), Mis: p.mis.Load(),
Ret: atomic.LoadUint64(&p.ret), Ret: p.ret.Load(),
} }
} }
func (p *BytePool) Get() *[]byte { func (p *BytePool) Get() *[]byte {
atomic.AddUint64(&p.get, 1) p.get.Add(1)
return p.p.Get().(*[]byte) return p.p.Get().(*[]byte)
} }
func (p *BytePool) Put(b *[]byte) { func (p *BytePool) Put(b *[]byte) {
atomic.AddUint64(&p.put, 1) p.put.Add(1)
if cap(*b) > p.c { if cap(*b) > p.c {
atomic.AddUint64(&p.ret, 1) p.ret.Add(1)
return return
} }
*b = (*b)[:0] *b = (*b)[:0]
p.p.Put(b) p.p.Put(b)
} }
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct { type BytesPool struct {
p *sync.Pool p *sync.Pool
get uint64 get *atomic.Uint64
put uint64 put *atomic.Uint64
mis uint64 mis *atomic.Uint64
ret uint64 ret *atomic.Uint64
c int c int
} }
func NewBytesPool(size int) *BytesPool { func NewBytesPool(size int) *BytesPool {
p := &BytesPool{c: size} p := &BytesPool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
atomic.AddUint64(&p.mis, 1) p.mis.Add(1)
b := bytes.NewBuffer(make([]byte, 0, size)) b := bytes.NewBuffer(make([]byte, 0, size))
return b return b
}, },
} }
poolsMu.Lock()
pools = append(pools, p) meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
poolsMu.Unlock() return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats { func (p *BytesPool) Stats() Stats {
return Stats{ return Stats{
Put: atomic.LoadUint64(&p.put), Put: p.put.Load(),
Get: atomic.LoadUint64(&p.get), Get: p.get.Load(),
Mis: atomic.LoadUint64(&p.mis), Mis: p.mis.Load(),
Ret: atomic.LoadUint64(&p.ret), Ret: p.ret.Load(),
} }
} }
@@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer {
} }
func (p *BytesPool) Put(b *bytes.Buffer) { func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c { if (*b).Cap() > p.c {
atomic.AddUint64(&p.ret, 1) p.ret.Add(1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct { type StringsPool struct {
p *sync.Pool p *sync.Pool
get uint64 get *atomic.Uint64
put uint64 put *atomic.Uint64
mis uint64 mis *atomic.Uint64
ret uint64 ret *atomic.Uint64
c int c int
} }
func NewStringsPool(size int) *StringsPool { func NewStringsPool(size int) *StringsPool {
p := &StringsPool{c: size} p := &StringsPool{
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
atomic.AddUint64(&p.mis, 1) p.mis.Add(1)
return &strings.Builder{} return &strings.Builder{}
}, },
} }
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p return p
} }
@@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats { func (p *StringsPool) Stats() Stats {
return Stats{ return Stats{
Put: atomic.LoadUint64(&p.put), Put: p.put.Load(),
Get: atomic.LoadUint64(&p.get), Get: p.get.Load(),
Mis: atomic.LoadUint64(&p.mis), Mis: p.mis.Load(),
Ret: atomic.LoadUint64(&p.ret), Ret: p.ret.Load(),
} }
} }
func (p *StringsPool) Get() *strings.Builder { func (p *StringsPool) Get() *strings.Builder {
atomic.AddUint64(&p.get, 1) p.get.Add(1)
return p.p.Get().(*strings.Builder) return p.p.Get().(*strings.Builder)
} }
func (p *StringsPool) Put(b *strings.Builder) { func (p *StringsPool) Put(b *strings.Builder) {
atomic.AddUint64(&p.put, 1) p.put.Add(1)
if b.Cap() > p.c { if b.Cap() > p.c {
atomic.AddUint64(&p.ret, 1) p.ret.Add(1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}