Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
2a6ce6d4da | |||
ad19fe2b90 | |||
49055a28ea | |||
d1c6e121c1 | |||
7cd7fb0c0a | |||
77eb5b5264 | |||
929e46c087 | |||
1fb5673d27 |
@ -8,6 +8,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/semconv"
|
"go.unistack.org/micro/v3/semconv"
|
||||||
@ -31,6 +32,27 @@ var (
|
|||||||
fatalValue = slog.StringValue("fatal")
|
fatalValue = slog.StringValue("fatal")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type wrapper struct {
|
||||||
|
h slog.Handler
|
||||||
|
level atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
|
||||||
|
return level >= slog.Level(int(h.level.Load()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
||||||
|
return h.h.Handle(ctx, rec)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||||
|
return h.WithAttrs(attrs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *wrapper) WithGroup(name string) slog.Handler {
|
||||||
|
return h.WithGroup(name)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||||
switch a.Key {
|
switch a.Key {
|
||||||
case slog.SourceKey:
|
case slog.SourceKey:
|
||||||
@ -68,7 +90,7 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
|||||||
|
|
||||||
type slogLogger struct {
|
type slogLogger struct {
|
||||||
leveler *slog.LevelVar
|
leveler *slog.LevelVar
|
||||||
handler slog.Handler
|
handler *wrapper
|
||||||
opts logger.Options
|
opts logger.Options
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
@ -82,51 +104,52 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
|
|||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
l := &slogLogger{
|
if len(options.ContextAttrFuncs) == 0 {
|
||||||
opts: options,
|
options.ContextAttrFuncs = logger.DefaultContextAttrFuncs
|
||||||
}
|
}
|
||||||
|
|
||||||
l.leveler = new(slog.LevelVar)
|
attrs, _ := s.argsAttrs(options.Fields)
|
||||||
handleOpt := &slog.HandlerOptions{
|
l := &slogLogger{
|
||||||
ReplaceAttr: l.renameAttr,
|
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)},
|
||||||
Level: l.leveler,
|
opts: options,
|
||||||
AddSource: l.opts.AddSource,
|
|
||||||
}
|
}
|
||||||
l.leveler.Set(loggerToSlogLevel(l.opts.Level))
|
l.handler.level.Store(int64(loggerToSlogLevel(options.Level)))
|
||||||
l.handler = slog.New(slog.NewJSONHandler(options.Out, handleOpt)).With(options.Fields...).Handler()
|
|
||||||
|
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) V(level logger.Level) bool {
|
func (s *slogLogger) V(level logger.Level) bool {
|
||||||
return s.opts.Level.Enabled(level)
|
s.mu.Lock()
|
||||||
|
v := s.opts.Level.Enabled(level)
|
||||||
|
s.mu.Unlock()
|
||||||
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Level(level logger.Level) {
|
func (s *slogLogger) Level(level logger.Level) {
|
||||||
s.leveler.Set(loggerToSlogLevel(level))
|
s.mu.Lock()
|
||||||
|
s.opts.Level = level
|
||||||
|
s.handler.level.Store(int64(loggerToSlogLevel(level)))
|
||||||
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Options() logger.Options {
|
func (s *slogLogger) Options() logger.Options {
|
||||||
return s.opts
|
return s.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Fields(attrs ...interface{}) logger.Logger {
|
func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
level := s.leveler.Level()
|
|
||||||
options := s.opts
|
options := s.opts
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
|
|
||||||
l := &slogLogger{opts: options}
|
l := &slogLogger{opts: options}
|
||||||
l.leveler = new(slog.LevelVar)
|
|
||||||
l.leveler.Set(level)
|
|
||||||
|
|
||||||
handleOpt := &slog.HandlerOptions{
|
if len(options.ContextAttrFuncs) == 0 {
|
||||||
ReplaceAttr: l.renameAttr,
|
options.ContextAttrFuncs = logger.DefaultContextAttrFuncs
|
||||||
Level: l.leveler,
|
|
||||||
AddSource: l.opts.AddSource,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
l.handler = slog.New(slog.NewJSONHandler(l.opts.Out, handleOpt)).With(attrs...).Handler()
|
attrs, _ := s.argsAttrs(fields)
|
||||||
|
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)}
|
||||||
|
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level)))
|
||||||
|
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
@ -134,22 +157,23 @@ func (s *slogLogger) Fields(attrs ...interface{}) logger.Logger {
|
|||||||
func (s *slogLogger) Init(opts ...logger.Option) error {
|
func (s *slogLogger) Init(opts ...logger.Option) error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
|
||||||
if len(s.opts.ContextAttrFuncs) == 0 {
|
|
||||||
s.opts.ContextAttrFuncs = logger.DefaultContextAttrFuncs
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&s.opts)
|
o(&s.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.leveler = new(slog.LevelVar)
|
if len(s.opts.ContextAttrFuncs) == 0 {
|
||||||
|
s.opts.ContextAttrFuncs = logger.DefaultContextAttrFuncs
|
||||||
|
}
|
||||||
|
|
||||||
handleOpt := &slog.HandlerOptions{
|
handleOpt := &slog.HandlerOptions{
|
||||||
ReplaceAttr: s.renameAttr,
|
ReplaceAttr: s.renameAttr,
|
||||||
Level: s.leveler,
|
Level: loggerToSlogLevel(logger.TraceLevel),
|
||||||
AddSource: s.opts.AddSource,
|
AddSource: s.opts.AddSource,
|
||||||
}
|
}
|
||||||
s.leveler.Set(loggerToSlogLevel(s.opts.Level))
|
|
||||||
s.handler = slog.New(slog.NewJSONHandler(s.opts.Out, handleOpt)).With(s.opts.Fields...).Handler()
|
attrs, _ := s.argsAttrs(s.opts.Fields)
|
||||||
|
s.handler = &wrapper{h: slog.NewJSONHandler(s.opts.Out, handleOpt).WithAttrs(attrs)}
|
||||||
|
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -192,27 +216,34 @@ func (s *slogLogger) String() string {
|
|||||||
return "slog"
|
return "slog"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
|
func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string, args ...interface{}) {
|
||||||
if !s.V(lvl) {
|
if !s.V(lvl) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
var argError error
|
||||||
|
|
||||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
|
||||||
|
|
||||||
attrs = prepareAttributes(attrs)
|
attrs, err := s.argsAttrs(args)
|
||||||
|
if err != nil {
|
||||||
for _, fn := range s.opts.ContextAttrFuncs {
|
argError = err
|
||||||
a := prepareAttributes(fn(ctx))
|
}
|
||||||
attrs = append(attrs, a...)
|
if argError != nil {
|
||||||
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
||||||
|
span.SetStatus(tracer.SpanStatusError, argError.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, attr := range attrs {
|
for _, fn := range s.opts.ContextAttrFuncs {
|
||||||
if ve, hasErr := attr.(error); hasErr && ve != nil {
|
ctxAttrs, err := s.argsAttrs(fn(ctx))
|
||||||
attrs = append(attrs, slog.String(s.opts.ErrorKey, ve.Error()))
|
if err != nil {
|
||||||
if span, ok := tracer.SpanFromContext(ctx); ok {
|
argError = err
|
||||||
span.SetStatus(tracer.SpanStatusError, ve.Error())
|
}
|
||||||
}
|
attrs = append(attrs, ctxAttrs...)
|
||||||
break
|
}
|
||||||
|
if argError != nil {
|
||||||
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
||||||
|
span.SetStatus(tracer.SpanStatusError, argError.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +260,7 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
|
|||||||
var pcs [1]uintptr
|
var pcs [1]uintptr
|
||||||
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
|
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
|
||||||
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
|
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
|
||||||
r.Add(attrs...)
|
r.AddAttrs(attrs...)
|
||||||
_ = s.handler.Handle(ctx, r)
|
_ = s.handler.Handle(ctx, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,11 +307,26 @@ func slogToLoggerLevel(level slog.Level) logger.Level {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareAttributes(attrs []interface{}) []interface{} {
|
func (s *slogLogger) argsAttrs(args []interface{}) ([]slog.Attr, error) {
|
||||||
if len(attrs)%2 == 1 {
|
attrs := make([]slog.Attr, 0, len(args))
|
||||||
attrs = append(attrs, badKey)
|
var err error
|
||||||
attrs[len(attrs)-1], attrs[len(attrs)-2] = attrs[len(attrs)-2], attrs[len(attrs)-1]
|
|
||||||
|
for idx := 0; idx < len(args); idx++ {
|
||||||
|
switch arg := args[idx].(type) {
|
||||||
|
case slog.Attr:
|
||||||
|
attrs = append(attrs, arg)
|
||||||
|
case string:
|
||||||
|
if idx+1 < len(args) {
|
||||||
|
attrs = append(attrs, slog.Any(arg, args[idx+1]))
|
||||||
|
idx += 1
|
||||||
|
} else {
|
||||||
|
attrs = append(attrs, slog.String(badKey, arg))
|
||||||
|
}
|
||||||
|
case error:
|
||||||
|
attrs = append(attrs, slog.String(s.opts.ErrorKey, arg.Error()))
|
||||||
|
err = arg
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return attrs
|
return attrs, err
|
||||||
}
|
}
|
||||||
|
@ -5,15 +5,66 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/google/uuid"
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMultipleFieldsWithLevel(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
l := NewLogger(logger.WithLevel(logger.InfoLevel), logger.WithOutput(buf))
|
||||||
|
if err := l.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l = l.Fields("key", "val")
|
||||||
|
|
||||||
|
l.Info(ctx, "msg1")
|
||||||
|
nl := l.Clone(logger.WithLevel(logger.DebugLevel))
|
||||||
|
nl.Debug(ctx, "msg2")
|
||||||
|
l.Debug(ctx, "msg3")
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"key":"val"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"msg1"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"msg2"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if bytes.Contains(buf.Bytes(), []byte(`"msg3"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultipleFields(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
l := NewLogger(logger.WithLevel(logger.InfoLevel), logger.WithOutput(buf))
|
||||||
|
if err := l.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l = l.Fields("key", "val")
|
||||||
|
|
||||||
|
l = l.Fields("key1", "val1")
|
||||||
|
|
||||||
|
l.Info(ctx, "msg")
|
||||||
|
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"key":"val"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"key1":"val1"`)) {
|
||||||
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestError(t *testing.T) {
|
func TestError(t *testing.T) {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
buf := bytes.NewBuffer(nil)
|
buf := bytes.NewBuffer(nil)
|
||||||
@ -43,9 +94,6 @@ func TestErrorf(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
l.Log(ctx, logger.ErrorLevel, "message", errors.New("error msg"))
|
l.Log(ctx, logger.ErrorLevel, "message", errors.New("error msg"))
|
||||||
if !bytes.Contains(buf.Bytes(), []byte(`"!BADKEY":"`)) {
|
|
||||||
t.Fatalf("logger BADKEY not works, buf contains: %s", buf.Bytes())
|
|
||||||
}
|
|
||||||
|
|
||||||
l.Log(ctx, logger.ErrorLevel, "", errors.New("error msg"))
|
l.Log(ctx, logger.ErrorLevel, "", errors.New("error msg"))
|
||||||
if !bytes.Contains(buf.Bytes(), []byte(`"error":"error msg"`)) {
|
if !bytes.Contains(buf.Bytes(), []byte(`"error":"error msg"`)) {
|
||||||
@ -236,5 +284,11 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
|||||||
if !(bytes.Contains(buf.Bytes(), []byte(`"source-service":"Test-System"`))) {
|
if !(bytes.Contains(buf.Bytes(), []byte(`"source-service":"Test-System"`))) {
|
||||||
t.Fatalf("logger info, buf %s", buf.Bytes())
|
t.Fatalf("logger info, buf %s", buf.Bytes())
|
||||||
}
|
}
|
||||||
|
buf.Reset()
|
||||||
|
imd, _ := metadata.FromIncomingContext(ctx)
|
||||||
|
l.Info(ctx, "test message1")
|
||||||
|
imd.Set("Source-Service", "Test-System2")
|
||||||
|
l.Info(ctx, "test message2")
|
||||||
|
|
||||||
|
// t.Logf("xxx %s", buf.Bytes())
|
||||||
}
|
}
|
||||||
|
10
service.go
10
service.go
@ -6,7 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/KimMachineGun/automemlimit/memlimit"
|
"github.com/KimMachineGun/automemlimit/memlimit"
|
||||||
_ "go.uber.org/automaxprocs"
|
"go.uber.org/automaxprocs/maxprocs"
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v3/config"
|
"go.unistack.org/micro/v3/config"
|
||||||
@ -20,6 +20,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
maxprocs.Set()
|
||||||
memlimit.SetGoMemLimitWithOpts(
|
memlimit.SetGoMemLimitWithOpts(
|
||||||
memlimit.WithRatio(0.9),
|
memlimit.WithRatio(0.9),
|
||||||
memlimit.WithProvider(
|
memlimit.WithProvider(
|
||||||
@ -86,13 +87,14 @@ func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...se
|
|||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
done chan struct{}
|
||||||
opts Options
|
opts Options
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService creates and returns a new Service based on the packages within.
|
// NewService creates and returns a new Service based on the packages within.
|
||||||
func NewService(opts ...Option) Service {
|
func NewService(opts ...Option) Service {
|
||||||
return &service{opts: NewOptions(opts...)}
|
return &service{opts: NewOptions(opts...), done: make(chan struct{})}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Name() string {
|
func (s *service) Name() string {
|
||||||
@ -362,6 +364,8 @@ func (s *service) Stop() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close(s.done)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -385,7 +389,7 @@ func (s *service) Run() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait on context cancel
|
// wait on context cancel
|
||||||
<-s.opts.Context.Done()
|
<-s.done
|
||||||
|
|
||||||
return s.Stop()
|
return s.Stop()
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ func TestNewService(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
if got := NewService(tt.args.opts...); !reflect.DeepEqual(got, tt.want) {
|
if got := NewService(tt.args.opts...); got.Name() != tt.want.Name() {
|
||||||
t.Errorf("NewService() = %v, want %v", got.Options().Name, tt.want.Options().Name)
|
t.Errorf("NewService() = %v, want %v", got.Options().Name, tt.want.Options().Name)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cache "github.com/patrickmn/go-cache"
|
cache "github.com/patrickmn/go-cache"
|
||||||
@ -20,7 +21,10 @@ func NewStore(opts ...store.Option) store.Store {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Connect(ctx context.Context) error {
|
func (m *memoryStore) Connect(ctx context.Context) error {
|
||||||
return nil
|
if m.opts.LazyConnect {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return m.connect(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Disconnect(ctx context.Context) error {
|
func (m *memoryStore) Disconnect(ctx context.Context) error {
|
||||||
@ -29,13 +33,14 @@ func (m *memoryStore) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type memoryStore struct {
|
type memoryStore struct {
|
||||||
funcRead store.FuncRead
|
funcRead store.FuncRead
|
||||||
funcWrite store.FuncWrite
|
funcWrite store.FuncWrite
|
||||||
funcExists store.FuncExists
|
funcExists store.FuncExists
|
||||||
funcList store.FuncList
|
funcList store.FuncList
|
||||||
funcDelete store.FuncDelete
|
funcDelete store.FuncDelete
|
||||||
store *cache.Cache
|
store *cache.Cache
|
||||||
opts store.Options
|
opts store.Options
|
||||||
|
isConnected atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) key(prefix, key string) string {
|
func (m *memoryStore) key(prefix, key string) string {
|
||||||
@ -145,6 +150,11 @@ func (m *memoryStore) Name() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||||
|
if m.opts.LazyConnect {
|
||||||
|
if err := m.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return m.funcExists(ctx, key, opts...)
|
return m.funcExists(ctx, key, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,6 +167,11 @@ func (m *memoryStore) fnExists(ctx context.Context, key string, opts ...store.Ex
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
|
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
|
||||||
|
if m.opts.LazyConnect {
|
||||||
|
if err := m.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return m.funcRead(ctx, key, val, opts...)
|
return m.funcRead(ctx, key, val, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,6 +184,11 @@ func (m *memoryStore) fnRead(ctx context.Context, key string, val interface{}, o
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
|
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
|
||||||
|
if m.opts.LazyConnect {
|
||||||
|
if err := m.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return m.funcWrite(ctx, key, val, opts...)
|
return m.funcWrite(ctx, key, val, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,6 +213,11 @@ func (m *memoryStore) fnWrite(ctx context.Context, key string, val interface{},
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
|
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
|
||||||
|
if m.opts.LazyConnect {
|
||||||
|
if err := m.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return m.funcDelete(ctx, key, opts...)
|
return m.funcDelete(ctx, key, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,6 +236,11 @@ func (m *memoryStore) Options() store.Options {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
|
func (m *memoryStore) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
|
||||||
|
if m.opts.LazyConnect {
|
||||||
|
if err := m.connect(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
return m.funcList(ctx, opts...)
|
return m.funcList(ctx, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,3 +274,8 @@ func (m *memoryStore) fnList(ctx context.Context, opts ...store.ListOption) ([]s
|
|||||||
|
|
||||||
return keys, nil
|
return keys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) connect(ctx context.Context) error {
|
||||||
|
m.isConnected.CompareAndSwap(0, 1)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -2,6 +2,7 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v3/options"
|
||||||
)
|
)
|
||||||
@ -9,12 +10,13 @@ import (
|
|||||||
var _ Store = (*noopStore)(nil)
|
var _ Store = (*noopStore)(nil)
|
||||||
|
|
||||||
type noopStore struct {
|
type noopStore struct {
|
||||||
funcRead FuncRead
|
funcRead FuncRead
|
||||||
funcWrite FuncWrite
|
funcWrite FuncWrite
|
||||||
funcExists FuncExists
|
funcExists FuncExists
|
||||||
funcList FuncList
|
funcList FuncList
|
||||||
funcDelete FuncDelete
|
funcDelete FuncDelete
|
||||||
opts Options
|
opts Options
|
||||||
|
isConnected atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStore(opts ...Option) *noopStore {
|
func NewStore(opts ...Option) *noopStore {
|
||||||
@ -52,12 +54,10 @@ func (n *noopStore) Init(opts ...Option) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) Connect(ctx context.Context) error {
|
func (n *noopStore) Connect(ctx context.Context) error {
|
||||||
select {
|
if n.opts.LazyConnect {
|
||||||
case <-ctx.Done():
|
return nil
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
return nil
|
return n.connect(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) Disconnect(ctx context.Context) error {
|
func (n *noopStore) Disconnect(ctx context.Context) error {
|
||||||
@ -70,6 +70,11 @@ func (n *noopStore) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||||
|
if n.opts.LazyConnect {
|
||||||
|
if err := n.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return n.funcRead(ctx, key, val, opts...)
|
return n.funcRead(ctx, key, val, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,6 +88,11 @@ func (n *noopStore) fnRead(ctx context.Context, key string, val interface{}, opt
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||||
|
if n.opts.LazyConnect {
|
||||||
|
if err := n.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return n.funcDelete(ctx, key, opts...)
|
return n.funcDelete(ctx, key, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,6 +106,11 @@ func (n *noopStore) fnDelete(ctx context.Context, key string, opts ...DeleteOpti
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||||
|
if n.opts.LazyConnect {
|
||||||
|
if err := n.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return n.funcExists(ctx, key, opts...)
|
return n.funcExists(ctx, key, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,6 +124,11 @@ func (n *noopStore) fnExists(ctx context.Context, key string, opts ...ExistsOpti
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||||
|
if n.opts.LazyConnect {
|
||||||
|
if err := n.connect(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return n.funcWrite(ctx, key, val, opts...)
|
return n.funcWrite(ctx, key, val, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,6 +142,11 @@ func (n *noopStore) fnWrite(ctx context.Context, key string, val interface{}, op
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||||
|
if n.opts.LazyConnect {
|
||||||
|
if err := n.connect(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
return n.funcList(ctx, opts...)
|
return n.funcList(ctx, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,3 +170,15 @@ func (n *noopStore) String() string {
|
|||||||
func (n *noopStore) Options() Options {
|
func (n *noopStore) Options() Options {
|
||||||
return n.opts
|
return n.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) connect(ctx context.Context) error {
|
||||||
|
if n.isConnected.CompareAndSwap(0, 1) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -41,6 +41,8 @@ type Options struct {
|
|||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
// Hooks can be run before/after store Read/List/Write/Exists/Delete
|
// Hooks can be run before/after store Read/List/Write/Exists/Delete
|
||||||
Hooks options.Hooks
|
Hooks options.Hooks
|
||||||
|
// LazyConnect creates a connection when using store
|
||||||
|
LazyConnect bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions creates options struct
|
// NewOptions creates options struct
|
||||||
|
@ -6,6 +6,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Duration int64
|
type Duration int64
|
||||||
@ -53,6 +55,31 @@ loop:
|
|||||||
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
|
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d Duration) MarshalYAML() (interface{}, error) {
|
||||||
|
return time.Duration(d).String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Duration) UnmarshalYAML(n *yaml.Node) error {
|
||||||
|
var v interface{}
|
||||||
|
if err := yaml.Unmarshal([]byte(n.Value), &v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch value := v.(type) {
|
||||||
|
case float64:
|
||||||
|
*d = Duration(time.Duration(value))
|
||||||
|
return nil
|
||||||
|
case string:
|
||||||
|
dv, err := ParseDuration(value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*d = Duration(dv)
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid duration")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (d Duration) MarshalJSON() ([]byte, error) {
|
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||||
return json.Marshal(time.Duration(d).String())
|
return json.Marshal(time.Duration(d).String())
|
||||||
}
|
}
|
||||||
|
@ -5,8 +5,44 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMarshalYAML(t *testing.T) {
|
||||||
|
d := Duration(10000000)
|
||||||
|
buf, err := yaml.Marshal(d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(buf, []byte(`10ms
|
||||||
|
`)) {
|
||||||
|
t.Fatalf("invalid duration: %s != %s", buf, `10ms`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnmarshalYAML(t *testing.T) {
|
||||||
|
type str struct {
|
||||||
|
TTL Duration `yaml:"ttl"`
|
||||||
|
}
|
||||||
|
v := &str{}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
err = yaml.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if v.TTL != 10000000 {
|
||||||
|
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if v.TTL != 31622400000000000 {
|
||||||
|
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMarshalJSON(t *testing.T) {
|
func TestMarshalJSON(t *testing.T) {
|
||||||
d := Duration(10000000)
|
d := Duration(10000000)
|
||||||
buf, err := json.Marshal(d)
|
buf, err := json.Marshal(d)
|
||||||
|
Loading…
Reference in New Issue
Block a user