From 30b5ed125301ef78f7d8d6847a65fb574b068de3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 25 Dec 2022 15:35:05 +0300 Subject: [PATCH] rewrite wrapper Signed-off-by: Vasiliy Tolstov --- common.go | 30 ++++++++-------- conn.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++------ doc.go | 2 +- go.mod | 2 +- go.sum | 4 +-- options.go | 30 +++++++++------- stats.go | 41 ++++++++++++++++++++++ stmt.go | 58 +++++++++++++++++++++++++++---- wrapper.go | 51 --------------------------- 9 files changed, 220 insertions(+), 98 deletions(-) create mode 100644 stats.go delete mode 100644 wrapper.go diff --git a/common.go b/common.go index cfc80d7..a7c5d91 100644 --- a/common.go +++ b/common.go @@ -3,25 +3,12 @@ package wrapper import ( "database/sql/driver" "errors" + "fmt" ) // ErrUnsupported is an error returned when the underlying driver doesn't provide a given function. var ErrUnsupported = errors.New("operation unsupported by the underlying driver") -/* -// newSpan creates a new opentracing.Span instance from the given context. -func (t *tracer) newSpan(ctx context.Context) opentracing.Span { - name := t.nameFunc(ctx) - var opts []opentracing.StartSpanOption - parent := opentracing.SpanFromContext(ctx) - if parent != nil { - opts = append(opts, opentracing.ChildOf(parent.Context())) - } - span := t.t.StartSpan(name, opts...) - return span -} -*/ - // namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in // database/sql ctxutil.go. func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { @@ -34,3 +21,18 @@ func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { } return dargs, nil } + +func namedValueToLabels(named []driver.NamedValue) []interface{} { + largs := make([]interface{}, len(named)*2) + var name string + for _, param := range named { + if param.Name != "" { + name = param.Name + } else { + name = fmt.Sprintf("$%d", param.Ordinal) + } + + largs = append(largs, name, param.Value) + } + return largs +} diff --git a/conn.go b/conn.go index ac1614d..4c54a8a 100644 --- a/conn.go +++ b/conn.go @@ -3,6 +3,9 @@ package wrapper import ( "context" "database/sql/driver" + "fmt" + + "go.unistack.org/micro/v3/tracer" ) // wrapperConn defines a wrapper for driver.Conn @@ -36,27 +39,62 @@ func (w *wrapperConn) Begin() (driver.Tx, error) { // BeginTx implements driver.ConnBeginTx BeginTx func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { - nctx, span := w.opts.Tracer.Start(ctx, "BeginTx") + var nctx context.Context + var span tracer.Span + + name := getQueryName(ctx) + if name != "" { + nctx, span = w.opts.Tracer.Start(ctx, "BeginTx "+name) + } else { + nctx, span = w.opts.Tracer.Start(ctx, "BeginTx") + } + if name == "" { + name = "unknown" + } + span.AddLabels("query", name) if connBeginTx, ok := w.conn.(driver.ConnBeginTx); ok { tx, err := connBeginTx.BeginTx(nctx, opts) if err != nil { + span.AddLabels("error", true) return nil, err } return &wrapperTx{tx: tx, opts: w.opts, span: span}, nil } - return w.conn.Begin() + tx, err := w.conn.Begin() + if err != nil { + span.AddLabels("error", true) + } + return tx, err } // PrepareContext implements driver.ConnPrepareContext PrepareContext func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { + var nctx context.Context + var span tracer.Span + + name := getQueryName(ctx) + if name != "" { + nctx, span = w.opts.Tracer.Start(ctx, "BeginTx "+name) + } else { + nctx, span = w.opts.Tracer.Start(ctx, "BeginTx") + } + if name == "" { + name = "unknown" + } + span.AddLabels("query", name) if connPrepareContext, ok := w.conn.(driver.ConnPrepareContext); ok { - stmt, err := connPrepareContext.PrepareContext(ctx, query) + stmt, err := connPrepareContext.PrepareContext(nctx, query) if err != nil { + span.AddLabels("error", true) return nil, err } return &wrapperStmt{stmt: stmt, opts: w.opts}, nil } - return w.conn.Prepare(query) + stmt, err := w.conn.Prepare(query) + if err != nil { + span.AddLabels("error", true) + } + return stmt, err } // Exec implements driver.Execer Exec @@ -69,17 +107,37 @@ func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, er // Exec implements driver.StmtExecContext ExecContext func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { - nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") + var nctx context.Context + var span tracer.Span + name := getQueryName(ctx) + if name != "" { + nctx, span = w.opts.Tracer.Start(ctx, "ExecContext "+name) + } else { + nctx, span = w.opts.Tracer.Start(ctx, "ExecContext") + } defer span.Finish() + if name == "" { + name = "unknown" + } + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + span.AddLabels("query", name) if execerContext, ok := w.conn.(driver.ExecerContext); ok { - r, err := execerContext.ExecContext(nctx, query, args) - return r, err + res, err := execerContext.ExecContext(nctx, query, args) + if err != nil { + span.AddLabels("error", true) + } + return res, err } values, err := namedValueToValue(args) if err != nil { + span.AddLabels("error", true) return nil, err } - return w.Exec(query, values) + res, err := w.Exec(query, values) + if err != nil { + span.AddLabels("error", true) + } + return res, err } // Ping implements driver.Pinger Ping @@ -101,16 +159,36 @@ func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, err } // QueryContext implements Driver.QueryerContext QueryContext -func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { - nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") +func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { + var nctx context.Context + var span tracer.Span + name := getQueryName(ctx) + if name != "" { + nctx, span = w.opts.Tracer.Start(ctx, "QueryContext "+name) + } else { + nctx, span = w.opts.Tracer.Start(ctx, "QueryContext") + } defer span.Finish() + if name == "" { + name = "unknown" + } + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + span.AddLabels("query", name) if queryerContext, ok := w.conn.(driver.QueryerContext); ok { rows, err := queryerContext.QueryContext(nctx, query, args) + if err != nil { + span.AddLabels("error", true) + } return rows, err } values, err := namedValueToValue(args) if err != nil { + span.AddLabels("error", true) return nil, err } - return w.Query(query, values) + rows, err := w.Query(query, values) + if err != nil { + span.AddLabels("error", true) + } + return rows, err } diff --git a/doc.go b/doc.go index fb88238..37dc5bd 100644 --- a/doc.go +++ b/doc.go @@ -1,2 +1,2 @@ // package wrapper provides SQL driver wrapper with micro tracing, logging, metering capabilities -package wrapper +package wrapper // import go.unistack.org/micro-wrapper-sql/v3 diff --git a/go.mod b/go.mod index bc0fb22..e4c4e9b 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module go.unistack.org/micro-wrapper-sql/v3 go 1.16 -require go.unistack.org/micro/v3 v3.9.15 +require go.unistack.org/micro/v3 v3.9.17 diff --git a/go.sum b/go.sum index 68036f9..206e177 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.unistack.org/micro-proto/v3 v3.3.1 h1:nQ0MtWvP2G3QrpOgawVOPhpZZYkq6umTGDqs8FxJYIo= go.unistack.org/micro-proto/v3 v3.3.1/go.mod h1:cwRyv8uInM2I7EbU7O8Fx2Ls3N90Uw9UCCcq4olOdfE= -go.unistack.org/micro/v3 v3.9.15 h1:Mv1/0jsySwIvm9+IvXTYj/X4bJCNChbwtf2ZV2ZDbk8= -go.unistack.org/micro/v3 v3.9.15/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI= +go.unistack.org/micro/v3 v3.9.17 h1:EJ9/XR9OTo/up/3aqWjaKS2YsWMA66b0dx+pc/0vIl8= +go.unistack.org/micro/v3 v3.9.17/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/options.go b/options.go index 468f29c..014f682 100644 --- a/options.go +++ b/options.go @@ -14,8 +14,6 @@ var ( DefaultMeterStatsInterval = 5 * time.Second // DefaultMeterMetricPrefix holds default metric prefix DefaultMeterMetricPrefix = "micro_sql_" - // DefaultMeterLabelPrefix holds default label prefix - DefaultMeterLabelPrefix = "micro_" ) var ( @@ -25,7 +23,8 @@ var ( IdleConnections = "idle_connections" WaitConnections = "wait_connections" BlockedSeconds = "blocked_seconds" - MaxIdleClosed = "max_idletime_closed" + MaxIdleClosed = "max_idle_closed" + MaxIdletimeClosed = "max_idletime_closed" MaxLifetimeClosed = "max_lifetime_closed" // RequestTotal = "request_total" @@ -48,7 +47,6 @@ type Options struct { ServiceName string ServiceVersion string ServiceID string - MeterLabelPrefix string MeterMetricPrefix string MeterStatsInterval time.Duration LoggerLevel logger.Level @@ -65,12 +63,20 @@ func NewOptions(opts ...Option) Options { Tracer: tracer.DefaultTracer, MeterStatsInterval: DefaultMeterStatsInterval, MeterMetricPrefix: DefaultMeterMetricPrefix, - MeterLabelPrefix: DefaultMeterLabelPrefix, LoggerLevel: logger.ErrorLevel, } for _, o := range opts { o(&options) } + + options.Meter = options.Meter.Clone( + meter.MetricPrefix(options.MeterMetricPrefix), + meter.Labels( + labelHost, options.DatabaseHost, + labelDatabase, options.DatabaseName, + ), + ) + return options } @@ -81,13 +87,6 @@ func MetricInterval(td time.Duration) Option { } } -// LabelPrefix specifies prefix for each label -func LabelPrefix(pref string) Option { - return func(o *Options) { - o.MeterLabelPrefix = pref - } -} - // MetricPrefix specifies prefix for each metric func MetricPrefix(pref string) Option { return func(o *Options) { @@ -137,3 +136,10 @@ func QueryName(ctx context.Context, name string) context.Context { } return context.WithValue(ctx, queryNameKey{}, name) } + +func getQueryName(ctx context.Context) string { + if v, ok := ctx.Value(queryNameKey{}).(string); ok { + return v + } + return "" +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..c184581 --- /dev/null +++ b/stats.go @@ -0,0 +1,41 @@ +package wrapper // import "go.unistack.org/micro-wrapper-sql/v3" + +import ( + "context" + "database/sql" + "time" +) + +type Statser interface { + Stats() sql.DBStats +} + +func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { + options := NewOptions(opts...) + + go func() { + ticker := time.NewTicker(options.MeterStatsInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if db == nil { + return + } + stats := db.Stats() + options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) + options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) + options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse)) + options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle)) + options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount)) + options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) + options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) + options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed)) + options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) + } + } + }() +} diff --git a/stmt.go b/stmt.go index d2f971c..cd7a290 100644 --- a/stmt.go +++ b/stmt.go @@ -3,6 +3,9 @@ package wrapper import ( "context" "database/sql/driver" + "fmt" + + "go.unistack.org/micro/v3/tracer" ) // wrapperStmt defines a wrapper for driver.Stmt @@ -33,29 +36,72 @@ func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) { // ExecContext implements driver.ExecerContext ExecContext func (w *wrapperStmt) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { - nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") + var nctx context.Context + var span tracer.Span + name := getQueryName(ctx) + if name != "" { + nctx, span = w.opts.Tracer.Start(ctx, "ExecContext "+name) + } else { + nctx, span = w.opts.Tracer.Start(ctx, "ExecContext") + } defer span.Finish() + if name == "" { + name = "unknown" + } + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + span.AddLabels("query", name) if execerContext, ok := w.stmt.(driver.ExecerContext); ok { - return execerContext.ExecContext(nctx, query, args) + res, err := execerContext.ExecContext(nctx, query, args) + if err != nil { + span.AddLabels("error", true) + } + return res, err } values, err := namedValueToValue(args) if err != nil { + span.AddLabels("error", true) return nil, err } - return w.Exec(values) + res, err := w.Exec(values) + if err != nil { + span.AddLabels("error", true) + } + return res, err } // QueryContext implements Driver.QueryerContext QueryContext -func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { - nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") +func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { + var nctx context.Context + var span tracer.Span + name := getQueryName(ctx) + if name != "" { + nctx, span = w.opts.Tracer.Start(ctx, "QueryContext "+name) + } else { + nctx, span = w.opts.Tracer.Start(ctx, "QueryContext") + } defer span.Finish() + if name == "" { + name = "unknown" + } + span.AddLabels("args", fmt.Sprintf("%v", namedValueToLabels(args))) + span.AddLabels("query", name) if queryerContext, ok := w.stmt.(driver.QueryerContext); ok { rows, err := queryerContext.QueryContext(nctx, query, args) + if err != nil { + span.AddLabels("error", true) + } return rows, err } values, err := namedValueToValue(args) if err != nil { + if err != nil { + span.AddLabels("error", true) + } return nil, err } - return w.Query(values) + rows, err := w.Query(values) + if err != nil { + span.AddLabels("error", true) + } + return rows, err } diff --git a/wrapper.go b/wrapper.go deleted file mode 100644 index cc40af9..0000000 --- a/wrapper.go +++ /dev/null @@ -1,51 +0,0 @@ -package wrapper // import "go.unistack.org/micro-wrapper-sql/v3" - -import ( - "context" - "database/sql" - "time" - - "go.unistack.org/micro/v3/meter" -) - -type Statser interface { - Stats() sql.DBStats -} - -func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { - options := NewOptions(opts...) - - m := options.Meter.Clone( - meter.LabelPrefix(options.MeterLabelPrefix), - meter.MetricPrefix(options.MeterMetricPrefix), - meter.Labels( - labelHost, options.DatabaseHost, - labelDatabase, options.DatabaseName, - ), - ) - - go func() { - ticker := time.NewTicker(options.MeterStatsInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if db == nil { - return - } - stats := db.Stats() - m.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) - m.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) - m.Counter(InuseConnections).Set(uint64(stats.InUse)) - m.Counter(IdleConnections).Set(uint64(stats.Idle)) - m.Counter(WaitConnections).Set(uint64(stats.WaitCount)) - m.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) - m.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) - m.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) - } - } - }() -}