dont modify connected state on errors
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -27,7 +27,7 @@ var ( | ||||
|  | ||||
| func (m *hookEvent) OnGroupManageError(err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) | ||||
| 		//} | ||||
|   | ||||
| @@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
|  | ||||
| func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { | ||||
| 	if err != nil { | ||||
| 		s.connected.Store(0) | ||||
| 		//		s.connected.Store(0) | ||||
| 		if s.fatalOnError { | ||||
| 			s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) | ||||
| 		} | ||||
| @@ -199,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str | ||||
| 	s.killConsumers(ctx, revoked) | ||||
| 	if err := c.CommitMarkedOffsets(ctx); err != nil { | ||||
| 		s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) | ||||
| 		s.connected.Store(0) | ||||
| 		// s.connected.Store(0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -279,7 +279,7 @@ func (pc *consumer) consume() { | ||||
| 								pc.c.MarkCommitRecords(record) | ||||
| 							} else { | ||||
| 								eventPool.Put(p) | ||||
| 								pc.connected.Store(0) | ||||
| 								//								pc.connected.Store(0) | ||||
| 								pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 								return | ||||
| 							} | ||||
| @@ -296,7 +296,7 @@ func (pc *consumer) consume() { | ||||
| 						pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 						pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 						eventPool.Put(p) | ||||
| 						pc.connected.Store(0) | ||||
| 						//						pc.connected.Store(0) | ||||
| 						pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | ||||
| 						sp.Finish() | ||||
| 						return | ||||
| @@ -334,7 +334,7 @@ func (pc *consumer) consume() { | ||||
| 					pc.c.MarkCommitRecords(record) | ||||
| 				} else { | ||||
| 					eventPool.Put(p) | ||||
| 					pc.connected.Store(0) | ||||
| 					//					pc.connected.Store(0) | ||||
| 					pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 					sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") | ||||
| 					sp.Finish() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user