update deps && structs && hooks #147

Merged
vtolstov merged 10 commits from devstigneev/micro-broker-kgo:v3 into v3 2024-12-20 00:02:25 +03:00
6 changed files with 71 additions and 43 deletions
Showing only changes of commit 7c5da60556 - Show all commits

View File

@ -3,3 +3,20 @@ run:
deadline: 5m
issues-exit-code: 1
tests: true
linters:
enable:
- staticcheck
- unused
- gosimple
- govet
- goimports
- prealloc
- unconvert
- nakedret
linters-settings:
govet:
check-all: true
enable:
- fieldalignment

View File

@ -8,11 +8,14 @@ import (
)
type event struct {
ctx context.Context
topic string
err error
sync.RWMutex
msg *broker.Message
err error
topic string
ctx context.Context
sync.RWMutex
ack bool
}

32
kgo.go
View File

@ -5,7 +5,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"math/rand/v2"
"net/http"
"strings"
"sync"
@ -29,7 +29,6 @@ var ErrLostMessage = errors.New("message not marked for offsets commit and will
var DefaultRetryBackoffFn = func() func(int) time.Duration {
var rngMu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func(fails int) time.Duration {
const (
min = 100 * time.Millisecond
@ -45,7 +44,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
backoff := min * time.Duration(1<<(fails-1))
rngMu.Lock()
jitter := 0.8 + 0.4*rng.Float64()
jitter := 0.8 + 0.4*rand.Float64()
rngMu.Unlock()
backoff = time.Duration(float64(backoff) * jitter)
@ -58,13 +57,16 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
}()
type Broker struct {
init bool
c *kgo.Client
kopts []kgo.Opt
c *kgo.Client
connected *atomic.Uint32
sync.RWMutex
kopts []kgo.Opt
subs []*subscriber
opts broker.Options
subs []*Subscriber
sync.RWMutex
init bool
}
func (r *Broker) Live() bool {
@ -302,11 +304,11 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds())
if err != nil {
// if err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc()
} else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
}
// } else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc()
// }
promise(r, err)
})
}
@ -322,9 +324,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
if result.Err != nil {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error())
} else {
} // else {
k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
}
// }
}
if len(errs) > 0 {
@ -350,7 +352,7 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error {
return nil
}
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}

View File

@ -61,7 +61,7 @@ const (
labelTopic = "topic"
)
func (m *hookMeter) OnGroupManageError(err error) {
func (m *hookMeter) OnGroupManageError(_ error) {
m.meter.Counter(metricBrokerGroupErrors).Inc()
}

View File

@ -22,29 +22,35 @@ type tp struct {
}
type consumer struct {
c *kgo.Client
topic string
partition int32
topic string
c *kgo.Client
htracer *hookTracer
opts broker.SubscribeOptions
kopts broker.Options
handler broker.Handler
quit chan struct{}
done chan struct{}
recs chan kgo.FetchTopicPartition
handler broker.Handler
quit chan struct{}
done chan struct{}
recs chan kgo.FetchTopicPartition
kopts broker.Options
opts broker.SubscribeOptions
partition int32
}
type Subscriber struct {
c *kgo.Client
topic string
htracer *hookTracer
opts broker.SubscribeOptions
kopts broker.Options
handler broker.Handler
closed bool
done chan struct{}
consumers map[tp]*consumer
c *kgo.Client
htracer *hookTracer
topic string
handler broker.Handler
done chan struct{}
kopts broker.Options
opts broker.SubscribeOptions
sync.RWMutex
closed bool
}
func (s *Subscriber) Client() *kgo.Client {
@ -138,8 +144,8 @@ func (s *Subscriber) poll(ctx context.Context) {
})
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tp := tp{p.Topic, p.Partition}
s.consumers[tp].recs <- p
nTp := tp{p.Topic, p.Partition}
s.consumers[nTp].recs <- p
})
s.c.AllowRebalance()
}
@ -152,9 +158,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost {
for _, partition := range partitions {
tp := tp{topic, partition}
pc := s.consumers[tp]
delete(s.consumers, tp)
nTp := tp{topic, partition}
pc := s.consumers[nTp]
delete(s.consumers, nTp)
close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))

View File

@ -11,9 +11,9 @@ import (
)
type hookTracer struct {
tracer tracer.Tracer
clientID string
group string
tracer tracer.Tracer
}
var messagingSystem = semconv.MessagingSystemKey.String("kafka")