diff --git a/broker.go b/broker.go index 0af77fa..f3cd752 100644 --- a/broker.go +++ b/broker.go @@ -27,19 +27,19 @@ var ( func (m *hookEvent) OnGroupManageError(err error) { if err != nil { - // m.connected.Store(0) - if m.fatalOnError { - m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) - } + m.connected.Store(0) + // if m.fatalOnError { + m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) + //} } } func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { if err != nil { // m.connected.Store(0) - if m.fatalOnError { - m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) - } + // if m.fatalOnError { + m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) + //} } } @@ -50,20 +50,22 @@ func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { if err != nil { // m.connected.Store(0) - if m.fatalOnError { - m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) - } + // if m.fatalOnError { + m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err) + //} } } func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { if err != nil { // m.connected.Store(0) + m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) } } func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { if err != nil { // m.connected.Store(0) + m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err) } } diff --git a/kgo.go b/kgo.go index 1cd57df..c99cfdc 100644 --- a/kgo.go +++ b/kgo.go @@ -19,6 +19,7 @@ import ( "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/tracer" + mjitter "go.unistack.org/micro/v3/util/jitter" mrand "go.unistack.org/micro/v3/util/rand" ) @@ -66,6 +67,8 @@ type Broker struct { sync.RWMutex init bool + + done chan struct{} } func (r *Broker) Live() bool { @@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho return nil, nil, err } k.connected.Store(1) + + if fatalOnError { + go func() { + c := 3 + n := 0 + tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second) + defer tc.Stop() + for range tc.C { + if k.connected.Load() == 0 { + if n > c { + k.opts.Logger.Fatal(context.Background(), "broker fatal error") + } + n++ + } else { + n = 0 + } + } + }() + } return c, htracer, nil } } @@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error { } k.connected.Store(0) + close(k.done) return nil } @@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } } - if options.Context != nil { - if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v { - fatalOnError = v - } - } - sub := &Subscriber{ topic: topic, opts: options, @@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker { connected: &atomic.Uint32{}, opts: options, kopts: kopts, + done: make(chan struct{}), } } diff --git a/subscriber.go b/subscriber.go index b8bd4e2..c276383 100644 --- a/subscriber.go +++ b/subscriber.go @@ -199,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) - // s.connected.Store(0) + s.connected.Store(0) } }