Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 44ec3b663b | |||
| 4bb73514e9 | |||
| 3e86864ce7 | |||
|
|
a68d3b24b8 | ||
| 9c22ae5384 | |||
|
|
16bad9a0cd | ||
|
|
3c779b248f |
@@ -1,5 +1,5 @@
|
||||
# Micro
|
||||

|
||||

|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
[](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
|
||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
|
||||
|
||||
@@ -17,11 +17,6 @@ import (
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
// DefaultCodecs will be used to encode/decode data
|
||||
var DefaultCodecs = map[string]codec.Codec{
|
||||
"application/octet-stream": codec.NewCodec(),
|
||||
}
|
||||
|
||||
type noopClient struct {
|
||||
funcPublish FuncPublish
|
||||
funcBatchPublish FuncBatchPublish
|
||||
@@ -178,9 +173,6 @@ func (n *noopClient) newCodec(contentType string) (codec.Codec, error) {
|
||||
if cf, ok := n.opts.Codecs[contentType]; ok {
|
||||
return cf, nil
|
||||
}
|
||||
if cf, ok := DefaultCodecs[contentType]; ok {
|
||||
return cf, nil
|
||||
}
|
||||
return nil, codec.ErrUnknownContentType
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
)
|
||||
|
||||
type testHook struct {
|
||||
@@ -19,7 +21,7 @@ func (t *testHook) Publish(fn FuncPublish) FuncPublish {
|
||||
func TestNoopHook(t *testing.T) {
|
||||
h := &testHook{}
|
||||
|
||||
c := NewClient(Hooks(HookPublish(h.Publish)))
|
||||
c := NewClient(Codec("application/octet-stream", codec.NewCodec()), Hooks(HookPublish(h.Publish)))
|
||||
|
||||
if err := c.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -198,7 +198,7 @@ func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Context: context.Background(),
|
||||
ContentType: DefaultContentType,
|
||||
Codecs: DefaultCodecs,
|
||||
Codecs: make(map[string]codec.Codec),
|
||||
CallOptions: CallOptions{
|
||||
Context: context.Background(),
|
||||
Backoff: DefaultBackoff,
|
||||
|
||||
235
cluster/hasql/cluster.go
Normal file
235
cluster/hasql/cluster.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"reflect"
|
||||
"unsafe"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
func newSQLRowError() *sql.Row {
|
||||
row := &sql.Row{}
|
||||
t := reflect.TypeOf(row).Elem()
|
||||
field, _ := t.FieldByName("err")
|
||||
rowPtr := unsafe.Pointer(row)
|
||||
errFieldPtr := unsafe.Pointer(uintptr(rowPtr) + field.Offset)
|
||||
errPtr := (*error)(errFieldPtr)
|
||||
*errPtr = ErrorNoAliveNodes
|
||||
return row
|
||||
}
|
||||
|
||||
type ClusterQuerier interface {
|
||||
Querier
|
||||
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
hasql *hasql.Cluster[Querier]
|
||||
options ClusterOptions
|
||||
}
|
||||
|
||||
// NewCluster returns [Querier] that provides cluster of nodes
|
||||
func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
|
||||
options := ClusterOptions{Context: context.Background()}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
if options.NodeChecker == nil {
|
||||
return nil, ErrClusterChecker
|
||||
}
|
||||
if options.NodeDiscoverer == nil {
|
||||
return nil, ErrClusterDiscoverer
|
||||
}
|
||||
if options.NodePicker == nil {
|
||||
return nil, ErrClusterPicker
|
||||
}
|
||||
|
||||
if options.Retries < 1 {
|
||||
options.Retries = 1
|
||||
}
|
||||
|
||||
if options.NodeStateCriterion == 0 {
|
||||
options.NodeStateCriterion = hasql.Primary
|
||||
}
|
||||
|
||||
options.Options = append(options.Options, hasql.WithNodePicker(options.NodePicker))
|
||||
if p, ok := options.NodePicker.(*CustomPicker[Querier]); ok {
|
||||
p.opts.Priority = options.NodePriority
|
||||
}
|
||||
|
||||
c, err := hasql.NewCluster(
|
||||
options.NodeDiscoverer,
|
||||
options.NodeChecker,
|
||||
options.Options...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Cluster{hasql: c, options: options}, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
|
||||
var tx *sql.Tx
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if tx, err = n.DB().BeginTx(ctx, opts); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if tx == nil && err == nil {
|
||||
err = ErrorNoAliveNodes
|
||||
}
|
||||
|
||||
return tx, err
|
||||
}
|
||||
|
||||
func (c *Cluster) Close() error {
|
||||
return c.hasql.Close()
|
||||
}
|
||||
|
||||
func (c *Cluster) Conn(ctx context.Context) (*sql.Conn, error) {
|
||||
var conn *sql.Conn
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if conn, err = n.DB().Conn(ctx); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if conn == nil && err == nil {
|
||||
err = ErrorNoAliveNodes
|
||||
}
|
||||
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (c *Cluster) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||
var res sql.Result
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if res, err = n.DB().ExecContext(ctx, query, args...); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if res == nil && err == nil {
|
||||
err = ErrorNoAliveNodes
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (c *Cluster) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
|
||||
var res *sql.Stmt
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if res, err = n.DB().PrepareContext(ctx, query); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if res == nil && err == nil {
|
||||
err = ErrorNoAliveNodes
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
var res *sql.Rows
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if res, err = n.DB().QueryContext(ctx, query); err != nil && err != sql.ErrNoRows && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if res == nil && err == nil {
|
||||
err = ErrorNoAliveNodes
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (c *Cluster) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
||||
var res *sql.Row
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
res = n.DB().QueryRowContext(ctx, query, args...)
|
||||
if res.Err() == nil {
|
||||
return false
|
||||
} else if res.Err() != nil && retries >= c.options.Retries {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if res == nil {
|
||||
res = newSQLRowError()
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (c *Cluster) PingContext(ctx context.Context) error {
|
||||
var err error
|
||||
var ok bool
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
ok = true
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if err = n.DB().PingContext(ctx); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if !ok {
|
||||
err = ErrorNoAliveNodes
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStateCriterion) error {
|
||||
for _, criterion := range criterions {
|
||||
if _, err := c.hasql.WaitForNode(ctx, criterion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
171
cluster/hasql/cluster_test.go
Normal file
171
cluster/hasql/cluster_test.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
func TestNewCluster(t *testing.T) {
|
||||
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbMaster.Close()
|
||||
dbMasterMock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(1, 0)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("master-dc1"))
|
||||
|
||||
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbDRMaster.Close()
|
||||
dbDRMasterMock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 40)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("drmaster1-dc2"))
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("drmaster"))
|
||||
|
||||
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbSlaveDC1.Close()
|
||||
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 50)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbSlaveDC2.Close()
|
||||
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 50)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, err := NewCluster[Querier](
|
||||
WithClusterContext(tctx),
|
||||
WithClusterNodeChecker(hasql.PostgreSQLChecker),
|
||||
WithClusterNodePicker(NewCustomPicker[Querier](
|
||||
CustomPickerMaxLag(100),
|
||||
)),
|
||||
WithClusterNodes(
|
||||
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
|
||||
ClusterNode{"master-dc1", dbMaster, 1},
|
||||
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
|
||||
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
|
||||
),
|
||||
WithClusterOptions(
|
||||
hasql.WithUpdateInterval[Querier](2*time.Second),
|
||||
hasql.WithUpdateTimeout[Querier](1*time.Second),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
node1Name := ""
|
||||
fmt.Printf("check for Standby\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.Standby), "SELECT node_name as name"); row.Err() != nil {
|
||||
t.Fatal(row.Err())
|
||||
} else if err = row.Scan(&node1Name); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if "slave-dc1" != node1Name {
|
||||
t.Fatalf("invalid node name %s != %s", "slave-dc1", node1Name)
|
||||
}
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
node2Name := ""
|
||||
fmt.Printf("check for PreferStandby\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() != nil {
|
||||
t.Fatal(row.Err())
|
||||
} else if err = row.Scan(&node2Name); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if "slave-dc1" != node2Name {
|
||||
t.Fatalf("invalid node name %s != %s", "slave-dc1", node2Name)
|
||||
}
|
||||
|
||||
node3Name := ""
|
||||
fmt.Printf("check for PreferPrimary\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferPrimary), "SELECT node_name as name"); row.Err() != nil {
|
||||
t.Fatal(row.Err())
|
||||
} else if err = row.Scan(&node3Name); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if "master-dc1" != node3Name {
|
||||
t.Fatalf("invalid node name %s != %s", "master-dc1", node3Name)
|
||||
}
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`.*`).WillReturnRows(sqlmock.NewRows([]string{"role"}).RowError(1, fmt.Errorf("row error")))
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
fmt.Printf("check for PreferStandby\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() == nil {
|
||||
t.Fatal("must return error")
|
||||
}
|
||||
|
||||
if dbMasterErr := dbMasterMock.ExpectationsWereMet(); dbMasterErr != nil {
|
||||
t.Error(dbMasterErr)
|
||||
}
|
||||
}
|
||||
25
cluster/hasql/db.go
Normal file
25
cluster/hasql/db.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
// Basic connection methods
|
||||
PingContext(ctx context.Context) error
|
||||
Close() error
|
||||
|
||||
// Query methods with context
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
|
||||
// Prepared statements with context
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
|
||||
// Transaction management with context
|
||||
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
|
||||
|
||||
Conn(ctx context.Context) (*sql.Conn, error)
|
||||
}
|
||||
295
cluster/hasql/driver.go
Normal file
295
cluster/hasql/driver.go
Normal file
@@ -0,0 +1,295 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier]
|
||||
func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
|
||||
driver := NewClusterDriver(db)
|
||||
connector, err := driver.OpenConnector("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sql.OpenDB(connector), nil
|
||||
}
|
||||
|
||||
// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier]
|
||||
type ClusterDriver struct {
|
||||
db ClusterQuerier
|
||||
}
|
||||
|
||||
// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier]
|
||||
func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
|
||||
return &ClusterDriver{db: db}
|
||||
}
|
||||
|
||||
// Open implements [driver.Driver.Open]
|
||||
func (d *ClusterDriver) Open(name string) (driver.Conn, error) {
|
||||
return d.Connect(context.Background())
|
||||
}
|
||||
|
||||
// OpenConnector implements [driver.DriverContext.OpenConnector]
|
||||
func (d *ClusterDriver) OpenConnector(name string) (driver.Connector, error) {
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// Connect implements [driver.Connector.Connect]
|
||||
func (d *ClusterDriver) Connect(ctx context.Context) (driver.Conn, error) {
|
||||
conn, err := d.db.Conn(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dbConn{conn: conn}, nil
|
||||
}
|
||||
|
||||
// Driver implements [driver.Connector.Driver]
|
||||
func (d *ClusterDriver) Driver() driver.Driver {
|
||||
return d
|
||||
}
|
||||
|
||||
// dbConn implements driver.Conn with both context and legacy methods
|
||||
type dbConn struct {
|
||||
conn *sql.Conn
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Prepare implements [driver.Conn.Prepare] (legacy method)
|
||||
func (c *dbConn) Prepare(query string) (driver.Stmt, error) {
|
||||
return c.PrepareContext(context.Background(), query)
|
||||
}
|
||||
|
||||
// PrepareContext implements [driver.ConnPrepareContext.PrepareContext]
|
||||
func (c *dbConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
stmt, err := c.conn.PrepareContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &dbStmt{stmt: stmt}, nil
|
||||
}
|
||||
|
||||
// Exec implements [driver.Execer.Exec] (legacy method)
|
||||
func (c *dbConn) Exec(query string, args []driver.Value) (driver.Result, error) {
|
||||
namedArgs := make([]driver.NamedValue, len(args))
|
||||
for i, value := range args {
|
||||
namedArgs[i] = driver.NamedValue{Value: value}
|
||||
}
|
||||
return c.ExecContext(context.Background(), query, namedArgs)
|
||||
}
|
||||
|
||||
// ExecContext implements [driver.ExecerContext.ExecContext]
|
||||
func (c *dbConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Convert driver.NamedValue to any
|
||||
interfaceArgs := make([]any, len(args))
|
||||
for i, arg := range args {
|
||||
interfaceArgs[i] = arg.Value
|
||||
}
|
||||
|
||||
return c.conn.ExecContext(ctx, query, interfaceArgs...)
|
||||
}
|
||||
|
||||
// Query implements [driver.Queryer.Query] (legacy method)
|
||||
func (c *dbConn) Query(query string, args []driver.Value) (driver.Rows, error) {
|
||||
namedArgs := make([]driver.NamedValue, len(args))
|
||||
for i, value := range args {
|
||||
namedArgs[i] = driver.NamedValue{Value: value}
|
||||
}
|
||||
return c.QueryContext(context.Background(), query, namedArgs)
|
||||
}
|
||||
|
||||
// QueryContext implements [driver.QueryerContext.QueryContext]
|
||||
func (c *dbConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Convert driver.NamedValue to any
|
||||
interfaceArgs := make([]any, len(args))
|
||||
for i, arg := range args {
|
||||
interfaceArgs[i] = arg.Value
|
||||
}
|
||||
|
||||
rows, err := c.conn.QueryContext(ctx, query, interfaceArgs...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &dbRows{rows: rows}, nil
|
||||
}
|
||||
|
||||
// Begin implements [driver.Conn.Begin] (legacy method)
|
||||
func (c *dbConn) Begin() (driver.Tx, error) {
|
||||
return c.BeginTx(context.Background(), driver.TxOptions{})
|
||||
}
|
||||
|
||||
// BeginTx implements [driver.ConnBeginTx.BeginTx]
|
||||
func (c *dbConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
sqlOpts := &sql.TxOptions{
|
||||
Isolation: sql.IsolationLevel(opts.Isolation),
|
||||
ReadOnly: opts.ReadOnly,
|
||||
}
|
||||
|
||||
tx, err := c.conn.BeginTx(ctx, sqlOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &dbTx{tx: tx}, nil
|
||||
}
|
||||
|
||||
// Ping implements [driver.Pinger.Ping]
|
||||
func (c *dbConn) Ping(ctx context.Context) error {
|
||||
return c.conn.PingContext(ctx)
|
||||
}
|
||||
|
||||
// Close implements [driver.Conn.Close]
|
||||
func (c *dbConn) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// IsValid implements [driver.Validator.IsValid]
|
||||
func (c *dbConn) IsValid() bool {
|
||||
// Ping with a short timeout to check if the connection is still valid
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
return c.conn.PingContext(ctx) == nil
|
||||
}
|
||||
|
||||
// dbStmt implements [driver.Stmt] with both context and legacy methods
|
||||
type dbStmt struct {
|
||||
stmt *sql.Stmt
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Close implements [driver.Stmt.Close]
|
||||
func (s *dbStmt) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.stmt.Close()
|
||||
}
|
||||
|
||||
// Close implements [driver.Stmt.NumInput]
|
||||
func (s *dbStmt) NumInput() int {
|
||||
return -1 // Number of parameters is unknown
|
||||
}
|
||||
|
||||
// Exec implements [driver.Stmt.Exec] (legacy method)
|
||||
func (s *dbStmt) Exec(args []driver.Value) (driver.Result, error) {
|
||||
namedArgs := make([]driver.NamedValue, len(args))
|
||||
for i, value := range args {
|
||||
namedArgs[i] = driver.NamedValue{Value: value}
|
||||
}
|
||||
return s.ExecContext(context.Background(), namedArgs)
|
||||
}
|
||||
|
||||
// ExecContext implements [driver.StmtExecContext.ExecContext]
|
||||
func (s *dbStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
interfaceArgs := make([]any, len(args))
|
||||
for i, arg := range args {
|
||||
interfaceArgs[i] = arg.Value
|
||||
}
|
||||
return s.stmt.ExecContext(ctx, interfaceArgs...)
|
||||
}
|
||||
|
||||
// Query implements [driver.Stmt.Query] (legacy method)
|
||||
func (s *dbStmt) Query(args []driver.Value) (driver.Rows, error) {
|
||||
namedArgs := make([]driver.NamedValue, len(args))
|
||||
for i, value := range args {
|
||||
namedArgs[i] = driver.NamedValue{Value: value}
|
||||
}
|
||||
return s.QueryContext(context.Background(), namedArgs)
|
||||
}
|
||||
|
||||
// QueryContext implements [driver.StmtQueryContext.QueryContext]
|
||||
func (s *dbStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
interfaceArgs := make([]any, len(args))
|
||||
for i, arg := range args {
|
||||
interfaceArgs[i] = arg.Value
|
||||
}
|
||||
|
||||
rows, err := s.stmt.QueryContext(ctx, interfaceArgs...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &dbRows{rows: rows}, nil
|
||||
}
|
||||
|
||||
// dbRows implements [driver.Rows]
|
||||
type dbRows struct {
|
||||
rows *sql.Rows
|
||||
}
|
||||
|
||||
// Columns implements [driver.Rows.Columns]
|
||||
func (r *dbRows) Columns() []string {
|
||||
cols, err := r.rows.Columns()
|
||||
if err != nil {
|
||||
// This shouldn't happen if the query was successful
|
||||
return []string{}
|
||||
}
|
||||
return cols
|
||||
}
|
||||
|
||||
// Close implements [driver.Rows.Close]
|
||||
func (r *dbRows) Close() error {
|
||||
return r.rows.Close()
|
||||
}
|
||||
|
||||
// Next implements [driver.Rows.Next]
|
||||
func (r *dbRows) Next(dest []driver.Value) error {
|
||||
if !r.rows.Next() {
|
||||
if err := r.rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
// Create a slice of interfaces to scan into
|
||||
scanArgs := make([]any, len(dest))
|
||||
for i := range scanArgs {
|
||||
scanArgs[i] = &dest[i]
|
||||
}
|
||||
|
||||
return r.rows.Scan(scanArgs...)
|
||||
}
|
||||
|
||||
// dbTx implements [driver.Tx]
|
||||
type dbTx struct {
|
||||
tx *sql.Tx
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Commit implements [driver.Tx.Commit]
|
||||
func (t *dbTx) Commit() error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.tx.Commit()
|
||||
}
|
||||
|
||||
// Rollback implements [driver.Tx.Rollback]
|
||||
func (t *dbTx) Rollback() error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.tx.Rollback()
|
||||
}
|
||||
141
cluster/hasql/driver_test.go
Normal file
141
cluster/hasql/driver_test.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
func TestDriver(t *testing.T) {
|
||||
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbMaster.Close()
|
||||
dbMasterMock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(1, 0)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("master-dc1"))
|
||||
|
||||
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbDRMaster.Close()
|
||||
dbDRMasterMock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 40)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("drmaster1-dc2"))
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("drmaster"))
|
||||
|
||||
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbSlaveDC1.Close()
|
||||
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 50)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbSlaveDC2.Close()
|
||||
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 50)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, err := NewCluster[Querier](
|
||||
WithClusterContext(tctx),
|
||||
WithClusterNodeChecker(hasql.PostgreSQLChecker),
|
||||
WithClusterNodePicker(NewCustomPicker[Querier](
|
||||
CustomPickerMaxLag(100),
|
||||
)),
|
||||
WithClusterNodes(
|
||||
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
|
||||
ClusterNode{"master-dc1", dbMaster, 1},
|
||||
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
|
||||
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
|
||||
),
|
||||
WithClusterOptions(
|
||||
hasql.WithUpdateInterval[Querier](2*time.Second),
|
||||
hasql.WithUpdateTimeout[Querier](1*time.Second),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := OpenDBWithCluster(c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Use context methods
|
||||
row := db.QueryRowContext(NodeStateCriterion(t.Context(), hasql.Primary), "SELECT node_name as name")
|
||||
if err = row.Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nodeName := ""
|
||||
if err = row.Scan(&nodeName); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if nodeName != "master-dc1" {
|
||||
t.Fatalf("invalid node_name %s != %s", "master-dc1", nodeName)
|
||||
}
|
||||
}
|
||||
10
cluster/hasql/error.go
Normal file
10
cluster/hasql/error.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package sql
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrClusterChecker = errors.New("cluster node checker required")
|
||||
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
|
||||
ErrClusterPicker = errors.New("cluster node picker required")
|
||||
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
|
||||
)
|
||||
110
cluster/hasql/options.go
Normal file
110
cluster/hasql/options.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
// ClusterOptions contains cluster specific options
|
||||
type ClusterOptions struct {
|
||||
NodeChecker hasql.NodeChecker
|
||||
NodePicker hasql.NodePicker[Querier]
|
||||
NodeDiscoverer hasql.NodeDiscoverer[Querier]
|
||||
Options []hasql.ClusterOpt[Querier]
|
||||
Context context.Context
|
||||
Retries int
|
||||
NodePriority map[string]int32
|
||||
NodeStateCriterion hasql.NodeStateCriterion
|
||||
}
|
||||
|
||||
// ClusterOption apply cluster options to ClusterOptions
|
||||
type ClusterOption func(*ClusterOptions)
|
||||
|
||||
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
|
||||
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeChecker = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodePicker pass hasql.NodePicker to cluster options
|
||||
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodePicker = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
|
||||
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeDiscoverer = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetries retry count on other nodes in case of error
|
||||
func WithRetries(n int) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Retries = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterContext pass context.Context to cluster options and used for checks
|
||||
func WithClusterContext(ctx context.Context) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterOptions pass hasql.ClusterOpt
|
||||
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Options = append(o.Options, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
|
||||
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeStateCriterion = c
|
||||
}
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Name string
|
||||
DB Querier
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// WithClusterNodes create cluster with static NodeDiscoverer
|
||||
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
nodes := make([]*hasql.Node[Querier], 0, len(cns))
|
||||
if o.NodePriority == nil {
|
||||
o.NodePriority = make(map[string]int32, len(cns))
|
||||
}
|
||||
for _, cn := range cns {
|
||||
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
|
||||
if cn.Priority == 0 {
|
||||
cn.Priority = math.MaxInt32
|
||||
}
|
||||
o.NodePriority[cn.Name] = cn.Priority
|
||||
}
|
||||
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
type nodeStateCriterionKey struct{}
|
||||
|
||||
// NodeStateCriterion inject hasql.NodeStateCriterion to context
|
||||
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
|
||||
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
|
||||
}
|
||||
|
||||
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
|
||||
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
|
||||
return v
|
||||
}
|
||||
return c.options.NodeStateCriterion
|
||||
}
|
||||
113
cluster/hasql/picker.go
Normal file
113
cluster/hasql/picker.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
// compile time guard
|
||||
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
|
||||
|
||||
// CustomPickerOptions holds options to pick nodes
|
||||
type CustomPickerOptions struct {
|
||||
MaxLag int
|
||||
Priority map[string]int32
|
||||
Retries int
|
||||
}
|
||||
|
||||
// CustomPickerOption func apply option to CustomPickerOptions
|
||||
type CustomPickerOption func(*CustomPickerOptions)
|
||||
|
||||
// CustomPickerMaxLag specifies max lag for which node can be used
|
||||
func CustomPickerMaxLag(n int) CustomPickerOption {
|
||||
return func(o *CustomPickerOptions) {
|
||||
o.MaxLag = n
|
||||
}
|
||||
}
|
||||
|
||||
// NewCustomPicker creates new node picker
|
||||
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
|
||||
options := CustomPickerOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &CustomPicker[Querier]{opts: options}
|
||||
}
|
||||
|
||||
// CustomPicker holds node picker options
|
||||
type CustomPicker[T Querier] struct {
|
||||
opts CustomPickerOptions
|
||||
}
|
||||
|
||||
// PickNode used to return specific node
|
||||
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
|
||||
for _, n := range cnodes {
|
||||
fmt.Printf("node %s\n", n.Node.String())
|
||||
}
|
||||
return cnodes[0]
|
||||
}
|
||||
|
||||
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
|
||||
if prio, ok := p.opts.Priority[nodeName]; ok {
|
||||
return prio
|
||||
}
|
||||
return math.MaxInt32 // Default to lowest priority
|
||||
}
|
||||
|
||||
// CompareNodes used to sort nodes
|
||||
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
|
||||
// Get replication lag values
|
||||
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
|
||||
// First check that lag lower then MaxLag
|
||||
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
|
||||
return 0 // both are equal
|
||||
}
|
||||
|
||||
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
|
||||
if aLag > p.opts.MaxLag {
|
||||
return 1 // b is better
|
||||
}
|
||||
if bLag > p.opts.MaxLag {
|
||||
return -1 // a is better
|
||||
}
|
||||
|
||||
// Get node priorities
|
||||
aPrio := p.getPriority(a.Node.String())
|
||||
bPrio := p.getPriority(b.Node.String())
|
||||
|
||||
// if both priority equals
|
||||
if aPrio == bPrio {
|
||||
// First compare by replication lag
|
||||
if aLag < bLag {
|
||||
return -1
|
||||
}
|
||||
if aLag > bLag {
|
||||
return 1
|
||||
}
|
||||
// If replication lag is equal, compare by latency
|
||||
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
|
||||
if aLatency < bLatency {
|
||||
return -1
|
||||
}
|
||||
if aLatency > bLatency {
|
||||
return 1
|
||||
}
|
||||
|
||||
// If lag and latency is equal
|
||||
return 0
|
||||
}
|
||||
|
||||
// If priorities are different, prefer the node with lower priority value
|
||||
if aPrio < bPrio {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
531
cluster/sql/cluster.go
Normal file
531
cluster/sql/cluster.go
Normal file
@@ -0,0 +1,531 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
var errNoAliveNodes = errors.New("no alive nodes")
|
||||
|
||||
func newSQLRowError() *sql.Row {
|
||||
row := &sql.Row{}
|
||||
t := reflect.TypeOf(row).Elem()
|
||||
field, _ := t.FieldByName("err")
|
||||
rowPtr := unsafe.Pointer(row)
|
||||
errFieldPtr := unsafe.Pointer(uintptr(rowPtr) + field.Offset)
|
||||
errPtr := (*error)(errFieldPtr)
|
||||
*errPtr = errNoAliveNodes
|
||||
return row
|
||||
}
|
||||
|
||||
type ClusterQuerier interface {
|
||||
Querier
|
||||
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
// Basic connection methods
|
||||
PingContext(ctx context.Context) error
|
||||
Close() error
|
||||
|
||||
// Query methods with context
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
|
||||
// Prepared statements with context
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
|
||||
// Transaction management with context
|
||||
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
|
||||
|
||||
// Connection pool management
|
||||
SetConnMaxLifetime(d time.Duration)
|
||||
SetConnMaxIdleTime(d time.Duration)
|
||||
SetMaxOpenConns(n int)
|
||||
SetMaxIdleConns(n int)
|
||||
Stats() sql.DBStats
|
||||
|
||||
Conn(ctx context.Context) (*sql.Conn, error)
|
||||
}
|
||||
|
||||
var (
|
||||
ErrClusterChecker = errors.New("cluster node checker required")
|
||||
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
|
||||
ErrClusterPicker = errors.New("cluster node picker required")
|
||||
)
|
||||
|
||||
type Cluster struct {
|
||||
hasql *hasql.Cluster[Querier]
|
||||
options ClusterOptions
|
||||
}
|
||||
|
||||
// NewCluster returns Querier that provides cluster of nodes
|
||||
func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
|
||||
options := ClusterOptions{Context: context.Background()}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
if options.NodeChecker == nil {
|
||||
return nil, ErrClusterChecker
|
||||
}
|
||||
if options.NodeDiscoverer == nil {
|
||||
return nil, ErrClusterDiscoverer
|
||||
}
|
||||
if options.NodePicker == nil {
|
||||
return nil, ErrClusterPicker
|
||||
}
|
||||
|
||||
if options.Retries < 1 {
|
||||
options.Retries = 1
|
||||
}
|
||||
|
||||
if options.NodeStateCriterion == 0 {
|
||||
options.NodeStateCriterion = hasql.Primary
|
||||
}
|
||||
|
||||
options.Options = append(options.Options, hasql.WithNodePicker(options.NodePicker))
|
||||
if p, ok := options.NodePicker.(*CustomPicker[Querier]); ok {
|
||||
p.opts.Priority = options.NodePriority
|
||||
}
|
||||
|
||||
c, err := hasql.NewCluster(
|
||||
options.NodeDiscoverer,
|
||||
options.NodeChecker,
|
||||
options.Options...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Cluster{hasql: c, options: options}, nil
|
||||
}
|
||||
|
||||
// compile time guard
|
||||
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
|
||||
|
||||
type nodeStateCriterionKey struct{}
|
||||
|
||||
// NodeStateCriterion inject hasql.NodeStateCriterion to context
|
||||
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
|
||||
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
|
||||
}
|
||||
|
||||
// CustomPickerOptions holds options to pick nodes
|
||||
type CustomPickerOptions struct {
|
||||
MaxLag int
|
||||
Priority map[string]int32
|
||||
Retries int
|
||||
}
|
||||
|
||||
// CustomPickerOption func apply option to CustomPickerOptions
|
||||
type CustomPickerOption func(*CustomPickerOptions)
|
||||
|
||||
// CustomPickerMaxLag specifies max lag for which node can be used
|
||||
func CustomPickerMaxLag(n int) CustomPickerOption {
|
||||
return func(o *CustomPickerOptions) {
|
||||
o.MaxLag = n
|
||||
}
|
||||
}
|
||||
|
||||
// NewCustomPicker creates new node picker
|
||||
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
|
||||
options := CustomPickerOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &CustomPicker[Querier]{opts: options}
|
||||
}
|
||||
|
||||
// CustomPicker holds node picker options
|
||||
type CustomPicker[T Querier] struct {
|
||||
opts CustomPickerOptions
|
||||
}
|
||||
|
||||
// PickNode used to return specific node
|
||||
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
|
||||
for _, n := range cnodes {
|
||||
fmt.Printf("node %s\n", n.Node.String())
|
||||
}
|
||||
return cnodes[0]
|
||||
}
|
||||
|
||||
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
|
||||
if prio, ok := p.opts.Priority[nodeName]; ok {
|
||||
return prio
|
||||
}
|
||||
return math.MaxInt32 // Default to lowest priority
|
||||
}
|
||||
|
||||
// CompareNodes used to sort nodes
|
||||
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
|
||||
fmt.Printf("CompareNodes %s %s\n", a.Node.String(), b.Node.String())
|
||||
// Get replication lag values
|
||||
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
|
||||
// First check that lag lower then MaxLag
|
||||
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
|
||||
fmt.Printf("CompareNodes aLag > p.opts.MaxLag && bLag > p.opts.MaxLag\n")
|
||||
return 0 // both are equal
|
||||
}
|
||||
|
||||
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
|
||||
if aLag > p.opts.MaxLag {
|
||||
fmt.Printf("CompareNodes aLag > p.opts.MaxLag\n")
|
||||
return 1 // b is better
|
||||
}
|
||||
if bLag > p.opts.MaxLag {
|
||||
fmt.Printf("CompareNodes bLag > p.opts.MaxLag\n")
|
||||
return -1 // a is better
|
||||
}
|
||||
|
||||
// Get node priorities
|
||||
aPrio := p.getPriority(a.Node.String())
|
||||
bPrio := p.getPriority(b.Node.String())
|
||||
|
||||
// if both priority equals
|
||||
if aPrio == bPrio {
|
||||
fmt.Printf("CompareNodes aPrio == bPrio\n")
|
||||
// First compare by replication lag
|
||||
if aLag < bLag {
|
||||
fmt.Printf("CompareNodes aLag < bLag\n")
|
||||
return -1
|
||||
}
|
||||
if aLag > bLag {
|
||||
fmt.Printf("CompareNodes aLag > bLag\n")
|
||||
return 1
|
||||
}
|
||||
// If replication lag is equal, compare by latency
|
||||
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
|
||||
if aLatency < bLatency {
|
||||
return -1
|
||||
}
|
||||
if aLatency > bLatency {
|
||||
return 1
|
||||
}
|
||||
|
||||
// If lag and latency is equal
|
||||
return 0
|
||||
}
|
||||
|
||||
// If priorities are different, prefer the node with lower priority value
|
||||
if aPrio < bPrio {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
// ClusterOptions contains cluster specific options
|
||||
type ClusterOptions struct {
|
||||
NodeChecker hasql.NodeChecker
|
||||
NodePicker hasql.NodePicker[Querier]
|
||||
NodeDiscoverer hasql.NodeDiscoverer[Querier]
|
||||
Options []hasql.ClusterOpt[Querier]
|
||||
Context context.Context
|
||||
Retries int
|
||||
NodePriority map[string]int32
|
||||
NodeStateCriterion hasql.NodeStateCriterion
|
||||
}
|
||||
|
||||
// ClusterOption apply cluster options to ClusterOptions
|
||||
type ClusterOption func(*ClusterOptions)
|
||||
|
||||
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
|
||||
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeChecker = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodePicker pass hasql.NodePicker to cluster options
|
||||
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodePicker = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
|
||||
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeDiscoverer = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetries retry count on other nodes in case of error
|
||||
func WithRetries(n int) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Retries = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterContext pass context.Context to cluster options and used for checks
|
||||
func WithClusterContext(ctx context.Context) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterOptions pass hasql.ClusterOpt
|
||||
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Options = append(o.Options, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
|
||||
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeStateCriterion = c
|
||||
}
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Name string
|
||||
DB Querier
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// WithClusterNodes create cluster with static NodeDiscoverer
|
||||
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
nodes := make([]*hasql.Node[Querier], 0, len(cns))
|
||||
if o.NodePriority == nil {
|
||||
o.NodePriority = make(map[string]int32, len(cns))
|
||||
}
|
||||
for _, cn := range cns {
|
||||
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
|
||||
if cn.Priority == 0 {
|
||||
cn.Priority = math.MaxInt32
|
||||
}
|
||||
o.NodePriority[cn.Name] = cn.Priority
|
||||
}
|
||||
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
|
||||
var tx *sql.Tx
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if tx, err = n.DB().BeginTx(ctx, opts); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if tx == nil && err == nil {
|
||||
err = errNoAliveNodes
|
||||
}
|
||||
|
||||
return tx, err
|
||||
}
|
||||
|
||||
func (c *Cluster) Close() error {
|
||||
return c.hasql.Close()
|
||||
}
|
||||
|
||||
func (c *Cluster) Conn(ctx context.Context) (*sql.Conn, error) {
|
||||
var conn *sql.Conn
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if conn, err = n.DB().Conn(ctx); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if conn == nil && err == nil {
|
||||
err = errNoAliveNodes
|
||||
}
|
||||
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (c *Cluster) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||
var res sql.Result
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if res, err = n.DB().ExecContext(ctx, query, args...); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if res == nil && err == nil {
|
||||
err = errNoAliveNodes
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (c *Cluster) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
|
||||
var res *sql.Stmt
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if res, err = n.DB().PrepareContext(ctx, query); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if res == nil && err == nil {
|
||||
err = errNoAliveNodes
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
var res *sql.Rows
|
||||
var err error
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if res, err = n.DB().QueryContext(ctx, query); err != nil && err != sql.ErrNoRows && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if res == nil && err == nil {
|
||||
err = errNoAliveNodes
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (c *Cluster) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
||||
var res *sql.Row
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
res = n.DB().QueryRowContext(ctx, query, args...)
|
||||
if res.Err() == nil {
|
||||
return false
|
||||
} else if res.Err() != nil && retries >= c.options.Retries {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if res == nil {
|
||||
res = newSQLRowError()
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (c *Cluster) PingContext(ctx context.Context) error {
|
||||
var err error
|
||||
var ok bool
|
||||
|
||||
retries := 0
|
||||
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
||||
ok = true
|
||||
for ; retries < c.options.Retries; retries++ {
|
||||
if err = n.DB().PingContext(ctx); err != nil && retries >= c.options.Retries {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if !ok {
|
||||
err = errNoAliveNodes
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStateCriterion) error {
|
||||
for _, criterion := range criterions {
|
||||
if _, err := c.hasql.WaitForNode(ctx, criterion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) SetConnMaxLifetime(td time.Duration) {
|
||||
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
|
||||
n.DB().SetConnMaxIdleTime(td)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) SetConnMaxIdleTime(td time.Duration) {
|
||||
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
|
||||
n.DB().SetConnMaxIdleTime(td)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) SetMaxOpenConns(nc int) {
|
||||
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
|
||||
n.DB().SetMaxOpenConns(nc)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) SetMaxIdleConns(nc int) {
|
||||
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
|
||||
n.DB().SetMaxIdleConns(nc)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Cluster) Stats() sql.DBStats {
|
||||
s := sql.DBStats{}
|
||||
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
|
||||
st := n.DB().Stats()
|
||||
s.Idle += st.Idle
|
||||
s.InUse += st.InUse
|
||||
s.MaxIdleClosed += st.MaxIdleClosed
|
||||
s.MaxIdleTimeClosed += st.MaxIdleTimeClosed
|
||||
s.MaxOpenConnections += st.MaxOpenConnections
|
||||
s.OpenConnections += st.OpenConnections
|
||||
s.WaitCount += st.WaitCount
|
||||
s.WaitDuration += st.WaitDuration
|
||||
return false
|
||||
})
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
|
||||
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
|
||||
return v
|
||||
}
|
||||
return c.options.NodeStateCriterion
|
||||
}
|
||||
171
cluster/sql/cluster_test.go
Normal file
171
cluster/sql/cluster_test.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
func TestNewCluster(t *testing.T) {
|
||||
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbMaster.Close()
|
||||
dbMasterMock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(1, 0)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("master-dc1"))
|
||||
|
||||
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbDRMaster.Close()
|
||||
dbDRMasterMock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 40)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("drmaster1-dc2"))
|
||||
|
||||
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("drmaster"))
|
||||
|
||||
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbSlaveDC1.Close()
|
||||
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 50)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dbSlaveDC2.Close()
|
||||
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
|
||||
|
||||
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
|
||||
sqlmock.NewRowsWithColumnDefinition(
|
||||
sqlmock.NewColumn("role").OfType("int8", 0),
|
||||
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
|
||||
AddRow(2, 50)).
|
||||
RowsWillBeClosed().
|
||||
WithoutArgs()
|
||||
|
||||
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
c, err := NewCluster[Querier](
|
||||
WithClusterContext(tctx),
|
||||
WithClusterNodeChecker(hasql.PostgreSQLChecker),
|
||||
WithClusterNodePicker(NewCustomPicker[Querier](
|
||||
CustomPickerMaxLag(100),
|
||||
)),
|
||||
WithClusterNodes(
|
||||
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
|
||||
ClusterNode{"master-dc1", dbMaster, 1},
|
||||
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
|
||||
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
|
||||
),
|
||||
WithClusterOptions(
|
||||
hasql.WithUpdateInterval[Querier](2*time.Second),
|
||||
hasql.WithUpdateTimeout[Querier](1*time.Second),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
node1Name := ""
|
||||
fmt.Printf("check for Standby\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.Standby), "SELECT node_name as name"); row.Err() != nil {
|
||||
t.Fatal(row.Err())
|
||||
} else if err = row.Scan(&node1Name); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if "slave-dc1" != node1Name {
|
||||
t.Fatalf("invalid node name %s != %s", "slave-dc1", node1Name)
|
||||
}
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
|
||||
sqlmock.NewRows([]string{"name"}).
|
||||
AddRow("slave-dc1"))
|
||||
|
||||
node2Name := ""
|
||||
fmt.Printf("check for PreferStandby\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() != nil {
|
||||
t.Fatal(row.Err())
|
||||
} else if err = row.Scan(&node2Name); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if "slave-dc1" != node2Name {
|
||||
t.Fatalf("invalid node name %s != %s", "slave-dc1", node2Name)
|
||||
}
|
||||
|
||||
node3Name := ""
|
||||
fmt.Printf("check for PreferPrimary\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferPrimary), "SELECT node_name as name"); row.Err() != nil {
|
||||
t.Fatal(row.Err())
|
||||
} else if err = row.Scan(&node3Name); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if "master-dc1" != node3Name {
|
||||
t.Fatalf("invalid node name %s != %s", "master-dc1", node3Name)
|
||||
}
|
||||
|
||||
dbSlaveDC1Mock.ExpectQuery(`.*`).WillReturnRows(sqlmock.NewRows([]string{"role"}).RowError(1, fmt.Errorf("row error")))
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
fmt.Printf("check for PreferStandby\n")
|
||||
if row := c.QueryRowContext(NodeStateCriterion(tctx, hasql.PreferStandby), "SELECT node_name as name"); row.Err() == nil {
|
||||
t.Fatal("must return error")
|
||||
}
|
||||
|
||||
if dbMasterErr := dbMasterMock.ExpectationsWereMet(); dbMasterErr != nil {
|
||||
t.Error(dbMasterErr)
|
||||
}
|
||||
}
|
||||
3
go.mod
3
go.mod
@@ -1,6 +1,6 @@
|
||||
module go.unistack.org/micro/v3
|
||||
|
||||
go 1.22.0
|
||||
go 1.24.0
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.1
|
||||
@@ -15,6 +15,7 @@ require (
|
||||
go.uber.org/automaxprocs v1.6.0
|
||||
go.unistack.org/micro-proto/v3 v3.4.1
|
||||
golang.org/x/sync v0.10.0
|
||||
golang.yandex/hasql/v2 v2.1.0
|
||||
google.golang.org/grpc v1.69.2
|
||||
google.golang.org/protobuf v1.36.1
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
|
||||
2
go.sum
2
go.sum
@@ -89,6 +89,8 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||
golang.yandex/hasql/v2 v2.1.0 h1:7CaFFWeHoK5TvA+QvZzlKHlIN5sqNpqM8NSrXskZD/k=
|
||||
golang.yandex/hasql/v2 v2.1.0/go.mod h1:3Au1AxuJDCTXmS117BpbI6e+70kGWeyLR1qJAH6HdtA=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
|
||||
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
|
||||
|
||||
@@ -4,18 +4,20 @@ package logger
|
||||
type Level int8
|
||||
|
||||
const (
|
||||
// TraceLevel level usually used to find bugs, very verbose
|
||||
// TraceLevel usually used to find bugs, very verbose
|
||||
TraceLevel Level = iota - 2
|
||||
// DebugLevel level used only when enabled debugging
|
||||
// DebugLevel used only when enabled debugging
|
||||
DebugLevel
|
||||
// InfoLevel level used for general info about what's going on inside the application
|
||||
// InfoLevel used for general info about what's going on inside the application
|
||||
InfoLevel
|
||||
// WarnLevel level used for non-critical entries
|
||||
// WarnLevel used for non-critical entries
|
||||
WarnLevel
|
||||
// ErrorLevel level used for errors that should definitely be noted
|
||||
// ErrorLevel used for errors that should definitely be noted
|
||||
ErrorLevel
|
||||
// FatalLevel level used for critical errors and then calls `os.Exit(1)`
|
||||
// FatalLevel used for critical errors and then calls `os.Exit(1)`
|
||||
FatalLevel
|
||||
// NoneLevel used to disable logging
|
||||
NoneLevel
|
||||
)
|
||||
|
||||
// String returns logger level string representation
|
||||
@@ -33,6 +35,8 @@ func (l Level) String() string {
|
||||
return "error"
|
||||
case FatalLevel:
|
||||
return "fatal"
|
||||
case NoneLevel:
|
||||
return "none"
|
||||
}
|
||||
return "info"
|
||||
}
|
||||
@@ -58,6 +62,8 @@ func ParseLevel(lvl string) Level {
|
||||
return ErrorLevel
|
||||
case FatalLevel.String():
|
||||
return FatalLevel
|
||||
case NoneLevel.String():
|
||||
return NoneLevel
|
||||
}
|
||||
return InfoLevel
|
||||
}
|
||||
|
||||
@@ -52,6 +52,12 @@ type Options struct {
|
||||
AddStacktrace bool
|
||||
// DedupKeys deduplicate keys in log output
|
||||
DedupKeys bool
|
||||
// FatalFinalizers runs in order in [logger.Fatal] method
|
||||
FatalFinalizers []func(context.Context)
|
||||
}
|
||||
|
||||
var DefaultFatalFinalizer = func(ctx context.Context) {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// NewOptions creates new options struct
|
||||
@@ -65,6 +71,7 @@ func NewOptions(opts ...Option) Options {
|
||||
AddSource: true,
|
||||
TimeFunc: time.Now,
|
||||
Meter: meter.DefaultMeter,
|
||||
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
|
||||
}
|
||||
|
||||
WithMicroKeys()(&options)
|
||||
@@ -76,6 +83,13 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// WithFatalFinalizers set logger.Fatal finalizers
|
||||
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
|
||||
return func(o *Options) {
|
||||
o.FatalFinalizers = fncs
|
||||
}
|
||||
}
|
||||
|
||||
// WithContextAttrFuncs appends default funcs for the context attrs filler
|
||||
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
|
||||
return func(o *Options) {
|
||||
|
||||
@@ -4,14 +4,12 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
@@ -34,6 +32,7 @@ var (
|
||||
warnValue = slog.StringValue("warn")
|
||||
errorValue = slog.StringValue("error")
|
||||
fatalValue = slog.StringValue("fatal")
|
||||
noneValue = slog.StringValue("none")
|
||||
)
|
||||
|
||||
type wrapper struct {
|
||||
@@ -85,6 +84,8 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||
a.Value = errorValue
|
||||
case lvl >= logger.FatalLevel:
|
||||
a.Value = fatalValue
|
||||
case lvl >= logger.NoneLevel:
|
||||
a.Value = noneValue
|
||||
default:
|
||||
a.Value = infoValue
|
||||
}
|
||||
@@ -228,11 +229,12 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
|
||||
|
||||
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.printLog(ctx, logger.FatalLevel, msg, attrs...)
|
||||
for _, fn := range s.opts.FatalFinalizers {
|
||||
fn(ctx)
|
||||
}
|
||||
if closer, ok := s.opts.Out.(io.Closer); ok {
|
||||
closer.Close()
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
@@ -316,6 +318,8 @@ func loggerToSlogLevel(level logger.Level) slog.Level {
|
||||
return slog.LevelDebug - 1
|
||||
case logger.FatalLevel:
|
||||
return slog.LevelError + 1
|
||||
case logger.NoneLevel:
|
||||
return slog.LevelError + 2
|
||||
default:
|
||||
return slog.LevelInfo
|
||||
}
|
||||
@@ -333,6 +337,8 @@ func slogToLoggerLevel(level slog.Level) logger.Level {
|
||||
return logger.TraceLevel
|
||||
case slog.LevelError + 1:
|
||||
return logger.FatalLevel
|
||||
case slog.LevelError + 2:
|
||||
return logger.NoneLevel
|
||||
default:
|
||||
return logger.InfoLevel
|
||||
}
|
||||
|
||||
@@ -36,6 +36,24 @@ func TestStacktrace(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoneLevel(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
buf := bytes.NewBuffer(nil)
|
||||
l := NewLogger(logger.WithLevel(logger.NoneLevel), logger.WithOutput(buf),
|
||||
WithHandlerFunc(slog.NewTextHandler),
|
||||
logger.WithAddStacktrace(true),
|
||||
)
|
||||
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
l.Error(ctx, "msg1", errors.New("err"))
|
||||
|
||||
if buf.Len() != 0 {
|
||||
t.Fatalf("logger none level not works, buf contains: %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayedBuffer(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
buf := bytes.NewBuffer(nil)
|
||||
@@ -62,7 +80,7 @@ func TestTime(t *testing.T) {
|
||||
WithHandlerFunc(slog.NewTextHandler),
|
||||
logger.WithAddStacktrace(true),
|
||||
logger.WithTimeFunc(func() time.Time {
|
||||
return time.Unix(0, 0)
|
||||
return time.Unix(0, 0).UTC()
|
||||
}),
|
||||
)
|
||||
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
|
||||
@@ -71,8 +89,7 @@ func TestTime(t *testing.T) {
|
||||
|
||||
l.Error(ctx, "msg1", errors.New("err"))
|
||||
|
||||
if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T03:00:00.000000000+03:00`)) &&
|
||||
!bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
|
||||
if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
|
||||
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
@@ -406,15 +423,16 @@ func TestLogger(t *testing.T) {
|
||||
func Test_WithContextAttrFunc(t *testing.T) {
|
||||
loggerContextAttrFuncs := []logger.ContextAttrFunc{
|
||||
func(ctx context.Context) []interface{} {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
attrs := make([]interface{}, 0, 10)
|
||||
for k, v := range md {
|
||||
switch k {
|
||||
case "X-Request-Id", "Phone", "External-Id", "Source-Service", "X-App-Install-Id", "Client-Id", "Client-Ip":
|
||||
attrs = append(attrs, strings.ToLower(k), v)
|
||||
key := strings.ToLower(k)
|
||||
switch key {
|
||||
case "x-request-id", "phone", "external-Id", "source-service", "x-app-install-id", "client-id", "client-ip":
|
||||
attrs = append(attrs, key, v)
|
||||
}
|
||||
}
|
||||
return attrs
|
||||
@@ -424,7 +442,7 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
||||
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, loggerContextAttrFuncs...)
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx = metadata.AppendIncomingContext(ctx, "X-Request-Id", uuid.New().String(),
|
||||
ctx = metadata.AppendOutgoingContext(ctx, "X-Request-Id", uuid.New().String(),
|
||||
"Source-Service", "Test-System")
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
@@ -437,17 +455,39 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
||||
if !(bytes.Contains(buf.Bytes(), []byte(`"level":"info"`)) && bytes.Contains(buf.Bytes(), []byte(`"msg":"test message"`))) {
|
||||
t.Fatalf("logger info, buf %s", buf.Bytes())
|
||||
}
|
||||
if !(bytes.Contains(buf.Bytes(), []byte(`"x-request-id":"`))) {
|
||||
if !(bytes.Contains(buf.Bytes(), []byte(`"x-request-id":`))) {
|
||||
t.Fatalf("logger info, buf %s", buf.Bytes())
|
||||
}
|
||||
if !(bytes.Contains(buf.Bytes(), []byte(`"source-service":"Test-System"`))) {
|
||||
t.Fatalf("logger info, buf %s", buf.Bytes())
|
||||
}
|
||||
buf.Reset()
|
||||
imd, _ := metadata.FromIncomingContext(ctx)
|
||||
omd, _ := metadata.FromOutgoingContext(ctx)
|
||||
l.Info(ctx, "test message1")
|
||||
imd.Set("Source-Service", "Test-System2")
|
||||
omd.Set("Source-Service", "Test-System2")
|
||||
l.Info(ctx, "test message2")
|
||||
|
||||
// t.Logf("xxx %s", buf.Bytes())
|
||||
}
|
||||
|
||||
func TestFatalFinalizers(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
buf := bytes.NewBuffer(nil)
|
||||
l := NewLogger(
|
||||
logger.WithLevel(logger.TraceLevel),
|
||||
logger.WithOutput(buf),
|
||||
)
|
||||
if err := l.Init(
|
||||
logger.WithFatalFinalizers(func(ctx context.Context) {
|
||||
l.Info(ctx, "fatal finalizer")
|
||||
})); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
l.Fatal(ctx, "info_msg1")
|
||||
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
|
||||
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
|
||||
}
|
||||
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
|
||||
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@ package meter
|
||||
import (
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -49,9 +49,11 @@ type Meter interface {
|
||||
Set(opts ...Option) Meter
|
||||
// Histogram get or create histogram
|
||||
Histogram(name string, labels ...string) Histogram
|
||||
// HistogramExt get or create histogram with specified quantiles
|
||||
HistogramExt(name string, quantiles []float64, labels ...string) Histogram
|
||||
// Summary get or create summary
|
||||
Summary(name string, labels ...string) Summary
|
||||
// SummaryExt get or create summary with spcified quantiles and window time
|
||||
// SummaryExt get or create summary with specified quantiles and window time
|
||||
SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary
|
||||
// Write writes metrics to io.Writer
|
||||
Write(w io.Writer, opts ...Option) error
|
||||
@@ -59,6 +61,8 @@ type Meter interface {
|
||||
Options() Options
|
||||
// String return meter type
|
||||
String() string
|
||||
// Unregister metric name and drop all data
|
||||
Unregister(name string, labels ...string) bool
|
||||
}
|
||||
|
||||
// Counter is a counter
|
||||
@@ -80,7 +84,11 @@ type FloatCounter interface {
|
||||
|
||||
// Gauge is a float64 gauge
|
||||
type Gauge interface {
|
||||
Add(float64)
|
||||
Get() float64
|
||||
Set(float64)
|
||||
Dec()
|
||||
Inc()
|
||||
}
|
||||
|
||||
// Histogram is a histogram for non-negative values with automatically created buckets
|
||||
@@ -117,6 +125,39 @@ func BuildLabels(labels ...string) []string {
|
||||
return labels
|
||||
}
|
||||
|
||||
var spool = newStringsPool(500)
|
||||
|
||||
type stringsPool struct {
|
||||
p *sync.Pool
|
||||
c int
|
||||
}
|
||||
|
||||
func newStringsPool(size int) *stringsPool {
|
||||
p := &stringsPool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &strings.Builder{}
|
||||
},
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *stringsPool) Cap() int {
|
||||
return p.c
|
||||
}
|
||||
|
||||
func (p *stringsPool) Get() *strings.Builder {
|
||||
return p.p.Get().(*strings.Builder)
|
||||
}
|
||||
|
||||
func (p *stringsPool) Put(b *strings.Builder) {
|
||||
if b.Cap() > p.c {
|
||||
return
|
||||
}
|
||||
b.Reset()
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
||||
// BuildName used to combine metric with labels.
|
||||
// If labels count is odd, drop last element
|
||||
func BuildName(name string, labels ...string) string {
|
||||
@@ -125,8 +166,6 @@ func BuildName(name string, labels ...string) string {
|
||||
}
|
||||
|
||||
if len(labels) > 2 {
|
||||
sort.Sort(byKey(labels))
|
||||
|
||||
idx := 0
|
||||
for {
|
||||
if labels[idx] == labels[idx+2] {
|
||||
@@ -141,7 +180,9 @@ func BuildName(name string, labels ...string) string {
|
||||
}
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
b := spool.Get()
|
||||
defer spool.Put(b)
|
||||
|
||||
_, _ = b.WriteString(name)
|
||||
_, _ = b.WriteRune('{')
|
||||
for idx := 0; idx < len(labels); idx += 2 {
|
||||
@@ -149,8 +190,9 @@ func BuildName(name string, labels ...string) string {
|
||||
_, _ = b.WriteRune(',')
|
||||
}
|
||||
_, _ = b.WriteString(labels[idx])
|
||||
_, _ = b.WriteString(`=`)
|
||||
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
|
||||
_, _ = b.WriteString(`="`)
|
||||
_, _ = b.WriteString(labels[idx+1])
|
||||
_, _ = b.WriteRune('"')
|
||||
}
|
||||
_, _ = b.WriteRune('}')
|
||||
|
||||
|
||||
@@ -50,11 +50,12 @@ func TestBuildName(t *testing.T) {
|
||||
data := map[string][]string{
|
||||
`my_metric{firstlabel="value2",zerolabel="value3"}`: {
|
||||
"my_metric",
|
||||
"zerolabel", "value3", "firstlabel", "value2",
|
||||
"firstlabel", "value2",
|
||||
"zerolabel", "value3",
|
||||
},
|
||||
`my_metric{broker="broker2",register="mdns",server="tcp"}`: {
|
||||
"my_metric",
|
||||
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
|
||||
"broker", "broker1", "broker", "broker2", "register", "mdns", "server", "http", "server", "tcp",
|
||||
},
|
||||
`my_metric{aaa="aaa"}`: {
|
||||
"my_metric",
|
||||
|
||||
@@ -28,6 +28,10 @@ func (r *noopMeter) Name() string {
|
||||
return r.opts.Name
|
||||
}
|
||||
|
||||
func (r *noopMeter) Unregister(name string, labels ...string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Init initialize options
|
||||
func (r *noopMeter) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
@@ -66,6 +70,11 @@ func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
|
||||
return &noopHistogram{labels: labels}
|
||||
}
|
||||
|
||||
// HistogramExt implements the Meter interface
|
||||
func (r *noopMeter) HistogramExt(_ string, quantiles []float64, labels ...string) Histogram {
|
||||
return &noopHistogram{labels: labels}
|
||||
}
|
||||
|
||||
// Set implements the Meter interface
|
||||
func (r *noopMeter) Set(opts ...Option) Meter {
|
||||
m := &noopMeter{opts: r.opts}
|
||||
@@ -132,6 +141,18 @@ type noopGauge struct {
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopGauge) Add(float64) {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Set(float64) {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Inc() {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Dec() {
|
||||
}
|
||||
|
||||
func (r *noopGauge) Get() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
)
|
||||
|
||||
var DefaultQuantiles = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
||||
|
||||
// Option powers the configuration for metrics implementations:
|
||||
type Option func(*Options)
|
||||
|
||||
@@ -23,6 +25,8 @@ type Options struct {
|
||||
WriteProcessMetrics bool
|
||||
// WriteFDMetrics flag to write fd metrics
|
||||
WriteFDMetrics bool
|
||||
// Quantiles specifies buckets for histogram
|
||||
Quantiles []float64
|
||||
}
|
||||
|
||||
// NewOptions prepares a set of options:
|
||||
@@ -61,14 +65,12 @@ func Address(value string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
|
||||
func TimingObjectives(value map[float64]float64) Option {
|
||||
// Quantiles defines the desired spread of statistics for histogram metrics:
|
||||
func Quantiles(quantiles []float64) Option {
|
||||
return func(o *Options) {
|
||||
o.TimingObjectives = value
|
||||
o.Quantiles = quantiles
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Labels add the meter labels
|
||||
func Labels(ls ...string) Option {
|
||||
|
||||
@@ -21,11 +21,6 @@ import (
|
||||
"go.unistack.org/micro/v3/util/rand"
|
||||
)
|
||||
|
||||
// DefaultCodecs will be used to encode/decode
|
||||
var DefaultCodecs = map[string]codec.Codec{
|
||||
"application/octet-stream": codec.NewCodec(),
|
||||
}
|
||||
|
||||
const (
|
||||
defaultContentType = "application/json"
|
||||
)
|
||||
@@ -93,9 +88,6 @@ func (n *noopServer) newCodec(contentType string) (codec.Codec, error) {
|
||||
if cf, ok := n.opts.Codecs[contentType]; ok {
|
||||
return cf, nil
|
||||
}
|
||||
if cf, ok := DefaultCodecs[contentType]; ok {
|
||||
return cf, nil
|
||||
}
|
||||
return nil, codec.ErrUnknownContentType
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user