Compare commits

...

24 Commits

Author SHA1 Message Date
bbc7512054 v3 (#404)
Some checks failed
coverage / build (push) Successful in 4m17s
test / test (push) Failing after 17m8s
## Pull Request template
Please, go through these steps before clicking submit on this PR.

1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).

**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

Reviewed-on: #404
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2025-05-20 13:24:52 +03:00
github-actions[bot]
dd810e4ae0 Merge remote-tracking branch 'upstream/v3' into v3 2025-05-01 16:18:41 +00:00
236ed47ab1 update ci (#214) 2025-05-01 19:15:11 +03:00
vtolstov
909bcf51a4 Apply Code Coverage Badge 2025-05-01 16:14:42 +00:00
9c24001f52 move wrapper.sql to hook/sql (#400)
All checks were successful
coverage / build (push) Successful in 4m25s
test / test (push) Successful in 7m28s
move micro-wrapper-sql to core micro

Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Reviewed-on: #400
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2025-05-01 19:12:33 +03:00
680cd6f708 [v3] fix flatten map util function (#212)
All checks were successful
coverage / build (push) Successful in 2m59s
test / test (push) Successful in 4m15s
* Apply Code Coverage Badge
* add the fixed version of FlattenMap() and corresponding tests
* clenaup readme


---------

Co-authored-by: pugnack <pugnack@users.noreply.github.com>
2025-04-27 22:22:24 +03:00
vtolstov
3fcf3bef6d Apply Code Coverage Badge 2025-04-27 10:38:55 +00:00
vtolstov
0ecd6199d4 Apply Code Coverage Badge
All checks were successful
coverage / build (push) Successful in 2m21s
test / test (push) Successful in 3m8s
2025-04-27 10:37:37 +00:00
14a30fb6a7 fix panic on shutdown caused by double channel close (#208) 2025-04-27 13:37:00 +03:00
7c613072df [v3] rename .gitea to .github (#207)
Some checks failed
coverage / build (push) Failing after 50s
test / test (push) Has been cancelled
* rename .gitea to .github
* attempt to fix lint/test job
* attempt to fix coverage job
2025-04-27 13:32:44 +03:00
vtolstov
c55e212270 Apply Code Coverage Badge 2025-04-22 17:34:26 +00:00
100bc006bb Merge pull request 'move hooks' (#397) from devstigneev/micro:move_hooks_v3 into v3
All checks were successful
coverage / build (push) Successful in 1m26s
test / test (push) Successful in 2m21s
Reviewed-on: #397
2025-04-22 20:33:47 +03:00
5dbfe8a7a6 Delete SECURITY.md 2025-04-22 15:53:06 +03:00
6be077dbe8 Update README.md 2025-04-22 15:05:07 +03:00
b4878211ee Update README.md 2025-04-22 15:02:01 +03:00
ec9178c6d4 Update README.md 2025-04-22 14:27:14 +03:00
ae63d44866 move hooks
Some checks failed
lint / lint (pull_request) Failing after 1m52s
test / test (pull_request) Successful in 4m37s
coverage / build (pull_request) Failing after 6m59s
2025-04-22 09:41:22 +03:00
883e79216a Update README.md 2025-04-22 08:43:53 +03:00
fa636ef6a9 tracer: add IsRecording to span interface
All checks were successful
coverage / build (push) Successful in 3m33s
test / test (push) Successful in 5m17s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-14 00:03:04 +03:00
cdb81a9ba3 remove debug
Some checks failed
coverage / build (push) Failing after 3m6s
test / test (push) Successful in 4m55s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-06 22:19:10 +03:00
413c6cc2f0 logger: fixup WithAddFields
All checks were successful
coverage / build (push) Successful in 1m58s
test / test (push) Successful in 5m39s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-02-21 18:12:27 +03:00
vtolstov
f56bd70136 Apply Code Coverage Badge 2025-02-06 13:22:17 +00:00
b51b4107a8 logger/slog: fixup stacktrace
All checks were successful
coverage / build (push) Successful in 1m27s
test / test (push) Successful in 2m54s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-02-06 16:21:29 +03:00
2067c9de6b util/buffer: add new seeker implementation
Some checks failed
coverage / build (push) Failing after 1m35s
test / test (push) Successful in 3m11s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-02-06 15:19:09 +03:00
43 changed files with 23059 additions and 83 deletions

View File

@@ -3,14 +3,16 @@ name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
build:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: checkout code
@@ -22,7 +24,7 @@ jobs:
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
go-version: 'stable'
- name: test coverage
run: |
@@ -39,8 +41,8 @@ jobs:
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: true
skip_checkout: true
skip_fetch: false
skip_checkout: false
file_pattern: ./README.md
- name: push
@@ -48,4 +50,4 @@ jobs:
uses: ad-m/github-push-action@master
with:
github_token: ${{ github.token }}
branch: ${{ github.ref }}
branch: ${{ github.ref }}

View File

@@ -3,10 +3,10 @@ name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
lint:
@@ -20,10 +20,10 @@ jobs:
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6
uses: golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@@ -3,15 +3,12 @@ name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
push:
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:

View File

@@ -3,15 +3,12 @@ name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
push:
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:
@@ -35,19 +32,19 @@ jobs:
go-version: 'stable'
- name: setup go work
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
GOWORK: ${{ github.workspace }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
GOWORK: ${{ github.workspace }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work
GOWORK: ${{ github.workspace }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

View File

@@ -1,5 +1,5 @@
run:
concurrency: 8
deadline: 5m
timeout: 5m
issues-exit-code: 1
tests: true

View File

@@ -1,5 +1,5 @@
# Micro
![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow)
![Coverage](https://img.shields.io/badge/Coverage-33.6%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
@@ -9,20 +9,20 @@ Micro is a standard library for microservices.
## Overview
Micro provides the core requirements for distributed systems development including RPC and Event driven communication.
Micro provides the core requirements for distributed systems development including SYNC and ASYNC communication.
## Features
Micro abstracts away the details of distributed systems. Here are the main features.
Micro abstracts away the details of distributed systems. Main features:
- **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application
level config from any source such as env vars, cmdline, file, consul, vault... You can merge the sources and even define fallbacks.
level config from any source such as env vars, cmdline, file, consul, vault, etc... You can merge the sources and even define fallbacks.
- **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and
s3. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
- **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service
development. When service A needs to speak to service B it needs the location of that service.
development.
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client

View File

@@ -1,15 +0,0 @@
# Security Policy
## Supported Versions
Use this section to tell people about which versions of your project are
currently being supported with security updates.
| Version | Supported |
| ------- | ------------------ |
| 3.7.x | :white_check_mark: |
| < 3.7.0 | :x: |
## Reporting a Vulnerability
If you find any issue, please create github issue in this repo

2
go.mod
View File

@@ -11,6 +11,7 @@ require (
github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/stretchr/testify v1.10.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v3 v3.4.1
golang.org/x/sync v0.10.0
@@ -33,7 +34,6 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/testify v1.10.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect
golang.org/x/net v0.33.0 // indirect

View File

@@ -0,0 +1,76 @@
package metadata
import (
"context"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
var DefaultMetadataKeys = []string{"x-request-id"}
type hook struct {
keys []string
}
func NewHook(keys ...string) *hook {
return &hook{keys: keys}
}
func metadataCopy(ctx context.Context, keys []string) context.Context {
if keys == nil {
return ctx
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(keys))
}
for _, k := range keys {
if v, ok := imd.Get(k); ok && v != "" {
omd.Set(k, v)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return ctx
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
return next(metadataCopy(ctx, w.keys), req, rsp, opts...)
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
return next(metadataCopy(ctx, w.keys), req, opts...)
}
}
func (w *hook) ClientPublish(next client.FuncPublish) client.FuncPublish {
return func(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
return next(metadataCopy(ctx, w.keys), msg, opts...)
}
}
func (w *hook) ClientBatchPublish(next client.FuncBatchPublish) client.FuncBatchPublish {
return func(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error {
return next(metadataCopy(ctx, w.keys), msgs, opts...)
}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
return next(metadataCopy(ctx, w.keys), req, rsp)
}
}
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
return func(ctx context.Context, msg server.Message) error {
return next(metadataCopy(ctx, w.keys), msg)
}
}

View File

@@ -0,0 +1,94 @@
package recovery
import (
"context"
"fmt"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/server"
)
func NewOptions(opts ...Option) Options {
options := Options{
ServerHandlerFn: DefaultServerHandlerFn,
ServerSubscriberFn: DefaultServerSubscriberFn,
}
for _, o := range opts {
o(&options)
}
return options
}
type Options struct {
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
ServerSubscriberFn func(context.Context, server.Message, error) error
}
type Option func(*Options)
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
return func(o *Options) {
o.ServerHandlerFn = fn
}
}
func ServerSubscriberFunc(fn func(context.Context, server.Message, error) error) Option {
return func(o *Options) {
o.ServerSubscriberFn = fn
}
}
var (
DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
return errors.BadRequest("", "%v", err)
}
DefaultServerSubscriberFn = func(ctx context.Context, req server.Message, err error) error {
return errors.BadRequest("", "%v", err)
}
)
var Hook = NewHook()
type hook struct {
opts Options
}
func NewHook(opts ...Option) *hook {
return &hook{opts: NewOptions(opts...)}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
defer func() {
r := recover()
switch verr := r.(type) {
case nil:
return
case error:
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
default:
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
}
}()
err = next(ctx, req, rsp)
return err
}
}
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
return func(ctx context.Context, msg server.Message) (err error) {
defer func() {
r := recover()
switch verr := r.(type) {
case nil:
return
case error:
err = w.opts.ServerSubscriberFn(ctx, msg, verr)
default:
err = w.opts.ServerSubscriberFn(ctx, msg, fmt.Errorf("%v", r))
}
}()
err = next(ctx, msg)
return err
}
}

View File

@@ -0,0 +1,139 @@
package requestid
import (
"context"
"net/textproto"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/util/id"
)
type XRequestIDKey struct{}
// DefaultMetadataKey contains metadata key
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
var xid string
cid, cok := ctx.Value(XRequestIDKey{}).(string)
if cok && cid != "" {
xid = cid
}
imd, iok := metadata.FromIncomingContext(ctx)
if !iok || imd == nil {
imd = metadata.New(1)
ctx = metadata.NewIncomingContext(ctx, imd)
}
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(1)
ctx = metadata.NewOutgoingContext(ctx, omd)
}
if xid == "" {
var id string
if id, iok = imd.Get(DefaultMetadataKey); iok && id != "" {
xid = id
}
if id, ook = omd.Get(DefaultMetadataKey); ook && id != "" {
xid = id
}
}
if xid == "" {
var err error
xid, err = id.New()
if err != nil {
return ctx, err
}
}
if !cok {
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
}
if !iok {
imd.Set(DefaultMetadataKey, xid)
}
if !ook {
omd.Set(DefaultMetadataKey, xid)
}
return ctx, nil
}
type hook struct{}
func NewHook() *hook {
return &hook{}
}
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
return func(ctx context.Context, msg server.Message) error {
var err error
if xid, ok := msg.Header()[DefaultMetadataKey]; ok {
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
}
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, msg)
}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, req, rsp)
}
}
func (w *hook) ClientBatchPublish(next client.FuncBatchPublish) client.FuncBatchPublish {
return func(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, msgs, opts...)
}
}
func (w *hook) ClientPublish(next client.FuncPublish) client.FuncPublish {
return func(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, msg, opts...)
}
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, req, rsp, opts...)
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return nil, err
}
return next(ctx, req, opts...)
}
}

View File

@@ -0,0 +1,33 @@
package requestid
import (
"context"
"testing"
"go.unistack.org/micro/v3/metadata"
)
func TestDefaultMetadataFunc(t *testing.T) {
ctx := context.TODO()
nctx, err := DefaultMetadataFunc(ctx)
if err != nil {
t.Fatalf("%v", err)
}
imd, ok := metadata.FromIncomingContext(nctx)
if !ok {
t.Fatalf("md missing in incoming context")
}
omd, ok := metadata.FromOutgoingContext(nctx)
if !ok {
t.Fatalf("md missing in outgoing context")
}
_, iok := imd.Get(DefaultMetadataKey)
_, ook := omd.Get(DefaultMetadataKey)
if !iok || !ook {
t.Fatalf("missing metadata key value")
}
}

51
hooks/sql/common.go Normal file
View File

@@ -0,0 +1,51 @@
package sql
import (
"database/sql/driver"
"errors"
"fmt"
"runtime"
)
//go:generate sh -c "go run gen.go > wrap_gen.go"
// namedValueToValue converts driver arguments of NamedValue format to Value format. Implemented in the same way as in
// database/sql ctxutil.go.
func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
dargs := make([]driver.Value, len(named))
for n, param := range named {
if len(param.Name) > 0 {
return nil, errors.New("sql: driver does not support the use of Named Parameters")
}
dargs[n] = param.Value
}
return dargs, nil
}
// namedValueToLabels convert driver arguments to interface{} slice
func namedValueToLabels(named []driver.NamedValue) []interface{} {
largs := make([]interface{}, 0, len(named)*2)
var name string
for _, param := range named {
if param.Name != "" {
name = param.Name
} else {
name = fmt.Sprintf("$%d", param.Ordinal)
}
largs = append(largs, fmt.Sprintf("%s=%v", name, param.Value))
}
return largs
}
// getCallerName get the name of the function A where A() -> B() -> GetFunctionCallerName()
func getCallerName() string {
pc, _, _, ok := runtime.Caller(3)
details := runtime.FuncForPC(pc)
var callerName string
if ok && details != nil {
callerName = details.Name()
} else {
callerName = labelUnknown
}
return callerName
}

467
hooks/sql/conn.go Normal file
View File

@@ -0,0 +1,467 @@
package sql
import (
"context"
"database/sql/driver"
"fmt"
"time"
"go.unistack.org/micro/v3/hooks/requestid"
"go.unistack.org/micro/v3/tracer"
)
var (
_ driver.Conn = (*wrapperConn)(nil)
_ driver.ConnBeginTx = (*wrapperConn)(nil)
_ driver.ConnPrepareContext = (*wrapperConn)(nil)
_ driver.Pinger = (*wrapperConn)(nil)
_ driver.Validator = (*wrapperConn)(nil)
_ driver.Queryer = (*wrapperConn)(nil) // nolint:staticcheck
_ driver.QueryerContext = (*wrapperConn)(nil)
_ driver.Execer = (*wrapperConn)(nil) // nolint:staticcheck
_ driver.ExecerContext = (*wrapperConn)(nil)
// _ driver.Connector
// _ driver.Driver
// _ driver.DriverContext
)
// wrapperConn defines a wrapper for driver.Conn
type wrapperConn struct {
d *wrapperDriver
dname string
conn driver.Conn
opts Options
ctx context.Context
//span tracer.Span
}
// Close implements driver.Conn Close
func (w *wrapperConn) Close() error {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
labels := []string{labelMethod, "Close"}
ts := time.Now()
err := w.conn.Close()
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Close", getCallerName(), td, err)...)
}
*/
return err
}
// Begin implements driver.Conn Begin
func (w *wrapperConn) Begin() (driver.Tx, error) {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
labels := []string{labelMethod, "Begin"}
ts := time.Now()
tx, err := w.conn.Begin() // nolint:staticcheck
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Begin", getCallerName(), td, err)...)
}
*/
return nil, err
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Begin", getCallerName(), td, err)...)
}
*/
return &wrapperTx{tx: tx, opts: w.opts, ctx: ctx}, nil
}
// BeginTx implements driver.ConnBeginTx BeginTx
func (w *wrapperConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
name := getQueryName(ctx)
nctx, span := w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
span.AddLabels("db.method", "BeginTx")
span.AddLabels("db.statement", name)
if id, ok := ctx.Value(requestid.XRequestIDKey{}).(string); ok {
span.AddLabels("x-request-id", id)
}
labels := []string{labelMethod, "BeginTx", labelQuery, name}
connBeginTx, ok := w.conn.(driver.ConnBeginTx)
if !ok {
return w.Begin()
}
ts := time.Now()
tx, err := connBeginTx.BeginTx(nctx, opts)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
span.SetStatus(tracer.SpanStatusError, err.Error())
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "BeginTx", getCallerName(), td, err)...)
}
*/
return nil, err
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "BeginTx", getCallerName(), td, err)...)
}
*/
return &wrapperTx{tx: tx, opts: w.opts, ctx: ctx, span: span}, nil
}
// Prepare implements driver.Conn Prepare
func (w *wrapperConn) Prepare(query string) (driver.Stmt, error) {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
labels := []string{labelMethod, "Prepare", labelQuery, getCallerName()}
ts := time.Now()
stmt, err := w.conn.Prepare(query)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Prepare", getCallerName(), td, err)...)
}
*/
return nil, err
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Prepare", getCallerName(), td, err)...)
}
*/
return wrapStmt(stmt, query, w.opts), nil
}
// PrepareContext implements driver.ConnPrepareContext PrepareContext
func (w *wrapperConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if w.ctx != nil {
nctx, span = w.opts.Tracer.Start(w.ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
} else {
nctx, span = w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
}
span.AddLabels("db.method", "PrepareContext")
span.AddLabels("db.statement", name)
if id, ok := ctx.Value(requestid.XRequestIDKey{}).(string); ok {
span.AddLabels("x-request-id", id)
}
labels := []string{labelMethod, "PrepareContext", labelQuery, name}
conn, ok := w.conn.(driver.ConnPrepareContext)
if !ok {
return w.Prepare(query)
}
ts := time.Now()
stmt, err := conn.PrepareContext(nctx, query)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
span.SetStatus(tracer.SpanStatusError, err.Error())
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "PrepareContext", getCallerName(), td, err)...)
}
*/
return nil, err
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "PrepareContext", getCallerName(), td, err)...)
}
*/
return wrapStmt(stmt, query, w.opts), nil
}
// Exec implements driver.Execer Exec
func (w *wrapperConn) Exec(query string, args []driver.Value) (driver.Result, error) {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
labels := []string{labelMethod, "Exec", labelQuery, getCallerName()}
// nolint:staticcheck
conn, ok := w.conn.(driver.Execer)
if !ok {
return nil, driver.ErrSkip
}
ts := time.Now()
res, err := conn.Exec(query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Exec", getCallerName(), td, err)...)
}
*/
return res, err
}
// Exec implements driver.StmtExecContext ExecContext
func (w *wrapperConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if w.ctx != nil {
nctx, span = w.opts.Tracer.Start(w.ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
} else {
nctx, span = w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
}
span.AddLabels("db.method", "ExecContext")
span.AddLabels("db.statement", name)
if id, ok := ctx.Value(requestid.XRequestIDKey{}).(string); ok {
span.AddLabels("x-request-id", id)
}
defer span.Finish()
if len(args) > 0 {
span.AddLabels("db.args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
labels := []string{labelMethod, "ExecContext", labelQuery, name}
conn, ok := w.conn.(driver.ExecerContext)
if !ok {
// nolint:staticcheck
return nil, driver.ErrSkip
}
ts := time.Now()
res, err := conn.ExecContext(nctx, query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "ExecContext", getCallerName(), td, err)...)
}
*/
return res, err
}
// Ping implements driver.Pinger Ping
func (w *wrapperConn) Ping(ctx context.Context) error {
conn, ok := w.conn.(driver.Pinger)
if !ok {
// fallback path to check db alive
pc, err := w.d.Open(w.dname)
if err != nil {
return err
}
return pc.Close()
}
var nctx context.Context //nolint: gosimple
nctx = ctx
/*
var span tracer.Span
if w.ctx != nil {
nctx, span = w.opts.Tracer.Start(w.ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
} else {
nctx, span = w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
}
span.AddLabels("db.method", "Ping")
defer span.Finish()
*/
labels := []string{labelMethod, "Ping"}
ts := time.Now()
err := conn.Ping(nctx)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
// span.SetStatus(tracer.SpanStatusError, err.Error())
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Ping", getCallerName(), td, err)...)
}
*/
return err
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
return nil
}
// Query implements driver.Queryer Query
func (w *wrapperConn) Query(query string, args []driver.Value) (driver.Rows, error) {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
// nolint:staticcheck
conn, ok := w.conn.(driver.Queryer)
if !ok {
return nil, driver.ErrSkip
}
labels := []string{labelMethod, "Query", labelQuery, getCallerName()}
ts := time.Now()
rows, err := conn.Query(query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Query", getCallerName(), td, err)...)
}
*/
return rows, err
}
// QueryContext implements Driver.QueryerContext QueryContext
func (w *wrapperConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if w.ctx != nil {
nctx, span = w.opts.Tracer.Start(w.ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
} else {
nctx, span = w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
}
span.AddLabels("db.method", "QueryContext")
span.AddLabels("db.statement", name)
if id, ok := ctx.Value(requestid.XRequestIDKey{}).(string); ok {
span.AddLabels("x-request-id", id)
}
defer span.Finish()
if len(args) > 0 {
span.AddLabels("db.args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
labels := []string{labelMethod, "QueryContext", labelQuery, name}
conn, ok := w.conn.(driver.QueryerContext)
if !ok {
return nil, driver.ErrSkip
}
ts := time.Now()
rows, err := conn.QueryContext(nctx, query, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "QueryContext", getCallerName(), td, err)...)
}
*/
return rows, err
}
// CheckNamedValue implements driver.NamedValueChecker
func (w *wrapperConn) CheckNamedValue(v *driver.NamedValue) error {
s, ok := w.conn.(driver.NamedValueChecker)
if !ok {
return driver.ErrSkip
}
return s.CheckNamedValue(v)
}
// IsValid implements driver.Validator
func (w *wrapperConn) IsValid() bool {
v, ok := w.conn.(driver.Validator)
if !ok {
return w.conn != nil
}
return v.IsValid()
}
func (w *wrapperConn) ResetSession(ctx context.Context) error {
s, ok := w.conn.(driver.SessionResetter)
if !ok {
return driver.ErrSkip
}
return s.ResetSession(ctx)
}

94
hooks/sql/driver.go Normal file
View File

@@ -0,0 +1,94 @@
package sql
import (
"context"
"database/sql/driver"
"time"
)
var (
// _ driver.DriverContext = (*wrapperDriver)(nil)
// _ driver.Connector = (*wrapperDriver)(nil)
)
/*
type conn interface {
driver.Pinger
driver.Execer
driver.ExecerContext
driver.Queryer
driver.QueryerContext
driver.Conn
driver.ConnPrepareContext
driver.ConnBeginTx
}
*/
// wrapperDriver defines a wrapper for driver.Driver
type wrapperDriver struct {
driver driver.Driver
opts Options
ctx context.Context
}
// NewWrapper creates and returns a new SQL driver with passed capabilities
func NewWrapper(d driver.Driver, opts ...Option) driver.Driver {
return &wrapperDriver{driver: d, opts: NewOptions(opts...), ctx: context.Background()}
}
type wrappedConnector struct {
connector driver.Connector
// name string
opts Options
ctx context.Context
}
func NewWrapperConnector(c driver.Connector, opts ...Option) driver.Connector {
return &wrappedConnector{connector: c, opts: NewOptions(opts...), ctx: context.Background()}
}
// Connect implements driver.Driver Connect
func (w *wrappedConnector) Connect(ctx context.Context) (driver.Conn, error) {
return w.connector.Connect(ctx)
}
// Driver implements driver.Driver Driver
func (w *wrappedConnector) Driver() driver.Driver {
return w.connector.Driver()
}
/*
// Connect implements driver.Driver OpenConnector
func (w *wrapperDriver) OpenConnector(name string) (driver.Conn, error) {
return &wrapperConnector{driver: w.driver, name: name, opts: w.opts}, nil
}
*/
// Open implements driver.Driver Open
func (w *wrapperDriver) Open(name string) (driver.Conn, error) {
// ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Ensure eventual timeout
// defer cancel()
/*
connector, err := w.OpenConnector(name)
if err != nil {
return nil, err
}
return connector.Connect(ctx)
*/
ts := time.Now()
c, err := w.driver.Open(name)
td := time.Since(ts)
/*
if w.opts.LoggerEnabled {
w.opts.Logger.Log(w.ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(w.ctx, "Open", getCallerName(), td, err)...)
}
*/
_ = td
if err != nil {
return nil, err
}
return wrapConn(c, w.opts), nil
}

165
hooks/sql/gen.go Normal file
View File

@@ -0,0 +1,165 @@
//go:build ignore
package main
import (
"bytes"
"crypto/md5"
"fmt"
"io"
"sort"
"strings"
)
var connIfaces = []string{
"driver.ConnBeginTx",
"driver.ConnPrepareContext",
"driver.Execer",
"driver.ExecerContext",
"driver.NamedValueChecker",
"driver.Pinger",
"driver.Queryer",
"driver.QueryerContext",
"driver.SessionResetter",
"driver.Validator",
}
var stmtIfaces = []string{
"driver.StmtExecContext",
"driver.StmtQueryContext",
"driver.ColumnConverter",
"driver.NamedValueChecker",
}
func getHash(s []string) string {
h := md5.New()
io.WriteString(h, strings.Join(s, "|"))
return fmt.Sprintf("%x", h.Sum(nil))
}
func main() {
comboConn := all(connIfaces)
sort.Slice(comboConn, func(i, j int) bool {
return len(comboConn[i]) < len(comboConn[j])
})
comboStmt := all(stmtIfaces)
sort.Slice(comboStmt, func(i, j int) bool {
return len(comboStmt[i]) < len(comboStmt[j])
})
b := bytes.NewBuffer(nil)
b.WriteString("// Code generated. DO NOT EDIT.\n\n")
b.WriteString("package sql\n\n")
b.WriteString(`import "database/sql/driver"`)
b.WriteString("\n\n")
b.WriteString("func wrapConn(dc driver.Conn, opts Options) driver.Conn {\n")
b.WriteString("\tc := &wrapperConn{conn: dc, opts: opts}\n")
for idx := len(comboConn) - 1; idx >= 0; idx-- {
ifaces := comboConn[idx]
n := len(ifaces)
if n == 0 {
continue
}
h := getHash(ifaces)
b.WriteString(fmt.Sprintf("\tif _, ok := dc.(wrapConn%04d_%s); ok {\n", n, h))
b.WriteString("\t\treturn struct {\n")
b.WriteString(strings.Join(append([]string{"\t\t\tdriver.Conn"}, ifaces...), "\n\t\t\t"))
b.WriteString("\n\t\t}{")
for idx := range ifaces {
if idx > 0 {
b.WriteString(", ")
b.WriteString("c")
} else if idx == 0 {
b.WriteString("c")
} else {
b.WriteString("c")
}
}
b.WriteString(", c}\n")
b.WriteString("\t}\n\n")
}
b.WriteString("\treturn c\n")
b.WriteString("}\n\n")
for idx := len(comboConn) - 1; idx >= 0; idx-- {
ifaces := comboConn[idx]
n := len(ifaces)
if n == 0 {
continue
}
h := getHash(ifaces)
b.WriteString(fmt.Sprintf("// %s\n", strings.Join(ifaces, "|")))
b.WriteString(fmt.Sprintf("type wrapConn%04d_%s interface {\n", n, h))
for _, iface := range ifaces {
b.WriteString(fmt.Sprintf("\t%s\n", iface))
}
b.WriteString("}\n\n")
}
b.WriteString("func wrapStmt(stmt driver.Stmt, query string, opts Options) driver.Stmt {\n")
b.WriteString("\tc := &wrapperStmt{stmt: stmt, query: query, opts: opts}\n")
for idx := len(comboStmt) - 1; idx >= 0; idx-- {
ifaces := comboStmt[idx]
n := len(ifaces)
if n == 0 {
continue
}
h := getHash(ifaces)
b.WriteString(fmt.Sprintf("\tif _, ok := stmt.(wrapStmt%04d_%s); ok {\n", n, h))
b.WriteString("\t\treturn struct {\n")
b.WriteString(strings.Join(append([]string{"\t\t\tdriver.Stmt"}, ifaces...), "\n\t\t\t"))
b.WriteString("\n\t\t}{")
for idx := range ifaces {
if idx > 0 {
b.WriteString(", ")
b.WriteString("c")
} else if idx == 0 {
b.WriteString("c")
} else {
b.WriteString("c")
}
}
b.WriteString(", c}\n")
b.WriteString("\t}\n\n")
}
b.WriteString("\treturn c\n")
b.WriteString("}\n")
for idx := len(comboStmt) - 1; idx >= 0; idx-- {
ifaces := comboStmt[idx]
n := len(ifaces)
if n == 0 {
continue
}
h := getHash(ifaces)
b.WriteString(fmt.Sprintf("\n// %s\n", strings.Join(ifaces, "|")))
b.WriteString(fmt.Sprintf("type wrapStmt%04d_%s interface {\n", n, h))
for _, iface := range ifaces {
b.WriteString(fmt.Sprintf("\t%s\n", iface))
}
b.WriteString("}\n")
}
fmt.Printf("%s", b.String())
}
// all returns all combinations for a given string array.
func all[T any](set []T) (subsets [][]T) {
length := uint(len(set))
for subsetBits := 1; subsetBits < (1 << length); subsetBits++ {
var subset []T
for object := uint(0); object < length; object++ {
if (subsetBits>>object)&1 == 1 {
subset = append(subset, set[object])
}
}
subsets = append(subsets, subset)
}
return subsets
}

172
hooks/sql/options.go Normal file
View File

@@ -0,0 +1,172 @@
package sql
import (
"context"
"fmt"
"time"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/tracer"
)
var (
// DefaultMeterStatsInterval holds default stats interval
DefaultMeterStatsInterval = 5 * time.Second
// DefaultLoggerObserver used to prepare labels for logger
DefaultLoggerObserver = func(ctx context.Context, method string, query string, td time.Duration, err error) []interface{} {
labels := []interface{}{"db.method", method, "took", fmt.Sprintf("%v", td)}
if err != nil {
labels = append(labels, "error", err.Error())
}
if query != labelUnknown {
labels = append(labels, "query", query)
}
return labels
}
)
var (
MaxOpenConnections = "micro_sql_max_open_conn"
OpenConnections = "micro_sql_open_conn"
InuseConnections = "micro_sql_inuse_conn"
IdleConnections = "micro_sql_idle_conn"
WaitConnections = "micro_sql_waited_conn"
BlockedSeconds = "micro_sql_blocked_seconds"
MaxIdleClosed = "micro_sql_max_idle_closed"
MaxIdletimeClosed = "micro_sql_closed_max_idle"
MaxLifetimeClosed = "micro_sql_closed_max_lifetime"
meterRequestTotal = "micro_sql_request_total"
meterRequestLatencyMicroseconds = "micro_sql_latency_microseconds"
meterRequestDurationSeconds = "micro_sql_request_duration_seconds"
labelUnknown = "unknown"
labelQuery = "db_statement"
labelMethod = "db_method"
labelStatus = "status"
labelSuccess = "success"
labelFailure = "failure"
labelHost = "db_host"
labelDatabase = "db_name"
)
// Options struct holds wrapper options
type Options struct {
Logger logger.Logger
Meter meter.Meter
Tracer tracer.Tracer
DatabaseHost string
DatabaseName string
MeterStatsInterval time.Duration
LoggerLevel logger.Level
LoggerEnabled bool
LoggerObserver func(ctx context.Context, method string, name string, td time.Duration, err error) []interface{}
}
// Option func signature
type Option func(*Options)
// NewOptions create new Options struct from provided option slice
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
MeterStatsInterval: DefaultMeterStatsInterval,
LoggerLevel: logger.ErrorLevel,
LoggerObserver: DefaultLoggerObserver,
}
for _, o := range opts {
o(&options)
}
options.Meter = options.Meter.Clone(
meter.Labels(
labelHost, options.DatabaseHost,
labelDatabase, options.DatabaseName,
),
)
options.Logger = options.Logger.Clone(logger.WithAddCallerSkipCount(1))
return options
}
// MetricInterval specifies stats interval for *sql.DB
func MetricInterval(td time.Duration) Option {
return func(o *Options) {
o.MeterStatsInterval = td
}
}
func DatabaseHost(host string) Option {
return func(o *Options) {
o.DatabaseHost = host
}
}
func DatabaseName(name string) Option {
return func(o *Options) {
o.DatabaseName = name
}
}
// Meter passes meter.Meter to wrapper
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// Logger passes logger.Logger to wrapper
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// LoggerEnabled enable sql logging
func LoggerEnabled(b bool) Option {
return func(o *Options) {
o.LoggerEnabled = b
}
}
// LoggerLevel passes logger.Level option
func LoggerLevel(lvl logger.Level) Option {
return func(o *Options) {
o.LoggerLevel = lvl
}
}
// LoggerObserver passes observer to fill logger fields
func LoggerObserver(obs func(context.Context, string, string, time.Duration, error) []interface{}) Option {
return func(o *Options) {
o.LoggerObserver = obs
}
}
// Tracer passes tracer.Tracer to wrapper
func Tracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
type queryNameKey struct{}
// QueryName passes query name to wrapper func
func QueryName(ctx context.Context, name string) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, queryNameKey{}, name)
}
func getQueryName(ctx context.Context) string {
if v, ok := ctx.Value(queryNameKey{}).(string); ok && v != labelUnknown {
return v
}
return getCallerName()
}

41
hooks/sql/stats.go Normal file
View File

@@ -0,0 +1,41 @@
package sql
import (
"context"
"database/sql"
"time"
)
type Statser interface {
Stats() sql.DBStats
}
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
options := NewOptions(opts...)
go func() {
ticker := time.NewTicker(options.MeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
}
}()
}

287
hooks/sql/stmt.go Normal file
View File

@@ -0,0 +1,287 @@
package sql
import (
"context"
"database/sql/driver"
"fmt"
"time"
"go.unistack.org/micro/v3/hooks/requestid"
"go.unistack.org/micro/v3/tracer"
)
var (
_ driver.Stmt = (*wrapperStmt)(nil)
_ driver.StmtQueryContext = (*wrapperStmt)(nil)
_ driver.StmtExecContext = (*wrapperStmt)(nil)
_ driver.NamedValueChecker = (*wrapperStmt)(nil)
)
// wrapperStmt defines a wrapper for driver.Stmt
type wrapperStmt struct {
stmt driver.Stmt
opts Options
query string
ctx context.Context
}
// Close implements driver.Stmt Close
func (w *wrapperStmt) Close() error {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
labels := []string{labelMethod, "Close"}
ts := time.Now()
err := w.stmt.Close()
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Close", getCallerName(), td, err)...)
}
*/
return err
}
// NumInput implements driver.Stmt NumInput
func (w *wrapperStmt) NumInput() int {
return w.stmt.NumInput()
}
// CheckNamedValue implements driver.NamedValueChecker
func (w *wrapperStmt) CheckNamedValue(v *driver.NamedValue) error {
s, ok := w.stmt.(driver.NamedValueChecker)
if !ok {
return driver.ErrSkip
}
return s.CheckNamedValue(v)
}
// Exec implements driver.Stmt Exec
func (w *wrapperStmt) Exec(args []driver.Value) (driver.Result, error) {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
labels := []string{labelMethod, "Exec"}
ts := time.Now()
res, err := w.stmt.Exec(args) // nolint:staticcheck
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Exec", getCallerName(), td, err)...)
}
*/
return res, err
}
// Query implements driver.Stmt Query
func (w *wrapperStmt) Query(args []driver.Value) (driver.Rows, error) {
var ctx context.Context
if w.ctx != nil {
ctx = w.ctx
} else {
ctx = context.Background()
}
_ = ctx
labels := []string{labelMethod, "Query"}
ts := time.Now()
rows, err := w.stmt.Query(args) // nolint:staticcheck
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "Query", getCallerName(), td, err)...)
}
*/
return rows, err
}
// ColumnConverter implements driver.ColumnConverter
func (w *wrapperStmt) ColumnConverter(idx int) driver.ValueConverter {
s, ok := w.stmt.(driver.ColumnConverter) // nolint:staticcheck
if !ok {
return nil
}
return s.ColumnConverter(idx)
}
// ExecContext implements driver.StmtExecContext ExecContext
func (w *wrapperStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if w.ctx != nil {
nctx, span = w.opts.Tracer.Start(w.ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
} else {
nctx, span = w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
}
span.AddLabels("db.method", "ExecContext")
span.AddLabels("db.statement", name)
defer span.Finish()
if len(args) > 0 {
span.AddLabels("db.args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
if id, ok := ctx.Value(requestid.XRequestIDKey{}).(string); ok {
span.AddLabels("x-request-id", id)
}
labels := []string{labelMethod, "ExecContext", labelQuery, name}
if conn, ok := w.stmt.(driver.StmtExecContext); ok {
ts := time.Now()
res, err := conn.ExecContext(nctx, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "ExecContext", name, td, err)...)
}
*/
return res, err
}
values, err := namedValueToValue(args)
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "ExecContext", name, 0, err)...)
}
*/
return nil, err
}
ts := time.Now()
res, err := w.Exec(values) // nolint:staticcheck
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "ExecContext", name, td, err)...)
}
*/
return res, err
}
// QueryContext implements driver.StmtQueryContext StmtQueryContext
func (w *wrapperStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
var nctx context.Context
var span tracer.Span
name := getQueryName(ctx)
if w.ctx != nil {
nctx, span = w.opts.Tracer.Start(w.ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
} else {
nctx, span = w.opts.Tracer.Start(ctx, "sdk.database", tracer.WithSpanKind(tracer.SpanKindClient))
}
span.AddLabels("db.method", "QueryContext")
span.AddLabels("db.statement", name)
defer span.Finish()
if len(args) > 0 {
span.AddLabels("db.args", fmt.Sprintf("%v", namedValueToLabels(args)))
}
if id, ok := ctx.Value(requestid.XRequestIDKey{}).(string); ok {
span.AddLabels("x-request-id", id)
}
labels := []string{labelMethod, "QueryContext", labelQuery, name}
if conn, ok := w.stmt.(driver.StmtQueryContext); ok {
ts := time.Now()
rows, err := conn.QueryContext(nctx, args)
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "QueryContext", name, td, err)...)
}
*/
return rows, err
}
values, err := namedValueToValue(args)
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "QueryContext", name, 0, err)...)
}
*/
return nil, err
}
ts := time.Now()
rows, err := w.Query(values) // nolint:staticcheck
td := time.Since(ts)
te := td.Seconds()
if err != nil {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelFailure)...).Inc()
span.SetStatus(tracer.SpanStatusError, err.Error())
} else {
w.opts.Meter.Counter(meterRequestTotal, append(labels, labelStatus, labelSuccess)...).Inc()
}
w.opts.Meter.Summary(meterRequestLatencyMicroseconds, labels...).Update(te)
w.opts.Meter.Histogram(meterRequestDurationSeconds, labels...).Update(te)
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(ctx, "QueryContext", name, td, err)...)
}
*/
return rows, err
}

63
hooks/sql/tx.go Normal file
View File

@@ -0,0 +1,63 @@
package sql
import (
"context"
"database/sql/driver"
"time"
"go.unistack.org/micro/v3/tracer"
)
var _ driver.Tx = (*wrapperTx)(nil)
// wrapperTx defines a wrapper for driver.Tx
type wrapperTx struct {
tx driver.Tx
span tracer.Span
opts Options
ctx context.Context
}
// Commit implements driver.Tx Commit
func (w *wrapperTx) Commit() error {
ts := time.Now()
err := w.tx.Commit()
td := time.Since(ts)
_ = td
if w.span != nil {
if err != nil {
w.span.SetStatus(tracer.SpanStatusError, err.Error())
}
w.span.Finish()
}
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(w.ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(w.ctx, "Commit", getCallerName(), td, err)...)
}
*/
w.ctx = nil
return err
}
// Rollback implements driver.Tx Rollback
func (w *wrapperTx) Rollback() error {
ts := time.Now()
err := w.tx.Rollback()
td := time.Since(ts)
_ = td
if w.span != nil {
if err != nil {
w.span.SetStatus(tracer.SpanStatusError, err.Error())
}
w.span.Finish()
}
/*
if w.opts.LoggerEnabled && w.opts.Logger.V(w.opts.LoggerLevel) {
w.opts.Logger.Log(w.ctx, w.opts.LoggerLevel, w.opts.LoggerObserver(w.ctx, "Rollback", getCallerName(), td, err)...)
}
*/
w.ctx = nil
return err
}

19
hooks/sql/wrap.go Normal file
View File

@@ -0,0 +1,19 @@
package sql
import (
"database/sql/driver"
)
/*
func wrapDriver(d driver.Driver, opts Options) driver.Driver {
if _, ok := d.(driver.DriverContext); ok {
return &wrapperDriver{driver: d, opts: opts}
}
return struct{ driver.Driver }{&wrapperDriver{driver: d, opts: opts}}
}
*/
// WrapConn allows an existing driver.Conn to be wrapped.
func WrapConn(c driver.Conn, opts ...Option) driver.Conn {
return wrapConn(c, NewOptions(opts...))
}

20699
hooks/sql/wrap_gen.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,194 @@
package validator
import (
"context"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/server"
)
var (
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
if rsp != nil {
return errors.BadGateway(req.Service(), "%v", err)
}
return errors.BadRequest(req.Service(), "%v", err)
}
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
if rsp != nil {
return errors.BadGateway(req.Service(), "%v", err)
}
return errors.BadRequest(req.Service(), "%v", err)
}
DefaultPublishErrorFunc = func(msg client.Message, err error) error {
return errors.BadRequest(msg.Topic(), "%v", err)
}
DefaultSubscribeErrorFunc = func(msg server.Message, err error) error {
return errors.BadRequest(msg.Topic(), "%v", err)
}
)
type (
ClientErrorFunc func(client.Request, interface{}, error) error
ServerErrorFunc func(server.Request, interface{}, error) error
PublishErrorFunc func(client.Message, error) error
SubscribeErrorFunc func(server.Message, error) error
)
// Options struct holds wrapper options
type Options struct {
ClientErrorFn ClientErrorFunc
ServerErrorFn ServerErrorFunc
PublishErrorFn PublishErrorFunc
SubscribeErrorFn SubscribeErrorFunc
ClientValidateResponse bool
ServerValidateResponse bool
}
// Option func signature
type Option func(*Options)
func ClientValidateResponse(b bool) Option {
return func(o *Options) {
o.ClientValidateResponse = b
}
}
func ServerValidateResponse(b bool) Option {
return func(o *Options) {
o.ClientValidateResponse = b
}
}
func ClientReqErrorFn(fn ClientErrorFunc) Option {
return func(o *Options) {
o.ClientErrorFn = fn
}
}
func ServerErrorFn(fn ServerErrorFunc) Option {
return func(o *Options) {
o.ServerErrorFn = fn
}
}
func PublishErrorFn(fn PublishErrorFunc) Option {
return func(o *Options) {
o.PublishErrorFn = fn
}
}
func SubscribeErrorFn(fn SubscribeErrorFunc) Option {
return func(o *Options) {
o.SubscribeErrorFn = fn
}
}
func NewOptions(opts ...Option) Options {
options := Options{
ClientErrorFn: DefaultClientErrorFunc,
ServerErrorFn: DefaultServerErrorFunc,
PublishErrorFn: DefaultPublishErrorFunc,
SubscribeErrorFn: DefaultSubscribeErrorFunc,
}
for _, o := range opts {
o(&options)
}
return options
}
func NewHook(opts ...Option) *hook {
return &hook{opts: NewOptions(opts...)}
}
type validator interface {
Validate() error
}
type hook struct {
opts Options
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.ClientErrorFn(req, nil, err)
}
}
err := next(ctx, req, rsp, opts...)
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
if verr := v.Validate(); verr != nil {
return w.opts.ClientErrorFn(req, rsp, verr)
}
}
return err
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return nil, w.opts.ClientErrorFn(req, nil, err)
}
}
return next(ctx, req, opts...)
}
}
func (w *hook) ClientPublish(next client.FuncPublish) client.FuncPublish {
return func(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
if v, ok := msg.Payload().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.PublishErrorFn(msg, err)
}
}
return next(ctx, msg, opts...)
}
}
func (w *hook) ClientBatchPublish(next client.FuncBatchPublish) client.FuncBatchPublish {
return func(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error {
for _, msg := range msgs {
if v, ok := msg.Payload().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.PublishErrorFn(msg, err)
}
}
}
return next(ctx, msgs, opts...)
}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.ServerErrorFn(req, nil, err)
}
}
err := next(ctx, req, rsp)
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
if verr := v.Validate(); verr != nil {
return w.opts.ServerErrorFn(req, rsp, verr)
}
}
return err
}
}
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
return func(ctx context.Context, msg server.Message) error {
if v, ok := msg.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.SubscribeErrorFn(msg, err)
}
}
return next(ctx, msg)
}
}

View File

@@ -99,6 +99,7 @@ func WithAddFields(fields ...interface{}) Option {
iv, iok := o.Fields[i].(string)
jv, jok := fields[j].(string)
if iok && jok && iv == jv {
o.Fields[i+1] = fields[j+1]
fields = slices.Delete(fields, j, j+2)
}
}

View File

@@ -278,7 +278,7 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
}
}
if (s.opts.AddStacktrace || lvl == logger.FatalLevel) || (s.opts.AddStacktrace && lvl == logger.ErrorLevel) {
if s.opts.AddStacktrace && (lvl == logger.FatalLevel || lvl == logger.ErrorLevel) {
stackInfo := make([]byte, 1024*1024)
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)

View File

@@ -21,7 +21,7 @@ import (
func TestStacktrace(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf),
l := NewLogger(logger.WithLevel(logger.DebugLevel), logger.WithOutput(buf),
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
)
@@ -124,7 +124,7 @@ func TestWithDedupKeysWithAddFields(t *testing.T) {
l.Info(ctx, "msg3")
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val1 key2=val2`)) {
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val4 key2=val3`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}

View File

@@ -104,6 +104,7 @@ type service struct {
done chan struct{}
opts Options
sync.RWMutex
stopped bool
}
// NewService creates and returns a new Service based on the packages within.
@@ -429,7 +430,7 @@ func (s *service) Stop() error {
}
}
close(s.done)
s.notifyShutdown()
return nil
}
@@ -453,10 +454,23 @@ func (s *service) Run() error {
return err
}
// wait on context cancel
<-s.done
return s.Stop()
return nil
}
// notifyShutdown marks the service as stopped and closes the done channel.
// It ensures the channel is closed only once, preventing multiple closures.
func (s *service) notifyShutdown() {
s.Lock()
if s.stopped {
s.Unlock()
return
}
s.stopped = true
s.Unlock()
close(s.done)
}
type Namer interface {

View File

@@ -4,7 +4,9 @@ import (
"context"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/config"
@@ -773,3 +775,41 @@ func Test_getNameIndex(t *testing.T) {
}
}
*/
func TestServiceShutdown(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("service shutdown failed: %v", r)
}
}()
s, ok := NewService().(*service)
require.NotNil(t, s)
require.True(t, ok)
require.NoError(t, s.Start())
require.False(t, s.stopped)
require.NoError(t, s.Stop())
require.True(t, s.stopped)
}
func TestServiceMultipleShutdowns(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("service shutdown failed: %v", r)
}
}()
s := NewService()
go func() {
time.Sleep(10 * time.Millisecond)
// first call
require.NoError(t, s.Stop())
// duplicate call
require.NoError(t, s.Stop())
}()
require.NoError(t, s.Run())
}

View File

@@ -46,6 +46,10 @@ func (s memoryStringer) String() string {
return s.s
}
func (t *Tracer) Enabled() bool {
return t.opts.Enabled
}
func (t *Tracer) Flush(_ context.Context) error {
return nil
}
@@ -89,6 +93,10 @@ func (s *Span) Tracer() tracer.Tracer {
return s.tracer
}
func (s *Span) IsRecording() bool {
return true
}
type Event struct {
name string
labels []interface{}

View File

@@ -18,6 +18,10 @@ func (t *noopTracer) Spans() []Span {
return t.spans
}
func (t *noopTracer) Enabled() bool {
return t.opts.Enabled
}
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
options := NewSpanOptions(opts...)
span := &noopSpan{
@@ -120,6 +124,10 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
s.statusMsg = msg
}
func (s *noopSpan) IsRecording() bool {
return false
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &noopTracer{

View File

@@ -142,6 +142,8 @@ type Options struct {
Name string
// ContextAttrFuncs contains funcs that provides tracing
ContextAttrFuncs []ContextAttrFunc
// Enabled specify trace status
Enabled bool
}
// Option func signature
@@ -181,6 +183,7 @@ func NewOptions(opts ...Option) Options {
Logger: logger.DefaultLogger,
Context: context.Background(),
ContextAttrFuncs: DefaultContextAttrFuncs,
Enabled: true,
}
for _, o := range opts {
o(&options)
@@ -194,3 +197,10 @@ func Name(n string) Option {
o.Name = n
}
}
// Disabled disable tracer
func Disabled(b bool) Option {
return func(o *Options) {
o.Enabled = !b
}
}

View File

@@ -51,6 +51,8 @@ type Tracer interface {
// Extract(ctx context.Context)
// Flush flushes spans
Flush(ctx context.Context) error
// Enabled returns tracer status
Enabled() bool
}
type Span interface {
@@ -78,4 +80,6 @@ type Span interface {
TraceID() string
// SpanID returns span id
SpanID() string
// IsRecording returns the recording state of the Span.
IsRecording() bool
}

View File

@@ -0,0 +1,84 @@
package buffer
import "io"
var _ interface {
io.ReadCloser
io.ReadSeeker
} = (*SeekerBuffer)(nil)
// Buffer is a ReadWriteCloser that supports seeking. It's intended to
// replicate the functionality of bytes.Buffer that I use in my projects.
//
// Note that the seeking is limited to the read marker; all writes are
// append-only.
type SeekerBuffer struct {
data []byte
pos int64
}
func NewSeekerBuffer(data []byte) *SeekerBuffer {
return &SeekerBuffer{
data: data,
}
}
func (b *SeekerBuffer) Read(p []byte) (int, error) {
if b.pos >= int64(len(b.data)) {
return 0, io.EOF
}
n := copy(p, b.data[b.pos:])
b.pos += int64(n)
return n, nil
}
func (b *SeekerBuffer) Write(p []byte) (int, error) {
b.data = append(b.data, p...)
return len(p), nil
}
// Seek sets the read pointer to pos.
func (b *SeekerBuffer) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
b.pos = offset
case io.SeekEnd:
b.pos = int64(len(b.data)) + offset
case io.SeekCurrent:
b.pos += offset
}
return b.pos, nil
}
// Rewind resets the read pointer to 0.
func (b *SeekerBuffer) Rewind() error {
if _, err := b.Seek(0, io.SeekStart); err != nil {
return err
}
return nil
}
// Close clears all the data out of the buffer and sets the read position to 0.
func (b *SeekerBuffer) Close() error {
b.data = nil
b.pos = 0
return nil
}
// Reset clears all the data out of the buffer and sets the read position to 0.
func (b *SeekerBuffer) Reset() {
b.data = nil
b.pos = 0
}
// Len returns the length of data remaining to be read.
func (b *SeekerBuffer) Len() int {
return len(b.data[b.pos:])
}
// Bytes returns the underlying bytes from the current position.
func (b *SeekerBuffer) Bytes() []byte {
return b.data[b.pos:]
}

View File

@@ -0,0 +1,55 @@
package buffer
import (
"fmt"
"strings"
"testing"
)
func noErrorT(t *testing.T, err error) {
if nil != err {
t.Fatalf("%s", err)
}
}
func boolT(t *testing.T, cond bool, s ...string) {
if !cond {
what := strings.Join(s, ", ")
if len(what) > 0 {
what = ": " + what
}
t.Fatalf("assert.Bool failed%s", what)
}
}
func TestSeeking(t *testing.T) {
partA := []byte("hello, ")
partB := []byte("world!")
buf := NewSeekerBuffer(partA)
boolT(t, buf.Len() == len(partA), fmt.Sprintf("on init: have length %d, want length %d", buf.Len(), len(partA)))
b := make([]byte, 32)
n, err := buf.Read(b)
noErrorT(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after reading 1: have length %d, want length 0", buf.Len()))
boolT(t, n == len(partA), fmt.Sprintf("after reading 2: have length %d, want length %d", n, len(partA)))
n, err = buf.Write(partB)
noErrorT(t, err)
boolT(t, n == len(partB), fmt.Sprintf("after writing: have length %d, want length %d", n, len(partB)))
n, err = buf.Read(b)
noErrorT(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after rereading 1: have length %d, want length 0", buf.Len()))
boolT(t, n == len(partB), fmt.Sprintf("after rereading 2: have length %d, want length %d", n, len(partB)))
partsLen := len(partA) + len(partB)
_ = buf.Rewind()
boolT(t, buf.Len() == partsLen, fmt.Sprintf("after rewinding: have length %d, want length %d", buf.Len(), partsLen))
buf.Close()
boolT(t, buf.Len() == 0, fmt.Sprintf("after closing, have length %d, want length 0", buf.Len()))
}

View File

@@ -489,35 +489,74 @@ func URLMap(query string) (map[string]interface{}, error) {
return mp.(map[string]interface{}), nil
}
// FlattenMap expand key.subkey to nested map
func FlattenMap(a map[string]interface{}) map[string]interface{} {
// preprocess map
nb := make(map[string]interface{}, len(a))
for k, v := range a {
ps := strings.Split(k, ".")
if len(ps) == 1 {
nb[k] = v
// FlattenMap flattens a nested map into a single-level map using dot notation for nested keys.
// In case of key conflicts, all nested levels will be discarded in favor of the first-level key.
//
// Example #1:
//
// Input:
// {
// "user.name": "alex",
// "user.document.id": "document_id"
// "user.document.number": "document_number"
// }
// Output:
// {
// "user": {
// "name": "alex",
// "document": {
// "id": "document_id"
// "number": "document_number"
// }
// }
// }
//
// Example #2 (with conflicts):
//
// Input:
// {
// "user": "alex",
// "user.document.id": "document_id"
// "user.document.number": "document_number"
// }
// Output:
// {
// "user": "alex"
// }
func FlattenMap(input map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{})
for k, v := range input {
parts := strings.Split(k, ".")
if len(parts) == 1 {
result[k] = v
continue
}
em := make(map[string]interface{})
em[ps[len(ps)-1]] = v
for i := len(ps) - 2; i > 0; i-- {
nm := make(map[string]interface{})
nm[ps[i]] = em
em = nm
}
if vm, ok := nb[ps[0]]; ok {
// nested map
nm := vm.(map[string]interface{})
for vk, vv := range em {
nm[vk] = vv
current := result
for i, part := range parts {
// last element in the path
if i == len(parts)-1 {
current[part] = v
break
}
// initialize map for current level if not exist
if _, ok := current[part]; !ok {
current[part] = make(map[string]interface{})
}
if nested, ok := current[part].(map[string]interface{}); ok {
current = nested // continue to the nested map
} else {
break // if current element is not a map, ignore it
}
nb[ps[0]] = nm
} else {
nb[ps[0]] = em
}
}
return nb
return result
}
/*

View File

@@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
rutil "go.unistack.org/micro/v3/util/reflect"
)
@@ -319,3 +320,140 @@ func TestIsZero(t *testing.T) {
// t.Logf("XX %#+v\n", ok)
}
func TestFlattenMap(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
expected map[string]interface{}
}{
{
name: "empty map",
input: map[string]interface{}{},
expected: map[string]interface{}{},
},
{
name: "nil map",
input: nil,
expected: map[string]interface{}{},
},
{
name: "single level",
input: map[string]interface{}{
"username": "username",
"password": "password",
},
expected: map[string]interface{}{
"username": "username",
"password": "password",
},
},
{
name: "two level",
input: map[string]interface{}{
"order_id": "order_id",
"user.name": "username",
"user.password": "password",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"name": "username",
"password": "password",
},
},
},
{
name: "three level",
input: map[string]interface{}{
"order_id": "order_id",
"user.name": "username",
"user.password": "password",
"user.document.id": "document_id",
"user.document.number": "document_number",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"name": "username",
"password": "password",
"document": map[string]interface{}{
"id": "document_id",
"number": "document_number",
},
},
},
},
{
name: "four level",
input: map[string]interface{}{
"order_id": "order_id",
"user.name": "username",
"user.password": "password",
"user.document.id": "document_id",
"user.document.number": "document_number",
"user.info.permissions.read": "available",
"user.info.permissions.write": "available",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"name": "username",
"password": "password",
"document": map[string]interface{}{
"id": "document_id",
"number": "document_number",
},
"info": map[string]interface{}{
"permissions": map[string]interface{}{
"read": "available",
"write": "available",
},
},
},
},
},
{
name: "key conflicts",
input: map[string]interface{}{
"user": "user",
"user.name": "username",
"user.password": "password",
},
expected: map[string]interface{}{
"user": "user",
},
},
{
name: "overwriting conflicts",
input: map[string]interface{}{
"order_id": "order_id",
"user.document.id": "document_id",
"user.document.number": "document_number",
"user.info.address": "address",
"user.info.phone": "phone",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"document": map[string]interface{}{
"id": "document_id",
"number": "document_number",
},
"info": map[string]interface{}{
"address": "address",
"phone": "phone",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for range 100 { // need to exclude the impact of key order in the map on the test.
require.Equal(t, tt.expected, rutil.FlattenMap(tt.input))
}
})
}
}