Compare commits

...

8 Commits

Author SHA1 Message Date
69fe6994ae dont modify connected state on errors
Some checks failed
test / test (push) Failing after 1m25s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-02 19:07:03 +03:00
6360feb351 fixup group lag metric
All checks were successful
test / test (push) Successful in 3m27s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-23 10:50:50 +03:00
0845f4873b try to recover kafka fatals
All checks were successful
test / test (push) Successful in 4m38s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-07 18:55:28 +03:00
747e35148f disable connected state changes now
All checks were successful
test / test (push) Successful in 2m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-12 20:40:30 +03:00
21f06fee6c fixup panic
All checks were successful
test / test (push) Successful in 2m21s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-12 16:59:43 +03:00
f4f8793686 fixup connected status
All checks were successful
test / test (push) Successful in 2m25s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-07 11:45:36 +03:00
ec4922ad8b fixup connected status
Some checks failed
test / test (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-07 11:44:56 +03:00
eaea14e5a8 fixup connected status
All checks were successful
test / test (push) Successful in 3m3s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-07 11:26:34 +03:00
3 changed files with 52 additions and 28 deletions

View File

@@ -27,43 +27,45 @@ var (
func (m *hookEvent) OnGroupManageError(err error) { func (m *hookEvent) OnGroupManageError(err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
if m.fatalOnError { // if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) m.log.Error(context.TODO(), "kgo.OnGroupManageError", err)
} //}
} }
} }
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
if m.fatalOnError { // if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err)
} //}
} }
} }
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
m.connected.Store(0) // m.connected.Store(0)
} }
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
if m.fatalOnError { // if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err)
} //}
} }
} }
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
m.log.Error(context.TODO(), "kgo.OnBrokerRead", err)
} }
} }
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err)
} }
} }

33
kgo.go
View File

@@ -19,6 +19,7 @@ import (
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/tracer"
mjitter "go.unistack.org/micro/v3/util/jitter"
mrand "go.unistack.org/micro/v3/util/rand" mrand "go.unistack.org/micro/v3/util/rand"
) )
@@ -66,6 +67,8 @@ type Broker struct {
sync.RWMutex sync.RWMutex
init bool init bool
done chan struct{}
} }
func (r *Broker) Live() bool { func (r *Broker) Live() bool {
@@ -142,8 +145,28 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho
} }
return nil, nil, err return nil, nil, err
} }
k.connected.Store(1)
if fatalOnError {
go func() {
c := 3
n := 0
tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second)
defer tc.Stop()
for range tc.C {
if k.connected.Load() == 0 {
if n > c {
k.opts.Logger.Fatal(context.Background(), "broker fatal error")
}
n++
} else {
n = 0
}
}
}()
}
return c, htracer, nil
} }
return c, htracer, nil
} }
func (k *Broker) Connect(ctx context.Context) error { func (k *Broker) Connect(ctx context.Context) error {
@@ -203,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error {
} }
k.connected.Store(0) k.connected.Store(0)
close(k.done)
return nil return nil
} }
@@ -390,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
} }
if options.Context != nil {
if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v {
fatalOnError = v
}
}
sub := &Subscriber{ sub := &Subscriber{
topic: topic, topic: topic,
opts: options, opts: options,
@@ -491,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker {
connected: &atomic.Uint32{}, connected: &atomic.Uint32{},
opts: options, opts: options,
kopts: kopts, kopts: kopts,
done: make(chan struct{}),
} }
} }

View File

@@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) {
s.Lock() s.Lock()
for p, l := range lmap { for p, l := range lmap {
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag))) s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
} }
s.Unlock() s.Unlock()
@@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost { for topic, partitions := range lost {
for _, partition := range partitions { for _, partition := range partitions {
tps := tp{topic, partition} tps := tp{topic, partition}
pc := s.consumers[tps] pc, ok := s.consumers[tps]
if !ok {
continue
}
delete(s.consumers, tps) delete(s.consumers, tps)
close(pc.quit) close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) { if s.kopts.Logger.V(logger.DebugLevel) {
@@ -174,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil { if err != nil {
s.connected.Store(0) // s.connected.Store(0)
if s.fatalOnError { if s.fatalOnError {
s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err)
} }
@@ -276,7 +279,7 @@ func (pc *consumer) consume() {
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
} else { } else {
eventPool.Put(p) eventPool.Put(p)
pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
return return
} }
@@ -293,7 +296,7 @@ func (pc *consumer) consume() {
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
eventPool.Put(p) eventPool.Put(p)
pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
sp.Finish() sp.Finish()
return return
@@ -331,7 +334,7 @@ func (pc *consumer) consume() {
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
} else { } else {
eventPool.Put(p) eventPool.Put(p)
pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
sp.Finish() sp.Finish()