implement driver
Some checks failed
lint / lint (pull_request) Successful in 2m43s
test / test (pull_request) Successful in 4m53s
coverage / build (pull_request) Failing after 17m18s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-09-20 22:27:42 +03:00
parent 022326ddc4
commit f92e18897a
4 changed files with 450 additions and 74 deletions

View File

@@ -13,7 +13,12 @@ import (
"golang.yandex/hasql/v2"
)
var errNoAliveNodes = errors.New("no alive nodes")
var (
ErrClusterChecker = errors.New("cluster node checker required")
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
ErrClusterPicker = errors.New("cluster node picker required")
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
)
func newSQLRowError() *sql.Row {
row := &sql.Row{}
@@ -22,7 +27,7 @@ func newSQLRowError() *sql.Row {
rowPtr := unsafe.Pointer(row)
errFieldPtr := unsafe.Pointer(uintptr(rowPtr) + field.Offset)
errPtr := (*error)(errFieldPtr)
*errPtr = errNoAliveNodes
*errPtr = ErrorNoAliveNodes
return row
}
@@ -47,28 +52,15 @@ type Querier interface {
// Transaction management with context
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
// Connection pool management
SetConnMaxLifetime(d time.Duration)
SetConnMaxIdleTime(d time.Duration)
SetMaxOpenConns(n int)
SetMaxIdleConns(n int)
Stats() sql.DBStats
Conn(ctx context.Context) (*sql.Conn, error)
}
var (
ErrClusterChecker = errors.New("cluster node checker required")
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
ErrClusterPicker = errors.New("cluster node picker required")
)
type Cluster struct {
hasql *hasql.Cluster[Querier]
options ClusterOptions
}
// NewCluster returns Querier that provides cluster of nodes
// NewCluster returns [Querier] that provides cluster of nodes
func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
options := ClusterOptions{Context: context.Background()}
for _, opt := range opts {
@@ -167,24 +159,20 @@ func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
// CompareNodes used to sort nodes
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
fmt.Printf("CompareNodes %s %s\n", a.Node.String(), b.Node.String())
// Get replication lag values
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
// First check that lag lower then MaxLag
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
fmt.Printf("CompareNodes aLag > p.opts.MaxLag && bLag > p.opts.MaxLag\n")
return 0 // both are equal
}
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
if aLag > p.opts.MaxLag {
fmt.Printf("CompareNodes aLag > p.opts.MaxLag\n")
return 1 // b is better
}
if bLag > p.opts.MaxLag {
fmt.Printf("CompareNodes bLag > p.opts.MaxLag\n")
return -1 // a is better
}
@@ -194,14 +182,11 @@ func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
// if both priority equals
if aPrio == bPrio {
fmt.Printf("CompareNodes aPrio == bPrio\n")
// First compare by replication lag
if aLag < bLag {
fmt.Printf("CompareNodes aLag < bLag\n")
return -1
}
if aLag > bLag {
fmt.Printf("CompareNodes aLag > bLag\n")
return 1
}
// If replication lag is equal, compare by latency
@@ -330,7 +315,7 @@ func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, er
})
if tx == nil && err == nil {
err = errNoAliveNodes
err = ErrorNoAliveNodes
}
return tx, err
@@ -355,7 +340,7 @@ func (c *Cluster) Conn(ctx context.Context) (*sql.Conn, error) {
})
if conn == nil && err == nil {
err = errNoAliveNodes
err = ErrorNoAliveNodes
}
return conn, err
@@ -376,7 +361,7 @@ func (c *Cluster) ExecContext(ctx context.Context, query string, args ...interfa
})
if res == nil && err == nil {
err = errNoAliveNodes
err = ErrorNoAliveNodes
}
return res, err
@@ -397,7 +382,7 @@ func (c *Cluster) PrepareContext(ctx context.Context, query string) (*sql.Stmt,
})
if res == nil && err == nil {
err = errNoAliveNodes
err = ErrorNoAliveNodes
}
return res, err
@@ -418,7 +403,7 @@ func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interf
})
if res == nil && err == nil {
err = errNoAliveNodes
err = ErrorNoAliveNodes
}
return res, err
@@ -463,7 +448,7 @@ func (c *Cluster) PingContext(ctx context.Context) error {
})
if !ok {
err = errNoAliveNodes
err = ErrorNoAliveNodes
}
return err
@@ -478,51 +463,6 @@ func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStat
return nil
}
func (c *Cluster) SetConnMaxLifetime(td time.Duration) {
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
n.DB().SetConnMaxIdleTime(td)
return false
})
}
func (c *Cluster) SetConnMaxIdleTime(td time.Duration) {
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
n.DB().SetConnMaxIdleTime(td)
return false
})
}
func (c *Cluster) SetMaxOpenConns(nc int) {
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
n.DB().SetMaxOpenConns(nc)
return false
})
}
func (c *Cluster) SetMaxIdleConns(nc int) {
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
n.DB().SetMaxIdleConns(nc)
return false
})
}
func (c *Cluster) Stats() sql.DBStats {
s := sql.DBStats{}
c.hasql.NodesIter(hasql.NodeStateCriterion(hasql.Alive))(func(n *hasql.Node[Querier]) bool {
st := n.DB().Stats()
s.Idle += st.Idle
s.InUse += st.InUse
s.MaxIdleClosed += st.MaxIdleClosed
s.MaxIdleTimeClosed += st.MaxIdleTimeClosed
s.MaxOpenConnections += st.MaxOpenConnections
s.OpenConnections += st.OpenConnections
s.WaitCount += st.WaitCount
s.WaitDuration += st.WaitDuration
return false
})
return s
}
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
return v

295
cluster/hasql/driver.go Normal file
View File

@@ -0,0 +1,295 @@
package sql
import (
"context"
"database/sql"
"database/sql/driver"
"io"
"sync"
"time"
)
// OpenDBWithDriver creates a [*sql.DB] that uses the [ClusterDriver]
func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) {
driver := NewClusterDriver(db)
connector, err := driver.OpenConnector("")
if err != nil {
return nil, err
}
return sql.OpenDB(connector), nil
}
// ClusterDriver implements driver.Driver and driver.Connector for an existing [Querier]
type ClusterDriver struct {
db ClusterQuerier
}
// NewClusterDriver creates a new driver that uses an existing [ClusterQuerier]
func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
return &ClusterDriver{db: db}
}
// Open implements [driver.Driver.Open]
func (d *ClusterDriver) Open(name string) (driver.Conn, error) {
return d.Connect(context.Background())
}
// OpenConnector implements [driver.DriverContext.OpenConnector]
func (d *ClusterDriver) OpenConnector(name string) (driver.Connector, error) {
return d, nil
}
// Connect implements [driver.Connector.Connect]
func (d *ClusterDriver) Connect(ctx context.Context) (driver.Conn, error) {
conn, err := d.db.Conn(ctx)
if err != nil {
return nil, err
}
return &dbConn{conn: conn}, nil
}
// Driver implements [driver.Connector.Driver]
func (d *ClusterDriver) Driver() driver.Driver {
return d
}
// dbConn implements driver.Conn with both context and legacy methods
type dbConn struct {
conn *sql.Conn
mu sync.Mutex
}
// Prepare implements [driver.Conn.Prepare] (legacy method)
func (c *dbConn) Prepare(query string) (driver.Stmt, error) {
return c.PrepareContext(context.Background(), query)
}
// PrepareContext implements [driver.ConnPrepareContext.PrepareContext]
func (c *dbConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
c.mu.Lock()
defer c.mu.Unlock()
stmt, err := c.conn.PrepareContext(ctx, query)
if err != nil {
return nil, err
}
return &dbStmt{stmt: stmt}, nil
}
// Exec implements [driver.Execer.Exec] (legacy method)
func (c *dbConn) Exec(query string, args []driver.Value) (driver.Result, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return c.ExecContext(context.Background(), query, namedArgs)
}
// ExecContext implements [driver.ExecerContext.ExecContext]
func (c *dbConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Convert driver.NamedValue to any
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
return c.conn.ExecContext(ctx, query, interfaceArgs...)
}
// Query implements [driver.Queryer.Query] (legacy method)
func (c *dbConn) Query(query string, args []driver.Value) (driver.Rows, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return c.QueryContext(context.Background(), query, namedArgs)
}
// QueryContext implements [driver.QueryerContext.QueryContext]
func (c *dbConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
c.mu.Lock()
defer c.mu.Unlock()
// Convert driver.NamedValue to any
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
rows, err := c.conn.QueryContext(ctx, query, interfaceArgs...)
if err != nil {
return nil, err
}
return &dbRows{rows: rows}, nil
}
// Begin implements [driver.Conn.Begin] (legacy method)
func (c *dbConn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{})
}
// BeginTx implements [driver.ConnBeginTx.BeginTx]
func (c *dbConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
c.mu.Lock()
defer c.mu.Unlock()
sqlOpts := &sql.TxOptions{
Isolation: sql.IsolationLevel(opts.Isolation),
ReadOnly: opts.ReadOnly,
}
tx, err := c.conn.BeginTx(ctx, sqlOpts)
if err != nil {
return nil, err
}
return &dbTx{tx: tx}, nil
}
// Ping implements [driver.Pinger.Ping]
func (c *dbConn) Ping(ctx context.Context) error {
return c.conn.PingContext(ctx)
}
// Close implements [driver.Conn.Close]
func (c *dbConn) Close() error {
return c.conn.Close()
}
// IsValid implements [driver.Validator.IsValid]
func (c *dbConn) IsValid() bool {
// Ping with a short timeout to check if the connection is still valid
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
return c.conn.PingContext(ctx) == nil
}
// dbStmt implements [driver.Stmt] with both context and legacy methods
type dbStmt struct {
stmt *sql.Stmt
mu sync.Mutex
}
// Close implements [driver.Stmt.Close]
func (s *dbStmt) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.stmt.Close()
}
// Close implements [driver.Stmt.NumInput]
func (s *dbStmt) NumInput() int {
return -1 // Number of parameters is unknown
}
// Exec implements [driver.Stmt.Exec] (legacy method)
func (s *dbStmt) Exec(args []driver.Value) (driver.Result, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return s.ExecContext(context.Background(), namedArgs)
}
// ExecContext implements [driver.StmtExecContext.ExecContext]
func (s *dbStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
s.mu.Lock()
defer s.mu.Unlock()
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
return s.stmt.ExecContext(ctx, interfaceArgs...)
}
// Query implements [driver.Stmt.Query] (legacy method)
func (s *dbStmt) Query(args []driver.Value) (driver.Rows, error) {
namedArgs := make([]driver.NamedValue, len(args))
for i, value := range args {
namedArgs[i] = driver.NamedValue{Value: value}
}
return s.QueryContext(context.Background(), namedArgs)
}
// QueryContext implements [driver.StmtQueryContext.QueryContext]
func (s *dbStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
s.mu.Lock()
defer s.mu.Unlock()
interfaceArgs := make([]any, len(args))
for i, arg := range args {
interfaceArgs[i] = arg.Value
}
rows, err := s.stmt.QueryContext(ctx, interfaceArgs...)
if err != nil {
return nil, err
}
return &dbRows{rows: rows}, nil
}
// dbRows implements [driver.Rows]
type dbRows struct {
rows *sql.Rows
}
// Columns implements [driver.Rows.Columns]
func (r *dbRows) Columns() []string {
cols, err := r.rows.Columns()
if err != nil {
// This shouldn't happen if the query was successful
return []string{}
}
return cols
}
// Close implements [driver.Rows.Close]
func (r *dbRows) Close() error {
return r.rows.Close()
}
// Next implements [driver.Rows.Next]
func (r *dbRows) Next(dest []driver.Value) error {
if !r.rows.Next() {
if err := r.rows.Err(); err != nil {
return err
}
return io.EOF
}
// Create a slice of interfaces to scan into
scanArgs := make([]any, len(dest))
for i := range scanArgs {
scanArgs[i] = &dest[i]
}
return r.rows.Scan(scanArgs...)
}
// dbTx implements [driver.Tx]
type dbTx struct {
tx *sql.Tx
mu sync.Mutex
}
// Commit implements [driver.Tx.Commit]
func (t *dbTx) Commit() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.tx.Commit()
}
// Rollback implements [driver.Tx.Rollback]
func (t *dbTx) Rollback() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.tx.Rollback()
}

View File

@@ -0,0 +1,141 @@
package sql
import (
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"golang.yandex/hasql/v2"
)
func TestDriver(t *testing.T) {
dbMaster, dbMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbMaster.Close()
dbMasterMock.MatchExpectationsInOrder(false)
dbMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(1, 0)).
RowsWillBeClosed().
WithoutArgs()
dbMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("master-dc1"))
dbDRMaster, dbDRMasterMock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbDRMaster.Close()
dbDRMasterMock.MatchExpectationsInOrder(false)
dbDRMasterMock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 40)).
RowsWillBeClosed().
WithoutArgs()
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster1-dc2"))
dbDRMasterMock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("drmaster"))
dbSlaveDC1, dbSlaveDC1Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC1.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC1Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC1Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
dbSlaveDC2, dbSlaveDC2Mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
if err != nil {
t.Fatal(err)
}
defer dbSlaveDC2.Close()
dbSlaveDC1Mock.MatchExpectationsInOrder(false)
dbSlaveDC2Mock.ExpectQuery(`.*pg_is_in_recovery.*`).WillReturnRows(
sqlmock.NewRowsWithColumnDefinition(
sqlmock.NewColumn("role").OfType("int8", 0),
sqlmock.NewColumn("replication_lag").OfType("int8", 0)).
AddRow(2, 50)).
RowsWillBeClosed().
WithoutArgs()
dbSlaveDC2Mock.ExpectQuery(`SELECT node_name as name`).WillReturnRows(
sqlmock.NewRows([]string{"name"}).
AddRow("slave-dc1"))
tctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
c, err := NewCluster[Querier](
WithClusterContext(tctx),
WithClusterNodeChecker(hasql.PostgreSQLChecker),
WithClusterNodePicker(NewCustomPicker[Querier](
CustomPickerMaxLag(100),
)),
WithClusterNodes(
ClusterNode{"slave-dc1", dbSlaveDC1, 1},
ClusterNode{"master-dc1", dbMaster, 1},
ClusterNode{"slave-dc2", dbSlaveDC2, 2},
ClusterNode{"drmaster1-dc2", dbDRMaster, 0},
),
WithClusterOptions(
hasql.WithUpdateInterval[Querier](2*time.Second),
hasql.WithUpdateTimeout[Querier](1*time.Second),
),
)
if err != nil {
t.Fatal(err)
}
defer c.Close()
if err = c.WaitForNodes(tctx, hasql.Primary, hasql.Standby); err != nil {
t.Fatal(err)
}
db, err := OpenDBWithDriver(c)
if err != nil {
t.Fatal(err)
}
// Use context methods
row := db.QueryRowContext(NodeStateCriterion(t.Context(), hasql.Primary), "SELECT node_name as name")
if err = row.Err(); err != nil {
t.Fatal(err)
}
nodeName := ""
if err = row.Scan(&nodeName); err != nil {
t.Fatal(err)
}
if nodeName != "master-dc1" {
t.Fatalf("invalid node_name %s != %s", "master-dc1", nodeName)
}
}