Compare commits
2 Commits
f92e18897a
...
614b740c56
Author | SHA1 | Date | |
---|---|---|---|
614b740c56 | |||
bc011a2e7f |
@@ -3,23 +3,12 @@ package sql
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrClusterChecker = errors.New("cluster node checker required")
|
||||
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
|
||||
ErrClusterPicker = errors.New("cluster node picker required")
|
||||
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
|
||||
)
|
||||
|
||||
func newSQLRowError() *sql.Row {
|
||||
row := &sql.Row{}
|
||||
t := reflect.TypeOf(row).Elem()
|
||||
@@ -36,25 +25,6 @@ type ClusterQuerier interface {
|
||||
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
// Basic connection methods
|
||||
PingContext(ctx context.Context) error
|
||||
Close() error
|
||||
|
||||
// Query methods with context
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
|
||||
// Prepared statements with context
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
|
||||
// Transaction management with context
|
||||
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
|
||||
|
||||
Conn(ctx context.Context) (*sql.Conn, error)
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
hasql *hasql.Cluster[Querier]
|
||||
options ClusterOptions
|
||||
@@ -101,205 +71,6 @@ func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
|
||||
return &Cluster{hasql: c, options: options}, nil
|
||||
}
|
||||
|
||||
// compile time guard
|
||||
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
|
||||
|
||||
type nodeStateCriterionKey struct{}
|
||||
|
||||
// NodeStateCriterion inject hasql.NodeStateCriterion to context
|
||||
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
|
||||
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
|
||||
}
|
||||
|
||||
// CustomPickerOptions holds options to pick nodes
|
||||
type CustomPickerOptions struct {
|
||||
MaxLag int
|
||||
Priority map[string]int32
|
||||
Retries int
|
||||
}
|
||||
|
||||
// CustomPickerOption func apply option to CustomPickerOptions
|
||||
type CustomPickerOption func(*CustomPickerOptions)
|
||||
|
||||
// CustomPickerMaxLag specifies max lag for which node can be used
|
||||
func CustomPickerMaxLag(n int) CustomPickerOption {
|
||||
return func(o *CustomPickerOptions) {
|
||||
o.MaxLag = n
|
||||
}
|
||||
}
|
||||
|
||||
// NewCustomPicker creates new node picker
|
||||
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
|
||||
options := CustomPickerOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &CustomPicker[Querier]{opts: options}
|
||||
}
|
||||
|
||||
// CustomPicker holds node picker options
|
||||
type CustomPicker[T Querier] struct {
|
||||
opts CustomPickerOptions
|
||||
}
|
||||
|
||||
// PickNode used to return specific node
|
||||
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
|
||||
for _, n := range cnodes {
|
||||
fmt.Printf("node %s\n", n.Node.String())
|
||||
}
|
||||
return cnodes[0]
|
||||
}
|
||||
|
||||
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
|
||||
if prio, ok := p.opts.Priority[nodeName]; ok {
|
||||
return prio
|
||||
}
|
||||
return math.MaxInt32 // Default to lowest priority
|
||||
}
|
||||
|
||||
// CompareNodes used to sort nodes
|
||||
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
|
||||
// Get replication lag values
|
||||
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
|
||||
// First check that lag lower then MaxLag
|
||||
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
|
||||
return 0 // both are equal
|
||||
}
|
||||
|
||||
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
|
||||
if aLag > p.opts.MaxLag {
|
||||
return 1 // b is better
|
||||
}
|
||||
if bLag > p.opts.MaxLag {
|
||||
return -1 // a is better
|
||||
}
|
||||
|
||||
// Get node priorities
|
||||
aPrio := p.getPriority(a.Node.String())
|
||||
bPrio := p.getPriority(b.Node.String())
|
||||
|
||||
// if both priority equals
|
||||
if aPrio == bPrio {
|
||||
// First compare by replication lag
|
||||
if aLag < bLag {
|
||||
return -1
|
||||
}
|
||||
if aLag > bLag {
|
||||
return 1
|
||||
}
|
||||
// If replication lag is equal, compare by latency
|
||||
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
|
||||
if aLatency < bLatency {
|
||||
return -1
|
||||
}
|
||||
if aLatency > bLatency {
|
||||
return 1
|
||||
}
|
||||
|
||||
// If lag and latency is equal
|
||||
return 0
|
||||
}
|
||||
|
||||
// If priorities are different, prefer the node with lower priority value
|
||||
if aPrio < bPrio {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
// ClusterOptions contains cluster specific options
|
||||
type ClusterOptions struct {
|
||||
NodeChecker hasql.NodeChecker
|
||||
NodePicker hasql.NodePicker[Querier]
|
||||
NodeDiscoverer hasql.NodeDiscoverer[Querier]
|
||||
Options []hasql.ClusterOpt[Querier]
|
||||
Context context.Context
|
||||
Retries int
|
||||
NodePriority map[string]int32
|
||||
NodeStateCriterion hasql.NodeStateCriterion
|
||||
}
|
||||
|
||||
// ClusterOption apply cluster options to ClusterOptions
|
||||
type ClusterOption func(*ClusterOptions)
|
||||
|
||||
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
|
||||
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeChecker = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodePicker pass hasql.NodePicker to cluster options
|
||||
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodePicker = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
|
||||
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeDiscoverer = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetries retry count on other nodes in case of error
|
||||
func WithRetries(n int) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Retries = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterContext pass context.Context to cluster options and used for checks
|
||||
func WithClusterContext(ctx context.Context) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterOptions pass hasql.ClusterOpt
|
||||
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Options = append(o.Options, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
|
||||
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeStateCriterion = c
|
||||
}
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Name string
|
||||
DB Querier
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// WithClusterNodes create cluster with static NodeDiscoverer
|
||||
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
nodes := make([]*hasql.Node[Querier], 0, len(cns))
|
||||
if o.NodePriority == nil {
|
||||
o.NodePriority = make(map[string]int32, len(cns))
|
||||
}
|
||||
for _, cn := range cns {
|
||||
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
|
||||
if cn.Priority == 0 {
|
||||
cn.Priority = math.MaxInt32
|
||||
}
|
||||
o.NodePriority[cn.Name] = cn.Priority
|
||||
}
|
||||
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
|
||||
var tx *sql.Tx
|
||||
var err error
|
||||
@@ -462,10 +233,3 @@ func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStat
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
|
||||
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
|
||||
return v
|
||||
}
|
||||
return c.options.NodeStateCriterion
|
||||
}
|
||||
|
25
cluster/hasql/db.go
Normal file
25
cluster/hasql/db.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
// Basic connection methods
|
||||
PingContext(ctx context.Context) error
|
||||
Close() error
|
||||
|
||||
// Query methods with context
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
|
||||
// Prepared statements with context
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
|
||||
// Transaction management with context
|
||||
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
|
||||
|
||||
Conn(ctx context.Context) (*sql.Conn, error)
|
||||
}
|
@@ -9,8 +9,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// OpenDBWithDriver creates a [*sql.DB] that uses the [ClusterDriver]
|
||||
func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) {
|
||||
// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier]
|
||||
func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) {
|
||||
driver := NewClusterDriver(db)
|
||||
connector, err := driver.OpenConnector("")
|
||||
if err != nil {
|
||||
@@ -19,12 +19,12 @@ func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) {
|
||||
return sql.OpenDB(connector), nil
|
||||
}
|
||||
|
||||
// ClusterDriver implements driver.Driver and driver.Connector for an existing [Querier]
|
||||
// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier]
|
||||
type ClusterDriver struct {
|
||||
db ClusterQuerier
|
||||
}
|
||||
|
||||
// NewClusterDriver creates a new driver that uses an existing [ClusterQuerier]
|
||||
// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier]
|
||||
func NewClusterDriver(db ClusterQuerier) *ClusterDriver {
|
||||
return &ClusterDriver{db: db}
|
||||
}
|
||||
|
@@ -119,7 +119,7 @@ func TestDriver(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := OpenDBWithDriver(c)
|
||||
db, err := OpenDBWithCluster(c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
10
cluster/hasql/error.go
Normal file
10
cluster/hasql/error.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package sql
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrClusterChecker = errors.New("cluster node checker required")
|
||||
ErrClusterDiscoverer = errors.New("cluster node discoverer required")
|
||||
ErrClusterPicker = errors.New("cluster node picker required")
|
||||
ErrorNoAliveNodes = errors.New("cluster no alive nodes")
|
||||
)
|
110
cluster/hasql/options.go
Normal file
110
cluster/hasql/options.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
// ClusterOptions contains cluster specific options
|
||||
type ClusterOptions struct {
|
||||
NodeChecker hasql.NodeChecker
|
||||
NodePicker hasql.NodePicker[Querier]
|
||||
NodeDiscoverer hasql.NodeDiscoverer[Querier]
|
||||
Options []hasql.ClusterOpt[Querier]
|
||||
Context context.Context
|
||||
Retries int
|
||||
NodePriority map[string]int32
|
||||
NodeStateCriterion hasql.NodeStateCriterion
|
||||
}
|
||||
|
||||
// ClusterOption apply cluster options to ClusterOptions
|
||||
type ClusterOption func(*ClusterOptions)
|
||||
|
||||
// WithClusterNodeChecker pass hasql.NodeChecker to cluster options
|
||||
func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeChecker = c
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodePicker pass hasql.NodePicker to cluster options
|
||||
func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodePicker = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options
|
||||
func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeDiscoverer = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetries retry count on other nodes in case of error
|
||||
func WithRetries(n int) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Retries = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterContext pass context.Context to cluster options and used for checks
|
||||
func WithClusterContext(ctx context.Context) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterOptions pass hasql.ClusterOpt
|
||||
func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.Options = append(o.Options, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion
|
||||
func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
o.NodeStateCriterion = c
|
||||
}
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
Name string
|
||||
DB Querier
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// WithClusterNodes create cluster with static NodeDiscoverer
|
||||
func WithClusterNodes(cns ...ClusterNode) ClusterOption {
|
||||
return func(o *ClusterOptions) {
|
||||
nodes := make([]*hasql.Node[Querier], 0, len(cns))
|
||||
if o.NodePriority == nil {
|
||||
o.NodePriority = make(map[string]int32, len(cns))
|
||||
}
|
||||
for _, cn := range cns {
|
||||
nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB))
|
||||
if cn.Priority == 0 {
|
||||
cn.Priority = math.MaxInt32
|
||||
}
|
||||
o.NodePriority[cn.Name] = cn.Priority
|
||||
}
|
||||
o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
type nodeStateCriterionKey struct{}
|
||||
|
||||
// NodeStateCriterion inject hasql.NodeStateCriterion to context
|
||||
func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context {
|
||||
return context.WithValue(ctx, nodeStateCriterionKey{}, c)
|
||||
}
|
||||
|
||||
func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion {
|
||||
if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok {
|
||||
return v
|
||||
}
|
||||
return c.options.NodeStateCriterion
|
||||
}
|
113
cluster/hasql/picker.go
Normal file
113
cluster/hasql/picker.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"golang.yandex/hasql/v2"
|
||||
)
|
||||
|
||||
// compile time guard
|
||||
var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil)
|
||||
|
||||
// CustomPickerOptions holds options to pick nodes
|
||||
type CustomPickerOptions struct {
|
||||
MaxLag int
|
||||
Priority map[string]int32
|
||||
Retries int
|
||||
}
|
||||
|
||||
// CustomPickerOption func apply option to CustomPickerOptions
|
||||
type CustomPickerOption func(*CustomPickerOptions)
|
||||
|
||||
// CustomPickerMaxLag specifies max lag for which node can be used
|
||||
func CustomPickerMaxLag(n int) CustomPickerOption {
|
||||
return func(o *CustomPickerOptions) {
|
||||
o.MaxLag = n
|
||||
}
|
||||
}
|
||||
|
||||
// NewCustomPicker creates new node picker
|
||||
func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] {
|
||||
options := CustomPickerOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &CustomPicker[Querier]{opts: options}
|
||||
}
|
||||
|
||||
// CustomPicker holds node picker options
|
||||
type CustomPicker[T Querier] struct {
|
||||
opts CustomPickerOptions
|
||||
}
|
||||
|
||||
// PickNode used to return specific node
|
||||
func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] {
|
||||
for _, n := range cnodes {
|
||||
fmt.Printf("node %s\n", n.Node.String())
|
||||
}
|
||||
return cnodes[0]
|
||||
}
|
||||
|
||||
func (p *CustomPicker[T]) getPriority(nodeName string) int32 {
|
||||
if prio, ok := p.opts.Priority[nodeName]; ok {
|
||||
return prio
|
||||
}
|
||||
return math.MaxInt32 // Default to lowest priority
|
||||
}
|
||||
|
||||
// CompareNodes used to sort nodes
|
||||
func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int {
|
||||
// Get replication lag values
|
||||
aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag()
|
||||
|
||||
// First check that lag lower then MaxLag
|
||||
if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag {
|
||||
return 0 // both are equal
|
||||
}
|
||||
|
||||
// If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't
|
||||
if aLag > p.opts.MaxLag {
|
||||
return 1 // b is better
|
||||
}
|
||||
if bLag > p.opts.MaxLag {
|
||||
return -1 // a is better
|
||||
}
|
||||
|
||||
// Get node priorities
|
||||
aPrio := p.getPriority(a.Node.String())
|
||||
bPrio := p.getPriority(b.Node.String())
|
||||
|
||||
// if both priority equals
|
||||
if aPrio == bPrio {
|
||||
// First compare by replication lag
|
||||
if aLag < bLag {
|
||||
return -1
|
||||
}
|
||||
if aLag > bLag {
|
||||
return 1
|
||||
}
|
||||
// If replication lag is equal, compare by latency
|
||||
aLatency := a.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
bLatency := b.Info.(interface{ Latency() time.Duration }).Latency()
|
||||
|
||||
if aLatency < bLatency {
|
||||
return -1
|
||||
}
|
||||
if aLatency > bLatency {
|
||||
return 1
|
||||
}
|
||||
|
||||
// If lag and latency is equal
|
||||
return 0
|
||||
}
|
||||
|
||||
// If priorities are different, prefer the node with lower priority value
|
||||
if aPrio < bPrio {
|
||||
return -1
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
Reference in New Issue
Block a user