diff --git a/cluster/hasql/cluster.go b/cluster/hasql/cluster.go index a7018c26..dba2e865 100644 --- a/cluster/hasql/cluster.go +++ b/cluster/hasql/cluster.go @@ -4,10 +4,7 @@ import ( "context" "database/sql" "errors" - "fmt" - "math" "reflect" - "time" "unsafe" "golang.yandex/hasql/v2" @@ -101,205 +98,6 @@ func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) { return &Cluster{hasql: c, options: options}, nil } -// compile time guard -var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil) - -type nodeStateCriterionKey struct{} - -// NodeStateCriterion inject hasql.NodeStateCriterion to context -func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context { - return context.WithValue(ctx, nodeStateCriterionKey{}, c) -} - -// CustomPickerOptions holds options to pick nodes -type CustomPickerOptions struct { - MaxLag int - Priority map[string]int32 - Retries int -} - -// CustomPickerOption func apply option to CustomPickerOptions -type CustomPickerOption func(*CustomPickerOptions) - -// CustomPickerMaxLag specifies max lag for which node can be used -func CustomPickerMaxLag(n int) CustomPickerOption { - return func(o *CustomPickerOptions) { - o.MaxLag = n - } -} - -// NewCustomPicker creates new node picker -func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] { - options := CustomPickerOptions{} - for _, o := range opts { - o(&options) - } - return &CustomPicker[Querier]{opts: options} -} - -// CustomPicker holds node picker options -type CustomPicker[T Querier] struct { - opts CustomPickerOptions -} - -// PickNode used to return specific node -func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] { - for _, n := range cnodes { - fmt.Printf("node %s\n", n.Node.String()) - } - return cnodes[0] -} - -func (p *CustomPicker[T]) getPriority(nodeName string) int32 { - if prio, ok := p.opts.Priority[nodeName]; ok { - return prio - } - return math.MaxInt32 // Default to lowest priority -} - -// CompareNodes used to sort nodes -func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int { - // Get replication lag values - aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag() - bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag() - - // First check that lag lower then MaxLag - if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag { - return 0 // both are equal - } - - // If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't - if aLag > p.opts.MaxLag { - return 1 // b is better - } - if bLag > p.opts.MaxLag { - return -1 // a is better - } - - // Get node priorities - aPrio := p.getPriority(a.Node.String()) - bPrio := p.getPriority(b.Node.String()) - - // if both priority equals - if aPrio == bPrio { - // First compare by replication lag - if aLag < bLag { - return -1 - } - if aLag > bLag { - return 1 - } - // If replication lag is equal, compare by latency - aLatency := a.Info.(interface{ Latency() time.Duration }).Latency() - bLatency := b.Info.(interface{ Latency() time.Duration }).Latency() - - if aLatency < bLatency { - return -1 - } - if aLatency > bLatency { - return 1 - } - - // If lag and latency is equal - return 0 - } - - // If priorities are different, prefer the node with lower priority value - if aPrio < bPrio { - return -1 - } - - return 1 -} - -// ClusterOptions contains cluster specific options -type ClusterOptions struct { - NodeChecker hasql.NodeChecker - NodePicker hasql.NodePicker[Querier] - NodeDiscoverer hasql.NodeDiscoverer[Querier] - Options []hasql.ClusterOpt[Querier] - Context context.Context - Retries int - NodePriority map[string]int32 - NodeStateCriterion hasql.NodeStateCriterion -} - -// ClusterOption apply cluster options to ClusterOptions -type ClusterOption func(*ClusterOptions) - -// WithClusterNodeChecker pass hasql.NodeChecker to cluster options -func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption { - return func(o *ClusterOptions) { - o.NodeChecker = c - } -} - -// WithClusterNodePicker pass hasql.NodePicker to cluster options -func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption { - return func(o *ClusterOptions) { - o.NodePicker = p - } -} - -// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options -func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption { - return func(o *ClusterOptions) { - o.NodeDiscoverer = d - } -} - -// WithRetries retry count on other nodes in case of error -func WithRetries(n int) ClusterOption { - return func(o *ClusterOptions) { - o.Retries = n - } -} - -// WithClusterContext pass context.Context to cluster options and used for checks -func WithClusterContext(ctx context.Context) ClusterOption { - return func(o *ClusterOptions) { - o.Context = ctx - } -} - -// WithClusterOptions pass hasql.ClusterOpt -func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption { - return func(o *ClusterOptions) { - o.Options = append(o.Options, opts...) - } -} - -// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion -func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption { - return func(o *ClusterOptions) { - o.NodeStateCriterion = c - } -} - -type ClusterNode struct { - Name string - DB Querier - Priority int32 -} - -// WithClusterNodes create cluster with static NodeDiscoverer -func WithClusterNodes(cns ...ClusterNode) ClusterOption { - return func(o *ClusterOptions) { - nodes := make([]*hasql.Node[Querier], 0, len(cns)) - if o.NodePriority == nil { - o.NodePriority = make(map[string]int32, len(cns)) - } - for _, cn := range cns { - nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB)) - if cn.Priority == 0 { - cn.Priority = math.MaxInt32 - } - o.NodePriority[cn.Name] = cn.Priority - } - o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...) - } -} - func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { var tx *sql.Tx var err error @@ -462,10 +260,3 @@ func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStat } return nil } - -func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion { - if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok { - return v - } - return c.options.NodeStateCriterion -} diff --git a/cluster/hasql/driver.go b/cluster/hasql/driver.go index 098e01e0..e04def6c 100644 --- a/cluster/hasql/driver.go +++ b/cluster/hasql/driver.go @@ -9,8 +9,8 @@ import ( "time" ) -// OpenDBWithDriver creates a [*sql.DB] that uses the [ClusterDriver] -func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) { +// OpenDBWithCluster creates a [*sql.DB] that uses the [ClusterQuerier] +func OpenDBWithCluster(db ClusterQuerier) (*sql.DB, error) { driver := NewClusterDriver(db) connector, err := driver.OpenConnector("") if err != nil { @@ -19,12 +19,12 @@ func OpenDBWithDriver(db ClusterQuerier) (*sql.DB, error) { return sql.OpenDB(connector), nil } -// ClusterDriver implements driver.Driver and driver.Connector for an existing [Querier] +// ClusterDriver implements [driver.Driver] and driver.Connector for an existing [Querier] type ClusterDriver struct { db ClusterQuerier } -// NewClusterDriver creates a new driver that uses an existing [ClusterQuerier] +// NewClusterDriver creates a new [driver.Driver] that uses an existing [ClusterQuerier] func NewClusterDriver(db ClusterQuerier) *ClusterDriver { return &ClusterDriver{db: db} } diff --git a/cluster/hasql/options.go b/cluster/hasql/options.go new file mode 100644 index 00000000..6226a38a --- /dev/null +++ b/cluster/hasql/options.go @@ -0,0 +1,110 @@ +package sql + +import ( + "context" + "math" + + "golang.yandex/hasql/v2" +) + +// ClusterOptions contains cluster specific options +type ClusterOptions struct { + NodeChecker hasql.NodeChecker + NodePicker hasql.NodePicker[Querier] + NodeDiscoverer hasql.NodeDiscoverer[Querier] + Options []hasql.ClusterOpt[Querier] + Context context.Context + Retries int + NodePriority map[string]int32 + NodeStateCriterion hasql.NodeStateCriterion +} + +// ClusterOption apply cluster options to ClusterOptions +type ClusterOption func(*ClusterOptions) + +// WithClusterNodeChecker pass hasql.NodeChecker to cluster options +func WithClusterNodeChecker(c hasql.NodeChecker) ClusterOption { + return func(o *ClusterOptions) { + o.NodeChecker = c + } +} + +// WithClusterNodePicker pass hasql.NodePicker to cluster options +func WithClusterNodePicker(p hasql.NodePicker[Querier]) ClusterOption { + return func(o *ClusterOptions) { + o.NodePicker = p + } +} + +// WithClusterNodeDiscoverer pass hasql.NodeDiscoverer to cluster options +func WithClusterNodeDiscoverer(d hasql.NodeDiscoverer[Querier]) ClusterOption { + return func(o *ClusterOptions) { + o.NodeDiscoverer = d + } +} + +// WithRetries retry count on other nodes in case of error +func WithRetries(n int) ClusterOption { + return func(o *ClusterOptions) { + o.Retries = n + } +} + +// WithClusterContext pass context.Context to cluster options and used for checks +func WithClusterContext(ctx context.Context) ClusterOption { + return func(o *ClusterOptions) { + o.Context = ctx + } +} + +// WithClusterOptions pass hasql.ClusterOpt +func WithClusterOptions(opts ...hasql.ClusterOpt[Querier]) ClusterOption { + return func(o *ClusterOptions) { + o.Options = append(o.Options, opts...) + } +} + +// WithClusterNodeStateCriterion pass default hasql.NodeStateCriterion +func WithClusterNodeStateCriterion(c hasql.NodeStateCriterion) ClusterOption { + return func(o *ClusterOptions) { + o.NodeStateCriterion = c + } +} + +type ClusterNode struct { + Name string + DB Querier + Priority int32 +} + +// WithClusterNodes create cluster with static NodeDiscoverer +func WithClusterNodes(cns ...ClusterNode) ClusterOption { + return func(o *ClusterOptions) { + nodes := make([]*hasql.Node[Querier], 0, len(cns)) + if o.NodePriority == nil { + o.NodePriority = make(map[string]int32, len(cns)) + } + for _, cn := range cns { + nodes = append(nodes, hasql.NewNode(cn.Name, cn.DB)) + if cn.Priority == 0 { + cn.Priority = math.MaxInt32 + } + o.NodePriority[cn.Name] = cn.Priority + } + o.NodeDiscoverer = hasql.NewStaticNodeDiscoverer(nodes...) + } +} + +type nodeStateCriterionKey struct{} + +// NodeStateCriterion inject hasql.NodeStateCriterion to context +func NodeStateCriterion(ctx context.Context, c hasql.NodeStateCriterion) context.Context { + return context.WithValue(ctx, nodeStateCriterionKey{}, c) +} + +func (c *Cluster) getNodeStateCriterion(ctx context.Context) hasql.NodeStateCriterion { + if v, ok := ctx.Value(nodeStateCriterionKey{}).(hasql.NodeStateCriterion); ok { + return v + } + return c.options.NodeStateCriterion +} diff --git a/cluster/hasql/picker.go b/cluster/hasql/picker.go new file mode 100644 index 00000000..5e131fca --- /dev/null +++ b/cluster/hasql/picker.go @@ -0,0 +1,113 @@ +package sql + +import ( + "fmt" + "math" + "time" + + "golang.yandex/hasql/v2" +) + +// compile time guard +var _ hasql.NodePicker[Querier] = (*CustomPicker[Querier])(nil) + +// CustomPickerOptions holds options to pick nodes +type CustomPickerOptions struct { + MaxLag int + Priority map[string]int32 + Retries int +} + +// CustomPickerOption func apply option to CustomPickerOptions +type CustomPickerOption func(*CustomPickerOptions) + +// CustomPickerMaxLag specifies max lag for which node can be used +func CustomPickerMaxLag(n int) CustomPickerOption { + return func(o *CustomPickerOptions) { + o.MaxLag = n + } +} + +// NewCustomPicker creates new node picker +func NewCustomPicker[T Querier](opts ...CustomPickerOption) *CustomPicker[Querier] { + options := CustomPickerOptions{} + for _, o := range opts { + o(&options) + } + return &CustomPicker[Querier]{opts: options} +} + +// CustomPicker holds node picker options +type CustomPicker[T Querier] struct { + opts CustomPickerOptions +} + +// PickNode used to return specific node +func (p *CustomPicker[T]) PickNode(cnodes []hasql.CheckedNode[T]) hasql.CheckedNode[T] { + for _, n := range cnodes { + fmt.Printf("node %s\n", n.Node.String()) + } + return cnodes[0] +} + +func (p *CustomPicker[T]) getPriority(nodeName string) int32 { + if prio, ok := p.opts.Priority[nodeName]; ok { + return prio + } + return math.MaxInt32 // Default to lowest priority +} + +// CompareNodes used to sort nodes +func (p *CustomPicker[T]) CompareNodes(a, b hasql.CheckedNode[T]) int { + // Get replication lag values + aLag := a.Info.(interface{ ReplicationLag() int }).ReplicationLag() + bLag := b.Info.(interface{ ReplicationLag() int }).ReplicationLag() + + // First check that lag lower then MaxLag + if aLag > p.opts.MaxLag && bLag > p.opts.MaxLag { + return 0 // both are equal + } + + // If one node exceeds MaxLag and the other doesn't, prefer the one that doesn't + if aLag > p.opts.MaxLag { + return 1 // b is better + } + if bLag > p.opts.MaxLag { + return -1 // a is better + } + + // Get node priorities + aPrio := p.getPriority(a.Node.String()) + bPrio := p.getPriority(b.Node.String()) + + // if both priority equals + if aPrio == bPrio { + // First compare by replication lag + if aLag < bLag { + return -1 + } + if aLag > bLag { + return 1 + } + // If replication lag is equal, compare by latency + aLatency := a.Info.(interface{ Latency() time.Duration }).Latency() + bLatency := b.Info.(interface{ Latency() time.Duration }).Latency() + + if aLatency < bLatency { + return -1 + } + if aLatency > bLatency { + return 1 + } + + // If lag and latency is equal + return 0 + } + + // If priorities are different, prefer the node with lower priority value + if aPrio < bPrio { + return -1 + } + + return 1 +}