Compare commits

..

8 Commits

Author SHA1 Message Date
614b740c56 split files
Some checks failed
lint / lint (pull_request) Successful in 2m27s
test / test (pull_request) Failing after 17m13s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:46:47 +03:00
bc011a2e7f split files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:44:29 +03:00
f92e18897a implement driver
Some checks failed
lint / lint (pull_request) Successful in 2m43s
test / test (pull_request) Successful in 4m53s
coverage / build (pull_request) Failing after 17m18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-20 22:27:42 +03:00
022326ddc4 move
Some checks failed
lint / lint (pull_request) Successful in 3m27s
test / test (pull_request) Successful in 4m49s
coverage / build (pull_request) Failing after 17m28s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:31:11 +03:00
e053eeac74 add default node state criterion
Some checks failed
lint / lint (pull_request) Successful in 2m36s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-19 11:26:16 +03:00
6c6916a050 initial hasql support
Some checks failed
lint / lint (pull_request) Successful in 4m23s
test / test (pull_request) Failing after 17m14s
coverage / build (pull_request) Failing after 17m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 17:16:20 +03:00
ea84ac094f Merge branch 'v4' into hasql
Some checks failed
test / test (pull_request) Failing after 17m58s
coverage / build (pull_request) Failing after 18m40s
lint / lint (pull_request) Failing after 1m41s
2025-09-18 14:35:10 +03:00
2886a7fe8a initial hasql support
Some checks failed
test / test (pull_request) Failing after 19m47s
lint / lint (pull_request) Failing after 19m59s
coverage / build (pull_request) Failing after 20m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-18 14:34:48 +03:00
7 changed files with 119 additions and 284 deletions

View File

@@ -3,7 +3,6 @@ package sql
import ( import (
"context" "context"
"database/sql" "database/sql"
"sync"
"time" "time"
) )
@@ -12,84 +11,31 @@ type Statser interface {
} }
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
if db == nil {
return
}
options := NewOptions(opts...) options := NewOptions(opts...)
var ( go func() {
statsMu sync.Mutex ticker := time.NewTicker(options.MeterStatsInterval)
lastUpdated time.Time defer ticker.Stop()
maxOpenConnections, openConnections, inUse, idle, waitCount float64
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
waitDuration float64
)
updateFn := func() { for {
statsMu.Lock() select {
defer statsMu.Unlock() case <-ctx.Done():
return
if time.Since(lastUpdated) < options.MeterStatsInterval { case <-ticker.C:
return if db == nil {
return
}
stats := db.Stats()
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
}
} }
}()
stats := db.Stats()
maxOpenConnections = float64(stats.MaxOpenConnections)
openConnections = float64(stats.OpenConnections)
inUse = float64(stats.InUse)
idle = float64(stats.Idle)
waitCount = float64(stats.WaitCount)
maxIdleClosed = float64(stats.MaxIdleClosed)
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
waitDuration = float64(stats.WaitDuration.Seconds())
lastUpdated = time.Now()
}
options.Meter.Gauge(MaxOpenConnections, func() float64 {
updateFn()
return maxOpenConnections
})
options.Meter.Gauge(OpenConnections, func() float64 {
updateFn()
return openConnections
})
options.Meter.Gauge(InuseConnections, func() float64 {
updateFn()
return inUse
})
options.Meter.Gauge(IdleConnections, func() float64 {
updateFn()
return idle
})
options.Meter.Gauge(WaitConnections, func() float64 {
updateFn()
return waitCount
})
options.Meter.Gauge(BlockedSeconds, func() float64 {
updateFn()
return waitDuration
})
options.Meter.Gauge(MaxIdleClosed, func() float64 {
updateFn()
return maxIdleClosed
})
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
updateFn()
return maxIdleTimeClosed
})
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
updateFn()
return maxLifetimeClosed
})
} }

View File

@@ -52,12 +52,6 @@ type Options struct {
AddStacktrace bool AddStacktrace bool
// DedupKeys deduplicate keys in log output // DedupKeys deduplicate keys in log output
DedupKeys bool DedupKeys bool
// FatalFinalizers runs in order in [logger.Fatal] method
FatalFinalizers []func(context.Context)
}
var DefaultFatalFinalizer = func(ctx context.Context) {
os.Exit(1)
} }
// NewOptions creates new options struct // NewOptions creates new options struct
@@ -71,7 +65,6 @@ func NewOptions(opts ...Option) Options {
AddSource: true, AddSource: true,
TimeFunc: time.Now, TimeFunc: time.Now,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
FatalFinalizers: []func(context.Context){DefaultFatalFinalizer},
} }
WithMicroKeys()(&options) WithMicroKeys()(&options)
@@ -83,13 +76,6 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {
o.FatalFinalizers = fncs
}
}
// WithContextAttrFuncs appends default funcs for the context attrs filler // WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option { func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -4,12 +4,14 @@ import (
"context" "context"
"io" "io"
"log/slog" "log/slog"
"os"
"reflect" "reflect"
"regexp" "regexp"
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
@@ -229,12 +231,11 @@ func (s *slogLogger) Error(ctx context.Context, msg string, attrs ...interface{}
func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Fatal(ctx context.Context, msg string, attrs ...interface{}) {
s.printLog(ctx, logger.FatalLevel, msg, attrs...) s.printLog(ctx, logger.FatalLevel, msg, attrs...)
for _, fn := range s.opts.FatalFinalizers {
fn(ctx)
}
if closer, ok := s.opts.Out.(io.Closer); ok { if closer, ok := s.opts.Out.(io.Closer); ok {
closer.Close() closer.Close()
} }
time.Sleep(1 * time.Second)
os.Exit(1)
} }
func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) { func (s *slogLogger) Warn(ctx context.Context, msg string, attrs ...interface{}) {

View File

@@ -469,25 +469,3 @@ func Test_WithContextAttrFunc(t *testing.T) {
// t.Logf("xxx %s", buf.Bytes()) // t.Logf("xxx %s", buf.Bytes())
} }
func TestFatalFinalizers(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(
logger.WithLevel(logger.TraceLevel),
logger.WithOutput(buf),
)
if err := l.Init(
logger.WithFatalFinalizers(func(ctx context.Context) {
l.Info(ctx, "fatal finalizer")
})); err != nil {
t.Fatal(err)
}
l.Fatal(ctx, "info_msg1")
if !bytes.Contains(buf.Bytes(), []byte("fatal finalizer")) {
t.Fatalf("logger dont have fatal message, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte("info_msg1")) {
t.Fatalf("logger dont have info_msg1 message, buf %s", buf.Bytes())
}
}

View File

@@ -59,8 +59,6 @@ type Meter interface {
Options() Options Options() Options
// String return meter type // String return meter type
String() string String() string
// Unregister metric name and drop all data
Unregister(name string, labels ...string) bool
} }
// Counter is a counter // Counter is a counter
@@ -82,11 +80,7 @@ type FloatCounter interface {
// Gauge is a float64 gauge // Gauge is a float64 gauge
type Gauge interface { type Gauge interface {
Add(float64)
Get() float64 Get() float64
Set(float64)
Dec()
Inc()
} }
// Histogram is a histogram for non-negative values with automatically created buckets // Histogram is a histogram for non-negative values with automatically created buckets

View File

@@ -28,10 +28,6 @@ func (r *noopMeter) Name() string {
return r.opts.Name return r.opts.Name
} }
func (r *noopMeter) Unregister(name string, labels ...string) bool {
return true
}
// Init initialize options // Init initialize options
func (r *noopMeter) Init(opts ...Option) error { func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
@@ -136,18 +132,6 @@ type noopGauge struct {
labels []string labels []string
} }
func (r *noopGauge) Add(float64) {
}
func (r *noopGauge) Set(float64) {
}
func (r *noopGauge) Inc() {
}
func (r *noopGauge) Dec() {
}
func (r *noopGauge) Get() float64 { func (r *noopGauge) Get() float64 {
return 0 return 0
} }

View File

@@ -6,18 +6,18 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
) )
func unregisterMetrics(size int) { var (
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) pools = make([]Statser, 0)
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) poolsMu sync.Mutex
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) )
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
// Stats struct
type Stats struct { type Stats struct {
Get uint64 Get uint64
Put uint64 Put uint64
@@ -25,13 +25,41 @@ type Stats struct {
Ret uint64 Ret uint64
} }
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct { type Pool[T any] struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
} }
func (p Pool[T]) Put(t T) { func (p Pool[T]) Put(t T) {
@@ -42,82 +70,37 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T) return p.p.Get().(T)
} }
func NewPool[T any](fn func() T, size int) Pool[T] { func NewPool[T any](fn func() T) Pool[T] {
p := Pool[T]{ return Pool[T]{
c: size, p: &sync.Pool{
get: &atomic.Uint64{}, New: func() interface{} {
put: &atomic.Uint64{}, return fn()
mis: &atomic.Uint64{}, },
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
}, },
} }
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
} }
type BytePool struct { type BytePool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytePool(size int) *BytePool { func NewBytePool(size int) *BytePool {
p := &BytePool{ p := &BytePool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size) b := make([]byte, 0, size)
return &b return &b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -127,73 +110,49 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats { func (p *BytePool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *BytePool) Get() *[]byte { func (p *BytePool) Get() *[]byte {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte) return p.p.Get().(*[]byte)
} }
func (p *BytePool) Put(b *[]byte) { func (p *BytePool) Put(b *[]byte) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c { if cap(*b) > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
*b = (*b)[:0] *b = (*b)[:0]
p.p.Put(b) p.p.Put(b)
} }
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct { type BytesPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytesPool(size int) *BytesPool { func NewBytesPool(size int) *BytesPool {
p := &BytesPool{ p := &BytesPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size)) b := bytes.NewBuffer(make([]byte, 0, size))
return b return b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -203,10 +162,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats { func (p *BytesPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
@@ -215,43 +174,34 @@ func (p *BytesPool) Get() *bytes.Buffer {
} }
func (p *BytesPool) Put(b *bytes.Buffer) { func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c { if (*b).Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct { type StringsPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewStringsPool(size int) *StringsPool { func NewStringsPool(size int) *StringsPool {
p := &StringsPool{ p := &StringsPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
return &strings.Builder{} return &strings.Builder{}
}, },
} }
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p return p
} }
@@ -261,28 +211,24 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats { func (p *StringsPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *StringsPool) Get() *strings.Builder { func (p *StringsPool) Get() *strings.Builder {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder) return p.p.Get().(*strings.Builder)
} }
func (p *StringsPool) Put(b *strings.Builder) { func (p *StringsPool) Put(b *strings.Builder) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c { if b.Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}