try refactoring && decomposition subsribe && update deps
This commit is contained in:
362
subscriber.go
362
subscriber.go
@@ -14,6 +14,7 @@ import (
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
@@ -58,10 +59,11 @@ type Subscriber struct {
|
||||
kopts broker.Options
|
||||
opts broker.SubscribeOptions
|
||||
|
||||
closed bool
|
||||
fatalOnError bool
|
||||
|
||||
closed atomic.Bool
|
||||
sync.RWMutex
|
||||
sync.WaitGroup
|
||||
}
|
||||
|
||||
func (s *Subscriber) Client() *kgo.Client {
|
||||
@@ -77,10 +79,12 @@ func (s *Subscriber) Topic() string {
|
||||
}
|
||||
|
||||
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||
if s.closed {
|
||||
if s.closed.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Wait()
|
||||
|
||||
s.c.PauseFetchTopics(s.topic)
|
||||
s.c.CloseAllowingRebalance()
|
||||
kc := make(map[string][]int32)
|
||||
@@ -89,8 +93,9 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||
}
|
||||
s.killConsumers(ctx, kc)
|
||||
close(s.done)
|
||||
s.closed = true
|
||||
s.closed.Store(true)
|
||||
s.c.ResumeFetchTopics(s.topic)
|
||||
s.c.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -103,39 +108,8 @@ func (s *Subscriber) poll(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
ac := kadm.NewClient(s.c)
|
||||
ticker := time.NewTicker(DefaultStatsInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
dgls, err := ac.Lag(ctx, s.opts.Group)
|
||||
if err != nil || !dgls.Ok() {
|
||||
continue
|
||||
}
|
||||
|
||||
dgl, ok := dgls[s.opts.Group]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
lmap, ok := dgl.Lag[s.topic]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
for p, l := range lmap {
|
||||
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
s.Add(1)
|
||||
go s.pollLag(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -146,27 +120,93 @@ func (s *Subscriber) poll(ctx context.Context) {
|
||||
return
|
||||
default:
|
||||
fetches := s.c.PollRecords(ctx, maxInflight)
|
||||
if !s.closed && fetches.IsClientClosed() {
|
||||
s.closed = true
|
||||
if !s.closed.Load() && fetches.IsClientClosed() {
|
||||
s.closed.Store(true)
|
||||
return
|
||||
}
|
||||
|
||||
fetches.EachError(func(t string, p int32, err error) {
|
||||
s.kopts.Logger.Fatal(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err)
|
||||
})
|
||||
|
||||
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
||||
tps := tp{p.Topic, p.Partition}
|
||||
s.consumers[tps].recs <- p
|
||||
if consumer, ok := s.consumers[tps]; ok {
|
||||
select {
|
||||
case consumer.recs <- p:
|
||||
default:
|
||||
if s.kopts.Logger.V(logger.WarnLevel) {
|
||||
s.kopts.Logger.Warn(ctx, fmt.Sprintf("[kgo] consumer channel full topic %s partition %d", p.Topic, p.Partition))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
s.c.AllowRebalance()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) pollLag(ctx context.Context) {
|
||||
ac := kadm.NewClient(s.c)
|
||||
ticker := time.NewTicker(DefaultStatsInterval)
|
||||
defer func() {
|
||||
s.Done()
|
||||
ticker.Stop()
|
||||
}()
|
||||
|
||||
// кеш ключей метрик lag: map[partition]metricCounter
|
||||
type lagMetric struct {
|
||||
counter meter.Counter
|
||||
lastLag int64
|
||||
}
|
||||
lagCache := make(map[int32]*lagMetric)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
dgls, err := ac.Lag(ctx, s.opts.Group)
|
||||
if err != nil || !dgls.Ok() {
|
||||
continue
|
||||
}
|
||||
|
||||
dgl, ok := dgls[s.opts.Group]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
lmap, ok := dgl.Lag[s.topic]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
for p, l := range lmap {
|
||||
lagVal := l.Lag
|
||||
if metric, exists := lagCache[p]; exists {
|
||||
if metric.lastLag != lagVal {
|
||||
metric.counter.Set(uint64(lagVal))
|
||||
metric.lastLag = lagVal
|
||||
}
|
||||
} else {
|
||||
counter := s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)))
|
||||
counter.Set(uint64(lagVal))
|
||||
lagCache[p] = &lagMetric{
|
||||
counter: counter,
|
||||
lastLag: lagVal,
|
||||
}
|
||||
}
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
s.Lock()
|
||||
for topic, partitions := range lost {
|
||||
for _, partition := range partitions {
|
||||
tps := tp{topic, partition}
|
||||
@@ -180,9 +220,16 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
||||
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() { <-pc.done; wg.Done() }()
|
||||
|
||||
go func(pc *consumer) {
|
||||
defer wg.Done()
|
||||
<-pc.done
|
||||
}(pc)
|
||||
}
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
|
||||
@@ -254,126 +301,119 @@ func (pc *consumer) consume() {
|
||||
case <-pc.quit:
|
||||
return
|
||||
case p := <-pc.recs:
|
||||
for _, record := range p.Records {
|
||||
ctx, sp := pc.htracer.WithProcessSpan(record)
|
||||
ts := time.Now()
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
|
||||
p := eventPool.Get().(*event)
|
||||
p.msg.Header = nil
|
||||
p.msg.Body = nil
|
||||
p.topic = record.Topic
|
||||
p.err = nil
|
||||
p.ack = false
|
||||
p.msg.Header = metadata.New(len(record.Headers))
|
||||
p.ctx = ctx
|
||||
for _, hdr := range record.Headers {
|
||||
p.msg.Header.Set(hdr.Key, string(hdr.Value))
|
||||
}
|
||||
if pc.kopts.Codec.String() == "noop" {
|
||||
p.msg.Body = record.Value
|
||||
} else if pc.opts.BodyOnly {
|
||||
p.msg.Body = record.Value
|
||||
} else {
|
||||
if sp != nil {
|
||||
sp.AddEvent("codec unmarshal start")
|
||||
}
|
||||
err := pc.kopts.Codec.Unmarshal(record.Value, p.msg)
|
||||
if sp != nil {
|
||||
sp.AddEvent("codec unmarshal stop")
|
||||
}
|
||||
if err != nil {
|
||||
if sp != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
|
||||
p.err = err
|
||||
p.msg.Body = record.Value
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
||||
if p.ack {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
// pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
return
|
||||
}
|
||||
eventPool.Put(p)
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
continue
|
||||
} else {
|
||||
pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err)
|
||||
}
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
||||
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
eventPool.Put(p)
|
||||
// pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
||||
if sp != nil {
|
||||
sp.Finish()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
if sp != nil {
|
||||
sp.AddEvent("handler start")
|
||||
}
|
||||
err := pc.handler(p)
|
||||
if sp != nil {
|
||||
sp.AddEvent("handler stop")
|
||||
}
|
||||
if err == nil {
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc()
|
||||
} else {
|
||||
if sp != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc()
|
||||
}
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
||||
if err == nil && pc.opts.AutoAck {
|
||||
p.ack = true
|
||||
} else if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
if sp != nil {
|
||||
sp.AddEvent("error handler start")
|
||||
}
|
||||
_ = eh(p)
|
||||
if sp != nil {
|
||||
sp.AddEvent("error handler stop")
|
||||
}
|
||||
} else {
|
||||
if pc.kopts.Logger.V(logger.ErrorLevel) {
|
||||
pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
if p.ack {
|
||||
eventPool.Put(p)
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
// pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
if sp != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
|
||||
sp.Finish()
|
||||
}
|
||||
return
|
||||
}
|
||||
if sp != nil {
|
||||
sp.Finish()
|
||||
}
|
||||
}
|
||||
pc.processBatch(p, eh)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (pc *consumer) processBatch(p kgo.FetchTopicPartition, eh broker.Handler) {
|
||||
var successCount, failureCount int
|
||||
topic := pc.topic
|
||||
|
||||
for _, record := range p.Records {
|
||||
ts := time.Now()
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", topic, "topic", topic).Inc()
|
||||
err := pc.handleRecord(record, eh)
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", topic, "topic", topic).Dec()
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
|
||||
if err == nil {
|
||||
successCount++
|
||||
} else {
|
||||
failureCount++
|
||||
}
|
||||
}
|
||||
|
||||
if successCount > 0 {
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "status", "success", "topic", topic).Add(successCount)
|
||||
}
|
||||
if failureCount > 0 {
|
||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "status", "failure", "topic", topic).Add(failureCount)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (pc *consumer) handleRecord(record *kgo.Record, eh broker.Handler) error {
|
||||
ctx, sp := pc.htracer.WithProcessSpan(record)
|
||||
p := eventPool.Get().(*event)
|
||||
p.reset()
|
||||
|
||||
defer func() {
|
||||
eventPool.Put(p)
|
||||
if sp != nil {
|
||||
sp.Finish()
|
||||
}
|
||||
}()
|
||||
|
||||
p.topic = record.Topic
|
||||
p.ctx = ctx
|
||||
|
||||
p.msg.Header = metadata.New(len(record.Headers))
|
||||
for _, hdr := range record.Headers {
|
||||
p.msg.Header.Set(hdr.Key, string(hdr.Value))
|
||||
}
|
||||
|
||||
if pc.kopts.Codec.String() == "noop" || pc.opts.BodyOnly {
|
||||
p.msg.Body = record.Value
|
||||
} else {
|
||||
if sp != nil {
|
||||
sp.AddEvent("codec unmarshal start")
|
||||
}
|
||||
err := pc.kopts.Codec.Unmarshal(record.Value, p.msg)
|
||||
if sp != nil {
|
||||
sp.AddEvent("codec unmarshal stop")
|
||||
}
|
||||
if err != nil {
|
||||
if sp != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
p.err = err
|
||||
p.msg.Body = record.Value
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
if p.ack {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
}
|
||||
return err
|
||||
}
|
||||
pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if sp != nil {
|
||||
sp.AddEvent("handler start")
|
||||
}
|
||||
err := pc.handler(p)
|
||||
if sp != nil {
|
||||
sp.AddEvent("handler stop")
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
if pc.opts.AutoAck {
|
||||
p.ack = true
|
||||
}
|
||||
} else {
|
||||
if sp != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
} else if pc.kopts.Logger.V(logger.ErrorLevel) {
|
||||
pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.ack {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user