Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 46eb739dff | |||
| 13b01f59ee |
@@ -41,11 +41,11 @@ type Broker interface {
|
|||||||
// Disconnect disconnect from broker
|
// Disconnect disconnect from broker
|
||||||
Disconnect(ctx context.Context) error
|
Disconnect(ctx context.Context) error
|
||||||
// NewMessage create new broker message to publish.
|
// NewMessage create new broker message to publish.
|
||||||
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
|
NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
|
||||||
// Publish message to broker topic
|
// Publish message to broker topic
|
||||||
Publish(ctx context.Context, topic string, messages ...Message) error
|
Publish(ctx context.Context, topic string, messages ...Message) error
|
||||||
// Subscribe subscribes to topic message via handler
|
// Subscribe subscribes to topic message via handler
|
||||||
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
|
Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
|
||||||
// String type of broker
|
// String type of broker
|
||||||
String() string
|
String() string
|
||||||
// Live returns broker liveness
|
// Live returns broker liveness
|
||||||
@@ -59,7 +59,7 @@ type Broker interface {
|
|||||||
type (
|
type (
|
||||||
FuncPublish func(ctx context.Context, topic string, messages ...Message) error
|
FuncPublish func(ctx context.Context, topic string, messages ...Message) error
|
||||||
HookPublish func(next FuncPublish) FuncPublish
|
HookPublish func(next FuncPublish) FuncPublish
|
||||||
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
|
FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
|
||||||
HookSubscribe func(next FuncSubscribe) FuncSubscribe
|
HookSubscribe func(next FuncSubscribe) FuncSubscribe
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -75,7 +75,7 @@ type Message interface {
|
|||||||
Body() []byte
|
Body() []byte
|
||||||
// Unmarshal try to decode message body to dst.
|
// Unmarshal try to decode message body to dst.
|
||||||
// This is helper method that uses codec.Unmarshal.
|
// This is helper method that uses codec.Unmarshal.
|
||||||
Unmarshal(dst interface{}, opts ...codec.Option) error
|
Unmarshal(dst any, opts ...codec.Option) error
|
||||||
// Ack acknowledge message if supported.
|
// Ack acknowledge message if supported.
|
||||||
Ack() error
|
Ack() error
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ import (
|
|||||||
type Options struct {
|
type Options struct {
|
||||||
// Name holds the broker name
|
// Name holds the broker name
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
// Tracer used for tracing
|
// Tracer used for tracing
|
||||||
Tracer tracer.Tracer
|
Tracer tracer.Tracer
|
||||||
// Register can be used for clustering
|
// Register can be used for clustering
|
||||||
@@ -31,23 +30,20 @@ type Options struct {
|
|||||||
Meter meter.Meter
|
Meter meter.Meter
|
||||||
// Context holds external options
|
// Context holds external options
|
||||||
Context context.Context
|
Context context.Context
|
||||||
|
|
||||||
// Wait waits for a collection of goroutines to finish
|
// Wait waits for a collection of goroutines to finish
|
||||||
Wait *sync.WaitGroup
|
Wait *sync.WaitGroup
|
||||||
// TLSConfig holds tls.TLSConfig options
|
// TLSConfig holds tls.TLSConfig options
|
||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
// Addrs holds the broker address
|
// Addrs holds the broker address
|
||||||
Addrs []string
|
Addrs []string
|
||||||
// Hooks can be run before broker Publish/BatchPublish and
|
// Hooks can be run before broker Publishing and message processing in Subscribe
|
||||||
// Subscribe/BatchSubscribe methods
|
|
||||||
Hooks options.Hooks
|
Hooks options.Hooks
|
||||||
|
|
||||||
// GracefulTimeout contains time to wait to finish in flight requests
|
// GracefulTimeout contains time to wait to finish in flight requests
|
||||||
GracefulTimeout time.Duration
|
GracefulTimeout time.Duration
|
||||||
|
|
||||||
// ContentType will be used if no content-type set when creating message
|
// ContentType will be used if no content-type set when creating message
|
||||||
ContentType string
|
ContentType string
|
||||||
|
// ErrorHandler specifies handler for all broker errors handling subscriber
|
||||||
|
ErrorHandler any
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions create new Options
|
// NewOptions create new Options
|
||||||
@@ -93,6 +89,13 @@ func ContentType(ct string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrorHandler handles errors in broker
|
||||||
|
func ErrorHandler(h any) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ErrorHandler = h
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MessageOptions struct
|
// MessageOptions struct
|
||||||
type MessageOptions struct {
|
type MessageOptions struct {
|
||||||
// ContentType for message body
|
// ContentType for message body
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"slices"
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v4/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -42,8 +43,10 @@ type Options struct {
|
|||||||
Fields []interface{}
|
Fields []interface{}
|
||||||
// ContextAttrFuncs contains funcs that executed before log func on context
|
// ContextAttrFuncs contains funcs that executed before log func on context
|
||||||
ContextAttrFuncs []ContextAttrFunc
|
ContextAttrFuncs []ContextAttrFunc
|
||||||
// callerSkipCount number of frmaes to skip
|
// callerSkipCount number of frames to skip
|
||||||
CallerSkipCount int
|
CallerSkipCount int
|
||||||
|
// AddCaller enables to get caller
|
||||||
|
AddCaller bool
|
||||||
// The logging level the logger should log
|
// The logging level the logger should log
|
||||||
Level Level
|
Level Level
|
||||||
// AddSource enabled writing source file and position in log
|
// AddSource enabled writing source file and position in log
|
||||||
@@ -83,6 +86,12 @@ func NewOptions(opts ...Option) Options {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithCallerEnabled(b bool) logger.Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.AddCaller = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithFatalFinalizers set logger.Fatal finalizers
|
// WithFatalFinalizers set logger.Fatal finalizers
|
||||||
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
|
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
|||||||
@@ -37,11 +37,11 @@ var (
|
|||||||
|
|
||||||
type wrapper struct {
|
type wrapper struct {
|
||||||
h slog.Handler
|
h slog.Handler
|
||||||
level atomic.Int64
|
level int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
|
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
|
||||||
return level >= slog.Level(int(h.level.Load()))
|
return level >= slog.Level(atomic.LoadInt64(&h.level))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
||||||
@@ -49,11 +49,17 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||||
return h.h.WithAttrs(attrs)
|
return &wrapper{
|
||||||
|
h: h.h.WithAttrs(attrs),
|
||||||
|
level: atomic.LoadInt64(&h.level),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *wrapper) WithGroup(name string) slog.Handler {
|
func (h *wrapper) WithGroup(name string) slog.Handler {
|
||||||
return h.h.WithGroup(name)
|
return &wrapper{
|
||||||
|
h: h.h.WithGroup(name),
|
||||||
|
level: atomic.LoadInt64(&h.level),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||||
@@ -115,10 +121,13 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
|
|||||||
|
|
||||||
attrs, _ := s.argsAttrs(options.Fields)
|
attrs, _ := s.argsAttrs(options.Fields)
|
||||||
l := &slogLogger{
|
l := &slogLogger{
|
||||||
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)},
|
handler: &wrapper{
|
||||||
opts: options,
|
h: s.handler.h.WithAttrs(attrs),
|
||||||
|
level: atomic.LoadInt64(&s.handler.level),
|
||||||
|
},
|
||||||
|
opts: options,
|
||||||
}
|
}
|
||||||
l.handler.level.Store(int64(loggerToSlogLevel(options.Level)))
|
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level)))
|
||||||
|
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
@@ -131,9 +140,9 @@ func (s *slogLogger) V(level logger.Level) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Level(level logger.Level) {
|
func (s *slogLogger) Level(level logger.Level) {
|
||||||
|
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.opts.Level = level
|
s.opts.Level = level
|
||||||
s.handler.level.Store(int64(loggerToSlogLevel(level)))
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,8 +163,11 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
attrs, _ := s.argsAttrs(fields)
|
attrs, _ := s.argsAttrs(fields)
|
||||||
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)}
|
l.handler = &wrapper{
|
||||||
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level)))
|
h: s.handler.h.WithAttrs(attrs),
|
||||||
|
level: atomic.LoadInt64(&s.handler.level),
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
|
||||||
|
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
@@ -200,8 +212,11 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
|
|||||||
h = slog.NewJSONHandler(s.opts.Out, handleOpt)
|
h = slog.NewJSONHandler(s.opts.Out, handleOpt)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.handler = &wrapper{h: h.WithAttrs(attrs)}
|
s.handler = &wrapper{
|
||||||
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
|
h: h.WithAttrs(attrs),
|
||||||
|
level: atomic.LoadInt64(&s.handler.level),
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -290,10 +305,17 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var pcs [1]uintptr
|
var pcs uintptr
|
||||||
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
|
|
||||||
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
|
if s.opts.AddCaller {
|
||||||
|
var caller [1]uintptr
|
||||||
|
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
|
||||||
|
pcs = caller[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
|
||||||
r.AddAttrs(attrs...)
|
r.AddAttrs(attrs...)
|
||||||
|
|
||||||
_ = s.handler.Handle(ctx, r)
|
_ = s.handler.Handle(ctx, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user