Compare commits

...

3 Commits

Author SHA1 Message Date
69fe6994ae dont modify connected state on errors
Some checks failed
test / test (push) Failing after 1m25s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-02 19:07:03 +03:00
6360feb351 fixup group lag metric
All checks were successful
test / test (push) Successful in 3m27s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-23 10:50:50 +03:00
0845f4873b try to recover kafka fatals
All checks were successful
test / test (push) Successful in 4m38s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-07 18:55:28 +03:00
3 changed files with 40 additions and 20 deletions

View File

@@ -28,18 +28,18 @@ var (
func (m *hookEvent) OnGroupManageError(err error) {
if err != nil {
// m.connected.Store(0)
if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err)
}
// if m.fatalOnError {
m.log.Error(context.TODO(), "kgo.OnGroupManageError", err)
//}
}
}
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
if err != nil {
// m.connected.Store(0)
if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err)
}
// if m.fatalOnError {
m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err)
//}
}
}
@@ -50,20 +50,22 @@ func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
if err != nil {
// m.connected.Store(0)
if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err)
}
// if m.fatalOnError {
m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err)
//}
}
}
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
if err != nil {
// m.connected.Store(0)
m.log.Error(context.TODO(), "kgo.OnBrokerRead", err)
}
}
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
if err != nil {
// m.connected.Store(0)
m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err)
}
}

30
kgo.go
View File

@@ -19,6 +19,7 @@ import (
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
mjitter "go.unistack.org/micro/v3/util/jitter"
mrand "go.unistack.org/micro/v3/util/rand"
)
@@ -66,6 +67,8 @@ type Broker struct {
sync.RWMutex
init bool
done chan struct{}
}
func (r *Broker) Live() bool {
@@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho
return nil, nil, err
}
k.connected.Store(1)
if fatalOnError {
go func() {
c := 3
n := 0
tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second)
defer tc.Stop()
for range tc.C {
if k.connected.Load() == 0 {
if n > c {
k.opts.Logger.Fatal(context.Background(), "broker fatal error")
}
n++
} else {
n = 0
}
}
}()
}
return c, htracer, nil
}
}
@@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error {
}
k.connected.Store(0)
close(k.done)
return nil
}
@@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
}
}
if options.Context != nil {
if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v {
fatalOnError = v
}
}
sub := &Subscriber{
topic: topic,
opts: options,
@@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker {
connected: &atomic.Uint32{},
opts: options,
kopts: kopts,
done: make(chan struct{}),
}
}

View File

@@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) {
s.Lock()
for p, l := range lmap {
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag)))
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
}
s.Unlock()
@@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil {
s.connected.Store(0)
// s.connected.Store(0)
if s.fatalOnError {
s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err)
}
@@ -279,7 +279,7 @@ func (pc *consumer) consume() {
pc.c.MarkCommitRecords(record)
} else {
eventPool.Put(p)
pc.connected.Store(0)
// pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
return
}
@@ -296,7 +296,7 @@ func (pc *consumer) consume() {
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
eventPool.Put(p)
pc.connected.Store(0)
// pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
sp.Finish()
return
@@ -334,7 +334,7 @@ func (pc *consumer) consume() {
pc.c.MarkCommitRecords(record)
} else {
eventPool.Put(p)
pc.connected.Store(0)
// pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
sp.Finish()