Compare commits
	
		
			2 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										12
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								broker.go
									
									
									
									
									
								
							| @@ -27,7 +27,7 @@ var ( | |||||||
|  |  | ||||||
| func (m *hookEvent) OnGroupManageError(err error) { | func (m *hookEvent) OnGroupManageError(err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | ||||||
| 		} | 		} | ||||||
| @@ -36,7 +36,7 @@ func (m *hookEvent) OnGroupManageError(err error) { | |||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | ||||||
| 		} | 		} | ||||||
| @@ -44,12 +44,12 @@ func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net | |||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | ||||||
| 	m.connected.Store(0) | 	// m.connected.Store(0) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | ||||||
| 		} | 		} | ||||||
| @@ -58,12 +58,12 @@ func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.D | |||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | |||||||
| 	for topic, partitions := range lost { | 	for topic, partitions := range lost { | ||||||
| 		for _, partition := range partitions { | 		for _, partition := range partitions { | ||||||
| 			tps := tp{topic, partition} | 			tps := tp{topic, partition} | ||||||
| 			pc := s.consumers[tps] | 			pc, ok := s.consumers[tps] | ||||||
|  | 			if !ok { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			delete(s.consumers, tps) | 			delete(s.consumers, tps) | ||||||
| 			close(pc.quit) | 			close(pc.quit) | ||||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user