Compare commits
	
		
			2 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										12
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								broker.go
									
									
									
									
									
								
							| @@ -27,7 +27,7 @@ var ( | ||||
|  | ||||
| func (m *hookEvent) OnGroupManageError(err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | ||||
| 		} | ||||
| @@ -36,7 +36,7 @@ func (m *hookEvent) OnGroupManageError(err error) { | ||||
|  | ||||
| func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | ||||
| 		} | ||||
| @@ -44,12 +44,12 @@ func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | ||||
| 	m.connected.Store(0) | ||||
| 	// m.connected.Store(0) | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | ||||
| 		} | ||||
| @@ -58,12 +58,12 @@ func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.D | ||||
|  | ||||
| func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
| 	for topic, partitions := range lost { | ||||
| 		for _, partition := range partitions { | ||||
| 			tps := tp{topic, partition} | ||||
| 			pc := s.consumers[tps] | ||||
| 			pc, ok := s.consumers[tps] | ||||
| 			if !ok { | ||||
| 				continue | ||||
| 			} | ||||
| 			delete(s.consumers, tps) | ||||
| 			close(pc.quit) | ||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user