diff --git a/common.go b/common.go index cfc80d7..3b59ff8 100644 --- a/common.go +++ b/common.go @@ -3,25 +3,12 @@ package wrapper import ( "database/sql/driver" "errors" + "fmt" ) // ErrUnsupported is an error returned when the underlying driver doesn't provide a given function. var ErrUnsupported = errors.New("operation unsupported by the underlying driver") -/* -// newSpan creates a new opentracing.Span instance from the given context. -func (t *tracer) newSpan(ctx context.Context) opentracing.Span { - name := t.nameFunc(ctx) - var opts []opentracing.StartSpanOption - parent := opentracing.SpanFromContext(ctx) - if parent != nil { - opts = append(opts, opentracing.ChildOf(parent.Context())) - } - span := t.t.StartSpan(name, opts...) - return span -} -*/ - // namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in // database/sql ctxutil.go. func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { @@ -34,3 +21,19 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { } return dargs, nil } + +// namedValueToLabels convert driver arguments to interface{} slice +func namedValueToLabels(named []driver.NamedValue) []interface{} { + largs := make([]interface{}, len(named)*2) + var name string + for _, param := range named { + if param.Name != "" { + name = param.Name + } else { + name = fmt.Sprintf("$%d", param.Ordinal) + } + + largs = append(largs, name, param.Value) + } + return largs +} diff --git a/conn.go b/conn.go index ac1614d..f9612db 100644 --- a/conn.go +++ b/conn.go @@ -3,6 +3,8 @@ package wrapper import ( "context" "database/sql/driver" + "fmt" + "time" ) // wrapperConn defines a wrapper for driver.Conn @@ -13,56 +15,212 @@ type wrapperConn struct { // Prepare implements driver.Conn Prepare func (w *wrapperConn) Prepare(query string) (driver.Stmt, error) { + labels := []string{labelMethod, "Prepare", labelQuery, labelUnknown} + ts := time.Now() stmt, err := w.conn.Prepare(query) + td := time.Since(ts) + te := td.Seconds() if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Prepare", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return &wrapperStmt{stmt: stmt, opts: w.opts}, nil } // Close implements driver.Conn Close func (w *wrapperConn) Close() error { - return w.conn.Close() + labels := []string{labelMethod, "Close"} + ts := time.Now() + err := w.conn.Close() + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + + return err } // Begin implements driver.Conn Begin func (w *wrapperConn) Begin() (driver.Tx, error) { + labels := []string{labelMethod, "Begin"} + ts := time.Now() + // nolint:staticcheck tx, err := w.conn.Begin() + td := time.Since(ts) + te := td.Seconds() if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Begin", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Begin", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return &wrapperTx{tx: tx, opts: w.opts}, nil } // BeginTx implements driver.ConnBeginTx BeginTx func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { nctx, span := w.opts.Tracer.Start(ctx, "BeginTx") + span.AddLabels("method", "BeginTx") + name := getQueryName(ctx) + if name != "" { + span.AddLabels("query", name) + } else { + name = labelUnknown + } + labels := []string{labelMethod, "BeginTx", labelQuery, name} if connBeginTx, ok := w.conn.(driver.ConnBeginTx); ok { + ts := time.Now() tx, err := connBeginTx.BeginTx(nctx, opts) + td := time.Since(ts) + te := td.Seconds() if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "BeginTx", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "BeginTx", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return &wrapperTx{tx: tx, opts: w.opts, span: span}, nil } - return w.conn.Begin() + ts := time.Now() + // nolint:staticcheck + tx, err := w.conn.Begin() + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "BeginTx", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return tx, nil } // PrepareContext implements driver.ConnPrepareContext PrepareContext func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { - if connPrepareContext, ok := w.conn.(driver.ConnPrepareContext); ok { - stmt, err := connPrepareContext.PrepareContext(ctx, query) + nctx, span := w.opts.Tracer.Start(ctx, "PrepareContext") + span.AddLabels("method", "PrepareContext") + name := getQueryName(ctx) + if name != "" { + span.AddLabels("query", name) + } else { + name = labelUnknown + } + labels := []string{labelMethod, "PrepareContext", labelQuery, name} + if conn, ok := w.conn.(driver.ConnPrepareContext); ok { + ts := time.Now() + stmt, err := conn.PrepareContext(nctx, query) + td := time.Since(ts) + te := td.Seconds() if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "PrepareContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "PrepareContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return &wrapperStmt{stmt: stmt, opts: w.opts}, nil } - return w.conn.Prepare(query) + ts := time.Now() + stmt, err := w.conn.Prepare(query) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "PrepareContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return stmt, nil } // Exec implements driver.Execer Exec func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, error) { + // nolint:staticcheck + labels := []string{labelMethod, "Exec", labelQuery, labelUnknown} if execer, ok := w.conn.(driver.Execer); ok { - return execer.Exec(query, args) + ts := time.Now() + res, err := execer.Exec(query, args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Exec", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return res, err + } + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Exec", labelUnknown, 0, ErrUnsupported)...).Log(context.TODO(), w.opts.LoggerLevel) } return nil, ErrUnsupported } @@ -70,47 +228,183 @@ func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, er // Exec implements driver.StmtExecContext ExecContext func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") + span.AddLabels("method", "ExecContext") + name := getQueryName(ctx) + if name != "" { + span.AddLabels("query", name) + } else { + name = labelUnknown + } defer span.Finish() - if execerContext, ok := w.conn.(driver.ExecerContext); ok { - r, err := execerContext.ExecContext(nctx, query, args) - return r, err + if len(args) > 0 { + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + } + labels := []string{labelMethod, "ExecContext", labelQuery, name} + if conn, ok := w.conn.(driver.ExecerContext); ok { + ts := time.Now() + res, err := conn.ExecContext(nctx, query, args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return res, err } values, err := namedValueToValue(args) if err != nil { + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", labelUnknown, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } - return w.Exec(query, values) + ts := time.Now() + // nolint:staticcheck + res, err := w.Exec(query, values) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return res, err } // Ping implements driver.Pinger Ping func (w *wrapperConn) Ping(ctx context.Context) error { - if pinger, ok := w.conn.(driver.Pinger); ok { + if conn, ok := w.conn.(driver.Pinger); ok { nctx, span := w.opts.Tracer.Start(ctx, "Ping") defer span.Finish() - return pinger.Ping(nctx) + labels := []string{labelMethod, "Ping"} + ts := time.Now() + err := conn.Ping(nctx) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Ping", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return err + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + } + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Ping", labelUnknown, 0, ErrUnsupported)...).Log(context.TODO(), w.opts.LoggerLevel) } return ErrUnsupported } // Query implements driver.Queryer Query func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, error) { - if queryer, ok := w.conn.(driver.Queryer); ok { - return queryer.Query(query, args) + // nolint:staticcheck + if conn, ok := w.conn.(driver.Queryer); ok { + labels := []string{labelMethod, "Query", labelQuery, labelUnknown} + ts := time.Now() + rows, err := conn.Query(query, args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Query", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return rows, err + } + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Query", labelUnknown, 0, ErrUnsupported)...).Log(context.TODO(), w.opts.LoggerLevel) } return nil, ErrUnsupported } // QueryContext implements Driver.QueryerContext QueryContext -func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { +func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") + span.AddLabels("method", "QueryContext") + name := getQueryName(ctx) + if name != "" { + span.AddLabels("query", name) + } else { + name = labelUnknown + } defer span.Finish() - if queryerContext, ok := w.conn.(driver.QueryerContext); ok { - rows, err := queryerContext.QueryContext(nctx, query, args) + if len(args) > 0 { + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + } + labels := []string{labelMethod, "QueryContext", labelQuery, name} + if conn, ok := w.conn.(driver.QueryerContext); ok { + ts := time.Now() + rows, err := conn.QueryContext(nctx, query, args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return rows, err } values, err := namedValueToValue(args) if err != nil { + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } - return w.Query(query, values) + ts := time.Now() + // nolint:staticcheck + rows, err := w.Query(query, values) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return rows, err } diff --git a/doc.go b/doc.go index fb88238..37dc5bd 100644 --- a/doc.go +++ b/doc.go @@ -1,2 +1,2 @@ // package wrapper provides SQL driver wrapper with micro tracing, logging, metering capabilities -package wrapper +package wrapper // import go.unistack.org/micro-wrapper-sql/v3 diff --git a/driver.go b/driver.go index c242f37..eb44e14 100644 --- a/driver.go +++ b/driver.go @@ -1,7 +1,9 @@ package wrapper import ( + "context" "database/sql/driver" + "time" ) // wrapperDriver defines a wrapper for driver.Driver @@ -17,9 +19,17 @@ func NewWrapper(d driver.Driver, opts ...Option) driver.Driver { // Open implements driver.Driver Open func (w *wrapperDriver) Open(name string) (driver.Conn, error) { + ts := time.Now() c, err := w.driver.Open(name) + td := time.Since(ts) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Open", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + if err != nil { return nil, err } + return &wrapperConn{conn: c, opts: w.opts}, nil } diff --git a/go.mod b/go.mod index eeef0a2..8f1cd42 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,5 @@ module go.unistack.org/micro-wrapper-sql/v3 go 1.16 -require go.unistack.org/micro/v3 v3.9.18 +require go.unistack.org/micro/v3 v3.9.19 + diff --git a/go.sum b/go.sum index c122bd3..c652651 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.unistack.org/micro-proto/v3 v3.3.1 h1:nQ0MtWvP2G3QrpOgawVOPhpZZYkq6umTGDqs8FxJYIo= go.unistack.org/micro-proto/v3 v3.3.1/go.mod h1:cwRyv8uInM2I7EbU7O8Fx2Ls3N90Uw9UCCcq4olOdfE= -go.unistack.org/micro/v3 v3.9.18 h1:FNsCCJJKyyP9gScc1K3HxGxFjub+c4ZQpL+QwVvIMro= -go.unistack.org/micro/v3 v3.9.18/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI= +go.unistack.org/micro/v3 v3.9.19 h1:HFeHr20hnrs60EnlKWwrvQLTXILu3qhmHqs98jlqmqk= +go.unistack.org/micro/v3 v3.9.19/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/options.go b/options.go index 468f29c..c253b3e 100644 --- a/options.go +++ b/options.go @@ -2,6 +2,7 @@ package wrapper import ( "context" + "fmt" "time" "go.unistack.org/micro/v3/logger" @@ -14,8 +15,17 @@ var ( DefaultMeterStatsInterval = 5 * time.Second // DefaultMeterMetricPrefix holds default metric prefix DefaultMeterMetricPrefix = "micro_sql_" - // DefaultMeterLabelPrefix holds default label prefix - DefaultMeterLabelPrefix = "micro_" + // DefaultLoggerObserver used to prepare labels for logger + DefaultLoggerObserver = func(ctx context.Context, method string, query string, td time.Duration, err error) []interface{} { + labels := []interface{}{"method", method, "took", fmt.Sprintf("%v", td)} + if err != nil { + labels = append(labels, "error", err.Error()) + } + if query != labelUnknown { + labels = append(labels, "query", query) + } + return labels + } ) var ( @@ -25,13 +35,18 @@ var ( IdleConnections = "idle_connections" WaitConnections = "wait_connections" BlockedSeconds = "blocked_seconds" - MaxIdleClosed = "max_idletime_closed" + MaxIdleClosed = "max_idle_closed" + MaxIdletimeClosed = "max_idletime_closed" MaxLifetimeClosed = "max_lifetime_closed" - // RequestTotal = "request_total" - // RequestLatencyMicroseconds = "request_latency_microseconds" - // RequestDurationSeconds = "request_duration_seconds" + meterRequestTotal = "request_total" + meterRequestLatencyMicroseconds = "request_latency_microseconds" + meterRequestDurationSeconds = "request_duration_seconds" + labelUnknown = "unknown" + labelQuery = "query" + labelMethod = "method" + labelStatus = "status" labelSuccess = "success" labelFailure = "failure" labelHost = "db_host" @@ -45,13 +60,11 @@ type Options struct { Tracer tracer.Tracer DatabaseHost string DatabaseName string - ServiceName string - ServiceVersion string - ServiceID string - MeterLabelPrefix string MeterMetricPrefix string MeterStatsInterval time.Duration LoggerLevel logger.Level + LoggerEnabled bool + LoggerObserver func(ctx context.Context, method string, name string, td time.Duration, err error) []interface{} } // Option func signature @@ -65,12 +78,23 @@ func NewOptions(opts ...Option) Options { Tracer: tracer.DefaultTracer, MeterStatsInterval: DefaultMeterStatsInterval, MeterMetricPrefix: DefaultMeterMetricPrefix, - MeterLabelPrefix: DefaultMeterLabelPrefix, LoggerLevel: logger.ErrorLevel, + LoggerObserver: DefaultLoggerObserver, } for _, o := range opts { o(&options) } + + options.Meter = options.Meter.Clone( + meter.MetricPrefix(options.MeterMetricPrefix), + meter.Labels( + labelHost, options.DatabaseHost, + labelDatabase, options.DatabaseName, + ), + ) + + options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1)) + return options } @@ -81,13 +105,6 @@ func MetricInterval(td time.Duration) Option { } } -// LabelPrefix specifies prefix for each label -func LabelPrefix(pref string) Option { - return func(o *Options) { - o.MeterLabelPrefix = pref - } -} - // MetricPrefix specifies prefix for each metric func MetricPrefix(pref string) Option { return func(o *Options) { @@ -121,6 +138,27 @@ func Logger(l logger.Logger) Option { } } +// LoggerEnabled enable sql logging +func LoggerEnabled(b bool) Option { + return func(o *Options) { + o.LoggerEnabled = b + } +} + +// LoggerLevel passes logger.Level option +func LoggerLevel(lvl logger.Level) Option { + return func(o *Options) { + o.LoggerLevel = lvl + } +} + +// LoggerObserver passes observer to fill logger fields +func LoggerObserver(obs func(context.Context, string, string, time.Duration, error) []interface{}) Option { + return func(o *Options) { + o.LoggerObserver = obs + } +} + // Tracer passes tracer.Tracer to wrapper func Tracer(t tracer.Tracer) Option { return func(o *Options) { @@ -137,3 +175,10 @@ func QueryName(ctx context.Context, name string) context.Context { } return context.WithValue(ctx, queryNameKey{}, name) } + +func getQueryName(ctx context.Context) string { + if v, ok := ctx.Value(queryNameKey{}).(string); ok { + return v + } + return "" +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..c184581 --- /dev/null +++ b/stats.go @@ -0,0 +1,41 @@ +package wrapper // import "go.unistack.org/micro-wrapper-sql/v3" + +import ( + "context" + "database/sql" + "time" +) + +type Statser interface { + Stats() sql.DBStats +} + +func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { + options := NewOptions(opts...) + + go func() { + ticker := time.NewTicker(options.MeterStatsInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if db == nil { + return + } + stats := db.Stats() + options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) + options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) + options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse)) + options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle)) + options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount)) + options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) + options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) + options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed)) + options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) + } + } + }() +} diff --git a/stmt.go b/stmt.go index d2f971c..f5705ee 100644 --- a/stmt.go +++ b/stmt.go @@ -3,6 +3,8 @@ package wrapper import ( "context" "database/sql/driver" + "fmt" + "time" ) // wrapperStmt defines a wrapper for driver.Stmt @@ -13,7 +15,23 @@ type wrapperStmt struct { // Close implements driver.Stmt Close func (w *wrapperStmt) Close() error { - return w.stmt.Close() + labels := []string{labelMethod, "Close"} + ts := time.Now() + err := w.stmt.Close() + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Close", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return err } // NumInput implements driver.Stmt NumInput @@ -23,39 +41,182 @@ func (w *wrapperStmt) NumInput() int { // Exec implements driver.Stmt Exec func (w *wrapperStmt) Exec(args []driver.Value) (driver.Result, error) { - return w.stmt.Exec(args) + labels := []string{labelMethod, "Exec"} + ts := time.Now() + // nolint:staticcheck + res, err := w.stmt.Exec(args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Exec", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return res, err } // Query implements driver.Stmt Query func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) { - return w.stmt.Query(args) + labels := []string{labelMethod, "Query"} + ts := time.Now() + // nolint:staticcheck + rows, err := w.stmt.Query(args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Query", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return rows, err } // ExecContext implements driver.ExecerContext ExecContext func (w *wrapperStmt) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") + span.AddLabels("method", "ExecContext") + name := getQueryName(ctx) + if name != "" { + span.AddLabels("query", name) + } else { + name = labelUnknown + } defer span.Finish() - if execerContext, ok := w.stmt.(driver.ExecerContext); ok { - return execerContext.ExecContext(nctx, query, args) + if len(args) > 0 { + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + } + labels := []string{labelMethod, "ExecContext", labelQuery, name} + if conn, ok := w.stmt.(driver.ExecerContext); ok { + ts := time.Now() + res, err := conn.ExecContext(nctx, query, args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return res, err } values, err := namedValueToValue(args) if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } - return w.Exec(values) + ts := time.Now() + // nolint:staticcheck + res, err := w.Exec(values) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "ExecContext", name, td, err)).Log(context.TODO(), w.opts.LoggerLevel) + } + return res, err } // QueryContext implements Driver.QueryerContext QueryContext -func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { +func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") + span.AddLabels("method", "QueryContext") + name := getQueryName(ctx) + if name != "" { + span.AddLabels("query", name) + } else { + name = labelUnknown + } defer span.Finish() - if queryerContext, ok := w.stmt.(driver.QueryerContext); ok { - rows, err := queryerContext.QueryContext(nctx, query, args) + if len(args) > 0 { + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + } + labels := []string{labelMethod, "QueryContext", labelQuery, name} + if conn, ok := w.stmt.(driver.QueryerContext); ok { + ts := time.Now() + rows, err := conn.QueryContext(nctx, query, args) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)).Log(context.TODO(), w.opts.LoggerLevel) + } return rows, err } values, err := namedValueToValue(args) if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, 0, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } return nil, err } - return w.Query(values) + ts := time.Now() + // nolint:staticcheck + rows, err := w.Query(values) + td := time.Since(ts) + te := td.Seconds() + if err != nil { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc() + span.AddLabels("error", true) + span.AddLabels("err", err.Error()) + } else { + w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc() + } + + w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te) + w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "QueryContext", name, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + return rows, err } diff --git a/tx.go b/tx.go index f0fbd50..7aae403 100644 --- a/tx.go +++ b/tx.go @@ -1,7 +1,9 @@ package wrapper import ( + "context" "database/sql/driver" + "time" "go.unistack.org/micro/v3/tracer" ) @@ -18,7 +20,15 @@ func (w *wrapperTx) Commit() error { if w.span != nil { defer w.span.Finish() } - return w.tx.Commit() + ts := time.Now() + err := w.tx.Commit() + td := time.Since(ts) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Commit", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + + return err } // Rollback implements driver.Tx Rollback @@ -26,5 +36,13 @@ func (w *wrapperTx) Rollback() error { if w.span != nil { defer w.span.Finish() } - return w.tx.Rollback() + ts := time.Now() + err := w.tx.Rollback() + td := time.Since(ts) + + if w.opts.LoggerEnabled { + w.opts.Logger.Fields(w.opts.LoggerObserver(context.TODO(), "Rollback", labelUnknown, td, err)...).Log(context.TODO(), w.opts.LoggerLevel) + } + + return err } diff --git a/wrapper.go b/wrapper.go deleted file mode 100644 index cc40af9..0000000 --- a/wrapper.go +++ /dev/null @@ -1,51 +0,0 @@ -package wrapper // import "go.unistack.org/micro-wrapper-sql/v3" - -import ( - "context" - "database/sql" - "time" - - "go.unistack.org/micro/v3/meter" -) - -type Statser interface { - Stats() sql.DBStats -} - -func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { - options := NewOptions(opts...) - - m := options.Meter.Clone( - meter.LabelPrefix(options.MeterLabelPrefix), - meter.MetricPrefix(options.MeterMetricPrefix), - meter.Labels( - labelHost, options.DatabaseHost, - labelDatabase, options.DatabaseName, - ), - ) - - go func() { - ticker := time.NewTicker(options.MeterStatsInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if db == nil { - return - } - stats := db.Stats() - m.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) - m.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) - m.Counter(InuseConnections).Set(uint64(stats.InUse)) - m.Counter(IdleConnections).Set(uint64(stats.Idle)) - m.Counter(WaitConnections).Set(uint64(stats.WaitCount)) - m.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) - m.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) - m.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) - } - } - }() -}