update deps && structs && hooks #147

Merged
vtolstov merged 10 commits from devstigneev/micro-broker-kgo:v3 into v3 2024-12-20 00:02:25 +03:00
6 changed files with 83 additions and 55 deletions
Showing only changes of commit ff6a272594 - Show all commits

View File

@ -3,3 +3,20 @@ run:
deadline: 5m deadline: 5m
issues-exit-code: 1 issues-exit-code: 1
tests: true tests: true
linters:
enable:
- staticcheck
- unused
- gosimple
- govet
- goimports
- prealloc
- unconvert
- nakedret
linters-settings:
govet:
check-all: true
enable:
- fieldalignment

View File

@ -8,11 +8,14 @@ import (
) )
type event struct { type event struct {
ctx context.Context
topic string
err error
sync.RWMutex
msg *broker.Message msg *broker.Message
err error
topic string
ctx context.Context
sync.RWMutex
ack bool ack bool
} }

32
kgo.go
View File

@ -5,7 +5,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand/v2"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
@ -29,7 +29,6 @@ var ErrLostMessage = errors.New("message not marked for offsets commit and will
var DefaultRetryBackoffFn = func() func(int) time.Duration { var DefaultRetryBackoffFn = func() func(int) time.Duration {
var rngMu sync.Mutex var rngMu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func(fails int) time.Duration { return func(fails int) time.Duration {
const ( const (
min = 100 * time.Millisecond min = 100 * time.Millisecond
@ -45,7 +44,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
backoff := min * time.Duration(1<<(fails-1)) backoff := min * time.Duration(1<<(fails-1))
rngMu.Lock() rngMu.Lock()
jitter := 0.8 + 0.4*rng.Float64() jitter := 0.8 + 0.4*rand.Float64()
rngMu.Unlock() rngMu.Unlock()
backoff = time.Duration(float64(backoff) * jitter) backoff = time.Duration(float64(backoff) * jitter)
@ -58,13 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
}() }()
type Broker struct { type Broker struct {
init bool c *kgo.Client
c *kgo.Client
kopts []kgo.Opt
connected *atomic.Uint32 connected *atomic.Uint32
sync.RWMutex
kopts []kgo.Opt
subs []*subscriber
opts broker.Options opts broker.Options
subs []*Subscriber
sync.RWMutex
init bool
} }
func (r *Broker) Live() bool { func (r *Broker) Live() bool {
@ -302,11 +304,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
if err != nil { // if err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc()
} else { // } else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
} // }
promise(r, err) promise(r, err)
}) })
} }
@ -322,9 +324,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
if result.Err != nil { if result.Err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error()) errs = append(errs, result.Err.Error())
} else { } // else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
} // }
} }
if len(errs) > 0 { if len(errs) > 0 {
@ -350,7 +352,7 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error {
return nil return nil
} }
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil return nil, nil
} }

View File

@ -61,7 +61,7 @@ const (
labelTopic = "topic" labelTopic = "topic"
) )
func (m *hookMeter) OnGroupManageError(err error) { func (m *hookMeter) OnGroupManageError(_ error) {
m.meter.Counter(metricBrokerGroupErrors).Inc() m.meter.Counter(metricBrokerGroupErrors).Inc()
} }

View File

@ -22,29 +22,35 @@ type tp struct {
} }
type consumer struct { type consumer struct {
c *kgo.Client topic string
topic string
partition int32 c *kgo.Client
htracer *hookTracer htracer *hookTracer
opts broker.SubscribeOptions
kopts broker.Options handler broker.Handler
handler broker.Handler quit chan struct{}
quit chan struct{} done chan struct{}
done chan struct{} recs chan kgo.FetchTopicPartition
recs chan kgo.FetchTopicPartition
kopts broker.Options
opts broker.SubscribeOptions
partition int32
} }
type Subscriber struct { type Subscriber struct {
c *kgo.Client
topic string
htracer *hookTracer
opts broker.SubscribeOptions
kopts broker.Options
handler broker.Handler
closed bool
done chan struct{}
consumers map[tp]*consumer consumers map[tp]*consumer
c *kgo.Client
htracer *hookTracer
topic string
handler broker.Handler
done chan struct{}
kopts broker.Options
opts broker.SubscribeOptions
sync.RWMutex sync.RWMutex
closed bool
} }
func (s *Subscriber) Client() *kgo.Client { func (s *Subscriber) Client() *kgo.Client {
@ -63,21 +69,21 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error {
if s.closed { if s.closed {
return nil return nil
} }
select { // select {
// case <-ctx.Done(): // case <-ctx.Done():
// return ctx.Err() // return ctx.Err()
default: // default:
s.c.PauseFetchTopics(s.topic) s.c.PauseFetchTopics(s.topic)
s.c.CloseAllowingRebalance() s.c.CloseAllowingRebalance()
kc := make(map[string][]int32) kc := make(map[string][]int32)
for ctp := range s.consumers { for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p) kc[ctp.t] = append(kc[ctp.t], ctp.p)
}
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
s.c.ResumeFetchTopics(s.topic)
} }
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
s.c.ResumeFetchTopics(s.topic)
// }
return nil return nil
} }
@ -141,8 +147,8 @@ func (s *Subscriber) poll(ctx context.Context) {
}) })
fetches.EachPartition(func(p kgo.FetchTopicPartition) { fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tp := tp{p.Topic, p.Partition} nTp := tp{p.Topic, p.Partition}
s.consumers[tp].recs <- p s.consumers[nTp].recs <- p
}) })
s.c.AllowRebalance() s.c.AllowRebalance()
} }
@ -155,9 +161,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost { for topic, partitions := range lost {
for _, partition := range partitions { for _, partition := range partitions {
tp := tp{topic, partition} nTp := tp{topic, partition}
pc := s.consumers[tp] pc := s.consumers[nTp]
delete(s.consumers, tp) delete(s.consumers, nTp)
close(pc.quit) close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) { if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))

View File

@ -11,9 +11,9 @@ import (
) )
type hookTracer struct { type hookTracer struct {
tracer tracer.Tracer
clientID string clientID string
group string group string
tracer tracer.Tracer
} }
var messagingSystem = semconv.MessagingSystemKey.String("kafka") var messagingSystem = semconv.MessagingSystemKey.String("kafka")