Compare commits
	
		
			5 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 69fe6994ae | |||
| 6360feb351 | |||
| 0845f4873b | |||
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										32
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								broker.go
									
									
									
									
									
								
							| @@ -27,43 +27,45 @@ var ( | ||||
|  | ||||
| func (m *hookEvent) OnGroupManageError(err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | ||||
| 		} | ||||
| 		// m.connected.Store(0) | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) | ||||
| 		//} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | ||||
| 		} | ||||
| 		// m.connected.Store(0) | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) | ||||
| 		//} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | ||||
| 	m.connected.Store(0) | ||||
| 	// m.connected.Store(0) | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | ||||
| 		} | ||||
| 		// m.connected.Store(0) | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err) | ||||
| 		//} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err) | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -19,6 +19,7 @@ import ( | ||||
| 	"go.unistack.org/micro/v3/metadata" | ||||
| 	"go.unistack.org/micro/v3/semconv" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
| 	mjitter "go.unistack.org/micro/v3/util/jitter" | ||||
| 	mrand "go.unistack.org/micro/v3/util/rand" | ||||
| ) | ||||
|  | ||||
| @@ -66,6 +67,8 @@ type Broker struct { | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	init bool | ||||
|  | ||||
| 	done chan struct{} | ||||
| } | ||||
|  | ||||
| func (r *Broker) Live() bool { | ||||
| @@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho | ||||
| 			return nil, nil, err | ||||
| 		} | ||||
| 		k.connected.Store(1) | ||||
|  | ||||
| 		if fatalOnError { | ||||
| 			go func() { | ||||
| 				c := 3 | ||||
| 				n := 0 | ||||
| 				tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second) | ||||
| 				defer tc.Stop() | ||||
| 				for range tc.C { | ||||
| 					if k.connected.Load() == 0 { | ||||
| 						if n > c { | ||||
| 							k.opts.Logger.Fatal(context.Background(), "broker fatal error") | ||||
| 						} | ||||
| 						n++ | ||||
| 					} else { | ||||
| 						n = 0 | ||||
| 					} | ||||
| 				} | ||||
| 			}() | ||||
| 		} | ||||
| 		return c, htracer, nil | ||||
| 	} | ||||
| } | ||||
| @@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error { | ||||
| 	} | ||||
|  | ||||
| 	k.connected.Store(0) | ||||
| 	close(k.done) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if options.Context != nil { | ||||
| 		if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v { | ||||
| 			fatalOnError = v | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	sub := &Subscriber{ | ||||
| 		topic:        topic, | ||||
| 		opts:         options, | ||||
| @@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker { | ||||
| 		connected: &atomic.Uint32{}, | ||||
| 		opts:      options, | ||||
| 		kopts:     kopts, | ||||
| 		done:      make(chan struct{}), | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) { | ||||
|  | ||||
| 				s.Lock() | ||||
| 				for p, l := range lmap { | ||||
| 					s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag))) | ||||
| 					s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag)) | ||||
| 				} | ||||
| 				s.Unlock() | ||||
|  | ||||
| @@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
| 	for topic, partitions := range lost { | ||||
| 		for _, partition := range partitions { | ||||
| 			tps := tp{topic, partition} | ||||
| 			pc := s.consumers[tps] | ||||
| 			pc, ok := s.consumers[tps] | ||||
| 			if !ok { | ||||
| 				continue | ||||
| 			} | ||||
| 			delete(s.consumers, tps) | ||||
| 			close(pc.quit) | ||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||
| @@ -174,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
|  | ||||
| func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { | ||||
| 	if err != nil { | ||||
| 		s.connected.Store(0) | ||||
| 		//		s.connected.Store(0) | ||||
| 		if s.fatalOnError { | ||||
| 			s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) | ||||
| 		} | ||||
| @@ -276,7 +279,7 @@ func (pc *consumer) consume() { | ||||
| 								pc.c.MarkCommitRecords(record) | ||||
| 							} else { | ||||
| 								eventPool.Put(p) | ||||
| 								pc.connected.Store(0) | ||||
| 								//								pc.connected.Store(0) | ||||
| 								pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 								return | ||||
| 							} | ||||
| @@ -293,7 +296,7 @@ func (pc *consumer) consume() { | ||||
| 						pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 						pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||
| 						eventPool.Put(p) | ||||
| 						pc.connected.Store(0) | ||||
| 						//						pc.connected.Store(0) | ||||
| 						pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | ||||
| 						sp.Finish() | ||||
| 						return | ||||
| @@ -331,7 +334,7 @@ func (pc *consumer) consume() { | ||||
| 					pc.c.MarkCommitRecords(record) | ||||
| 				} else { | ||||
| 					eventPool.Put(p) | ||||
| 					pc.connected.Store(0) | ||||
| 					//					pc.connected.Store(0) | ||||
| 					pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||
| 					sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") | ||||
| 					sp.Finish() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user