Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
@@ -5,10 +5,12 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kadm"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"github.com/twmb/franz-go/pkg/kmsg"
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
@@ -22,20 +24,17 @@ type tp struct {
|
||||
}
|
||||
|
||||
type consumer struct {
|
||||
topic string
|
||||
|
||||
c *kgo.Client
|
||||
topic string
|
||||
c *kgo.Client
|
||||
htracer *hookTracer
|
||||
|
||||
handler broker.Handler
|
||||
quit chan struct{}
|
||||
done chan struct{}
|
||||
recs chan kgo.FetchTopicPartition
|
||||
|
||||
kopts broker.Options
|
||||
opts broker.SubscribeOptions
|
||||
|
||||
quit chan struct{}
|
||||
done chan struct{}
|
||||
recs chan kgo.FetchTopicPartition
|
||||
kopts broker.Options
|
||||
partition int32
|
||||
opts broker.SubscribeOptions
|
||||
handler broker.Handler
|
||||
connected *atomic.Uint32
|
||||
}
|
||||
|
||||
type Subscriber struct {
|
||||
@@ -49,6 +48,7 @@ type Subscriber struct {
|
||||
kopts broker.Options
|
||||
opts broker.SubscribeOptions
|
||||
|
||||
connected *atomic.Uint32
|
||||
sync.RWMutex
|
||||
closed bool
|
||||
}
|
||||
@@ -144,8 +144,8 @@ func (s *Subscriber) poll(ctx context.Context) {
|
||||
})
|
||||
|
||||
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
||||
nTp := tp{p.Topic, p.Partition}
|
||||
s.consumers[nTp].recs <- p
|
||||
tps := tp{p.Topic, p.Partition}
|
||||
s.consumers[tps].recs <- p
|
||||
})
|
||||
s.c.AllowRebalance()
|
||||
}
|
||||
@@ -158,9 +158,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
||||
|
||||
for topic, partitions := range lost {
|
||||
for _, partition := range partitions {
|
||||
nTp := tp{topic, partition}
|
||||
pc := s.consumers[nTp]
|
||||
delete(s.consumers, nTp)
|
||||
tps := tp{topic, partition}
|
||||
pc := s.consumers[tps]
|
||||
delete(s.consumers, tps)
|
||||
close(pc.quit)
|
||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))
|
||||
@@ -171,11 +171,18 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
|
||||
if err != nil {
|
||||
s.connected.Store(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
|
||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] lost %#+v", lost))
|
||||
if s.kopts.Logger.V(logger.ErrorLevel) {
|
||||
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] lost %#+v", lost))
|
||||
}
|
||||
s.killConsumers(ctx, lost)
|
||||
// s.connected.Store(0)
|
||||
}
|
||||
|
||||
func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
|
||||
@@ -185,6 +192,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
||||
s.killConsumers(ctx, revoked)
|
||||
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
||||
s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
|
||||
// s.connected.Store(0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,6 +210,7 @@ func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
|
||||
handler: s.handler,
|
||||
kopts: s.kopts,
|
||||
opts: s.opts,
|
||||
connected: s.connected,
|
||||
}
|
||||
s.Lock()
|
||||
s.consumers[tp{topic, partition}] = pc
|
||||
@@ -263,6 +272,7 @@ func (pc *consumer) consume() {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
return
|
||||
}
|
||||
@@ -279,6 +289,7 @@ func (pc *consumer) consume() {
|
||||
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
eventPool.Put(p)
|
||||
pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
||||
sp.Finish()
|
||||
return
|
||||
@@ -316,6 +327,7 @@ func (pc *consumer) consume() {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
|
||||
sp.Finish()
|
||||
|
Reference in New Issue
Block a user