Compare commits

..

2 Commits

Author SHA1 Message Date
46eb739dff broker: add ErrorHandler
Some checks failed
coverage / build (push) Failing after 4m49s
test / test (push) Failing after 16m1s
sync / sync (push) Failing after 20s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
13b01f59ee logger: conditional caller field
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-16 08:34:55 +03:00
4 changed files with 61 additions and 27 deletions

View File

@@ -41,11 +41,11 @@ type Broker interface {
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// NewMessage create new broker message to publish.
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
// Publish message to broker topic
Publish(ctx context.Context, topic string, messages ...Message) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
// String type of broker
String() string
// Live returns broker liveness
@@ -59,7 +59,7 @@ type Broker interface {
type (
FuncPublish func(ctx context.Context, topic string, messages ...Message) error
HookPublish func(next FuncPublish) FuncPublish
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
HookSubscribe func(next FuncSubscribe) FuncSubscribe
)
@@ -75,7 +75,7 @@ type Message interface {
Body() []byte
// Unmarshal try to decode message body to dst.
// This is helper method that uses codec.Unmarshal.
Unmarshal(dst interface{}, opts ...codec.Option) error
Unmarshal(dst any, opts ...codec.Option) error
// Ack acknowledge message if supported.
Ack() error
}

View File

@@ -18,7 +18,6 @@ import (
type Options struct {
// Name holds the broker name
Name string
// Tracer used for tracing
Tracer tracer.Tracer
// Register can be used for clustering
@@ -31,23 +30,20 @@ type Options struct {
Meter meter.Meter
// Context holds external options
Context context.Context
// Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Addrs holds the broker address
Addrs []string
// Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
// Hooks can be run before broker Publishing and message processing in Subscribe
Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration
// ContentType will be used if no content-type set when creating message
ContentType string
// ErrorHandler specifies handler for all broker errors handling subscriber
ErrorHandler any
}
// NewOptions create new Options
@@ -93,6 +89,13 @@ func ContentType(ct string) Option {
}
}
// ErrorHandler handles errors in broker
func ErrorHandler(h any) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}
// MessageOptions struct
type MessageOptions struct {
// ContentType for message body

View File

@@ -8,6 +8,7 @@ import (
"slices"
"time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/meter"
)
@@ -42,8 +43,10 @@ type Options struct {
Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frmaes to skip
// callerSkipCount number of frames to skip
CallerSkipCount int
// AddCaller enables to get caller
AddCaller bool
// The logging level the logger should log
Level Level
// AddSource enabled writing source file and position in log
@@ -83,6 +86,12 @@ func NewOptions(opts ...Option) Options {
return options
}
func WithCallerEnabled(b bool) logger.Option {
return func(o *Options) {
o.AddCaller = b
}
}
// WithFatalFinalizers set logger.Fatal finalizers
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
return func(o *Options) {

View File

@@ -37,11 +37,11 @@ var (
type wrapper struct {
h slog.Handler
level atomic.Int64
level int64
}
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
return level >= slog.Level(int(h.level.Load()))
return level >= slog.Level(atomic.LoadInt64(&h.level))
}
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
@@ -49,11 +49,17 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
}
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return h.h.WithAttrs(attrs)
return &wrapper{
h: h.h.WithAttrs(attrs),
level: atomic.LoadInt64(&h.level),
}
}
func (h *wrapper) WithGroup(name string) slog.Handler {
return h.h.WithGroup(name)
return &wrapper{
h: h.h.WithGroup(name),
level: atomic.LoadInt64(&h.level),
}
}
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@@ -115,10 +121,13 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
attrs, _ := s.argsAttrs(options.Fields)
l := &slogLogger{
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)},
opts: options,
handler: &wrapper{
h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
},
opts: options,
}
l.handler.level.Store(int64(loggerToSlogLevel(options.Level)))
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level)))
return l
}
@@ -131,9 +140,9 @@ func (s *slogLogger) V(level logger.Level) bool {
}
func (s *slogLogger) Level(level logger.Level) {
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
s.mu.Lock()
s.opts.Level = level
s.handler.level.Store(int64(loggerToSlogLevel(level)))
s.mu.Unlock()
}
@@ -154,8 +163,11 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
}
attrs, _ := s.argsAttrs(fields)
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)}
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level)))
l.handler = &wrapper{
h: s.handler.h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
return l
}
@@ -200,8 +212,11 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
h = slog.NewJSONHandler(s.opts.Out, handleOpt)
}
s.handler = &wrapper{h: h.WithAttrs(attrs)}
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
s.handler = &wrapper{
h: h.WithAttrs(attrs),
level: atomic.LoadInt64(&s.handler.level),
}
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
s.mu.Unlock()
return nil
@@ -290,10 +305,17 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
}
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
var pcs uintptr
if s.opts.AddCaller {
var caller [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
pcs = caller[0]
}
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
r.AddAttrs(attrs...)
_ = s.handler.Handle(ctx, r)
}