Compare commits

..

2 Commits

Author SHA1 Message Date
46eb739dff broker: add ErrorHandler
Some checks failed
coverage / build (push) Failing after 4m49s
test / test (push) Failing after 16m1s
sync / sync (push) Failing after 20s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
13b01f59ee logger: conditional caller field
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
4 changed files with 61 additions and 27 deletions

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish. // NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
// Live returns broker liveness // Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type ( type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe HookSubscribe func(next FuncSubscribe) FuncSubscribe
) )
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte Body() []byte
// Unmarshal try to decode message body to dst. // Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal. // This is helper method that uses codec.Unmarshal.
Unmarshal(dst interface{}, opts ...codec.Option) error Unmarshal(dst any, opts ...codec.Option) error
// Ack acknowledge message if supported. // Ack acknowledge message if supported.
Ack() error Ack() error
} }

View File

@@ -18,7 +18,6 @@ import (
type Options struct { type Options struct {
// Name holds the broker name // Name holds the broker name
Name string Name string
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
@@ -31,23 +30,20 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// Wait waits for a collection of goroutines to finish // Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
// Hooks can be run before broker Publish/BatchPublish and // Hooks can be run before broker Publishing and message processing in Subscribe
// Subscribe/BatchSubscribe methods
Hooks options.Hooks Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests // GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message // ContentType will be used if no content-type set when creating message
ContentType string ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler any
} }
// NewOptions create new Options // NewOptions create new Options
@@ -93,6 +89,13 @@ func ContentType(ct string) Option {
} }
} }
// ErrorHandler handles errors in broker
func ErrorHandler(h any) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct // MessageOptions struct
type MessageOptions struct { type MessageOptions struct {
// ContentType for message body // ContentType for message body

View File

@@ -8,6 +8,7 @@ import (
"slices" "slices"
"time" "time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
) )
@@ -42,8 +43,10 @@ type Options struct {
Fields []interface{} Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context // ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frmaes to skip // callerSkipCount number of frames to skip
CallerSkipCount int CallerSkipCount int
// AddCaller enables to get caller
AddCaller bool
// The logging level the logger should log // The logging level the logger should log
Level Level Level Level
// AddSource enabled writing source file and position in log // AddSource enabled writing source file and position in log
@@ -83,6 +86,12 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
func WithCallerEnabled(b bool) logger.Option {
return func(o *Options) {
o.AddCaller = b
}
}
// WithFatalFinalizers set logger.Fatal finalizers // WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option { func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -37,11 +37,11 @@ var (
type wrapper struct { type wrapper struct {
h slog.Handler h slog.Handler
level atomic.Int64 level int64
} }
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool { func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
return level >= slog.Level(int(h.level.Load())) return level >= slog.Level(atomic.LoadInt64(&h.level))
} }
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error { func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
@@ -49,11 +49,17 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
} }
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return h.h.WithAttrs(attrs) return &wrapper{
h: h.h.WithAttrs(attrs),
level: atomic.LoadInt64(&h.level),
}
} }
func (h *wrapper) WithGroup(name string) slog.Handler { func (h *wrapper) WithGroup(name string) slog.Handler {
return h.h.WithGroup(name) return &wrapper{
h: h.h.WithGroup(name),
level: atomic.LoadInt64(&h.level),
}
} }
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@@ -115,10 +121,13 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
attrs, _ := s.argsAttrs(options.Fields) attrs, _ := s.argsAttrs(options.Fields)
l := &slogLogger{ l := &slogLogger{
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)}, handler: &wrapper{
h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
},
opts: options, opts: options,
} }
l.handler.level.Store(int64(loggerToSlogLevel(options.Level))) atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level)))
return l return l
} }
@@ -131,9 +140,9 @@ func (s *slogLogger) V(level logger.Level) bool {
} }
func (s *slogLogger) Level(level logger.Level) { func (s *slogLogger) Level(level logger.Level) {
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
s.mu.Lock() s.mu.Lock()
s.opts.Level = level s.opts.Level = level
s.handler.level.Store(int64(loggerToSlogLevel(level)))
s.mu.Unlock() s.mu.Unlock()
} }
@@ -154,8 +163,11 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
} }
attrs, _ := s.argsAttrs(fields) attrs, _ := s.argsAttrs(fields)
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)} l.handler = &wrapper{
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level))) h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
return l return l
} }
@@ -200,8 +212,11 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
h = slog.NewJSONHandler(s.opts.Out, handleOpt) h = slog.NewJSONHandler(s.opts.Out, handleOpt)
} }
s.handler = &wrapper{h: h.WithAttrs(attrs)} s.handler = &wrapper{
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level))) h: h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock() s.mu.Unlock()
return nil return nil
@@ -290,10 +305,17 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
} }
} }
var pcs [1]uintptr var pcs uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0]) if s.opts.AddCaller {
var caller [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
pcs = caller[0]
}
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
r.AddAttrs(attrs...) r.AddAttrs(attrs...)
_ = s.handler.Handle(ctx, r) _ = s.handler.Handle(ctx, r)
} }