Compare commits

..

6 Commits

Author SHA1 Message Date
f92e18897a implement driver
Some checks failed
lint / lint (pull_request) Successful in 2m43s
test / test (pull_request) Successful in 4m53s
coverage / build (pull_request) Failing after 17m18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:27:42 +03:00
022326ddc4 move
Some checks failed
lint / lint (pull_request) Successful in 3m27s
test / test (pull_request) Successful in 4m49s
coverage / build (pull_request) Failing after 17m28s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:31:11 +03:00
e053eeac74 add default node state criterion
Some checks failed
lint / lint (pull_request) Successful in 2m36s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:26:16 +03:00
6c6916a050 initial hasql support
Some checks failed
lint / lint (pull_request) Successful in 4m23s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 17:16:20 +03:00
ea84ac094f Merge branch 'v4' into hasql
Some checks failed
test / test (pull_request) Failing after 17m58s
coverage / build (pull_request) Failing after 18m40s
lint / lint (pull_request) Failing after 1m41s
2025-09-18 14:35:10 +03:00
2886a7fe8a initial hasql support
Some checks failed
test / test (pull_request) Failing after 19m47s
lint / lint (pull_request) Failing after 19m59s
coverage / build (pull_request) Failing after 20m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 14:34:48 +03:00
25 changed files with 461 additions and 679 deletions

View File

@@ -25,7 +25,7 @@ jobs:
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1) dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash" echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash" echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" -a "$src_hash" != "" -a "$dst_hash" != "" ]; then if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT echo "sync_needed=true" >> $GITHUB_OUTPUT
else else
echo "sync_needed=false" >> $GITHUB_OUTPUT echo "sync_needed=false" >> $GITHUB_OUTPUT

View File

@@ -1,5 +1,5 @@
# Micro # Micro
![Coverage](https://img.shields.io/badge/Coverage-33.6%25-yellow) ![Coverage](https://img.shields.io/badge/Coverage-33.8%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush) [![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish. // NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
// Live returns broker liveness // Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type ( type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error) FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe HookSubscribe func(next FuncSubscribe) FuncSubscribe
) )
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte Body() []byte
// Unmarshal try to decode message body to dst. // Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal. // This is helper method that uses codec.Unmarshal.
Unmarshal(dst any, opts ...codec.Option) error Unmarshal(dst interface{}, opts ...codec.Option) error
// Ack acknowledge message if supported. // Ack acknowledge message if supported.
Ack() error Ack() error
} }

View File

@@ -18,6 +18,7 @@ import (
type Options struct { type Options struct {
// Name holds the broker name // Name holds the broker name
Name string Name string
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
@@ -30,20 +31,23 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// Wait waits for a collection of goroutines to finish // Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
// Hooks can be run before broker Publishing and message processing in Subscribe // Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
Hooks options.Hooks Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests // GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message // ContentType will be used if no content-type set when creating message
ContentType string ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler any
} }
// NewOptions create new Options // NewOptions create new Options
@@ -76,12 +80,6 @@ func Context(ctx context.Context) Option {
} }
} }
func GracefulTimeout(t time.Duration) Option {
return func(o *Options) {
o.GracefulTimeout = t
}
}
// ContentType used by default if not specified // ContentType used by default if not specified
func ContentType(ct string) Option { func ContentType(ct string) Option {
return func(o *Options) { return func(o *Options) {
@@ -89,13 +87,6 @@ func ContentType(ct string) Option {
} }
} }
// ErrorHandler handles errors in broker
func ErrorHandler(h any) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct // MessageOptions struct
type MessageOptions struct { type MessageOptions struct {
// ContentType for message body // ContentType for message body

View File

@@ -15,6 +15,11 @@ import (
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
) )
// DefaultCodecs will be used to encode/decode data
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type noopClient struct { type noopClient struct {
funcCall FuncCall funcCall FuncCall
funcStream FuncStream funcStream FuncStream

View File

@@ -161,7 +161,7 @@ func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Context: context.Background(), Context: context.Background(),
ContentType: DefaultContentType, ContentType: DefaultContentType,
Codecs: make(map[string]codec.Codec), Codecs: DefaultCodecs,
CallOptions: CallOptions{ CallOptions: CallOptions{
Context: context.Background(), Context: context.Background(),
Backoff: DefaultBackoff, Backoff: DefaultBackoff,

View File

@@ -3,12 +3,23 @@ package sql
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt"
"math"
"reflect" "reflect"
"time"
"unsafe" "unsafe"
"golang.yandex/hasql/v2" "golang.yandex/hasql/v2"
) )
var (
ErrClusterChecker = errors.New("cluster node checker required")
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
ErrClusterPicker = errors.New("cluster node picker required")
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
)
func newSQLRowError() *sql.Row { func newSQLRowError() *sql.Row {
row := &sql.Row{} row := &sql.Row{}
t := reflect.TypeOf(row).Elem() t := reflect.TypeOf(row).Elem()
@@ -25,6 +36,25 @@ type ClusterQuerier interface {
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
} }
type Querier interface {
// Basic connection methods
PingContext(ctx context.Context) error
Close() error
// Query methods with context
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
// Prepared statements with context
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
// Transaction management with context
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Conn(ctx context.Context) (*sql.Conn, error)
}
type Cluster struct { type Cluster struct {
hasql *hasql.Cluster[Querier] hasql *hasql.Cluster[Querier]
options ClusterOptions options ClusterOptions
@@ -71,6 +101,205 @@ func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
return &Cluster{hasql: c, options: options}, nil return &Cluster{hasql: c, options: options}, nil
} }
// compile time guard
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
type nodeStateCriterionKey struct{}
// NodeStateCriterion inject hasql.NodeStateCriterion to context
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
}
// CustomPickerOptions holds options to pick nodes
type CustomPickerOptions struct {
MaxLag int
Priority map[string]int32
Retries int
}
// CustomPickerOption func apply option to CustomPickerOptions
type CustomPickerOption func(*CustomPickerOptions)
// CustomPickerMaxLag specifies max lag for which node can be used
func CustomPickerMaxLag(n int) CustomPickerOption {
return func(o *CustomPickerOptions) {
o.MaxLag = n
}
}
// NewCustomPicker creates new node picker
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
options := CustomPickerOptions{}
for _, o := range opts {
o(&options)
}
return &CustomPicker[Querier]{opts: options}
}
// CustomPicker holds node picker options
type CustomPicker[T Querier] struct {
opts CustomPickerOptions
}
// PickNode used to return specific node
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
for _, n := range cnodes {
fmt.Printf("node %s\n", n.Node.String())
}
return cnodes[0]
}
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
if prio, ok := p.opts.Priority[nodeName]; ok {
return prio
}
return math.MaxInt32 // Default to lowest priority
}
// CompareNodes used to sort nodes
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
// Get replication lag values
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
// First check that lag lower then MaxLag
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
return 0 // both are equal
}
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
if aLag > p.opts.MaxLag {
return 1 // b is better
}
if bLag > p.opts.MaxLag {
return -1 // a is better
}
// Get node priorities
aPrio := p.getPriority(a.Node.String())
bPrio := p.getPriority(b.Node.String())
// if both priority equals
if aPrio == bPrio {
// First compare by replication lag
if aLag < bLag {
return -1
}
if aLag > bLag {
return 1
}
// If replication lag is equal, compare by latency
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
if aLatency < bLatency {
return -1
}
if aLatency > bLatency {
return 1
}
// If lag and latency is equal
return 0
}
// If priorities are different, prefer the node with lower priority value
if aPrio < bPrio {
return -1
}
return 1
}
// ClusterOptions contains cluster specific options
type ClusterOptions struct {
NodeChecker hasql.NodeChecker
NodePicker hasql.NodePicker[Querier]
NodeDiscoverer hasql.NodeDiscoverer[Querier]
Options []hasql.ClusterOpt[Querier]
Context context.Context
Retries int
NodePriority map[string]int32
NodeStateCriterion hasql.NodeStateCriterion
}
// ClusterOption apply cluster options to ClusterOptions
type ClusterOption func(*ClusterOptions)
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
return func(o *ClusterOptions) {
o.NodeChecker = c
}
}
// WithClusterNodePicker pass hasql.NodePicker to cluster options
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodePicker = p
}
}
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodeDiscoverer = d
}
}
// WithRetries retry count on other nodes in case of error
func WithRetries(n int) ClusterOption {
return func(o *ClusterOptions) {
o.Retries = n
}
}
// WithClusterContext pass context.Context to cluster options and used for checks
func WithClusterContext(ctx context.Context) ClusterOption {
return func(o *ClusterOptions) {
o.Context = ctx
}
}
// WithClusterOptions pass hasql.ClusterOpt
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.Options = append(o.Options, opts...)
}
}
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
return func(o *ClusterOptions) {
o.NodeStateCriterion = c
}
}
type ClusterNode struct {
Name string
DB Querier
Priority int32
}
// WithClusterNodes create cluster with static NodeDiscoverer
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
return func(o *ClusterOptions) {
nodes := make([]*hasql.Node[Querier], 0, len(cns))
if o.NodePriority == nil {
o.NodePriority = make(map[string]int32, len(cns))
}
for _, cn := range cns {
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
if cn.Priority == 0 {
cn.Priority = math.MaxInt32
}
o.NodePriority[cn.Name] = cn.Priority
}
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
}
}
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
var tx *sql.Tx var tx *sql.Tx
var err error var err error
@@ -233,3 +462,10 @@ func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStat
} }
return nil return nil
} }
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
return v
}
return c.options.NodeStateCriterion
}

View File

@@ -1,25 +0,0 @@
package sql
import (
"context"
"database/sql"
)
type Querier interface {
// Basic connection methods
PingContext(ctx context.Context) error
Close() error
// Query methods with context
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
// Prepared statements with context
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
// Transaction management with context
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
Conn(ctx context.Context) (*sql.Conn, error)
}

View File

@@ -9,8 +9,8 @@ import (
"time" "time"
) )
// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier] // OpenDBWithDriver creates a [*sql.DB] that uses the [ClusterDriver]
func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) { func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) {
driver := NewClusterDriver(db) driver := NewClusterDriver(db)
connector, err := driver.OpenConnector("") connector, err := driver.OpenConnector("")
if err != nil { if err != nil {
@@ -19,12 +19,12 @@ func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
return sql.OpenDB(connector), nil return sql.OpenDB(connector), nil
} }
// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier] // ClusterDriver implements driver.Driver and driver.Connector for an existing [Querier]
type ClusterDriver struct { type ClusterDriver struct {
db ClusterQuerier db ClusterQuerier
} }
// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier] // NewClusterDriver creates a new driver that uses an existing [ClusterQuerier]
func NewClusterDriver(db ClusterQuerier) *ClusterDriver { func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
return &ClusterDriver{db: db} return &ClusterDriver{db: db}
} }

View File

@@ -119,7 +119,7 @@ func TestDriver(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
db, err := OpenDBWithCluster(c) db, err := OpenDBWithDriver(c)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -1,10 +0,0 @@
package sql
import "errors"
var (
ErrClusterChecker = errors.New("cluster node checker required")
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
ErrClusterPicker = errors.New("cluster node picker required")
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
)

View File

@@ -1,110 +0,0 @@
package sql
import (
"context"
"math"
"golang.yandex/hasql/v2"
)
// ClusterOptions contains cluster specific options
type ClusterOptions struct {
NodeChecker hasql.NodeChecker
NodePicker hasql.NodePicker[Querier]
NodeDiscoverer hasql.NodeDiscoverer[Querier]
Options []hasql.ClusterOpt[Querier]
Context context.Context
Retries int
NodePriority map[string]int32
NodeStateCriterion hasql.NodeStateCriterion
}
// ClusterOption apply cluster options to ClusterOptions
type ClusterOption func(*ClusterOptions)
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
return func(o *ClusterOptions) {
o.NodeChecker = c
}
}
// WithClusterNodePicker pass hasql.NodePicker to cluster options
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodePicker = p
}
}
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.NodeDiscoverer = d
}
}
// WithRetries retry count on other nodes in case of error
func WithRetries(n int) ClusterOption {
return func(o *ClusterOptions) {
o.Retries = n
}
}
// WithClusterContext pass context.Context to cluster options and used for checks
func WithClusterContext(ctx context.Context) ClusterOption {
return func(o *ClusterOptions) {
o.Context = ctx
}
}
// WithClusterOptions pass hasql.ClusterOpt
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
return func(o *ClusterOptions) {
o.Options = append(o.Options, opts...)
}
}
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
return func(o *ClusterOptions) {
o.NodeStateCriterion = c
}
}
type ClusterNode struct {
Name string
DB Querier
Priority int32
}
// WithClusterNodes create cluster with static NodeDiscoverer
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
return func(o *ClusterOptions) {
nodes := make([]*hasql.Node[Querier], 0, len(cns))
if o.NodePriority == nil {
o.NodePriority = make(map[string]int32, len(cns))
}
for _, cn := range cns {
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
if cn.Priority == 0 {
cn.Priority = math.MaxInt32
}
o.NodePriority[cn.Name] = cn.Priority
}
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
}
}
type nodeStateCriterionKey struct{}
// NodeStateCriterion inject hasql.NodeStateCriterion to context
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
}
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
return v
}
return c.options.NodeStateCriterion
}

View File

@@ -1,113 +0,0 @@
package sql
import (
"fmt"
"math"
"time"
"golang.yandex/hasql/v2"
)
// compile time guard
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
// CustomPickerOptions holds options to pick nodes
type CustomPickerOptions struct {
MaxLag int
Priority map[string]int32
Retries int
}
// CustomPickerOption func apply option to CustomPickerOptions
type CustomPickerOption func(*CustomPickerOptions)
// CustomPickerMaxLag specifies max lag for which node can be used
func CustomPickerMaxLag(n int) CustomPickerOption {
return func(o *CustomPickerOptions) {
o.MaxLag = n
}
}
// NewCustomPicker creates new node picker
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
options := CustomPickerOptions{}
for _, o := range opts {
o(&options)
}
return &CustomPicker[Querier]{opts: options}
}
// CustomPicker holds node picker options
type CustomPicker[T Querier] struct {
opts CustomPickerOptions
}
// PickNode used to return specific node
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
for _, n := range cnodes {
fmt.Printf("node %s\n", n.Node.String())
}
return cnodes[0]
}
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
if prio, ok := p.opts.Priority[nodeName]; ok {
return prio
}
return math.MaxInt32 // Default to lowest priority
}
// CompareNodes used to sort nodes
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
// Get replication lag values
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
// First check that lag lower then MaxLag
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
return 0 // both are equal
}
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
if aLag > p.opts.MaxLag {
return 1 // b is better
}
if bLag > p.opts.MaxLag {
return -1 // a is better
}
// Get node priorities
aPrio := p.getPriority(a.Node.String())
bPrio := p.getPriority(b.Node.String())
// if both priority equals
if aPrio == bPrio {
// First compare by replication lag
if aLag < bLag {
return -1
}
if aLag > bLag {
return 1
}
// If replication lag is equal, compare by latency
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
if aLatency < bLatency {
return -1
}
if aLatency > bLatency {
return 1
}
// If lag and latency is equal
return 0
}
// If priorities are different, prefer the node with lower priority value
if aPrio < bPrio {
return -1
}
return 1
}

30
go.mod
View File

@@ -1,33 +1,35 @@
module go.unistack.org/micro/v4 module go.unistack.org/micro/v4
go 1.25 go 1.24
require ( require (
dario.cat/mergo v1.0.2 dario.cat/mergo v1.0.1
github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/KimMachineGun/automemlimit v0.7.5 github.com/KimMachineGun/automemlimit v0.7.0
github.com/goccy/go-yaml v1.18.0 github.com/goccy/go-yaml v1.17.1
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/matoous/go-nanoid v1.5.1 github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/spf13/cast v1.10.0 github.com/spf13/cast v1.7.1
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.10.0
go.uber.org/atomic v1.11.0 go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v4 v4.1.0 go.unistack.org/micro-proto/v4 v4.1.0
golang.org/x/sync v0.17.0 golang.org/x/sync v0.10.0
golang.yandex/hasql/v2 v2.1.0 golang.yandex/hasql/v2 v2.1.0
google.golang.org/grpc v1.76.0 google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.10 google.golang.org/protobuf v1.36.3
) )
require ( require (
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect
golang.org/x/sys v0.37.0 // indirect golang.org/x/net v0.34.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

68
go.sum
View File

@@ -1,19 +1,19 @@
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk= github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlDI/pBWK49u88=
github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
@@ -30,36 +30,40 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k= golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k=
golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA= golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -3,7 +3,6 @@ package sql
import ( import (
"context" "context"
"database/sql" "database/sql"
"sync"
"time" "time"
) )
@@ -12,84 +11,31 @@ type Statser interface {
} }
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
if db == nil {
return
}
options := NewOptions(opts...) options := NewOptions(opts...)
var ( go func() {
statsMu sync.Mutex ticker := time.NewTicker(options.MeterStatsInterval)
lastUpdated time.Time defer ticker.Stop()
maxOpenConnections, openConnections, inUse, idle, waitCount float64
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
waitDuration float64
)
updateFn := func() { for {
statsMu.Lock() select {
defer statsMu.Unlock() case <-ctx.Done():
return
if time.Since(lastUpdated) < options.MeterStatsInterval { case <-ticker.C:
return if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
} }
}()
stats := db.Stats()
maxOpenConnections = float64(stats.MaxOpenConnections)
openConnections = float64(stats.OpenConnections)
inUse = float64(stats.InUse)
idle = float64(stats.Idle)
waitCount = float64(stats.WaitCount)
maxIdleClosed = float64(stats.MaxIdleClosed)
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
waitDuration = float64(stats.WaitDuration.Seconds())
lastUpdated = time.Now()
}
options.Meter.Gauge(MaxOpenConnections, func() float64 {
updateFn()
return maxOpenConnections
})
options.Meter.Gauge(OpenConnections, func() float64 {
updateFn()
return openConnections
})
options.Meter.Gauge(InuseConnections, func() float64 {
updateFn()
return inUse
})
options.Meter.Gauge(IdleConnections, func() float64 {
updateFn()
return idle
})
options.Meter.Gauge(WaitConnections, func() float64 {
updateFn()
return waitCount
})
options.Meter.Gauge(BlockedSeconds, func() float64 {
updateFn()
return waitDuration
})
options.Meter.Gauge(MaxIdleClosed, func() float64 {
updateFn()
return maxIdleClosed
})
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
updateFn()
return maxIdleTimeClosed
})
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
updateFn()
return maxLifetimeClosed
})
} }

View File

@@ -8,7 +8,6 @@ import (
"slices" "slices"
"time" "time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
) )
@@ -43,10 +42,8 @@ type Options struct {
Fields []interface{} Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context // ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frames to skip // callerSkipCount number of frmaes to skip
CallerSkipCount int CallerSkipCount int
// AddCaller enables to get caller
AddCaller bool
// The logging level the logger should log // The logging level the logger should log
Level Level Level Level
// AddSource enabled writing source file and position in log // AddSource enabled writing source file and position in log
@@ -55,12 +52,6 @@ type Options struct {
AddStacktrace bool AddStacktrace bool
// DedupKeys deduplicate keys in log output // DedupKeys deduplicate keys in log output
DedupKeys bool DedupKeys bool
// FatalFinalizers runs in order in [logger.Fatal] method
FatalFinalizers []func(context.Context)
}
var DefaultFatalFinalizer = func(ctx context.Context) {
os.Exit(1)
} }
// NewOptions creates new options struct // NewOptions creates new options struct
@@ -74,7 +65,6 @@ func NewOptions(opts ...Option) Options {
AddSource: true, AddSource: true,
TimeFunc: time.Now, TimeFunc: time.Now,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
} }
WithMicroKeys()(&options) WithMicroKeys()(&options)
@@ -86,19 +76,6 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
func WithCallerEnabled(b bool) logger.Option {
return func(o *Options) {
o.AddCaller = b
}
}
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {
o.FatalFinalizers = fncs
}
}
// WithContextAttrFuncs appends default funcs for the context attrs filler // WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option { func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -4,12 +4,14 @@ import (
"context" "context"
"io" "io"
"log/slog" "log/slog"
"os"
"reflect" "reflect"
"regexp" "regexp"
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
@@ -37,11 +39,11 @@ var (
type wrapper struct { type wrapper struct {
h slog.Handler h slog.Handler
level int64 level atomic.Int64
} }
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool { func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
return level >= slog.Level(atomic.LoadInt64(&h.level)) return level >= slog.Level(int(h.level.Load()))
} }
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error { func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
@@ -49,17 +51,11 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
} }
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return &wrapper{ return h.h.WithAttrs(attrs)
h: h.h.WithAttrs(attrs),
level: atomic.LoadInt64(&h.level),
}
} }
func (h *wrapper) WithGroup(name string) slog.Handler { func (h *wrapper) WithGroup(name string) slog.Handler {
return &wrapper{ return h.h.WithGroup(name)
h: h.h.WithGroup(name),
level: atomic.LoadInt64(&h.level),
}
} }
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@@ -121,13 +117,10 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
attrs, _ := s.argsAttrs(options.Fields) attrs, _ := s.argsAttrs(options.Fields)
l := &slogLogger{ l := &slogLogger{
handler: &wrapper{ handler: &wrapper{h: s.handler.h.WithAttrs(attrs)},
h: s.handler.h.WithAttrs(attrs), opts: options,
level: atomic.LoadInt64(&s.handler.level),
},
opts: options,
} }
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level))) l.handler.level.Store(int64(loggerToSlogLevel(options.Level)))
return l return l
} }
@@ -140,9 +133,9 @@ func (s *slogLogger) V(level logger.Level) bool {
} }
func (s *slogLogger) Level(level logger.Level) { func (s *slogLogger) Level(level logger.Level) {
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
s.mu.Lock() s.mu.Lock()
s.opts.Level = level s.opts.Level = level
s.handler.level.Store(int64(loggerToSlogLevel(level)))
s.mu.Unlock() s.mu.Unlock()
} }
@@ -163,11 +156,8 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
} }
attrs, _ := s.argsAttrs(fields) attrs, _ := s.argsAttrs(fields)
l.handler = &wrapper{ l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)}
h: s.handler.h.WithAttrs(attrs), l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level)))
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
return l return l
} }
@@ -212,11 +202,8 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
h = slog.NewJSONHandler(s.opts.Out, handleOpt) h = slog.NewJSONHandler(s.opts.Out, handleOpt)
} }
s.handler = &wrapper{ s.handler = &wrapper{h: h.WithAttrs(attrs)}
h: h.WithAttrs(attrs), s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock() s.mu.Unlock()
return nil return nil
@@ -244,12 +231,11 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
s.printLog(ctx, logger.FatalLevel, msg, attrs...) s.printLog(ctx, logger.FatalLevel, msg, attrs...)
for _, fn := range s.opts.FatalFinalizers {
fn(ctx)
}
if closer, ok := s.opts.Out.(io.Closer); ok { if closer, ok := s.opts.Out.(io.Closer); ok {
closer.Close() closer.Close()
} }
time.Sleep(1 * time.Second)
os.Exit(1)
} }
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {
@@ -305,17 +291,10 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
} }
} }
var pcs uintptr var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
if s.opts.AddCaller { r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
var caller [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
pcs = caller[0]
}
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
r.AddAttrs(attrs...) r.AddAttrs(attrs...)
_ = s.handler.Handle(ctx, r) _ = s.handler.Handle(ctx, r)
} }

View File

@@ -469,25 +469,3 @@ func Test_WithContextAttrFunc(t *testing.T) {
// t.Logf("xxx %s", buf.Bytes()) // t.Logf("xxx %s", buf.Bytes())
} }
func TestFatalFinalizers(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(
logger.WithLevel(logger.TraceLevel),
logger.WithOutput(buf),
)
if err := l.Init(
logger.WithFatalFinalizers(func(ctx context.Context) {
l.Info(ctx, "fatal finalizer")
})); err != nil {
t.Fatal(err)
}
l.Fatal(ctx, "info_msg1")
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
}
}

View File

@@ -49,11 +49,9 @@ type Meter interface {
Set(opts ...Option) Meter Set(opts ...Option) Meter
// Histogram get or create histogram // Histogram get or create histogram
Histogram(name string, labels ...string) Histogram Histogram(name string, labels ...string) Histogram
// HistogramExt get or create histogram with specified quantiles
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
// Summary get or create summary // Summary get or create summary
Summary(name string, labels ...string) Summary Summary(name string, labels ...string) Summary
// SummaryExt get or create summary with specified quantiles and window time // SummaryExt get or create summary with spcified quantiles and window time
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
// Write writes metrics to io.Writer // Write writes metrics to io.Writer
Write(w io.Writer, opts ...Option) error Write(w io.Writer, opts ...Option) error
@@ -61,8 +59,6 @@ type Meter interface {
Options() Options Options() Options
// String return meter type // String return meter type
String() string String() string
// Unregister metric name and drop all data
Unregister(name string, labels ...string) bool
} }
// Counter is a counter // Counter is a counter
@@ -84,11 +80,7 @@ type FloatCounter interface {
// Gauge is a float64 gauge // Gauge is a float64 gauge
type Gauge interface { type Gauge interface {
Add(float64)
Get() float64 Get() float64
Set(float64)
Dec()
Inc()
} }
// Histogram is a histogram for non-negative values with automatically created buckets // Histogram is a histogram for non-negative values with automatically created buckets

View File

@@ -28,10 +28,6 @@ func (r *noopMeter) Name() string {
return r.opts.Name return r.opts.Name
} }
func (r *noopMeter) Unregister(name string, labels ...string) bool {
return true
}
// Init initialize options // Init initialize options
func (r *noopMeter) Init(opts ...Option) error { func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
@@ -70,11 +66,6 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
return &noopHistogram{labels: labels} return &noopHistogram{labels: labels}
} }
// HistogramExt implements the Meter interface
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
// Set implements the Meter interface // Set implements the Meter interface
func (r *noopMeter) Set(opts ...Option) Meter { func (r *noopMeter) Set(opts ...Option) Meter {
m := &noopMeter{opts: r.opts} m := &noopMeter{opts: r.opts}
@@ -141,18 +132,6 @@ type noopGauge struct {
labels []string labels []string
} }
func (r *noopGauge) Add(float64) {
}
func (r *noopGauge) Set(float64) {
}
func (r *noopGauge) Inc() {
}
func (r *noopGauge) Dec() {
}
func (r *noopGauge) Get() float64 { func (r *noopGauge) Get() float64 {
return 0 return 0
} }

View File

@@ -4,8 +4,6 @@ import (
"context" "context"
) )
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
// Option powers the configuration for metrics implementations: // Option powers the configuration for metrics implementations:
type Option func(*Options) type Option func(*Options)
@@ -25,8 +23,6 @@ type Options struct {
WriteProcessMetrics bool WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics // WriteFDMetrics flag to write fd metrics
WriteFDMetrics bool WriteFDMetrics bool
// Quantiles specifies buckets for histogram
Quantiles []float64
} }
// NewOptions prepares a set of options: // NewOptions prepares a set of options:
@@ -65,12 +61,14 @@ func Address(value string) Option {
} }
} }
// Quantiles defines the desired spread of statistics for histogram metrics: /*
func Quantiles(quantiles []float64) Option { // TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
func TimingObjectives(value map[float64]float64) Option {
return func(o *Options) { return func(o *Options) {
o.Quantiles = quantiles o.TimingObjectives = value
} }
} }
*/
// Labels add the meter labels // Labels add the meter labels
func Labels(ls ...string) Option { func Labels(ls ...string) Option {

View File

@@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
maddr "go.unistack.org/micro/v4/util/addr" maddr "go.unistack.org/micro/v4/util/addr"
@@ -13,6 +14,11 @@ import (
"go.unistack.org/micro/v4/util/rand" "go.unistack.org/micro/v4/util/rand"
) )
// DefaultCodecs will be used to encode/decode
var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
type rpcHandler struct { type rpcHandler struct {
opts HandlerOptions opts HandlerOptions
handler interface{} handler interface{}

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/KimMachineGun/automemlimit/memlimit" "github.com/KimMachineGun/automemlimit/memlimit"
"go.uber.org/automaxprocs/maxprocs"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config" "go.unistack.org/micro/v4/config"
@@ -22,8 +23,8 @@ import (
) )
func init() { func init() {
_, _ = maxprocs.Set()
_, _ = memlimit.SetGoMemLimitWithOpts( _, _ = memlimit.SetGoMemLimitWithOpts(
memlimit.WithRefreshInterval(1*time.Minute),
memlimit.WithRatio(0.9), memlimit.WithRatio(0.9),
memlimit.WithProvider( memlimit.WithProvider(
memlimit.ApplyFallback( memlimit.ApplyFallback(

View File

@@ -6,18 +6,18 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
) )
func unregisterMetrics(size int) { var (
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) pools = make([]Statser, 0)
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) poolsMu sync.Mutex
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) )
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
// Stats struct
type Stats struct { type Stats struct {
Get uint64 Get uint64
Put uint64 Put uint64
@@ -25,13 +25,41 @@ type Stats struct {
Ret uint64 Ret uint64
} }
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct { type Pool[T any] struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
} }
func (p Pool[T]) Put(t T) { func (p Pool[T]) Put(t T) {
@@ -42,82 +70,37 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T) return p.p.Get().(T)
} }
func NewPool[T any](fn func() T, size int) Pool[T] { func NewPool[T any](fn func() T) Pool[T] {
p := Pool[T]{ return Pool[T]{
c: size, p: &sync.Pool{
get: &atomic.Uint64{}, New: func() interface{} {
put: &atomic.Uint64{}, return fn()
mis: &atomic.Uint64{}, },
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
}, },
} }
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
} }
type BytePool struct { type BytePool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytePool(size int) *BytePool { func NewBytePool(size int) *BytePool {
p := &BytePool{ p := &BytePool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size) b := make([]byte, 0, size)
return &b return &b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -127,73 +110,49 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats { func (p *BytePool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *BytePool) Get() *[]byte { func (p *BytePool) Get() *[]byte {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte) return p.p.Get().(*[]byte)
} }
func (p *BytePool) Put(b *[]byte) { func (p *BytePool) Put(b *[]byte) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c { if cap(*b) > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
*b = (*b)[:0] *b = (*b)[:0]
p.p.Put(b) p.p.Put(b)
} }
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct { type BytesPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytesPool(size int) *BytesPool { func NewBytesPool(size int) *BytesPool {
p := &BytesPool{ p := &BytesPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size)) b := bytes.NewBuffer(make([]byte, 0, size))
return b return b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -203,10 +162,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats { func (p *BytesPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
@@ -215,43 +174,34 @@ func (p *BytesPool) Get() *bytes.Buffer {
} }
func (p *BytesPool) Put(b *bytes.Buffer) { func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c { if (*b).Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct { type StringsPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewStringsPool(size int) *StringsPool { func NewStringsPool(size int) *StringsPool {
p := &StringsPool{ p := &StringsPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
return &strings.Builder{} return &strings.Builder{}
}, },
} }
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p return p
} }
@@ -261,28 +211,24 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats { func (p *StringsPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *StringsPool) Get() *strings.Builder { func (p *StringsPool) Get() *strings.Builder {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder) return p.p.Get().(*strings.Builder)
} }
func (p *StringsPool) Put(b *strings.Builder) { func (p *StringsPool) Put(b *strings.Builder) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c { if b.Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}