closes #403 Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> Reviewed-on: #407 Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org> Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
236 lines
5.2 KiB
Go
236 lines
5.2 KiB
Go
package sql
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"reflect"
|
|
"unsafe"
|
|
|
|
"golang.yandex/hasql/v2"
|
|
)
|
|
|
|
func newSQLRowError() *sql.Row {
|
|
row := &sql.Row{}
|
|
t := reflect.TypeOf(row).Elem()
|
|
field, _ := t.FieldByName("err")
|
|
rowPtr := unsafe.Pointer(row)
|
|
errFieldPtr := unsafe.Pointer(uintptr(rowPtr) + field.Offset)
|
|
errPtr := (*error)(errFieldPtr)
|
|
*errPtr = ErrorNoAliveNodes
|
|
return row
|
|
}
|
|
|
|
type ClusterQuerier interface {
|
|
Querier
|
|
WaitForNodes(ctx context.Context, criterion ...hasql.NodeStateCriterion) error
|
|
}
|
|
|
|
type Cluster struct {
|
|
hasql *hasql.Cluster[Querier]
|
|
options ClusterOptions
|
|
}
|
|
|
|
// NewCluster returns [Querier] that provides cluster of nodes
|
|
func NewCluster[T Querier](opts ...ClusterOption) (ClusterQuerier, error) {
|
|
options := ClusterOptions{Context: context.Background()}
|
|
for _, opt := range opts {
|
|
opt(&options)
|
|
}
|
|
if options.NodeChecker == nil {
|
|
return nil, ErrClusterChecker
|
|
}
|
|
if options.NodeDiscoverer == nil {
|
|
return nil, ErrClusterDiscoverer
|
|
}
|
|
if options.NodePicker == nil {
|
|
return nil, ErrClusterPicker
|
|
}
|
|
|
|
if options.Retries < 1 {
|
|
options.Retries = 1
|
|
}
|
|
|
|
if options.NodeStateCriterion == 0 {
|
|
options.NodeStateCriterion = hasql.Primary
|
|
}
|
|
|
|
options.Options = append(options.Options, hasql.WithNodePicker(options.NodePicker))
|
|
if p, ok := options.NodePicker.(*CustomPicker[Querier]); ok {
|
|
p.opts.Priority = options.NodePriority
|
|
}
|
|
|
|
c, err := hasql.NewCluster(
|
|
options.NodeDiscoverer,
|
|
options.NodeChecker,
|
|
options.Options...,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Cluster{hasql: c, options: options}, nil
|
|
}
|
|
|
|
func (c *Cluster) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
|
|
var tx *sql.Tx
|
|
var err error
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
for ; retries < c.options.Retries; retries++ {
|
|
if tx, err = n.DB().BeginTx(ctx, opts); err != nil && retries >= c.options.Retries {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if tx == nil && err == nil {
|
|
err = ErrorNoAliveNodes
|
|
}
|
|
|
|
return tx, err
|
|
}
|
|
|
|
func (c *Cluster) Close() error {
|
|
return c.hasql.Close()
|
|
}
|
|
|
|
func (c *Cluster) Conn(ctx context.Context) (*sql.Conn, error) {
|
|
var conn *sql.Conn
|
|
var err error
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
for ; retries < c.options.Retries; retries++ {
|
|
if conn, err = n.DB().Conn(ctx); err != nil && retries >= c.options.Retries {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if conn == nil && err == nil {
|
|
err = ErrorNoAliveNodes
|
|
}
|
|
|
|
return conn, err
|
|
}
|
|
|
|
func (c *Cluster) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
|
var res sql.Result
|
|
var err error
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
for ; retries < c.options.Retries; retries++ {
|
|
if res, err = n.DB().ExecContext(ctx, query, args...); err != nil && retries >= c.options.Retries {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if res == nil && err == nil {
|
|
err = ErrorNoAliveNodes
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (c *Cluster) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
|
|
var res *sql.Stmt
|
|
var err error
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
for ; retries < c.options.Retries; retries++ {
|
|
if res, err = n.DB().PrepareContext(ctx, query); err != nil && retries >= c.options.Retries {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if res == nil && err == nil {
|
|
err = ErrorNoAliveNodes
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (c *Cluster) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
|
var res *sql.Rows
|
|
var err error
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
for ; retries < c.options.Retries; retries++ {
|
|
if res, err = n.DB().QueryContext(ctx, query); err != nil && err != sql.ErrNoRows && retries >= c.options.Retries {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if res == nil && err == nil {
|
|
err = ErrorNoAliveNodes
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (c *Cluster) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
|
var res *sql.Row
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
for ; retries < c.options.Retries; retries++ {
|
|
res = n.DB().QueryRowContext(ctx, query, args...)
|
|
if res.Err() == nil {
|
|
return false
|
|
} else if res.Err() != nil && retries >= c.options.Retries {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
if res == nil {
|
|
res = newSQLRowError()
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (c *Cluster) PingContext(ctx context.Context) error {
|
|
var err error
|
|
var ok bool
|
|
|
|
retries := 0
|
|
c.hasql.NodesIter(c.getNodeStateCriterion(ctx))(func(n *hasql.Node[Querier]) bool {
|
|
ok = true
|
|
for ; retries < c.options.Retries; retries++ {
|
|
if err = n.DB().PingContext(ctx); err != nil && retries >= c.options.Retries {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
|
|
if !ok {
|
|
err = ErrorNoAliveNodes
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *Cluster) WaitForNodes(ctx context.Context, criterions ...hasql.NodeStateCriterion) error {
|
|
for _, criterion := range criterions {
|
|
if _, err := c.hasql.WaitForNode(ctx, criterion); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|