Compare commits
10 Commits
v4.1.25
...
3a3024c05f
| Author | SHA1 | Date | |
|---|---|---|---|
| 3a3024c05f | |||
| 1511423ff2 | |||
| b55375fd89 | |||
| 77d1582543 | |||
| 560afc5dd6 | |||
| 540bc415d5 | |||
| 46eb739dff | |||
| 13b01f59ee | |||
| c32a17b69b | |||
| 9cb25acf63 |
@@ -41,11 +41,11 @@ type Broker interface {
|
||||
// Disconnect disconnect from broker
|
||||
Disconnect(ctx context.Context) error
|
||||
// NewMessage create new broker message to publish.
|
||||
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
|
||||
NewMessage(ctx context.Context, hdr metadata.Metadata, body any, opts ...MessageOption) (Message, error)
|
||||
// Publish message to broker topic
|
||||
Publish(ctx context.Context, topic string, messages ...Message) error
|
||||
// Subscribe subscribes to topic message via handler
|
||||
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
|
||||
Subscribe(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
|
||||
// String type of broker
|
||||
String() string
|
||||
// Live returns broker liveness
|
||||
@@ -59,7 +59,7 @@ type Broker interface {
|
||||
type (
|
||||
FuncPublish func(ctx context.Context, topic string, messages ...Message) error
|
||||
HookPublish func(next FuncPublish) FuncPublish
|
||||
FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
|
||||
FuncSubscribe func(ctx context.Context, topic string, handler any, opts ...SubscribeOption) (Subscriber, error)
|
||||
HookSubscribe func(next FuncSubscribe) FuncSubscribe
|
||||
)
|
||||
|
||||
@@ -75,7 +75,7 @@ type Message interface {
|
||||
Body() []byte
|
||||
// Unmarshal try to decode message body to dst.
|
||||
// This is helper method that uses codec.Unmarshal.
|
||||
Unmarshal(dst interface{}, opts ...codec.Option) error
|
||||
Unmarshal(dst any, opts ...codec.Option) error
|
||||
// Ack acknowledge message if supported.
|
||||
Ack() error
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
type Options struct {
|
||||
// Name holds the broker name
|
||||
Name string
|
||||
|
||||
// Tracer used for tracing
|
||||
Tracer tracer.Tracer
|
||||
// Register can be used for clustering
|
||||
@@ -31,23 +30,20 @@ type Options struct {
|
||||
Meter meter.Meter
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
|
||||
// Wait waits for a collection of goroutines to finish
|
||||
Wait *sync.WaitGroup
|
||||
// TLSConfig holds tls.TLSConfig options
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// Addrs holds the broker address
|
||||
Addrs []string
|
||||
// Hooks can be run before broker Publish/BatchPublish and
|
||||
// Subscribe/BatchSubscribe methods
|
||||
// Hooks can be run before broker Publishing and message processing in Subscribe
|
||||
Hooks options.Hooks
|
||||
|
||||
// GracefulTimeout contains time to wait to finish in flight requests
|
||||
GracefulTimeout time.Duration
|
||||
|
||||
// ContentType will be used if no content-type set when creating message
|
||||
ContentType string
|
||||
// ErrorHandler specifies handler for all broker errors handling subscriber
|
||||
ErrorHandler func(Message) error
|
||||
}
|
||||
|
||||
// NewOptions create new Options
|
||||
@@ -93,6 +89,13 @@ func ContentType(ct string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// ErrorHandler handles errors in broker
|
||||
func ErrorHandler(h func(Message) error) Option {
|
||||
return func(o *Options) {
|
||||
o.ErrorHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// MessageOptions struct
|
||||
type MessageOptions struct {
|
||||
// ContentType for message body
|
||||
|
||||
4
go.mod
4
go.mod
@@ -15,10 +15,10 @@ require (
|
||||
github.com/stretchr/testify v1.11.1
|
||||
go.uber.org/atomic v1.11.0
|
||||
go.unistack.org/micro-proto/v4 v4.1.0
|
||||
golang.org/x/sync v0.17.0
|
||||
golang.org/x/sync v0.19.0
|
||||
golang.yandex/hasql/v2 v2.1.0
|
||||
google.golang.org/grpc v1.76.0
|
||||
google.golang.org/protobuf v1.36.10
|
||||
google.golang.org/protobuf v1.36.11
|
||||
)
|
||||
|
||||
require (
|
||||
|
||||
8
go.sum
8
go.sum
@@ -46,8 +46,8 @@ go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9
|
||||
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
||||
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
|
||||
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
|
||||
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
||||
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
|
||||
@@ -58,8 +58,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
|
||||
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
||||
@@ -42,8 +42,10 @@ type Options struct {
|
||||
Fields []interface{}
|
||||
// ContextAttrFuncs contains funcs that executed before log func on context
|
||||
ContextAttrFuncs []ContextAttrFunc
|
||||
// callerSkipCount number of frmaes to skip
|
||||
// callerSkipCount number of frames to skip
|
||||
CallerSkipCount int
|
||||
// AddCaller enables to get caller
|
||||
AddCaller bool
|
||||
// The logging level the logger should log
|
||||
Level Level
|
||||
// AddSource enabled writing source file and position in log
|
||||
@@ -83,6 +85,18 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
func WithCallerEnabled(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.AddCaller = b
|
||||
}
|
||||
}
|
||||
|
||||
func WithAddCaller(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.AddCaller = b
|
||||
}
|
||||
}
|
||||
|
||||
// WithFatalFinalizers set logger.Fatal finalizers
|
||||
func WithFatalFinalizers(fncs ...func(context.Context)) Option {
|
||||
return func(o *Options) {
|
||||
|
||||
@@ -37,11 +37,11 @@ var (
|
||||
|
||||
type wrapper struct {
|
||||
h slog.Handler
|
||||
level atomic.Int64
|
||||
level int64
|
||||
}
|
||||
|
||||
func (h *wrapper) Enabled(ctx context.Context, level slog.Level) bool {
|
||||
return level >= slog.Level(int(h.level.Load()))
|
||||
return level >= slog.Level(atomic.LoadInt64(&h.level))
|
||||
}
|
||||
|
||||
func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
||||
@@ -49,11 +49,17 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
|
||||
}
|
||||
|
||||
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
return h.h.WithAttrs(attrs)
|
||||
return &wrapper{
|
||||
h: h.h.WithAttrs(attrs),
|
||||
level: atomic.LoadInt64(&h.level),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *wrapper) WithGroup(name string) slog.Handler {
|
||||
return h.h.WithGroup(name)
|
||||
return &wrapper{
|
||||
h: h.h.WithGroup(name),
|
||||
level: atomic.LoadInt64(&h.level),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
|
||||
@@ -115,10 +121,13 @@ func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
|
||||
|
||||
attrs, _ := s.argsAttrs(options.Fields)
|
||||
l := &slogLogger{
|
||||
handler: &wrapper{h: s.handler.h.WithAttrs(attrs)},
|
||||
opts: options,
|
||||
handler: &wrapper{
|
||||
h: s.handler.h.WithAttrs(attrs),
|
||||
level: atomic.LoadInt64(&s.handler.level),
|
||||
},
|
||||
opts: options,
|
||||
}
|
||||
l.handler.level.Store(int64(loggerToSlogLevel(options.Level)))
|
||||
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(options.Level)))
|
||||
|
||||
return l
|
||||
}
|
||||
@@ -131,9 +140,9 @@ func (s *slogLogger) V(level logger.Level) bool {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Level(level logger.Level) {
|
||||
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(level)))
|
||||
s.mu.Lock()
|
||||
s.opts.Level = level
|
||||
s.handler.level.Store(int64(loggerToSlogLevel(level)))
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -154,8 +163,11 @@ func (s *slogLogger) Fields(fields ...interface{}) logger.Logger {
|
||||
}
|
||||
|
||||
attrs, _ := s.argsAttrs(fields)
|
||||
l.handler = &wrapper{h: s.handler.h.WithAttrs(attrs)}
|
||||
l.handler.level.Store(int64(loggerToSlogLevel(l.opts.Level)))
|
||||
l.handler = &wrapper{
|
||||
h: s.handler.h.WithAttrs(attrs),
|
||||
level: atomic.LoadInt64(&s.handler.level),
|
||||
}
|
||||
atomic.StoreInt64(&l.handler.level, int64(loggerToSlogLevel(l.opts.Level)))
|
||||
|
||||
return l
|
||||
}
|
||||
@@ -200,8 +212,11 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
|
||||
h = slog.NewJSONHandler(s.opts.Out, handleOpt)
|
||||
}
|
||||
|
||||
s.handler = &wrapper{h: h.WithAttrs(attrs)}
|
||||
s.handler.level.Store(int64(loggerToSlogLevel(s.opts.Level)))
|
||||
s.handler = &wrapper{
|
||||
h: h.WithAttrs(attrs),
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&s.handler.level, int64(loggerToSlogLevel(s.opts.Level)))
|
||||
s.mu.Unlock()
|
||||
|
||||
return nil
|
||||
@@ -290,10 +305,17 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
|
||||
}
|
||||
}
|
||||
|
||||
var pcs [1]uintptr
|
||||
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, printLog, LogLvlMethod]
|
||||
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
|
||||
var pcs uintptr
|
||||
|
||||
if s.opts.AddCaller {
|
||||
var caller [1]uintptr
|
||||
runtime.Callers(s.opts.CallerSkipCount, caller[:]) // skip [Callers, printLog, LogLvlMethod]
|
||||
pcs = caller[0]
|
||||
}
|
||||
|
||||
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs)
|
||||
r.AddAttrs(attrs...)
|
||||
|
||||
_ = s.handler.Handle(ctx, r)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,8 +6,10 @@ import (
|
||||
)
|
||||
|
||||
var _ interface {
|
||||
io.ReadCloser
|
||||
io.ReadSeeker
|
||||
io.Reader
|
||||
io.Seeker
|
||||
io.Writer
|
||||
io.Closer
|
||||
} = (*SeekerBuffer)(nil)
|
||||
|
||||
// SeekerBuffer is a ReadWriteCloser that supports seeking. It's intended to
|
||||
@@ -95,7 +97,7 @@ func (b *SeekerBuffer) Close() error {
|
||||
|
||||
// Reset clears all the data out of the buffer and sets the read position to 0.
|
||||
func (b *SeekerBuffer) Reset() {
|
||||
b.data = nil
|
||||
b.data = b.data[:0]
|
||||
b.pos = 0
|
||||
}
|
||||
|
||||
|
||||
@@ -296,7 +296,7 @@ func TestSeekerBuffer_Reset(t *testing.T) {
|
||||
buf.pos = 2
|
||||
|
||||
buf.Reset()
|
||||
require.Nil(t, buf.data)
|
||||
require.Equal(t, []byte{}, buf.data)
|
||||
require.Equal(t, int64(0), buf.pos)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user