Merge pull request #53 from unistack-org/improvements

WIP: rewrite wrapper
This commit is contained in:
Василий Толстов 2023-01-06 23:28:03 +03:00 committed by GitHub
commit 5a3db147bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 639 additions and 117 deletions

View File

@ -3,25 +3,12 @@ package wrapper
import ( import (
"database/sql/driver" "database/sql/driver"
"errors" "errors"
"fmt"
) )
// ErrUnsupported is an error returned when the underlying driver doesn't provide a given function. // ErrUnsupported is an error returned when the underlying driver doesn't provide a given function.
var ErrUnsupported = errors.New("operation unsupported by the underlying driver") var ErrUnsupported = errors.New("operation unsupported by the underlying driver")
/*
// newSpan creates a new opentracing.Span instance from the given context.
func (t *tracer) newSpan(ctx context.Context) opentracing.Span {
name := t.nameFunc(ctx)
var opts []opentracing.StartSpanOption
parent := opentracing.SpanFromContext(ctx)
if parent != nil {
opts = append(opts, opentracing.ChildOf(parent.Context()))
}
span := t.t.StartSpan(name, opts...)
return span
}
*/
// namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in // namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in
// database/sql ctxutil.go. // database/sql ctxutil.go.
func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
@ -34,3 +21,19 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
} }
return dargs, nil return dargs, nil
} }
// namedValueToLabels convert driver arguments to interface{} slice
func namedValueToLabels(named []driver.NamedValue) []interface{} {
largs := make([]interface{}, len(named)*2)
var name string
for _, param := range named {
if param.Name != "" {
name = param.Name
} else {
name = fmt.Sprintf("$%d", param.Ordinal)
}
largs = append(largs, name, param.Value)
}
return largs
}

330
conn.go
View File

@ -3,6 +3,8 @@ package wrapper
import ( import (
"context" "context"
"database/sql/driver" "database/sql/driver"
"fmt"
"time"
) )
// wrapperConn defines a wrapper for driver.Conn // wrapperConn defines a wrapper for driver.Conn
@ -13,56 +15,212 @@ type wrapperConn struct {
// Prepare implements driver.Conn Prepare // Prepare implements driver.Conn Prepare
func (w *wrapperConn) Prepare(query string) (driver.Stmt, error) { func (w *wrapperConn) Prepare(query string) (driver.Stmt, error) {
labels := []string{labelMethod, "Prepare", labelQuery, labelUnknown}
ts := time.Now()
stmt, err := w.conn.Prepare(query) stmt, err := w.conn.Prepare(query)
td := time.Since(ts)
te := td.Seconds()
if err != nil { if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Prepare", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return &wrapperStmt{stmt: stmt, opts: w.opts}, nil return &wrapperStmt{stmt: stmt, opts: w.opts}, nil
} }
// Close implements driver.Conn Close // Close implements driver.Conn Close
func (w *wrapperConn) Close() error { func (w *wrapperConn) Close() error {
return w.conn.Close() labels := []string{labelMethod, "Close"}
ts := time.Now()
err := w.conn.Close()
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return err
} }
// Begin implements driver.Conn Begin // Begin implements driver.Conn Begin
func (w *wrapperConn) Begin() (driver.Tx, error) { func (w *wrapperConn) Begin() (driver.Tx, error) {
labels := []string{labelMethod, "Begin"}
ts := time.Now()
// nolint:staticcheck
tx, err := w.conn.Begin() tx, err := w.conn.Begin()
td := time.Since(ts)
te := td.Seconds()
if err != nil { if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Begin", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Begin", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return &wrapperTx{tx: tx, opts: w.opts}, nil return &wrapperTx{tx: tx, opts: w.opts}, nil
} }
// BeginTx implements driver.ConnBeginTx BeginTx // BeginTx implements driver.ConnBeginTx BeginTx
func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
nctx, span := w.opts.Tracer.Start(ctx, "BeginTx") nctx, span := w.opts.Tracer.Start(ctx, "BeginTx")
span.AddLabels("method", "BeginTx")
name := getQueryName(ctx)
if name != "" {
span.AddLabels("query", name)
} else {
name = labelUnknown
}
labels := []string{labelMethod, "BeginTx", labelQuery, name}
if connBeginTx, ok := w.conn.(driver.ConnBeginTx); ok { if connBeginTx, ok := w.conn.(driver.ConnBeginTx); ok {
ts := time.Now()
tx, err := connBeginTx.BeginTx(nctx, opts) tx, err := connBeginTx.BeginTx(nctx, opts)
td := time.Since(ts)
te := td.Seconds()
if err != nil { if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "BeginTx", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "BeginTx", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return &wrapperTx{tx: tx, opts: w.opts, span: span}, nil return &wrapperTx{tx: tx, opts: w.opts, span: span}, nil
} }
return w.conn.Begin() ts := time.Now()
// nolint:staticcheck
tx, err := w.conn.Begin()
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "BeginTx", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return tx, nil
} }
// PrepareContext implements driver.ConnPrepareContext PrepareContext // PrepareContext implements driver.ConnPrepareContext PrepareContext
func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
if connPrepareContext, ok := w.conn.(driver.ConnPrepareContext); ok { nctx, span := w.opts.Tracer.Start(ctx, "PrepareContext")
stmt, err := connPrepareContext.PrepareContext(ctx, query) span.AddLabels("method", "PrepareContext")
name := getQueryName(ctx)
if name != "" {
span.AddLabels("query", name)
} else {
name = labelUnknown
}
labels := []string{labelMethod, "PrepareContext", labelQuery, name}
if conn, ok := w.conn.(driver.ConnPrepareContext); ok {
ts := time.Now()
stmt, err := conn.PrepareContext(nctx, query)
td := time.Since(ts)
te := td.Seconds()
if err != nil { if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "PrepareContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "PrepareContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return &wrapperStmt{stmt: stmt, opts: w.opts}, nil return &wrapperStmt{stmt: stmt, opts: w.opts}, nil
} }
return w.conn.Prepare(query) ts := time.Now()
stmt, err := w.conn.Prepare(query)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "PrepareContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return stmt, nil
} }
// Exec implements driver.Execer Exec // Exec implements driver.Execer Exec
func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, error) { func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, error) {
// nolint:staticcheck
labels := []string{labelMethod, "Exec", labelQuery, labelUnknown}
if execer, ok := w.conn.(driver.Execer); ok { if execer, ok := w.conn.(driver.Execer); ok {
return execer.Exec(query, args) ts := time.Now()
res, err := execer.Exec(query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Exec", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return res, err
}
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Exec", labelUnknown, 0, ErrUnsupported)...).Log(context.TODO(), w.opts.LoggerLevel)
} }
return nil, ErrUnsupported return nil, ErrUnsupported
} }
@ -70,47 +228,183 @@ func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, er
// Exec implements driver.StmtExecContext ExecContext // Exec implements driver.StmtExecContext ExecContext
func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") nctx, span := w.opts.Tracer.Start(ctx, "ExecContext")
span.AddLabels("method", "ExecContext")
name := getQueryName(ctx)
if name != "" {
span.AddLabels("query", name)
} else {
name = labelUnknown
}
defer span.Finish() defer span.Finish()
if execerContext, ok := w.conn.(driver.ExecerContext); ok { if len(args) > 0 {
r, err := execerContext.ExecContext(nctx, query, args) span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
return r, err }
labels := []string{labelMethod, "ExecContext", labelQuery, name}
if conn, ok := w.conn.(driver.ExecerContext); ok {
ts := time.Now()
res, err := conn.ExecContext(nctx, query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return res, err
} }
values, err := namedValueToValue(args) values, err := namedValueToValue(args)
if err != nil { if err != nil {
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", labelUnknown, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
return w.Exec(query, values) ts := time.Now()
// nolint:staticcheck
res, err := w.Exec(query, values)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return res, err
} }
// Ping implements driver.Pinger Ping // Ping implements driver.Pinger Ping
func (w *wrapperConn) Ping(ctx context.Context) error { func (w *wrapperConn) Ping(ctx context.Context) error {
if pinger, ok := w.conn.(driver.Pinger); ok { if conn, ok := w.conn.(driver.Pinger); ok {
nctx, span := w.opts.Tracer.Start(ctx, "Ping") nctx, span := w.opts.Tracer.Start(ctx, "Ping")
defer span.Finish() defer span.Finish()
return pinger.Ping(nctx) labels := []string{labelMethod, "Ping"}
ts := time.Now()
err := conn.Ping(nctx)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Ping", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return err
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
}
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Ping", labelUnknown, 0, ErrUnsupported)...).Log(context.TODO(), w.opts.LoggerLevel)
} }
return ErrUnsupported return ErrUnsupported
} }
// Query implements driver.Queryer Query // Query implements driver.Queryer Query
func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, error) { func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, error) {
if queryer, ok := w.conn.(driver.Queryer); ok { // nolint:staticcheck
return queryer.Query(query, args) if conn, ok := w.conn.(driver.Queryer); ok {
labels := []string{labelMethod, "Query", labelQuery, labelUnknown}
ts := time.Now()
rows, err := conn.Query(query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Query", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return rows, err
}
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Query", labelUnknown, 0, ErrUnsupported)...).Log(context.TODO(), w.opts.LoggerLevel)
} }
return nil, ErrUnsupported return nil, ErrUnsupported
} }
// QueryContext implements Driver.QueryerContext QueryContext // QueryContext implements Driver.QueryerContext QueryContext
func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") nctx, span := w.opts.Tracer.Start(ctx, "QueryContext")
span.AddLabels("method", "QueryContext")
name := getQueryName(ctx)
if name != "" {
span.AddLabels("query", name)
} else {
name = labelUnknown
}
defer span.Finish() defer span.Finish()
if queryerContext, ok := w.conn.(driver.QueryerContext); ok { if len(args) > 0 {
rows, err := queryerContext.QueryContext(nctx, query, args) span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
labels := []string{labelMethod, "QueryContext", labelQuery, name}
if conn, ok := w.conn.(driver.QueryerContext); ok {
ts := time.Now()
rows, err := conn.QueryContext(nctx, query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return rows, err return rows, err
} }
values, err := namedValueToValue(args) values, err := namedValueToValue(args)
if err != nil { if err != nil {
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
return w.Query(query, values) ts := time.Now()
// nolint:staticcheck
rows, err := w.Query(query, values)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return rows, err
} }

2
doc.go
View File

@ -1,2 +1,2 @@
// package wrapper provides SQL driver wrapper with micro tracing, logging, metering capabilities // package wrapper provides SQL driver wrapper with micro tracing, logging, metering capabilities
package wrapper package wrapper // import go.unistack.org/micro-wrapper-sql/v3

View File

@ -1,7 +1,9 @@
package wrapper package wrapper
import ( import (
"context"
"database/sql/driver" "database/sql/driver"
"time"
) )
// wrapperDriver defines a wrapper for driver.Driver // wrapperDriver defines a wrapper for driver.Driver
@ -17,9 +19,17 @@ func NewWrapper(d driver.Driver, opts ...Option) driver.Driver {
// Open implements driver.Driver Open // Open implements driver.Driver Open
func (w *wrapperDriver) Open(name string) (driver.Conn, error) { func (w *wrapperDriver) Open(name string) (driver.Conn, error) {
ts := time.Now()
c, err := w.driver.Open(name) c, err := w.driver.Open(name)
td := time.Since(ts)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Open", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &wrapperConn{conn: c, opts: w.opts}, nil return &wrapperConn{conn: c, opts: w.opts}, nil
} }

3
go.mod
View File

@ -2,4 +2,5 @@ module go.unistack.org/micro-wrapper-sql/v3
go 1.16 go 1.16
require go.unistack.org/micro/v3 v3.9.18 require go.unistack.org/micro/v3 v3.9.19

4
go.sum
View File

@ -74,8 +74,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.unistack.org/micro-proto/v3 v3.3.1 h1:nQ0MtWvP2G3QrpOgawVOPhpZZYkq6umTGDqs8FxJYIo= go.unistack.org/micro-proto/v3 v3.3.1 h1:nQ0MtWvP2G3QrpOgawVOPhpZZYkq6umTGDqs8FxJYIo=
go.unistack.org/micro-proto/v3 v3.3.1/go.mod h1:cwRyv8uInM2I7EbU7O8Fx2Ls3N90Uw9UCCcq4olOdfE= go.unistack.org/micro-proto/v3 v3.3.1/go.mod h1:cwRyv8uInM2I7EbU7O8Fx2Ls3N90Uw9UCCcq4olOdfE=
go.unistack.org/micro/v3 v3.9.18 h1:FNsCCJJKyyP9gScc1K3HxGxFjub+c4ZQpL+QwVvIMro= go.unistack.org/micro/v3 v3.9.19 h1:HFeHr20hnrs60EnlKWwrvQLTXILu3qhmHqs98jlqmqk=
go.unistack.org/micro/v3 v3.9.18/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI= go.unistack.org/micro/v3 v3.9.19/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

View File

@ -2,6 +2,7 @@ package wrapper
import ( import (
"context" "context"
"fmt"
"time" "time"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
@ -14,8 +15,17 @@ var (
DefaultMeterStatsInterval = 5 * time.Second DefaultMeterStatsInterval = 5 * time.Second
// DefaultMeterMetricPrefix holds default metric prefix // DefaultMeterMetricPrefix holds default metric prefix
DefaultMeterMetricPrefix = "micro_sql_" DefaultMeterMetricPrefix = "micro_sql_"
// DefaultMeterLabelPrefix holds default label prefix // DefaultLoggerObserver used to prepare labels for logger
DefaultMeterLabelPrefix = "micro_" DefaultLoggerObserver = func(ctx context.Context, method string, query string, td time.Duration, err error) []interface{} {
labels := []interface{}{"method", method, "took", fmt.Sprintf("%v", td)}
if err != nil {
labels = append(labels, "error", err.Error())
}
if query != labelUnknown {
labels = append(labels, "query", query)
}
return labels
}
) )
var ( var (
@ -25,13 +35,18 @@ var (
IdleConnections = "idle_connections" IdleConnections = "idle_connections"
WaitConnections = "wait_connections" WaitConnections = "wait_connections"
BlockedSeconds = "blocked_seconds" BlockedSeconds = "blocked_seconds"
MaxIdleClosed = "max_idletime_closed" MaxIdleClosed = "max_idle_closed"
MaxIdletimeClosed = "max_idletime_closed"
MaxLifetimeClosed = "max_lifetime_closed" MaxLifetimeClosed = "max_lifetime_closed"
// RequestTotal = "request_total" meterRequestTotal = "request_total"
// RequestLatencyMicroseconds = "request_latency_microseconds" meterRequestLatencyMicroseconds = "request_latency_microseconds"
// RequestDurationSeconds = "request_duration_seconds" meterRequestDurationSeconds = "request_duration_seconds"
labelUnknown = "unknown"
labelQuery = "query"
labelMethod = "method"
labelStatus = "status"
labelSuccess = "success" labelSuccess = "success"
labelFailure = "failure" labelFailure = "failure"
labelHost = "db_host" labelHost = "db_host"
@ -45,13 +60,11 @@ type Options struct {
Tracer tracer.Tracer Tracer tracer.Tracer
DatabaseHost string DatabaseHost string
DatabaseName string DatabaseName string
ServiceName string
ServiceVersion string
ServiceID string
MeterLabelPrefix string
MeterMetricPrefix string MeterMetricPrefix string
MeterStatsInterval time.Duration MeterStatsInterval time.Duration
LoggerLevel logger.Level LoggerLevel logger.Level
LoggerEnabled bool
LoggerObserver func(ctx context.Context, method string, name string, td time.Duration, err error) []interface{}
} }
// Option func signature // Option func signature
@ -65,12 +78,23 @@ func NewOptions(opts ...Option) Options {
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
MeterStatsInterval: DefaultMeterStatsInterval, MeterStatsInterval: DefaultMeterStatsInterval,
MeterMetricPrefix: DefaultMeterMetricPrefix, MeterMetricPrefix: DefaultMeterMetricPrefix,
MeterLabelPrefix: DefaultMeterLabelPrefix,
LoggerLevel: logger.ErrorLevel, LoggerLevel: logger.ErrorLevel,
LoggerObserver: DefaultLoggerObserver,
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
options.Meter = options.Meter.Clone(
meter.MetricPrefix(options.MeterMetricPrefix),
meter.Labels(
labelHost, options.DatabaseHost,
labelDatabase, options.DatabaseName,
),
)
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
return options return options
} }
@ -81,13 +105,6 @@ func MetricInterval(td time.Duration) Option {
} }
} }
// LabelPrefix specifies prefix for each label
func LabelPrefix(pref string) Option {
return func(o *Options) {
o.MeterLabelPrefix = pref
}
}
// MetricPrefix specifies prefix for each metric // MetricPrefix specifies prefix for each metric
func MetricPrefix(pref string) Option { func MetricPrefix(pref string) Option {
return func(o *Options) { return func(o *Options) {
@ -121,6 +138,27 @@ func Logger(l logger.Logger) Option {
} }
} }
// LoggerEnabled enable sql logging
func LoggerEnabled(b bool) Option {
return func(o *Options) {
o.LoggerEnabled = b
}
}
// LoggerLevel passes logger.Level option
func LoggerLevel(lvl logger.Level) Option {
return func(o *Options) {
o.LoggerLevel = lvl
}
}
// LoggerObserver passes observer to fill logger fields
func LoggerObserver(obs func(context.Context, string, string, time.Duration, error) []interface{}) Option {
return func(o *Options) {
o.LoggerObserver = obs
}
}
// Tracer passes tracer.Tracer to wrapper // Tracer passes tracer.Tracer to wrapper
func Tracer(t tracer.Tracer) Option { func Tracer(t tracer.Tracer) Option {
return func(o *Options) { return func(o *Options) {
@ -137,3 +175,10 @@ func QueryName(ctx context.Context, name string) context.Context {
} }
return context.WithValue(ctx, queryNameKey{}, name) return context.WithValue(ctx, queryNameKey{}, name)
} }
func getQueryName(ctx context.Context) string {
if v, ok := ctx.Value(queryNameKey{}).(string); ok {
return v
}
return ""
}

41
stats.go Normal file
View File

@ -0,0 +1,41 @@
package wrapper // import "go.unistack.org/micro-wrapper-sql/v3"
import (
"context"
"database/sql"
"time"
)
type Statser interface {
Stats() sql.DBStats
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
options := NewOptions(opts...)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
}
}()
}

181
stmt.go
View File

@ -3,6 +3,8 @@ package wrapper
import ( import (
"context" "context"
"database/sql/driver" "database/sql/driver"
"fmt"
"time"
) )
// wrapperStmt defines a wrapper for driver.Stmt // wrapperStmt defines a wrapper for driver.Stmt
@ -13,7 +15,23 @@ type wrapperStmt struct {
// Close implements driver.Stmt Close // Close implements driver.Stmt Close
func (w *wrapperStmt) Close() error { func (w *wrapperStmt) Close() error {
return w.stmt.Close() labels := []string{labelMethod, "Close"}
ts := time.Now()
err := w.stmt.Close()
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Close", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return err
} }
// NumInput implements driver.Stmt NumInput // NumInput implements driver.Stmt NumInput
@ -23,39 +41,182 @@ func (w *wrapperStmt) NumInput() int {
// Exec implements driver.Stmt Exec // Exec implements driver.Stmt Exec
func (w *wrapperStmt) Exec(args []driver.Value) (driver.Result, error) { func (w *wrapperStmt) Exec(args []driver.Value) (driver.Result, error) {
return w.stmt.Exec(args) labels := []string{labelMethod, "Exec"}
ts := time.Now()
// nolint:staticcheck
res, err := w.stmt.Exec(args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Exec", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return res, err
} }
// Query implements driver.Stmt Query // Query implements driver.Stmt Query
func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) { func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) {
return w.stmt.Query(args) labels := []string{labelMethod, "Query"}
ts := time.Now()
// nolint:staticcheck
rows, err := w.stmt.Query(args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Query", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return rows, err
} }
// ExecContext implements driver.ExecerContext ExecContext // ExecContext implements driver.ExecerContext ExecContext
func (w *wrapperStmt) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { func (w *wrapperStmt) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") nctx, span := w.opts.Tracer.Start(ctx, "ExecContext")
span.AddLabels("method", "ExecContext")
name := getQueryName(ctx)
if name != "" {
span.AddLabels("query", name)
} else {
name = labelUnknown
}
defer span.Finish() defer span.Finish()
if execerContext, ok := w.stmt.(driver.ExecerContext); ok { if len(args) > 0 {
return execerContext.ExecContext(nctx, query, args) span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
labels := []string{labelMethod, "ExecContext", labelQuery, name}
if conn, ok := w.stmt.(driver.ExecerContext); ok {
ts := time.Now()
res, err := conn.ExecContext(nctx, query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return res, err
} }
values, err := namedValueToValue(args) values, err := namedValueToValue(args)
if err != nil { if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
return w.Exec(values) ts := time.Now()
// nolint:staticcheck
res, err := w.Exec(values)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)).Log(context.TODO(), w.opts.LoggerLevel)
}
return res, err
} }
// QueryContext implements Driver.QueryerContext QueryContext // QueryContext implements Driver.QueryerContext QueryContext
func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") nctx, span := w.opts.Tracer.Start(ctx, "QueryContext")
span.AddLabels("method", "QueryContext")
name := getQueryName(ctx)
if name != "" {
span.AddLabels("query", name)
} else {
name = labelUnknown
}
defer span.Finish() defer span.Finish()
if queryerContext, ok := w.stmt.(driver.QueryerContext); ok { if len(args) > 0 {
rows, err := queryerContext.QueryContext(nctx, query, args) span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
labels := []string{labelMethod, "QueryContext", labelQuery, name}
if conn, ok := w.stmt.(driver.QueryerContext); ok {
ts := time.Now()
rows, err := conn.QueryContext(nctx, query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)).Log(context.TODO(), w.opts.LoggerLevel)
}
return rows, err return rows, err
} }
values, err := namedValueToValue(args) values, err := namedValueToValue(args)
if err != nil { if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return nil, err return nil, err
} }
return w.Query(values) ts := time.Now()
// nolint:staticcheck
rows, err := w.Query(values)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.AddLabels("error", true)
span.AddLabels("err", err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return rows, err
} }

22
tx.go
View File

@ -1,7 +1,9 @@
package wrapper package wrapper
import ( import (
"context"
"database/sql/driver" "database/sql/driver"
"time"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/tracer"
) )
@ -18,7 +20,15 @@ func (w *wrapperTx) Commit() error {
if w.span != nil { if w.span != nil {
defer w.span.Finish() defer w.span.Finish()
} }
return w.tx.Commit() ts := time.Now()
err := w.tx.Commit()
td := time.Since(ts)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Commit", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return err
} }
// Rollback implements driver.Tx Rollback // Rollback implements driver.Tx Rollback
@ -26,5 +36,13 @@ func (w *wrapperTx) Rollback() error {
if w.span != nil { if w.span != nil {
defer w.span.Finish() defer w.span.Finish()
} }
return w.tx.Rollback() ts := time.Now()
err := w.tx.Rollback()
td := time.Since(ts)
if w.opts.LoggerEnabled {
w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Rollback", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel)
}
return err
} }

View File

@ -1,51 +0,0 @@
package wrapper // import "go.unistack.org/micro-wrapper-sql/v3"
import (
"context"
"database/sql"
"time"
"go.unistack.org/micro/v3/meter"
)
type Statser interface {
Stats() sql.DBStats
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
options := NewOptions(opts...)
m := options.Meter.Clone(
meter.LabelPrefix(options.MeterLabelPrefix),
meter.MetricPrefix(options.MeterMetricPrefix),
meter.Labels(
labelHost, options.DatabaseHost,
labelDatabase, options.DatabaseName,
),
)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
stats := db.Stats()
m.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
m.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
m.Counter(InuseConnections).Set(uint64(stats.InUse))
m.Counter(IdleConnections).Set(uint64(stats.Idle))
m.Counter(WaitConnections).Set(uint64(stats.WaitCount))
m.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
m.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
m.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
}
}()
}