rewrite wrapper

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2022-12-25 15:35:05 +03:00
parent 9ea887afd8
commit 30b5ed1253
9 changed files with 220 additions and 98 deletions

View File

@ -3,25 +3,12 @@ package wrapper
import (
"database/sql/driver"
"errors"
"fmt"
)
// ErrUnsupported is an error returned when the underlying driver doesn't provide a given function.
var ErrUnsupported = errors.New("operation unsupported by the underlying driver")
/*
// newSpan creates a new opentracing.Span instance from the given context.
func (t *tracer) newSpan(ctx context.Context) opentracing.Span {
name := t.nameFunc(ctx)
var opts []opentracing.StartSpanOption
parent := opentracing.SpanFromContext(ctx)
if parent != nil {
opts = append(opts, opentracing.ChildOf(parent.Context()))
}
span := t.t.StartSpan(name, opts...)
return span
}
*/
// namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in
// database/sql ctxutil.go.
func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
@ -34,3 +21,18 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
}
return dargs, nil
}
func namedValueToLabels(named []driver.NamedValue) []interface{} {
largs := make([]interface{}, len(named)*2)
var name string
for _, param := range named {
if param.Name != "" {
name = param.Name
} else {
name = fmt.Sprintf("$%d", param.Ordinal)
}
largs = append(largs, name, param.Value)
}
return largs
}

100
conn.go
View File

@ -3,6 +3,9 @@ package wrapper
import (
"context"
"database/sql/driver"
"fmt"
"go.unistack.org/micro/v3/tracer"
)
// wrapperConn defines a wrapper for driver.Conn
@ -36,27 +39,62 @@ func (w *wrapperConn) Begin() (driver.Tx, error) {
// BeginTx implements driver.ConnBeginTx BeginTx
func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
nctx, span := w.opts.Tracer.Start(ctx, "BeginTx")
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if name != "" {
nctx, span = w.opts.Tracer.Start(ctx, "BeginTx "+name)
} else {
nctx, span = w.opts.Tracer.Start(ctx, "BeginTx")
}
if name == "" {
name = "unknown"
}
span.AddLabels("query", name)
if connBeginTx, ok := w.conn.(driver.ConnBeginTx); ok {
tx, err := connBeginTx.BeginTx(nctx, opts)
if err != nil {
span.AddLabels("error", true)
return nil, err
}
return &wrapperTx{tx: tx, opts: w.opts, span: span}, nil
}
return w.conn.Begin()
tx, err := w.conn.Begin()
if err != nil {
span.AddLabels("error", true)
}
return tx, err
}
// PrepareContext implements driver.ConnPrepareContext PrepareContext
func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if name != "" {
nctx, span = w.opts.Tracer.Start(ctx, "BeginTx "+name)
} else {
nctx, span = w.opts.Tracer.Start(ctx, "BeginTx")
}
if name == "" {
name = "unknown"
}
span.AddLabels("query", name)
if connPrepareContext, ok := w.conn.(driver.ConnPrepareContext); ok {
stmt, err := connPrepareContext.PrepareContext(ctx, query)
stmt, err := connPrepareContext.PrepareContext(nctx, query)
if err != nil {
span.AddLabels("error", true)
return nil, err
}
return &wrapperStmt{stmt: stmt, opts: w.opts}, nil
}
return w.conn.Prepare(query)
stmt, err := w.conn.Prepare(query)
if err != nil {
span.AddLabels("error", true)
}
return stmt, err
}
// Exec implements driver.Execer Exec
@ -69,17 +107,37 @@ func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, er
// Exec implements driver.StmtExecContext ExecContext
func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
nctx, span := w.opts.Tracer.Start(ctx, "ExecContext")
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if name != "" {
nctx, span = w.opts.Tracer.Start(ctx, "ExecContext "+name)
} else {
nctx, span = w.opts.Tracer.Start(ctx, "ExecContext")
}
defer span.Finish()
if name == "" {
name = "unknown"
}
span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
span.AddLabels("query", name)
if execerContext, ok := w.conn.(driver.ExecerContext); ok {
r, err := execerContext.ExecContext(nctx, query, args)
return r, err
res, err := execerContext.ExecContext(nctx, query, args)
if err != nil {
span.AddLabels("error", true)
}
return res, err
}
values, err := namedValueToValue(args)
if err != nil {
span.AddLabels("error", true)
return nil, err
}
return w.Exec(query, values)
res, err := w.Exec(query, values)
if err != nil {
span.AddLabels("error", true)
}
return res, err
}
// Ping implements driver.Pinger Ping
@ -101,16 +159,36 @@ func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, err
}
// QueryContext implements Driver.QueryerContext QueryContext
func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) {
nctx, span := w.opts.Tracer.Start(ctx, "QueryContext")
func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if name != "" {
nctx, span = w.opts.Tracer.Start(ctx, "QueryContext "+name)
} else {
nctx, span = w.opts.Tracer.Start(ctx, "QueryContext")
}
defer span.Finish()
if name == "" {
name = "unknown"
}
span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
span.AddLabels("query", name)
if queryerContext, ok := w.conn.(driver.QueryerContext); ok {
rows, err := queryerContext.QueryContext(nctx, query, args)
if err != nil {
span.AddLabels("error", true)
}
return rows, err
}
values, err := namedValueToValue(args)
if err != nil {
span.AddLabels("error", true)
return nil, err
}
return w.Query(query, values)
rows, err := w.Query(query, values)
if err != nil {
span.AddLabels("error", true)
}
return rows, err
}

2
doc.go
View File

@ -1,2 +1,2 @@
// package wrapper provides SQL driver wrapper with micro tracing, logging, metering capabilities
package wrapper
package wrapper // import go.unistack.org/micro-wrapper-sql/v3

2
go.mod
View File

@ -2,4 +2,4 @@ module go.unistack.org/micro-wrapper-sql/v3
go 1.16
require go.unistack.org/micro/v3 v3.9.15
require go.unistack.org/micro/v3 v3.9.17

4
go.sum
View File

@ -74,8 +74,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.unistack.org/micro-proto/v3 v3.3.1 h1:nQ0MtWvP2G3QrpOgawVOPhpZZYkq6umTGDqs8FxJYIo=
go.unistack.org/micro-proto/v3 v3.3.1/go.mod h1:cwRyv8uInM2I7EbU7O8Fx2Ls3N90Uw9UCCcq4olOdfE=
go.unistack.org/micro/v3 v3.9.15 h1:Mv1/0jsySwIvm9+IvXTYj/X4bJCNChbwtf2ZV2ZDbk8=
go.unistack.org/micro/v3 v3.9.15/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI=
go.unistack.org/micro/v3 v3.9.17 h1:EJ9/XR9OTo/up/3aqWjaKS2YsWMA66b0dx+pc/0vIl8=
go.unistack.org/micro/v3 v3.9.17/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

View File

@ -14,8 +14,6 @@ var (
DefaultMeterStatsInterval = 5 * time.Second
// DefaultMeterMetricPrefix holds default metric prefix
DefaultMeterMetricPrefix = "micro_sql_"
// DefaultMeterLabelPrefix holds default label prefix
DefaultMeterLabelPrefix = "micro_"
)
var (
@ -25,7 +23,8 @@ var (
IdleConnections = "idle_connections"
WaitConnections = "wait_connections"
BlockedSeconds = "blocked_seconds"
MaxIdleClosed = "max_idletime_closed"
MaxIdleClosed = "max_idle_closed"
MaxIdletimeClosed = "max_idletime_closed"
MaxLifetimeClosed = "max_lifetime_closed"
// RequestTotal = "request_total"
@ -48,7 +47,6 @@ type Options struct {
ServiceName string
ServiceVersion string
ServiceID string
MeterLabelPrefix string
MeterMetricPrefix string
MeterStatsInterval time.Duration
LoggerLevel logger.Level
@ -65,12 +63,20 @@ func NewOptions(opts ...Option) Options {
Tracer: tracer.DefaultTracer,
MeterStatsInterval: DefaultMeterStatsInterval,
MeterMetricPrefix: DefaultMeterMetricPrefix,
MeterLabelPrefix: DefaultMeterLabelPrefix,
LoggerLevel: logger.ErrorLevel,
}
for _, o := range opts {
o(&options)
}
options.Meter = options.Meter.Clone(
meter.MetricPrefix(options.MeterMetricPrefix),
meter.Labels(
labelHost, options.DatabaseHost,
labelDatabase, options.DatabaseName,
),
)
return options
}
@ -81,13 +87,6 @@ func MetricInterval(td time.Duration) Option {
}
}
// LabelPrefix specifies prefix for each label
func LabelPrefix(pref string) Option {
return func(o *Options) {
o.MeterLabelPrefix = pref
}
}
// MetricPrefix specifies prefix for each metric
func MetricPrefix(pref string) Option {
return func(o *Options) {
@ -137,3 +136,10 @@ func QueryName(ctx context.Context, name string) context.Context {
}
return context.WithValue(ctx, queryNameKey{}, name)
}
func getQueryName(ctx context.Context) string {
if v, ok := ctx.Value(queryNameKey{}).(string); ok {
return v
}
return ""
}

41
stats.go Normal file
View File

@ -0,0 +1,41 @@
package wrapper // import "go.unistack.org/micro-wrapper-sql/v3"
import (
"context"
"database/sql"
"time"
)
type Statser interface {
Stats() sql.DBStats
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
options := NewOptions(opts...)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
}
}()
}

58
stmt.go
View File

@ -3,6 +3,9 @@ package wrapper
import (
"context"
"database/sql/driver"
"fmt"
"go.unistack.org/micro/v3/tracer"
)
// wrapperStmt defines a wrapper for driver.Stmt
@ -33,29 +36,72 @@ func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) {
// ExecContext implements driver.ExecerContext ExecContext
func (w *wrapperStmt) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
nctx, span := w.opts.Tracer.Start(ctx, "ExecContext")
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if name != "" {
nctx, span = w.opts.Tracer.Start(ctx, "ExecContext "+name)
} else {
nctx, span = w.opts.Tracer.Start(ctx, "ExecContext")
}
defer span.Finish()
if name == "" {
name = "unknown"
}
span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
span.AddLabels("query", name)
if execerContext, ok := w.stmt.(driver.ExecerContext); ok {
return execerContext.ExecContext(nctx, query, args)
res, err := execerContext.ExecContext(nctx, query, args)
if err != nil {
span.AddLabels("error", true)
}
return res, err
}
values, err := namedValueToValue(args)
if err != nil {
span.AddLabels("error", true)
return nil, err
}
return w.Exec(values)
res, err := w.Exec(values)
if err != nil {
span.AddLabels("error", true)
}
return res, err
}
// QueryContext implements Driver.QueryerContext QueryContext
func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) {
nctx, span := w.opts.Tracer.Start(ctx, "QueryContext")
func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if name != "" {
nctx, span = w.opts.Tracer.Start(ctx, "QueryContext "+name)
} else {
nctx, span = w.opts.Tracer.Start(ctx, "QueryContext")
}
defer span.Finish()
if name == "" {
name = "unknown"
}
span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
span.AddLabels("query", name)
if queryerContext, ok := w.stmt.(driver.QueryerContext); ok {
rows, err := queryerContext.QueryContext(nctx, query, args)
if err != nil {
span.AddLabels("error", true)
}
return rows, err
}
values, err := namedValueToValue(args)
if err != nil {
if err != nil {
span.AddLabels("error", true)
}
return nil, err
}
return w.Query(values)
rows, err := w.Query(values)
if err != nil {
span.AddLabels("error", true)
}
return rows, err
}

View File

@ -1,51 +0,0 @@
package wrapper // import "go.unistack.org/micro-wrapper-sql/v3"
import (
"context"
"database/sql"
"time"
"go.unistack.org/micro/v3/meter"
)
type Statser interface {
Stats() sql.DBStats
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
options := NewOptions(opts...)
m := options.Meter.Clone(
meter.LabelPrefix(options.MeterLabelPrefix),
meter.MetricPrefix(options.MeterMetricPrefix),
meter.Labels(
labelHost, options.DatabaseHost,
labelDatabase, options.DatabaseName,
),
)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
stats := db.Stats()
m.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
m.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
m.Counter(InuseConnections).Set(uint64(stats.InUse))
m.Counter(IdleConnections).Set(uint64(stats.Idle))
m.Counter(WaitConnections).Set(uint64(stats.WaitCount))
m.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
m.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
m.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
}
}()
}