Compare commits
6 Commits
v4.1.23
...
f92e18897a
| Author | SHA1 | Date | |
|---|---|---|---|
| f92e18897a | |||
| 022326ddc4 | |||
| e053eeac74 | |||
| 6c6916a050 | |||
| ea84ac094f | |||
| 2886a7fe8a |
@@ -1,5 +1,5 @@
|
||||
# Micro
|
||||

|
||||

|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
||||
|
||||
@@ -3,12 +3,23 @@ package sql
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrClusterChecker = errors.New("cluster node checker required")
|
||||
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
|
||||
ErrClusterPicker = errors.New("cluster node picker required")
|
||||
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
|
||||
)
|
||||
|
||||
func newSQLRowError() *sql.Row {
|
||||
row := &sql.Row{}
|
||||
t := reflect.TypeOf(row).Elem()
|
||||
@@ -25,6 +36,25 @@ type ClusterQuerier interface {
|
||||
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
// Basic connection methods
|
||||
PingContext(ctx context.Context) error
|
||||
Close() error
|
||||
|
||||
// Query methods with context
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
|
||||
// Prepared statements with context
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
|
||||
// Transaction management with context
|
||||
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
|
||||
|
||||
Conn(ctx context.Context) (*sql.Conn, error)
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
hasql *hasql.Cluster[Querier]
|
||||
options ClusterOptions
|
||||
@@ -71,6 +101,205 @@ func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
|
||||
return &Cluster{hasql: c, options: options}, nil
|
||||
}
|
||||
|
||||
// compile time guard
|
||||
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
|
||||
|
||||
type nodeStateCriterionKey struct{}
|
||||
|
||||
// NodeStateCriterion inject hasql.NodeStateCriterion to context
|
||||
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
|
||||
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
|
||||
}
|
||||
|
||||
// CustomPickerOptions holds options to pick nodes
|
||||
type CustomPickerOptions struct {
|
||||
MaxLag int
|
||||
Priority map[string]int32
|
||||
Retries int
|
||||
}
|
||||
|
||||
// CustomPickerOption func apply option to CustomPickerOptions
|
||||
type CustomPickerOption func(*CustomPickerOptions)
|
||||
|
||||
// CustomPickerMaxLag specifies max lag for which node can be used
|
||||
func CustomPickerMaxLag(n int) CustomPickerOption {
|
||||
return func(o *CustomPickerOptions) {
|
||||
o.MaxLag = n
|
||||
}
|
||||
}
|
||||
|
||||
// NewCustomPicker creates new node picker
|
||||
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
|
||||
options := CustomPickerOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &CustomPicker[Querier]{opts: options}
|
||||
}
|
||||
|
||||
// CustomPicker holds node picker options
|
||||
type CustomPicker[T Querier] struct {
|
||||
opts CustomPickerOptions
|
||||
}
|
||||
|
||||
// PickNode used to return specific node
|
||||
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
|
||||
for _, n := range cnodes {
|
||||
fmt.Printf("node %s\n", n.Node.String())
|
||||
}
|
||||
return cnodes[0]
|
||||
}
|
||||
|
||||
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
|
||||
if prio, ok := p.opts.Priority[nodeName]; ok {
|
||||
return prio
|
||||
}
|
||||
return math.MaxInt32 // Default to lowest priority
|
||||
}
|
||||
|
||||
// CompareNodes used to sort nodes
|
||||
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
|
||||
// Get replication lag values
|
||||
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
|
||||
// First check that lag lower then MaxLag
|
||||
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
|
||||
return 0 // both are equal
|
||||
}
|
||||
|
||||
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
|
||||
if aLag > p.opts.MaxLag {
|
||||
return 1 // b is better
|
||||
}
|
||||
if bLag > p.opts.MaxLag {
|
||||
return -1 // a is better
|
||||
}
|
||||
|
||||
// Get node priorities
|
||||
aPrio := p.getPriority(a.Node.String())
|
||||
bPrio := p.getPriority(b.Node.String())
|
||||
|
||||
// if both priority equals
|
||||
if aPrio == bPrio {
|
||||
// First compare by replication lag
|
||||
if aLag < bLag {
|
||||
return -1
|
||||
}
|
||||
if aLag > bLag {
|
||||
return 1
|
||||
}
|
||||
// If replication lag is equal, compare by latency
|
||||
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
|
||||
if aLatency < bLatency {
|
||||
return -1
|
||||
}
|
||||
if aLatency > bLatency {
|
||||
return 1
|
||||
}
|
||||
|
||||
// If lag and latency is equal
|
||||
return 0
|
||||
}
|
||||
|
||||
// If priorities are different, prefer the node with lower priority value
|
||||
if aPrio < bPrio {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
// ClusterOptions contains cluster specific options
|
||||
type ClusterOptions struct {
|
||||
NodeChecker hasql.NodeChecker
|
||||
NodePicker hasql.NodePicker[Querier]
|
||||
NodeDiscoverer hasql.NodeDiscoverer[Querier]
|
||||
Options []hasql.ClusterOpt[Querier]
|
||||
Context context.Context
|
||||
Retries int
|
||||
NodePriority map[string]int32
|
||||
NodeStateCriterion hasql.NodeStateCriterion
|
||||
}
|
||||
|
||||
// ClusterOption apply cluster options to ClusterOptions
|
||||
type ClusterOption func(*ClusterOptions)
|
||||
|
||||
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
|
||||
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeChecker = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodePicker pass hasql.NodePicker to cluster options
|
||||
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodePicker = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
|
||||
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeDiscoverer = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetries retry count on other nodes in case of error
|
||||
func WithRetries(n int) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Retries = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterContext pass context.Context to cluster options and used for checks
|
||||
func WithClusterContext(ctx context.Context) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterOptions pass hasql.ClusterOpt
|
||||
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Options = append(o.Options, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
|
||||
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeStateCriterion = c
|
||||
}
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Name string
|
||||
DB Querier
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// WithClusterNodes create cluster with static NodeDiscoverer
|
||||
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
nodes := make([]*hasql.Node[Querier], 0, len(cns))
|
||||
if o.NodePriority == nil {
|
||||
o.NodePriority = make(map[string]int32, len(cns))
|
||||
}
|
||||
for _, cn := range cns {
|
||||
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
|
||||
if cn.Priority == 0 {
|
||||
cn.Priority = math.MaxInt32
|
||||
}
|
||||
o.NodePriority[cn.Name] = cn.Priority
|
||||
}
|
||||
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
|
||||
var tx *sql.Tx
|
||||
var err error
|
||||
@@ -233,3 +462,10 @@ func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStat
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
|
||||
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
|
||||
return v
|
||||
}
|
||||
return c.options.NodeStateCriterion
|
||||
}
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
// Basic connection methods
|
||||
PingContext(ctx context.Context) error
|
||||
Close() error
|
||||
|
||||
// Query methods with context
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
|
||||
// Prepared statements with context
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
|
||||
// Transaction management with context
|
||||
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
|
||||
|
||||
Conn(ctx context.Context) (*sql.Conn, error)
|
||||
}
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier]
|
||||
func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
|
||||
// OpenDBWithDriver creates a [*sql.DB] that uses the [ClusterDriver]
|
||||
func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) {
|
||||
driver := NewClusterDriver(db)
|
||||
connector, err := driver.OpenConnector("")
|
||||
if err != nil {
|
||||
@@ -19,12 +19,12 @@ func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
|
||||
return sql.OpenDB(connector), nil
|
||||
}
|
||||
|
||||
// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier]
|
||||
// ClusterDriver implements driver.Driver and driver.Connector for an existing [Querier]
|
||||
type ClusterDriver struct {
|
||||
db ClusterQuerier
|
||||
}
|
||||
|
||||
// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier]
|
||||
// NewClusterDriver creates a new driver that uses an existing [ClusterQuerier]
|
||||
func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
|
||||
return &ClusterDriver{db: db}
|
||||
}
|
||||
|
||||
@@ -119,7 +119,7 @@ func TestDriver(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := OpenDBWithCluster(c)
|
||||
db, err := OpenDBWithDriver(c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
package sql
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrClusterChecker = errors.New("cluster node checker required")
|
||||
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
|
||||
ErrClusterPicker = errors.New("cluster node picker required")
|
||||
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
|
||||
)
|
||||
@@ -1,110 +0,0 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
// ClusterOptions contains cluster specific options
|
||||
type ClusterOptions struct {
|
||||
NodeChecker hasql.NodeChecker
|
||||
NodePicker hasql.NodePicker[Querier]
|
||||
NodeDiscoverer hasql.NodeDiscoverer[Querier]
|
||||
Options []hasql.ClusterOpt[Querier]
|
||||
Context context.Context
|
||||
Retries int
|
||||
NodePriority map[string]int32
|
||||
NodeStateCriterion hasql.NodeStateCriterion
|
||||
}
|
||||
|
||||
// ClusterOption apply cluster options to ClusterOptions
|
||||
type ClusterOption func(*ClusterOptions)
|
||||
|
||||
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
|
||||
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeChecker = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodePicker pass hasql.NodePicker to cluster options
|
||||
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodePicker = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
|
||||
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeDiscoverer = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetries retry count on other nodes in case of error
|
||||
func WithRetries(n int) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Retries = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterContext pass context.Context to cluster options and used for checks
|
||||
func WithClusterContext(ctx context.Context) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterOptions pass hasql.ClusterOpt
|
||||
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Options = append(o.Options, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
|
||||
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeStateCriterion = c
|
||||
}
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Name string
|
||||
DB Querier
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// WithClusterNodes create cluster with static NodeDiscoverer
|
||||
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
nodes := make([]*hasql.Node[Querier], 0, len(cns))
|
||||
if o.NodePriority == nil {
|
||||
o.NodePriority = make(map[string]int32, len(cns))
|
||||
}
|
||||
for _, cn := range cns {
|
||||
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
|
||||
if cn.Priority == 0 {
|
||||
cn.Priority = math.MaxInt32
|
||||
}
|
||||
o.NodePriority[cn.Name] = cn.Priority
|
||||
}
|
||||
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
type nodeStateCriterionKey struct{}
|
||||
|
||||
// NodeStateCriterion inject hasql.NodeStateCriterion to context
|
||||
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
|
||||
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
|
||||
}
|
||||
|
||||
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
|
||||
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
|
||||
return v
|
||||
}
|
||||
return c.options.NodeStateCriterion
|
||||
}
|
||||
@@ -1,113 +0,0 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
// compile time guard
|
||||
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
|
||||
|
||||
// CustomPickerOptions holds options to pick nodes
|
||||
type CustomPickerOptions struct {
|
||||
MaxLag int
|
||||
Priority map[string]int32
|
||||
Retries int
|
||||
}
|
||||
|
||||
// CustomPickerOption func apply option to CustomPickerOptions
|
||||
type CustomPickerOption func(*CustomPickerOptions)
|
||||
|
||||
// CustomPickerMaxLag specifies max lag for which node can be used
|
||||
func CustomPickerMaxLag(n int) CustomPickerOption {
|
||||
return func(o *CustomPickerOptions) {
|
||||
o.MaxLag = n
|
||||
}
|
||||
}
|
||||
|
||||
// NewCustomPicker creates new node picker
|
||||
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
|
||||
options := CustomPickerOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &CustomPicker[Querier]{opts: options}
|
||||
}
|
||||
|
||||
// CustomPicker holds node picker options
|
||||
type CustomPicker[T Querier] struct {
|
||||
opts CustomPickerOptions
|
||||
}
|
||||
|
||||
// PickNode used to return specific node
|
||||
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
|
||||
for _, n := range cnodes {
|
||||
fmt.Printf("node %s\n", n.Node.String())
|
||||
}
|
||||
return cnodes[0]
|
||||
}
|
||||
|
||||
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
|
||||
if prio, ok := p.opts.Priority[nodeName]; ok {
|
||||
return prio
|
||||
}
|
||||
return math.MaxInt32 // Default to lowest priority
|
||||
}
|
||||
|
||||
// CompareNodes used to sort nodes
|
||||
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
|
||||
// Get replication lag values
|
||||
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
|
||||
// First check that lag lower then MaxLag
|
||||
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
|
||||
return 0 // both are equal
|
||||
}
|
||||
|
||||
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
|
||||
if aLag > p.opts.MaxLag {
|
||||
return 1 // b is better
|
||||
}
|
||||
if bLag > p.opts.MaxLag {
|
||||
return -1 // a is better
|
||||
}
|
||||
|
||||
// Get node priorities
|
||||
aPrio := p.getPriority(a.Node.String())
|
||||
bPrio := p.getPriority(b.Node.String())
|
||||
|
||||
// if both priority equals
|
||||
if aPrio == bPrio {
|
||||
// First compare by replication lag
|
||||
if aLag < bLag {
|
||||
return -1
|
||||
}
|
||||
if aLag > bLag {
|
||||
return 1
|
||||
}
|
||||
// If replication lag is equal, compare by latency
|
||||
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
|
||||
if aLatency < bLatency {
|
||||
return -1
|
||||
}
|
||||
if aLatency > bLatency {
|
||||
return 1
|
||||
}
|
||||
|
||||
// If lag and latency is equal
|
||||
return 0
|
||||
}
|
||||
|
||||
// If priorities are different, prefer the node with lower priority value
|
||||
if aPrio < bPrio {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package sql
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -12,84 +11,31 @@ type Statser interface {
|
||||
}
|
||||
|
||||
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
|
||||
if db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
options := NewOptions(opts...)
|
||||
|
||||
var (
|
||||
statsMu sync.Mutex
|
||||
lastUpdated time.Time
|
||||
maxOpenConnections, openConnections, inUse, idle, waitCount float64
|
||||
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
|
||||
waitDuration float64
|
||||
)
|
||||
go func() {
|
||||
ticker := time.NewTicker(options.MeterStatsInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
updateFn := func() {
|
||||
statsMu.Lock()
|
||||
defer statsMu.Unlock()
|
||||
|
||||
if time.Since(lastUpdated) < options.MeterStatsInterval {
|
||||
return
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if db == nil {
|
||||
return
|
||||
}
|
||||
stats := db.Stats()
|
||||
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
|
||||
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
|
||||
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
|
||||
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
|
||||
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
|
||||
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
|
||||
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
|
||||
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
|
||||
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
|
||||
}
|
||||
}
|
||||
|
||||
stats := db.Stats()
|
||||
maxOpenConnections = float64(stats.MaxOpenConnections)
|
||||
openConnections = float64(stats.OpenConnections)
|
||||
inUse = float64(stats.InUse)
|
||||
idle = float64(stats.Idle)
|
||||
waitCount = float64(stats.WaitCount)
|
||||
maxIdleClosed = float64(stats.MaxIdleClosed)
|
||||
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
|
||||
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
|
||||
waitDuration = float64(stats.WaitDuration.Seconds())
|
||||
|
||||
lastUpdated = time.Now()
|
||||
}
|
||||
|
||||
options.Meter.Gauge(MaxOpenConnections, func() float64 {
|
||||
updateFn()
|
||||
return maxOpenConnections
|
||||
})
|
||||
|
||||
options.Meter.Gauge(OpenConnections, func() float64 {
|
||||
updateFn()
|
||||
return openConnections
|
||||
})
|
||||
|
||||
options.Meter.Gauge(InuseConnections, func() float64 {
|
||||
updateFn()
|
||||
return inUse
|
||||
})
|
||||
|
||||
options.Meter.Gauge(IdleConnections, func() float64 {
|
||||
updateFn()
|
||||
return idle
|
||||
})
|
||||
|
||||
options.Meter.Gauge(WaitConnections, func() float64 {
|
||||
updateFn()
|
||||
return waitCount
|
||||
})
|
||||
|
||||
options.Meter.Gauge(BlockedSeconds, func() float64 {
|
||||
updateFn()
|
||||
return waitDuration
|
||||
})
|
||||
|
||||
options.Meter.Gauge(MaxIdleClosed, func() float64 {
|
||||
updateFn()
|
||||
return maxIdleClosed
|
||||
})
|
||||
|
||||
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
|
||||
updateFn()
|
||||
return maxIdleTimeClosed
|
||||
})
|
||||
|
||||
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
|
||||
updateFn()
|
||||
return maxLifetimeClosed
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -52,12 +52,6 @@ type Options struct {
|
||||
AddStacktrace bool
|
||||
// DedupKeys deduplicate keys in log output
|
||||
DedupKeys bool
|
||||
// FatalFinalizers runs in order in [logger.Fatal] method
|
||||
FatalFinalizers []func(context.Context)
|
||||
}
|
||||
|
||||
var DefaultFatalFinalizer = func(ctx context.Context) {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// NewOptions creates new options struct
|
||||
@@ -71,7 +65,6 @@ func NewOptions(opts ...Option) Options {
|
||||
AddSource: true,
|
||||
TimeFunc: time.Now,
|
||||
Meter: meter.DefaultMeter,
|
||||
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
|
||||
}
|
||||
|
||||
WithMicroKeys()(&options)
|
||||
@@ -83,13 +76,6 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// WithFatalFinalizers set logger.Fatal finalizers
|
||||
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
|
||||
return func(o *Options) {
|
||||
o.FatalFinalizers = fncs
|
||||
}
|
||||
}
|
||||
|
||||
// WithContextAttrFuncs appends default funcs for the context attrs filler
|
||||
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
|
||||
return func(o *Options) {
|
||||
|
||||
@@ -4,12 +4,14 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/semconv"
|
||||
@@ -229,12 +231,11 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
|
||||
|
||||
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.printLog(ctx, logger.FatalLevel, msg, attrs...)
|
||||
for _, fn := range s.opts.FatalFinalizers {
|
||||
fn(ctx)
|
||||
}
|
||||
if closer, ok := s.opts.Out.(io.Closer); ok {
|
||||
closer.Close()
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
|
||||
@@ -469,25 +469,3 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
||||
|
||||
// t.Logf("xxx %s", buf.Bytes())
|
||||
}
|
||||
|
||||
func TestFatalFinalizers(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
buf := bytes.NewBuffer(nil)
|
||||
l := NewLogger(
|
||||
logger.WithLevel(logger.TraceLevel),
|
||||
logger.WithOutput(buf),
|
||||
)
|
||||
if err := l.Init(
|
||||
logger.WithFatalFinalizers(func(ctx context.Context) {
|
||||
l.Info(ctx, "fatal finalizer")
|
||||
})); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
l.Fatal(ctx, "info_msg1")
|
||||
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
|
||||
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
|
||||
}
|
||||
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
|
||||
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,11 +49,9 @@ type Meter interface {
|
||||
Set(opts ...Option) Meter
|
||||
// Histogram get or create histogram
|
||||
Histogram(name string, labels ...string) Histogram
|
||||
// HistogramExt get or create histogram with specified quantiles
|
||||
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
|
||||
// Summary get or create summary
|
||||
Summary(name string, labels ...string) Summary
|
||||
// SummaryExt get or create summary with specified quantiles and window time
|
||||
// SummaryExt get or create summary with spcified quantiles and window time
|
||||
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
|
||||
// Write writes metrics to io.Writer
|
||||
Write(w io.Writer, opts ...Option) error
|
||||
@@ -61,8 +59,6 @@ type Meter interface {
|
||||
Options() Options
|
||||
// String return meter type
|
||||
String() string
|
||||
// Unregister metric name and drop all data
|
||||
Unregister(name string, labels ...string) bool
|
||||
}
|
||||
|
||||
// Counter is a counter
|
||||
@@ -84,11 +80,7 @@ type FloatCounter interface {
|
||||
|
||||
// Gauge is a float64 gauge
|
||||
type Gauge interface {
|
||||
Add(float64)
|
||||
Get() float64
|
||||
Set(float64)
|
||||
Dec()
|
||||
Inc()
|
||||
}
|
||||
|
||||
// Histogram is a histogram for non-negative values with automatically created buckets
|
||||
|
||||
@@ -28,10 +28,6 @@ func (r *noopMeter) Name() string {
|
||||
return r.opts.Name
|
||||
}
|
||||
|
||||
func (r *noopMeter) Unregister(name string, labels ...string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Init initialize options
|
||||
func (r *noopMeter) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
@@ -70,11 +66,6 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
|
||||
return &noopHistogram{labels: labels}
|
||||
}
|
||||
|
||||
// HistogramExt implements the Meter interface
|
||||
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
|
||||
return &noopHistogram{labels: labels}
|
||||
}
|
||||
|
||||
// Set implements the Meter interface
|
||||
func (r *noopMeter) Set(opts ...Option) Meter {
|
||||
m := &noopMeter{opts: r.opts}
|
||||
@@ -141,18 +132,6 @@ type noopGauge struct {
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopGauge) Add(float64) {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Set(float64) {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Inc() {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Dec() {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Get() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
)
|
||||
|
||||
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
||||
|
||||
// Option powers the configuration for metrics implementations:
|
||||
type Option func(*Options)
|
||||
|
||||
@@ -25,8 +23,6 @@ type Options struct {
|
||||
WriteProcessMetrics bool
|
||||
// WriteFDMetrics flag to write fd metrics
|
||||
WriteFDMetrics bool
|
||||
// Quantiles specifies buckets for histogram
|
||||
Quantiles []float64
|
||||
}
|
||||
|
||||
// NewOptions prepares a set of options:
|
||||
@@ -65,12 +61,14 @@ func Address(value string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Quantiles defines the desired spread of statistics for histogram metrics:
|
||||
func Quantiles(quantiles []float64) Option {
|
||||
/*
|
||||
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
|
||||
func TimingObjectives(value map[float64]float64) Option {
|
||||
return func(o *Options) {
|
||||
o.Quantiles = quantiles
|
||||
o.TimingObjectives = value
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Labels add the meter labels
|
||||
func Labels(ls ...string) Option {
|
||||
|
||||
@@ -6,18 +6,18 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/meter"
|
||||
"go.unistack.org/micro/v4/semconv"
|
||||
)
|
||||
|
||||
func unregisterMetrics(size int) {
|
||||
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size))
|
||||
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size))
|
||||
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size))
|
||||
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
|
||||
}
|
||||
var (
|
||||
pools = make([]Statser, 0)
|
||||
poolsMu sync.Mutex
|
||||
)
|
||||
|
||||
// Stats struct
|
||||
type Stats struct {
|
||||
Get uint64
|
||||
Put uint64
|
||||
@@ -25,13 +25,41 @@ type Stats struct {
|
||||
Ret uint64
|
||||
}
|
||||
|
||||
// Statser provides buffer pool stats
|
||||
type Statser interface {
|
||||
Stats() Stats
|
||||
Cap() int
|
||||
}
|
||||
|
||||
func init() {
|
||||
go newStatsMeter()
|
||||
}
|
||||
|
||||
func newStatsMeter() {
|
||||
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
poolsMu.Lock()
|
||||
for _, st := range pools {
|
||||
stats := st.Stats()
|
||||
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
||||
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
||||
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
||||
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
||||
}
|
||||
poolsMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
_ Statser = (*BytePool)(nil)
|
||||
_ Statser = (*BytesPool)(nil)
|
||||
_ Statser = (*StringsPool)(nil)
|
||||
)
|
||||
|
||||
type Pool[T any] struct {
|
||||
p *sync.Pool
|
||||
get *atomic.Uint64
|
||||
put *atomic.Uint64
|
||||
mis *atomic.Uint64
|
||||
ret *atomic.Uint64
|
||||
c int
|
||||
p *sync.Pool
|
||||
}
|
||||
|
||||
func (p Pool[T]) Put(t T) {
|
||||
@@ -42,82 +70,37 @@ func (p Pool[T]) Get() T {
|
||||
return p.p.Get().(T)
|
||||
}
|
||||
|
||||
func NewPool[T any](fn func() T, size int) Pool[T] {
|
||||
p := Pool[T]{
|
||||
c: size,
|
||||
get: &atomic.Uint64{},
|
||||
put: &atomic.Uint64{},
|
||||
mis: &atomic.Uint64{},
|
||||
ret: &atomic.Uint64{},
|
||||
}
|
||||
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
p.mis.Add(1)
|
||||
return fn()
|
||||
func NewPool[T any](fn func() T) Pool[T] {
|
||||
return Pool[T]{
|
||||
p: &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return fn()
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
|
||||
return float64(p.get.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
|
||||
return float64(p.put.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
|
||||
return float64(p.mis.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
|
||||
return float64(p.ret.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
type BytePool struct {
|
||||
p *sync.Pool
|
||||
get *atomic.Uint64
|
||||
put *atomic.Uint64
|
||||
mis *atomic.Uint64
|
||||
ret *atomic.Uint64
|
||||
get uint64
|
||||
put uint64
|
||||
mis uint64
|
||||
ret uint64
|
||||
c int
|
||||
}
|
||||
|
||||
func NewBytePool(size int) *BytePool {
|
||||
p := &BytePool{
|
||||
c: size,
|
||||
get: &atomic.Uint64{},
|
||||
put: &atomic.Uint64{},
|
||||
mis: &atomic.Uint64{},
|
||||
ret: &atomic.Uint64{},
|
||||
}
|
||||
p := &BytePool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
p.mis.Add(1)
|
||||
atomic.AddUint64(&p.mis, 1)
|
||||
b := make([]byte, 0, size)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
|
||||
return float64(p.get.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
|
||||
return float64(p.put.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
|
||||
return float64(p.mis.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
|
||||
return float64(p.ret.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
poolsMu.Lock()
|
||||
pools = append(pools, p)
|
||||
poolsMu.Unlock()
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -127,73 +110,49 @@ func (p *BytePool) Cap() int {
|
||||
|
||||
func (p *BytePool) Stats() Stats {
|
||||
return Stats{
|
||||
Put: p.put.Load(),
|
||||
Get: p.get.Load(),
|
||||
Mis: p.mis.Load(),
|
||||
Ret: p.ret.Load(),
|
||||
Put: atomic.LoadUint64(&p.put),
|
||||
Get: atomic.LoadUint64(&p.get),
|
||||
Mis: atomic.LoadUint64(&p.mis),
|
||||
Ret: atomic.LoadUint64(&p.ret),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *BytePool) Get() *[]byte {
|
||||
p.get.Add(1)
|
||||
atomic.AddUint64(&p.get, 1)
|
||||
return p.p.Get().(*[]byte)
|
||||
}
|
||||
|
||||
func (p *BytePool) Put(b *[]byte) {
|
||||
p.put.Add(1)
|
||||
atomic.AddUint64(&p.put, 1)
|
||||
if cap(*b) > p.c {
|
||||
p.ret.Add(1)
|
||||
atomic.AddUint64(&p.ret, 1)
|
||||
return
|
||||
}
|
||||
*b = (*b)[:0]
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
||||
func (p *BytePool) Close() {
|
||||
unregisterMetrics(p.c)
|
||||
}
|
||||
|
||||
type BytesPool struct {
|
||||
p *sync.Pool
|
||||
get *atomic.Uint64
|
||||
put *atomic.Uint64
|
||||
mis *atomic.Uint64
|
||||
ret *atomic.Uint64
|
||||
get uint64
|
||||
put uint64
|
||||
mis uint64
|
||||
ret uint64
|
||||
c int
|
||||
}
|
||||
|
||||
func NewBytesPool(size int) *BytesPool {
|
||||
p := &BytesPool{
|
||||
c: size,
|
||||
get: &atomic.Uint64{},
|
||||
put: &atomic.Uint64{},
|
||||
mis: &atomic.Uint64{},
|
||||
ret: &atomic.Uint64{},
|
||||
}
|
||||
p := &BytesPool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
p.mis.Add(1)
|
||||
atomic.AddUint64(&p.mis, 1)
|
||||
b := bytes.NewBuffer(make([]byte, 0, size))
|
||||
return b
|
||||
},
|
||||
}
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
|
||||
return float64(p.get.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
|
||||
return float64(p.put.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
|
||||
return float64(p.mis.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
|
||||
return float64(p.ret.Load())
|
||||
}, "capacity", strconv.Itoa(p.c))
|
||||
|
||||
poolsMu.Lock()
|
||||
pools = append(pools, p)
|
||||
poolsMu.Unlock()
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -203,10 +162,10 @@ func (p *BytesPool) Cap() int {
|
||||
|
||||
func (p *BytesPool) Stats() Stats {
|
||||
return Stats{
|
||||
Put: p.put.Load(),
|
||||
Get: p.get.Load(),
|
||||
Mis: p.mis.Load(),
|
||||
Ret: p.ret.Load(),
|
||||
Put: atomic.LoadUint64(&p.put),
|
||||
Get: atomic.LoadUint64(&p.get),
|
||||
Mis: atomic.LoadUint64(&p.mis),
|
||||
Ret: atomic.LoadUint64(&p.ret),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,43 +174,34 @@ func (p *BytesPool) Get() *bytes.Buffer {
|
||||
}
|
||||
|
||||
func (p *BytesPool) Put(b *bytes.Buffer) {
|
||||
p.put.Add(1)
|
||||
if (*b).Cap() > p.c {
|
||||
p.ret.Add(1)
|
||||
atomic.AddUint64(&p.ret, 1)
|
||||
return
|
||||
}
|
||||
b.Reset()
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
||||
func (p *BytesPool) Close() {
|
||||
unregisterMetrics(p.c)
|
||||
}
|
||||
|
||||
type StringsPool struct {
|
||||
p *sync.Pool
|
||||
get *atomic.Uint64
|
||||
put *atomic.Uint64
|
||||
mis *atomic.Uint64
|
||||
ret *atomic.Uint64
|
||||
get uint64
|
||||
put uint64
|
||||
mis uint64
|
||||
ret uint64
|
||||
c int
|
||||
}
|
||||
|
||||
func NewStringsPool(size int) *StringsPool {
|
||||
p := &StringsPool{
|
||||
c: size,
|
||||
get: &atomic.Uint64{},
|
||||
put: &atomic.Uint64{},
|
||||
mis: &atomic.Uint64{},
|
||||
ret: &atomic.Uint64{},
|
||||
}
|
||||
p := &StringsPool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
p.mis.Add(1)
|
||||
atomic.AddUint64(&p.mis, 1)
|
||||
return &strings.Builder{}
|
||||
},
|
||||
}
|
||||
|
||||
poolsMu.Lock()
|
||||
pools = append(pools, p)
|
||||
poolsMu.Unlock()
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -261,28 +211,24 @@ func (p *StringsPool) Cap() int {
|
||||
|
||||
func (p *StringsPool) Stats() Stats {
|
||||
return Stats{
|
||||
Put: p.put.Load(),
|
||||
Get: p.get.Load(),
|
||||
Mis: p.mis.Load(),
|
||||
Ret: p.ret.Load(),
|
||||
Put: atomic.LoadUint64(&p.put),
|
||||
Get: atomic.LoadUint64(&p.get),
|
||||
Mis: atomic.LoadUint64(&p.mis),
|
||||
Ret: atomic.LoadUint64(&p.ret),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *StringsPool) Get() *strings.Builder {
|
||||
p.get.Add(1)
|
||||
atomic.AddUint64(&p.get, 1)
|
||||
return p.p.Get().(*strings.Builder)
|
||||
}
|
||||
|
||||
func (p *StringsPool) Put(b *strings.Builder) {
|
||||
p.put.Add(1)
|
||||
atomic.AddUint64(&p.put, 1)
|
||||
if b.Cap() > p.c {
|
||||
p.ret.Add(1)
|
||||
atomic.AddUint64(&p.ret, 1)
|
||||
return
|
||||
}
|
||||
b.Reset()
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
||||
func (p *StringsPool) Close() {
|
||||
unregisterMetrics(p.c)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user