Compare commits

..

9 Commits

Author SHA1 Message Date
77d1582543 Merge branch 'v4' of https://github.com/unistack-org/micro into v4
All checks were successful
sync / sync (push) Successful in 10s
2025-12-16 09:17:32 +03:00
560afc5dd6 broker: fixup error handler func
Some checks failed
coverage / build (push) Failing after 1m54s
sync / sync (push) Failing after 16s
test / test (push) Failing after 18m21s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 09:06:06 +03:00
540bc415d5 broker: fixup error handler func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 09:01:36 +03:00
46eb739dff broker: add ErrorHandler
Some checks failed
coverage / build (push) Failing after 4m49s
test / test (push) Failing after 16m1s
sync / sync (push) Failing after 20s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
13b01f59ee logger: conditional caller field
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
c32a17b69b broker: add ErrorHandler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:25:59 +03:00
9cb25acf63 logger: conditional caller field
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-13 19:50:03 +03:00
52c8f3da86 Merge pull request 'add opt gracefultimeout broker' (#410) from devstigneev/micro:v4_new_opts into v4
Some checks failed
test / test (push) Failing after 12m37s
coverage / build (push) Failing after 12m51s
sync / sync (push) Failing after 16s
Reviewed-on: #410
2025-12-10 15:22:35 +03:00
Evstigneev Denis
e7f9f638bd add opt gracefultimeout broker
Some checks failed
test / test (pull_request) Failing after 13m43s
lint / lint (pull_request) Failing after 14m9s
coverage / build (pull_request) Failing after 14m25s
2025-12-10 15:20:14 +03:00
6 changed files with 67 additions and 1137 deletions

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish. // NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
// Live returns broker liveness // Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type ( type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe HookSubscribe func(next FuncSubscribe) FuncSubscribe
) )
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte Body() []byte
// Unmarshal try to decode message body to dst. // Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal. // This is helper method that uses codec.Unmarshal.
Unmarshal(dst interface{}, opts ...codec.Option) error Unmarshal(dst any, opts ...codec.Option) error
// Ack acknowledge message if supported. // Ack acknowledge message if supported.
Ack() error Ack() error
} }

View File

@@ -18,7 +18,6 @@ import (
type Options struct { type Options struct {
// Name holds the broker name // Name holds the broker name
Name string Name string
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
@@ -31,23 +30,20 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// Wait waits for a collection of goroutines to finish // Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
// Hooks can be run before broker Publish/BatchPublish and // Hooks can be run before broker Publishing and message processing in Subscribe
// Subscribe/BatchSubscribe methods
Hooks options.Hooks Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests // GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message // ContentType will be used if no content-type set when creating message
ContentType string ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler func(Message) error
} }
// NewOptions create new Options // NewOptions create new Options
@@ -80,6 +76,12 @@ func Context(ctx context.Context) Option {
} }
} }
func GracefulTimeout(t time.Duration) Option {
return func(o *Options) {
o.GracefulTimeout = t
}
}
// ContentType used by default if not specified // ContentType used by default if not specified
func ContentType(ct string) Option { func ContentType(ct string) Option {
return func(o *Options) { return func(o *Options) {
@@ -87,6 +89,13 @@ func ContentType(ct string) Option {
} }
} }
// ErrorHandler handles errors in broker
func ErrorHandler(h func(Message) error) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct // MessageOptions struct
type MessageOptions struct { type MessageOptions struct {
// ContentType for message body // ContentType for message body

View File

@@ -8,6 +8,7 @@ import (
"slices" "slices"
"time" "time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
) )
@@ -42,8 +43,10 @@ type Options struct {
Fields []interface{} Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context // ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frmaes to skip // callerSkipCount number of frames to skip
CallerSkipCount int CallerSkipCount int
// AddCaller enables to get caller
AddCaller bool
// The logging level the logger should log // The logging level the logger should log
Level Level Level Level
// AddSource enabled writing source file and position in log // AddSource enabled writing source file and position in log
@@ -83,6 +86,12 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
func WithCallerEnabled(b bool) logger.Option {
return func(o *Options) {
o.AddCaller = b
}
}
// WithFatalFinalizers set logger.Fatal finalizers // WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option { func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -37,11 +37,11 @@ var (
type wrapper struct { type wrapper struct {
h slog.Handler h slog.Handler
level atomic.Int64 level int64
} }
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool { func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
return level >= slog.Level(int(h.level.Load())) return level >= slog.Level(atomic.LoadInt64(&h.level))
} }
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error { func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
@@ -49,11 +49,17 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
} }
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return h.h.WithAttrs(attrs) return &wrapper{
h: h.h.WithAttrs(attrs),
level: atomic.LoadInt64(&h.level),
}
} }
func (h *wrapper) WithGroup(name string) slog.Handler { func (h *wrapper) WithGroup(name string) slog.Handler {
return h.h.WithGroup(name) return &wrapper{
h: h.h.WithGroup(name),
level: atomic.LoadInt64(&h.level),
}
} }
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@@ -115,10 +121,13 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
attrs, _ := s.argsAttrs(options.Fields) attrs, _ := s.argsAttrs(options.Fields)
l := &slogLogger{ l := &slogLogger{
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)}, handler: &wrapper{
h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
},
opts: options, opts: options,
} }
l.handler.level.Store(int64(loggerToSlogLevel(options.Level))) atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level)))
return l return l
} }
@@ -131,9 +140,9 @@ func (s *slogLogger) V(level logger.Level) bool {
} }
func (s *slogLogger) Level(level logger.Level) { func (s *slogLogger) Level(level logger.Level) {
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
s.mu.Lock() s.mu.Lock()
s.opts.Level = level s.opts.Level = level
s.handler.level.Store(int64(loggerToSlogLevel(level)))
s.mu.Unlock() s.mu.Unlock()
} }
@@ -154,8 +163,11 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
} }
attrs, _ := s.argsAttrs(fields) attrs, _ := s.argsAttrs(fields)
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)} l.handler = &wrapper{
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level))) h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
return l return l
} }
@@ -200,8 +212,11 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
h = slog.NewJSONHandler(s.opts.Out, handleOpt) h = slog.NewJSONHandler(s.opts.Out, handleOpt)
} }
s.handler = &wrapper{h: h.WithAttrs(attrs)} s.handler = &wrapper{
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level))) h: h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock() s.mu.Unlock()
return nil return nil
@@ -290,10 +305,17 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
} }
} }
var pcs [1]uintptr var pcs uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0]) if s.opts.AddCaller {
var caller [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
pcs = caller[0]
}
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
r.AddAttrs(attrs...) r.AddAttrs(attrs...)
_ = s.handler.Handle(ctx, r) _ = s.handler.Handle(ctx, r)
} }

View File

@@ -1,815 +0,0 @@
package mock
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
"go.unistack.org/micro/v4/store"
)
// ExpectedWrite represents an expected Write operation
type ExpectedWrite struct {
key string
value interface{}
ttl time.Duration
metadata map[string]string
namespace string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedWrite) match(key string, val interface{}, opts ...store.WriteOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check value match
if e.value != nil && !reflect.DeepEqual(e.value, val) {
return false
}
// Check options
options := store.NewWriteOptions(opts...)
if e.ttl > 0 && e.ttl != options.TTL {
return false
}
if e.namespace != "" && e.namespace != options.Namespace {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedRead represents an expected Read operation
type ExpectedRead struct {
key string
value interface{}
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedRead) match(key string, opts ...store.ReadOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedDelete represents an expected Delete operation
type ExpectedDelete struct {
key string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedDelete) match(key string, opts ...store.DeleteOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedExists represents an expected Exists operation
type ExpectedExists struct {
key string
times int
called int
mutex sync.Mutex
err error
}
func (e *ExpectedExists) match(key string, opts ...store.ExistsOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check key match
if e.key != "" && e.key != key {
return false
}
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// ExpectedList represents an expected List operation
type ExpectedList struct {
times int
called int
mutex sync.Mutex
err error
keys []string
}
func (e *ExpectedList) match(opts ...store.ListOption) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
// Check if we've exceeded the expected times
if e.times > 0 && e.called >= e.times {
return false
}
e.called++
return true
}
// Store is a mock implementation of the Store interface for testing
type Store struct {
expectedWrites []*ExpectedWrite
expectedReads []*ExpectedRead
expectedDeletes []*ExpectedDelete
expectedExists []*ExpectedExists
expectedLists []*ExpectedList
data map[string]interface{}
exists map[string]bool
ttls map[string]time.Time // key -> expiration time
metadata map[string]map[string]string
err error
opts store.Options
mutex sync.RWMutex
}
// NewStore creates a new mock store
func NewStore(opts ...store.Option) *Store {
options := store.NewOptions(opts...)
return &Store{
data: make(map[string]interface{}),
exists: make(map[string]bool),
ttls: make(map[string]time.Time),
metadata: make(map[string]map[string]string),
opts: options,
}
}
// ExpectWrite creates an expectation for a Write operation
func (m *Store) ExpectWrite(key string) *ExpectedWrite {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedWrite{key: key}
m.expectedWrites = append(m.expectedWrites, exp)
return exp
}
// ExpectRead creates an expectation for a Read operation
func (m *Store) ExpectRead(key string) *ExpectedRead {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedRead{key: key}
m.expectedReads = append(m.expectedReads, exp)
return exp
}
// ExpectDelete creates an expectation for a Delete operation
func (m *Store) ExpectDelete(key string) *ExpectedDelete {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedDelete{key: key}
m.expectedDeletes = append(m.expectedDeletes, exp)
return exp
}
// ExpectExists creates an expectation for an Exists operation
func (m *Store) ExpectExists(key string) *ExpectedExists {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedExists{key: key}
m.expectedExists = append(m.expectedExists, exp)
return exp
}
// ExpectList creates an expectation for a List operation
func (m *Store) ExpectList() *ExpectedList {
m.mutex.Lock()
defer m.mutex.Unlock()
exp := &ExpectedList{}
m.expectedLists = append(m.expectedLists, exp)
return exp
}
// WithValue sets the value to return for expected operations
func (e *ExpectedWrite) WithValue(val interface{}) *ExpectedWrite {
e.value = val
return e
}
// WithTTL sets the TTL for expected Write operations
func (e *ExpectedWrite) WithTTL(ttl time.Duration) *ExpectedWrite {
e.ttl = ttl
return e
}
// WithNamespace sets the namespace for expected operations
func (e *ExpectedWrite) WithNamespace(ns string) *ExpectedWrite {
e.namespace = ns
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedWrite) Times(n int) *ExpectedWrite {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedWrite) WillReturnError(err error) *ExpectedWrite {
e.err = err
return e
}
// WithValue sets the value to return for expected Read operations
func (e *ExpectedRead) WithValue(val interface{}) *ExpectedRead {
e.value = val
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedRead) Times(n int) *ExpectedRead {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedRead) WillReturnError(err error) *ExpectedRead {
e.err = err
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedDelete) Times(n int) *ExpectedDelete {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedDelete) WillReturnError(err error) *ExpectedDelete {
e.err = err
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedExists) Times(n int) *ExpectedExists {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedExists) WillReturnError(err error) *ExpectedExists {
e.err = err
return e
}
// WillReturn sets the keys to return for List operations
func (e *ExpectedList) WillReturn(keys ...string) *ExpectedList {
e.keys = keys
return e
}
// Times sets how many times the expectation should be called
func (e *ExpectedList) Times(n int) *ExpectedList {
e.times = n
return e
}
// WillReturnError sets an error to return for the expected operation
func (e *ExpectedList) WillReturnError(err error) *ExpectedList {
e.err = err
return e
}
// checkTTL checks if a key has expired
func (m *Store) checkTTL(key string) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
if exp, ok := m.ttls[key]; ok {
if time.Now().After(exp) {
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
return false
}
}
return true
}
// FastForward decrements all TTLs by the given duration
func (m *Store) FastForward(d time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
now := time.Now()
for key, exp := range m.ttls {
// Calculate remaining time before fast forward
remaining := time.Until(exp)
if remaining <= 0 {
// Already expired, remove it
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
} else {
// Apply fast forward
newRemaining := remaining - d
if newRemaining <= 0 {
// Would expire after fast forward, remove it
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
} else {
// Update expiration time
m.ttls[key] = now.Add(newRemaining)
}
}
}
}
// Name returns store name
func (m *Store) Name() string {
return m.opts.Name
}
// Init initializes the mock store
func (m *Store) Init(opts ...store.Option) error {
if m.err != nil {
return m.err
}
for _, o := range opts {
o(&m.opts)
}
return nil
}
// Connect is used when store needs to be connected
func (m *Store) Connect(ctx context.Context) error {
if m.err != nil {
return m.err
}
return nil
}
// Options returns the current options
func (m *Store) Options() store.Options {
return m.opts
}
// Exists checks that key exists in store
func (m *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
if m.err != nil {
return m.err
}
// Check TTL first
if !m.checkTTL(key) {
return store.ErrNotFound
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedExists {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
if !m.exists[key] {
return store.ErrNotFound
}
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
if !m.exists[key] {
return store.ErrNotFound
}
return nil
}
// Read reads a single key name to provided value with optional ReadOptions
func (m *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
if m.err != nil {
return m.err
}
// Check TTL first
if !m.checkTTL(key) {
return store.ErrNotFound
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedReads {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
if !m.exists[key] {
return store.ErrNotFound
}
// Copy the value from expected or actual data
data := exp.value
if data == nil {
data = m.data[key]
}
if data != nil {
// Simple type conversion for testing
if target, ok := val.(*interface{}); ok {
*target = data
} else if target, ok := val.(*string); ok {
if s, ok := data.(string); ok {
*target = s
} else {
*target = fmt.Sprintf("%v", data)
}
} else if target, ok := val.(*int); ok {
if i, ok := data.(int); ok {
*target = i
}
}
}
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
if !m.exists[key] {
return store.ErrNotFound
}
if data, ok := m.data[key]; ok {
if target, ok := val.(*interface{}); ok {
*target = data
} else if target, ok := val.(*string); ok {
if s, ok := data.(string); ok {
*target = s
} else {
*target = fmt.Sprintf("%v", data)
}
} else if target, ok := val.(*int); ok {
if i, ok := data.(int); ok {
*target = i
}
}
}
return nil
}
// Write writes a value to key name to the store with optional WriteOption
func (m *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
if m.err != nil {
return m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedWrites {
if exp.match(key, val, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
// Apply the write operation
m.mutex.Lock()
m.data[key] = val
m.exists[key] = true
// Handle TTL
options := store.NewWriteOptions(opts...)
if options.TTL > 0 {
m.ttls[key] = time.Now().Add(options.TTL)
} else {
delete(m.ttls, key) // Remove TTL if not set
}
// Handle metadata
if options.Metadata != nil {
m.metadata[key] = make(map[string]string)
for k, v := range options.Metadata {
// Convert []string to string by joining with comma
if len(v) > 0 {
m.metadata[key][k] = strings.Join(v, ",")
} else {
m.metadata[key][k] = ""
}
}
}
m.mutex.Unlock()
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
m.mutex.Lock()
m.data[key] = val
m.exists[key] = true
options := store.NewWriteOptions(opts...)
if options.TTL > 0 {
m.ttls[key] = time.Now().Add(options.TTL)
} else {
delete(m.ttls, key)
}
if options.Metadata != nil {
m.metadata[key] = make(map[string]string)
for k, v := range options.Metadata {
// Convert []string to string by joining with comma
if len(v) > 0 {
m.metadata[key][k] = strings.Join(v, ",")
} else {
m.metadata[key][k] = ""
}
}
}
m.mutex.Unlock()
return nil
}
// Delete removes the record with the corresponding key from the store
func (m *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
if m.err != nil {
return m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedDeletes {
if exp.match(key, opts...) {
m.mutex.Unlock()
if exp.err != nil {
return exp.err
}
m.mutex.Lock()
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
m.mutex.Unlock()
return nil
}
}
m.mutex.Unlock()
// If no expectation matched, use default behavior
m.mutex.Lock()
delete(m.data, key)
delete(m.exists, key)
delete(m.ttls, key)
delete(m.metadata, key)
m.mutex.Unlock()
return nil
}
// List returns any keys that match, or an empty list with no error if none matched
func (m *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
if m.err != nil {
return nil, m.err
}
// Find matching expectation
m.mutex.Lock()
for _, exp := range m.expectedLists {
if exp.match(opts...) {
m.mutex.Unlock()
if exp.err != nil {
return nil, exp.err
}
return exp.keys, nil
}
}
m.mutex.Unlock()
// If no expectation matched, return actual keys
m.mutex.RLock()
defer m.mutex.RUnlock()
var keys []string
for key := range m.data {
// Check TTL
if exp, ok := m.ttls[key]; ok {
if time.Now().After(exp) {
continue // Skip expired keys
}
}
keys = append(keys, key)
}
// Apply list options filtering
options := store.NewListOptions(opts...)
if options.Prefix != "" {
var filtered []string
for _, key := range keys {
if len(key) >= len(options.Prefix) && key[:len(options.Prefix)] == options.Prefix {
filtered = append(filtered, key)
}
}
keys = filtered
}
if options.Suffix != "" {
var filtered []string
for _, key := range keys {
if len(key) >= len(options.Suffix) && key[len(key)-len(options.Suffix):] == options.Suffix {
filtered = append(filtered, key)
}
}
keys = filtered
}
// Apply limit and offset
if options.Limit > 0 && int(options.Limit) < len(keys) {
end := int(options.Offset) + int(options.Limit)
if end > len(keys) {
end = len(keys)
}
if int(options.Offset) < len(keys) {
keys = keys[options.Offset:end]
} else {
keys = []string{}
}
} else if options.Offset > 0 && int(options.Offset) < len(keys) {
keys = keys[options.Offset:]
} else if options.Offset >= uint(len(keys)) {
keys = []string{}
}
return keys, nil
}
// Disconnect disconnects the mock store
func (m *Store) Disconnect(ctx context.Context) error {
if m.err != nil {
return m.err
}
return nil
}
// String returns the name of the implementation
func (m *Store) String() string {
return "mock"
}
// Watch returns events watcher
func (m *Store) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) {
if m.err != nil {
return nil, m.err
}
return NewWatcher(), nil
}
// Live returns store liveness
func (m *Store) Live() bool {
return true
}
// Ready returns store readiness
func (m *Store) Ready() bool {
return true
}
// Health returns store health
func (m *Store) Health() bool {
return true
}
// ExpectationsWereMet checks that all expected operations were called the expected number of times
func (m *Store) ExpectationsWereMet() error {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, exp := range m.expectedWrites {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected write for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedReads {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected read for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedDeletes {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected delete for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedExists {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected exists for key %s to be called %d times, but was called %d times", exp.key, exp.times, exp.called)
}
}
for _, exp := range m.expectedLists {
if exp.times > 0 && exp.called != exp.times {
return fmt.Errorf("expected list to be called %d times, but was called %d times", exp.times, exp.called)
}
}
return nil
}
// Watcher is a mock implementation of the Watcher interface
type Watcher struct {
events chan store.Event
stop chan bool
}
// NewWatcher creates a new mock watcher
func NewWatcher() *Watcher {
return &Watcher{
events: make(chan store.Event, 1),
stop: make(chan bool, 1),
}
}
// Next is a blocking call that returns the next event
func (mw *Watcher) Next() (store.Event, error) {
select {
case event := <-mw.events:
return event, nil
case <-mw.stop:
return nil, store.ErrWatcherStopped
}
}
// Stop stops the watcher
func (mw *Watcher) Stop() {
select {
case mw.stop <- true:
default:
}
}
// SendEvent sends an event to the watcher (for testing purposes)
func (mw *Watcher) SendEvent(event store.Event) {
select {
case mw.events <- event:
default:
// If channel is full, drop the event
}
}

View File

@@ -1,295 +0,0 @@
package mock
import (
"context"
"testing"
"time"
"go.unistack.org/micro/v4/store"
)
func TestStore(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test Write with expectation
s.ExpectWrite("test_key").WithValue("test_value")
err := s.Write(ctx, "test_key", "test_value")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
// Test Read with expectation
s.ExpectRead("test_key").WithValue("test_value")
var value interface{}
err = s.Read(ctx, "test_key", &value)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
if value != "test_value" {
t.Fatalf("Expected 'test_value', got %v", value)
}
// Test Read with string
s.ExpectRead("test_key")
var strValue string
err = s.Read(ctx, "test_key", &strValue)
if err != nil {
t.Fatalf("Read string failed: %v", err)
}
if strValue != "test_value" {
t.Fatalf("Expected 'test_value', got %s", strValue)
}
// Test Write and Read integer with TTL
s.ExpectWrite("int_key").WithValue(42).WithTTL(5 * time.Second)
err = s.Write(ctx, "int_key", 42, store.WriteTTL(5*time.Second))
if err != nil {
t.Fatalf("Write int failed: %v", err)
}
s.ExpectRead("int_key")
var intValue int
err = s.Read(ctx, "int_key", &intValue)
if err != nil {
t.Fatalf("Read int failed: %v", err)
}
if intValue != 42 {
t.Fatalf("Expected 42, got %d", intValue)
}
// Test Exists with expectation
s.ExpectExists("test_key")
err = s.Exists(ctx, "test_key")
if err != nil {
t.Fatalf("Exists failed: %v", err)
}
// Test List with expectation
s.ExpectList().WillReturn("test_key", "another_key")
keys, err := s.List(ctx)
if err != nil {
t.Fatalf("List failed: %v", err)
}
if len(keys) != 2 {
t.Fatalf("Expected 2 keys, got %d", len(keys))
}
// Test Delete with expectation
s.ExpectDelete("test_key")
err = s.Delete(ctx, "test_key")
if err != nil {
t.Fatalf("Delete failed: %v", err)
}
// Test that deleted key doesn't exist
s.ExpectExists("test_key").WillReturnError(store.ErrNotFound)
err = s.Exists(ctx, "test_key")
if err == nil {
t.Fatalf("Expected store.ErrNotFound after delete")
}
// Test error handling
s.ExpectExists("nonexistent").WillReturnError(store.ErrNotFound)
err = s.Exists(ctx, "nonexistent")
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound, got %v", err)
}
// Verify all expectations were met
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreFastForward(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Write with TTL
s.ExpectWrite("ttl_key").WithValue("ttl_value").WithTTL(100 * time.Millisecond)
err := s.Write(ctx, "ttl_key", "ttl_value", store.WriteTTL(100*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Check key exists before TTL expires
s.ExpectRead("ttl_key")
var value string
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read before TTL failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value', got %s", value)
}
// Fast forward by 50ms - key should still exist
s.FastForward(50 * time.Millisecond)
s.ExpectRead("ttl_key")
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read after 50ms fast forward failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value' after 50ms, got %s", value)
}
// Fast forward by another 60ms (total 110ms) - key should expire
s.FastForward(60 * time.Millisecond)
s.ExpectRead("ttl_key").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after TTL, got %v", err)
}
// Test FastForward on already expired keys
s.ExpectWrite("ttl_key2").WithValue("ttl_value2").WithTTL(10 * time.Millisecond)
err = s.Write(ctx, "ttl_key2", "ttl_value2", store.WriteTTL(10*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Fast forward by 20ms - key should expire immediately
s.FastForward(20 * time.Millisecond)
s.ExpectRead("ttl_key2").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key2", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after immediate expiration, got %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreWithOptions(t *testing.T) {
s := NewStore(store.Name("test_mock"), store.Namespace("test_ns"))
if s.Name() != "test_mock" {
t.Fatalf("Expected name 'test_mock', got %s", s.Name())
}
opts := s.Options()
if opts.Namespace != "test_ns" {
t.Fatalf("Expected namespace 'test_ns', got %s", opts.Namespace)
}
}
func TestWatcher(t *testing.T) {
watcher := NewWatcher()
// Test Stop
watcher.Stop()
// Test Next after stop
_, err := watcher.Next()
if err != store.ErrWatcherStopped {
t.Fatalf("Expected store.ErrWatcherStopped, got %v", err)
}
}
func TestStoreHealth(t *testing.T) {
s := NewStore()
if !s.Live() {
t.Fatal("Expected Live() to return true")
}
if !s.Ready() {
t.Fatal("Expected Ready() to return true")
}
if !s.Health() {
t.Fatal("Expected Health() to return true")
}
}
func TestStoreConnectDisconnect(t *testing.T) {
s := NewStore()
err := s.Connect(context.Background())
if err != nil {
t.Fatalf("Connect failed: %v", err)
}
err = s.Disconnect(context.Background())
if err != nil {
t.Fatalf("Disconnect failed: %v", err)
}
// Test error propagation
s.ExpectWrite("test_key").WillReturnError(store.ErrNotConnected)
err = s.Write(context.Background(), "test_key", "value")
if err != store.ErrNotConnected {
t.Fatalf("Expected store.ErrNotConnected, got %v", err)
}
}
func TestStoreTTL(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test Write with TTL
s.ExpectWrite("ttl_key").WithValue("ttl_value").WithTTL(100 * time.Millisecond)
err := s.Write(ctx, "ttl_key", "ttl_value", store.WriteTTL(100*time.Millisecond))
if err != nil {
t.Fatalf("Write with TTL failed: %v", err)
}
// Read before TTL expires
s.ExpectRead("ttl_key")
var value string
err = s.Read(ctx, "ttl_key", &value)
if err != nil {
t.Fatalf("Read before TTL failed: %v", err)
}
if value != "ttl_value" {
t.Fatalf("Expected 'ttl_value', got %s", value)
}
// Wait for TTL to expire
time.Sleep(150 * time.Millisecond)
// Read after TTL expires should return ErrNotFound
s.ExpectRead("ttl_key").WillReturnError(store.ErrNotFound)
err = s.Read(ctx, "ttl_key", &value)
if err != store.ErrNotFound {
t.Fatalf("Expected store.ErrNotFound after TTL, got %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}
func TestStoreExpectedOperations(t *testing.T) {
ctx := context.Background()
s := NewStore()
// Test expected operations with Times
s.ExpectWrite("once_key").Times(1)
s.ExpectWrite("twice_key").Times(2)
err := s.Write(ctx, "once_key", "value1")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
err = s.Write(ctx, "twice_key", "value2")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
err = s.Write(ctx, "twice_key", "value3")
if err != nil {
t.Fatalf("Write failed: %v", err)
}
if err := s.ExpectationsWereMet(); err != nil {
t.Fatalf("Expectations not met: %v", err)
}
}