diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index fa4081e..b50aad2 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -9,7 +9,7 @@ # the `language` matrix defined below to confirm you have the correct set of # supported CodeQL languages. # -name: "CodeQL" +name: "codeql" on: workflow_run: @@ -17,16 +17,16 @@ on: types: - completed push: - branches: [ master ] + branches: [ master, v3 ] pull_request: # The branches below must be a subset of the branches above - branches: [ master ] + branches: [ master, v3 ] schedule: - cron: '34 1 * * 0' jobs: analyze: - name: Analyze + name: analyze runs-on: ubuntu-latest permissions: actions: read @@ -42,11 +42,14 @@ jobs: # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed steps: - - name: Checkout repository + - name: checkout uses: actions/checkout@v2 - + - name: setup + uses: actions/setup-go@v2 + with: + go-version: 1.16 # Initializes the CodeQL tools for scanning. - - name: Initialize CodeQL + - name: init uses: github/codeql-action/init@v1 with: languages: ${{ matrix.language }} @@ -57,7 +60,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - - name: Autobuild + - name: autobuild uses: github/codeql-action/autobuild@v1 # ℹ️ Command-line programs to run using the OS shell. @@ -71,5 +74,5 @@ jobs: # make bootstrap # make release - - name: Perform CodeQL Analysis + - name: analyze uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/dependabot-automerge.yml b/.github/workflows/dependabot-automerge.yml index dd6a62e..69e4c39 100644 --- a/.github/workflows/dependabot-automerge.yml +++ b/.github/workflows/dependabot-automerge.yml @@ -1,66 +1,31 @@ name: "prautomerge" on: - workflow_run: - workflows: ["prbuild"] - types: - - completed + pull_request_target: + types: [assigned, opened, synchronize, reopened] permissions: - contents: write pull-requests: write + contents: write jobs: - Dependabot-Automerge: + dependabot: runs-on: ubuntu-latest - # Contains workaround to execute if dependabot updates the PR by checking for the base branch in the linked PR - # The the github.event.workflow_run.event value is 'push' and not 'pull_request' - # dont work with multiple workflows when last returns success - if: >- - github.event.workflow_run.conclusion == 'success' - && github.actor == 'dependabot[bot]' - && github.event.sender.login == 'dependabot[bot]' - && github.event.sender.type == 'Bot' - && (github.event.workflow_run.event == 'pull_request' - || (github.event.workflow_run.event == 'push' && github.event.workflow_run.pull_requests[0].base.ref == github.event.repository.default_branch )) + if: ${{ github.actor == 'dependabot[bot]' }} steps: - - name: Approve Changes and Merge changes if label 'dependencies' is set - uses: actions/github-script@v5 + - name: metadata + id: metadata + uses: dependabot/fetch-metadata@v1.1.1 with: - github-token: ${{ secrets.GITHUB_TOKEN }} - script: | - console.log(context.payload.workflow_run); - - var labelNames = await github.paginate( - github.issues.listLabelsOnIssue, - { - repo: context.repo.repo, - owner: context.repo.owner, - issue_number: context.payload.workflow_run.pull_requests[0].number, - }, - (response) => response.data.map( - (label) => label.name - ) - ); - - console.log(labelNames); - - if (labelNames.includes('dependencies')) { - console.log('Found label'); - - await github.pulls.createReview({ - repo: context.repo.repo, - owner: context.repo.owner, - pull_number: context.payload.workflow_run.pull_requests[0].number, - event: 'APPROVE' - }); - console.log('Approved PR'); - - await github.pulls.merge({ - repo: context.repo.repo, - owner: context.repo.owner, - pull_number: context.payload.workflow_run.pull_requests[0].number, - }); - - console.log('Merged PR'); - } + github-token: "${{ secrets.TOKEN }}" + - name: approve + run: gh pr review --approve "$PR_URL" + env: + PR_URL: ${{github.event.pull_request.html_url}} + GITHUB_TOKEN: ${{secrets.TOKEN}} + - name: merge + if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}} + run: gh pr merge --auto --merge "$PR_URL" + env: + PR_URL: ${{github.event.pull_request.html_url}} + GITHUB_TOKEN: ${{secrets.TOKEN}} diff --git a/README.md b/README.md index 50fdd0d..4ee3890 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ import ( func Connect(cfg *PostgresConfig) (*sqlx.DB, error) { // format connection string - dbstr := fmt.Sprintf( + cstr := fmt.Sprintf( "postgres://%s:%s@%s/%s?sslmode=disable&statement_cache_mode=describe", cfg.Login, url.QueryEscape(cfg.Passw), @@ -24,7 +24,7 @@ func Connect(cfg *PostgresConfig) (*sqlx.DB, error) { ) // parse connection string - dbConf, err := pgx.ParseConfig(dbstr) + dbConf, err := pgx.ParseConfig(cstr) if err != nil { return nil, err } @@ -37,16 +37,24 @@ func Connect(cfg *PostgresConfig) (*sqlx.DB, error) { // may be needed for pbbouncer, needs to check //dbConf.PreferSimpleProtocol = true // register pgx conn - connStr := stdlib.RegisterConnConfig(dbConf) + dsn := stdlib.RegisterConnConfig(dbConf) - db, err := sqlx.Connect("pgx", connStr) + + sql.Register("micro-wrapper-sql", wrapper.WrapDriver( + &stdlib.Driver{}, + wrapper.Tracer(some.NewTracer()), + )) + wdb, err := sql.Open("micro-wrapper-sql", dsn) if err != nil { return nil, err } + + db := sqlx.NewDb(wdb, "pgx") db.SetMaxOpenConns(int(cfg.ConnMax)) - db.SetMaxIdleConns(int(cfg.ConnMax / 2)) + db.SetMaxIdleConns(int(cfg.ConnMaxIdle)) db.SetConnMaxLifetime(time.Duration(cfg.ConnLifetime) * time.Second) db.SetConnMaxIdleTime(time.Duration(cfg.ConnMaxIdleTime) * time.Second) + return db, nil } ``` diff --git a/common.go b/common.go new file mode 100644 index 0000000..cfc80d7 --- /dev/null +++ b/common.go @@ -0,0 +1,36 @@ +package wrapper + +import ( + "database/sql/driver" + "errors" +) + +// ErrUnsupported is an error returned when the underlying driver doesn't provide a given function. +var ErrUnsupported = errors.New("operation unsupported by the underlying driver") + +/* +// newSpan creates a new opentracing.Span instance from the given context. +func (t *tracer) newSpan(ctx context.Context) opentracing.Span { + name := t.nameFunc(ctx) + var opts []opentracing.StartSpanOption + parent := opentracing.SpanFromContext(ctx) + if parent != nil { + opts = append(opts, opentracing.ChildOf(parent.Context())) + } + span := t.t.StartSpan(name, opts...) + return span +} +*/ + +// namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in +// database/sql ctxutil.go. +func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) { + dargs := make([]driver.Value, len(named)) + for n, param := range named { + if len(param.Name) > 0 { + return nil, errors.New("sql: driver does not support the use of Named Parameters") + } + dargs[n] = param.Value + } + return dargs, nil +} diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..ac1614d --- /dev/null +++ b/conn.go @@ -0,0 +1,116 @@ +package wrapper + +import ( + "context" + "database/sql/driver" +) + +// wrapperConn defines a wrapper for driver.Conn +type wrapperConn struct { + conn driver.Conn + opts Options +} + +// Prepare implements driver.Conn Prepare +func (w *wrapperConn) Prepare(query string) (driver.Stmt, error) { + stmt, err := w.conn.Prepare(query) + if err != nil { + return nil, err + } + return &wrapperStmt{stmt: stmt, opts: w.opts}, nil +} + +// Close implements driver.Conn Close +func (w *wrapperConn) Close() error { + return w.conn.Close() +} + +// Begin implements driver.Conn Begin +func (w *wrapperConn) Begin() (driver.Tx, error) { + tx, err := w.conn.Begin() + if err != nil { + return nil, err + } + return &wrapperTx{tx: tx, opts: w.opts}, nil +} + +// BeginTx implements driver.ConnBeginTx BeginTx +func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { + nctx, span := w.opts.Tracer.Start(ctx, "BeginTx") + if connBeginTx, ok := w.conn.(driver.ConnBeginTx); ok { + tx, err := connBeginTx.BeginTx(nctx, opts) + if err != nil { + return nil, err + } + return &wrapperTx{tx: tx, opts: w.opts, span: span}, nil + } + return w.conn.Begin() +} + +// PrepareContext implements driver.ConnPrepareContext PrepareContext +func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { + if connPrepareContext, ok := w.conn.(driver.ConnPrepareContext); ok { + stmt, err := connPrepareContext.PrepareContext(ctx, query) + if err != nil { + return nil, err + } + return &wrapperStmt{stmt: stmt, opts: w.opts}, nil + } + return w.conn.Prepare(query) +} + +// Exec implements driver.Execer Exec +func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, error) { + if execer, ok := w.conn.(driver.Execer); ok { + return execer.Exec(query, args) + } + return nil, ErrUnsupported +} + +// Exec implements driver.StmtExecContext ExecContext +func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { + nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") + defer span.Finish() + if execerContext, ok := w.conn.(driver.ExecerContext); ok { + r, err := execerContext.ExecContext(nctx, query, args) + return r, err + } + values, err := namedValueToValue(args) + if err != nil { + return nil, err + } + return w.Exec(query, values) +} + +// Ping implements driver.Pinger Ping +func (w *wrapperConn) Ping(ctx context.Context) error { + if pinger, ok := w.conn.(driver.Pinger); ok { + nctx, span := w.opts.Tracer.Start(ctx, "Ping") + defer span.Finish() + return pinger.Ping(nctx) + } + return ErrUnsupported +} + +// Query implements driver.Queryer Query +func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, error) { + if queryer, ok := w.conn.(driver.Queryer); ok { + return queryer.Query(query, args) + } + return nil, ErrUnsupported +} + +// QueryContext implements Driver.QueryerContext QueryContext +func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { + nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") + defer span.Finish() + if queryerContext, ok := w.conn.(driver.QueryerContext); ok { + rows, err := queryerContext.QueryContext(nctx, query, args) + return rows, err + } + values, err := namedValueToValue(args) + if err != nil { + return nil, err + } + return w.Query(query, values) +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..fb88238 --- /dev/null +++ b/doc.go @@ -0,0 +1,2 @@ +// package wrapper provides SQL driver wrapper with micro tracing, logging, metering capabilities +package wrapper diff --git a/driver.go b/driver.go new file mode 100644 index 0000000..c242f37 --- /dev/null +++ b/driver.go @@ -0,0 +1,25 @@ +package wrapper + +import ( + "database/sql/driver" +) + +// wrapperDriver defines a wrapper for driver.Driver +type wrapperDriver struct { + driver driver.Driver + opts Options +} + +// NewWrapper creates and returns a new SQL driver with passed capabilities +func NewWrapper(d driver.Driver, opts ...Option) driver.Driver { + return &wrapperDriver{driver: d, opts: NewOptions(opts...)} +} + +// Open implements driver.Driver Open +func (w *wrapperDriver) Open(name string) (driver.Conn, error) { + c, err := w.driver.Open(name) + if err != nil { + return nil, err + } + return &wrapperConn{conn: c, opts: w.opts}, nil +} diff --git a/go.mod b/go.mod index 41e27be..c875dcf 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ -module github.com/unistack-org/micro-wrapper-sql/v3 +module go.unistack.org/micro-wrapper-sql/v3 go 1.16 -require github.com/unistack-org/micro/v3 v3.2.22 +require go.unistack.org/micro/v3 v3.8.5 diff --git a/go.sum b/go.sum index 1892420..f2d5c38 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,26 @@ -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= +github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= -github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -github.com/unistack-org/micro/v3 v3.2.22 h1:AXyLtRpfcPGczhaA1f9KR0ctK+1Zpqvb+rBJrZtp3Oo= -github.com/unistack-org/micro/v3 v3.2.22/go.mod h1:oI8H/uGq1h4i5cvUycEoFKJQC7G8yChZQNIDNWGSLRU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/unistack-org/micro-proto v0.0.9 h1:KrWLS4FUX7UAWNAilQf70uad6ZPf/0EudeddCXllRVc= +github.com/unistack-org/micro-proto v0.0.9/go.mod h1:Cckwmzd89gvS7ThxzZp9kQR/EOdksFQcsTAtDDyKwrg= +go.unistack.org/micro/v3 v3.8.2 h1:q2j+J7PLRNnENUbsi9eIrPwe4GM+vrxY656NwkwEmew= +go.unistack.org/micro/v3 v3.8.2/go.mod h1:Tkteri0wiiybbH6aPqay26pZHFIAwL9LXJc2x1Jkakk= +golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/options.go b/options.go new file mode 100644 index 0000000..468f29c --- /dev/null +++ b/options.go @@ -0,0 +1,139 @@ +package wrapper + +import ( + "context" + "time" + + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/tracer" +) + +var ( + // DefaultMeterStatsInterval holds default stats interval + DefaultMeterStatsInterval = 5 * time.Second + // DefaultMeterMetricPrefix holds default metric prefix + DefaultMeterMetricPrefix = "micro_sql_" + // DefaultMeterLabelPrefix holds default label prefix + DefaultMeterLabelPrefix = "micro_" +) + +var ( + MaxOpenConnections = "max_open_connections" + OpenConnections = "open_connections" + InuseConnections = "inuse_connections" + IdleConnections = "idle_connections" + WaitConnections = "wait_connections" + BlockedSeconds = "blocked_seconds" + MaxIdleClosed = "max_idletime_closed" + MaxLifetimeClosed = "max_lifetime_closed" + + // RequestTotal = "request_total" + // RequestLatencyMicroseconds = "request_latency_microseconds" + // RequestDurationSeconds = "request_duration_seconds" + + labelSuccess = "success" + labelFailure = "failure" + labelHost = "db_host" + labelDatabase = "db_name" +) + +// Options struct holds wrapper options +type Options struct { + Logger logger.Logger + Meter meter.Meter + Tracer tracer.Tracer + DatabaseHost string + DatabaseName string + ServiceName string + ServiceVersion string + ServiceID string + MeterLabelPrefix string + MeterMetricPrefix string + MeterStatsInterval time.Duration + LoggerLevel logger.Level +} + +// Option func signature +type Option func(*Options) + +// NewOptions create new Options struct from provided option slice +func NewOptions(opts ...Option) Options { + options := Options{ + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, + MeterStatsInterval: DefaultMeterStatsInterval, + MeterMetricPrefix: DefaultMeterMetricPrefix, + MeterLabelPrefix: DefaultMeterLabelPrefix, + LoggerLevel: logger.ErrorLevel, + } + for _, o := range opts { + o(&options) + } + return options +} + +// MetricInterval specifies stats interval for *sql.DB +func MetricInterval(td time.Duration) Option { + return func(o *Options) { + o.MeterStatsInterval = td + } +} + +// LabelPrefix specifies prefix for each label +func LabelPrefix(pref string) Option { + return func(o *Options) { + o.MeterLabelPrefix = pref + } +} + +// MetricPrefix specifies prefix for each metric +func MetricPrefix(pref string) Option { + return func(o *Options) { + o.MeterMetricPrefix = pref + } +} + +func DatabaseHost(host string) Option { + return func(o *Options) { + o.DatabaseHost = host + } +} + +func DatabaseName(name string) Option { + return func(o *Options) { + o.DatabaseName = name + } +} + +// Meter passes meter.Meter to wrapper +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + +// Logger passes logger.Logger to wrapper +func Logger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +// Tracer passes tracer.Tracer to wrapper +func Tracer(t tracer.Tracer) Option { + return func(o *Options) { + o.Tracer = t + } +} + +type queryNameKey struct{} + +// QueryName passes query name to wrapper func +func QueryName(ctx context.Context, name string) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, queryNameKey{}, name) +} diff --git a/stmt.go b/stmt.go new file mode 100644 index 0000000..d2f971c --- /dev/null +++ b/stmt.go @@ -0,0 +1,61 @@ +package wrapper + +import ( + "context" + "database/sql/driver" +) + +// wrapperStmt defines a wrapper for driver.Stmt +type wrapperStmt struct { + stmt driver.Stmt + opts Options +} + +// Close implements driver.Stmt Close +func (w *wrapperStmt) Close() error { + return w.stmt.Close() +} + +// NumInput implements driver.Stmt NumInput +func (w *wrapperStmt) NumInput() int { + return w.stmt.NumInput() +} + +// Exec implements driver.Stmt Exec +func (w *wrapperStmt) Exec(args []driver.Value) (driver.Result, error) { + return w.stmt.Exec(args) +} + +// Query implements driver.Stmt Query +func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) { + return w.stmt.Query(args) +} + +// ExecContext implements driver.ExecerContext ExecContext +func (w *wrapperStmt) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { + nctx, span := w.opts.Tracer.Start(ctx, "ExecContext") + defer span.Finish() + if execerContext, ok := w.stmt.(driver.ExecerContext); ok { + return execerContext.ExecContext(nctx, query, args) + } + values, err := namedValueToValue(args) + if err != nil { + return nil, err + } + return w.Exec(values) +} + +// QueryContext implements Driver.QueryerContext QueryContext +func (w *wrapperStmt) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) { + nctx, span := w.opts.Tracer.Start(ctx, "QueryContext") + defer span.Finish() + if queryerContext, ok := w.stmt.(driver.QueryerContext); ok { + rows, err := queryerContext.QueryContext(nctx, query, args) + return rows, err + } + values, err := namedValueToValue(args) + if err != nil { + return nil, err + } + return w.Query(values) +} diff --git a/tx.go b/tx.go new file mode 100644 index 0000000..f0fbd50 --- /dev/null +++ b/tx.go @@ -0,0 +1,30 @@ +package wrapper + +import ( + "database/sql/driver" + + "go.unistack.org/micro/v3/tracer" +) + +// wrapperTx defines a wrapper for driver.Tx +type wrapperTx struct { + tx driver.Tx + span tracer.Span + opts Options +} + +// Commit implements driver.Tx Commit +func (w *wrapperTx) Commit() error { + if w.span != nil { + defer w.span.Finish() + } + return w.tx.Commit() +} + +// Rollback implements driver.Tx Rollback +func (w *wrapperTx) Rollback() error { + if w.span != nil { + defer w.span.Finish() + } + return w.tx.Rollback() +} diff --git a/wrapper.go b/wrapper.go index 40d2ad8..cc40af9 100644 --- a/wrapper.go +++ b/wrapper.go @@ -1,159 +1,51 @@ -package wrapper +package wrapper // import "go.unistack.org/micro-wrapper-sql/v3" import ( "context" "database/sql" "time" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/meter" - "github.com/unistack-org/micro/v3/tracer" + "go.unistack.org/micro/v3/meter" ) -var ( - DefaultStatsInterval = 5 * time.Second - // default metric prefix - DefaultMetricPrefix = "micro_sql_" - // default label prefix - DefaultLabelPrefix = "micro_" -) - -var ( - DatabaseHostLabel = "database_host" - DatabaseNameLabel = "database_name" - ServiceNameLabel = "service_name" - ServiceVersionLabel = "service_version" - ServiceIDLabel = "service_id" - - MaxOpenConnectionsLabel = "max_open_connections" - OpenConnectionsLabel = "open_connections" - InuseConnectionsLabel = "inuse_connections" - IdleConnectionsLabel = "idle_connections" - WaitConnectionsLabel = "wait_connections" - BlockedSecondsLabel = "blocked_seconds" - MaxIdleClosedLabel = "max_idle_closed" - MaxLifetimeClosedLabel = "max_lifetime_closed" - - //srequest_total // counter - //slatency_microseconds // summary - //srequest_duration_seconds // histogramm - -) - -type Options struct { - Logger logger.Logger - Meter meter.Meter - Tracer tracer.Tracer - DatabaseHost string - DatabaseName string - ServiceName string - ServiceVersion string - ServiceID string +type Statser interface { + Stats() sql.DBStats } -type Option func(*Options) +func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { + options := NewOptions(opts...) -func DatabaseHost(host string) Option { - return func(opts *Options) { - opts.DatabaseHost = host - } -} + m := options.Meter.Clone( + meter.LabelPrefix(options.MeterLabelPrefix), + meter.MetricPrefix(options.MeterMetricPrefix), + meter.Labels( + labelHost, options.DatabaseHost, + labelDatabase, options.DatabaseName, + ), + ) -func DatabaseName(name string) Option { - return func(opts *Options) { - opts.DatabaseName = name - } -} + go func() { + ticker := time.NewTicker(options.MeterStatsInterval) + defer ticker.Stop() -func ServiceName(name string) Option { - return func(opts *Options) { - opts.ServiceName = name - } -} - -func ServiceVersion(version string) Option { - return func(opts *Options) { - opts.ServiceVersion = version - } -} - -func ServiceID(id string) Option { - return func(opts *Options) { - opts.ServiceID = id - } -} - -func Meter(m meter.Meter) Option { - return func(opts *Options) { - opts.Meter = m - } -} -func Logger(l logger.Logger) Option { - return func(opts *Options) { - opts.Logger = l - } -} - -func Tracer(t tracer.Tracer) Option { - return func(opts *Options) { - opts.Tracer = t - } -} - -type queryKey struct{} - -func QueryName(ctx context.Context, name string) context.Context { - if ctx == nil { - ctx = context.Background() - } - - return context.WithValue(ctx, queryKey{}, name) -} - -func getName(ctx context.Context) string { - name := "Unknown" - - val, ok := ctx.Value(queryKey{}).(string) - if ok && len(val) > 0 { - name = val - } - - return name -} - -type Wrapper struct { - db *sql.DB - opts Options -} - -func (w *Wrapper) collect() { - labels := []string{ - DatabaseHostLabel, w.opts.DatabaseHost, - DatabaseNameLabel, w.opts.DatabaseName, - ServiceNameLabel, w.opts.ServiceName, - ServiceVersionLabel, w.opts.ServiceVersion, - ServiceIDLabel, w.opts.ServiceID, - } - - ticker := time.NewTicker(DefaultStatsInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if w.db == nil { - continue + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if db == nil { + return + } + stats := db.Stats() + m.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) + m.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) + m.Counter(InuseConnections).Set(uint64(stats.InUse)) + m.Counter(IdleConnections).Set(uint64(stats.Idle)) + m.Counter(WaitConnections).Set(uint64(stats.WaitCount)) + m.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) + m.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) + m.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) } - - stats := w.db.Stats() - w.opts.Meter.FloatCounter(MaxOpenConnectionsLabel, meter.Labels(labels...)).Set(float64(stats.MaxOpenConnections)) - w.opts.Meter.FloatCounter(OpenConnectionsLabel, meter.Labels(labels...)).Set(float64(stats.OpenConnections)) - w.opts.Meter.FloatCounter(InuseConnectionsLabel, meter.Labels(labels...)).Set(float64(stats.InUse)) - w.opts.Meter.FloatCounter(IdleConnectionsLabel, meter.Labels(labels...)).Set(float64(stats.Idle)) - w.opts.Meter.FloatCounter(WaitConnectionsLabel, meter.Labels(labels...)).Set(float64(stats.WaitCount)) - w.opts.Meter.FloatCounter(BlockedSecondsLabel, meter.Labels(labels...)).Set(stats.WaitDuration.Seconds()) - w.opts.Meter.FloatCounter(MaxIdleClosedLabel, meter.Labels(labels...)).Set(float64(stats.MaxIdleClosed)) - w.opts.Meter.FloatCounter(MaxLifetimeClosedLabel, meter.Labels(labels...)).Set(float64(stats.MaxLifetimeClosed)) } - } + }() }