Compare commits
	
		
			5 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 69fe6994ae | |||
| 6360feb351 | |||
| 0845f4873b | |||
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										32
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								broker.go
									
									
									
									
									
								
							| @@ -27,43 +27,45 @@ var ( | |||||||
|  |  | ||||||
| func (m *hookEvent) OnGroupManageError(err error) { | func (m *hookEvent) OnGroupManageError(err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		// if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | 		m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) | ||||||
| 		} | 		//} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		// if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | 		m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) | ||||||
| 		} | 		//} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | ||||||
| 	m.connected.Store(0) | 	// m.connected.Store(0) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		// if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | 		m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err) | ||||||
| 		} | 		//} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
|  | 		m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
|  | 		m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -19,6 +19,7 @@ import ( | |||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v3/metadata" | ||||||
| 	"go.unistack.org/micro/v3/semconv" | 	"go.unistack.org/micro/v3/semconv" | ||||||
| 	"go.unistack.org/micro/v3/tracer" | 	"go.unistack.org/micro/v3/tracer" | ||||||
|  | 	mjitter "go.unistack.org/micro/v3/util/jitter" | ||||||
| 	mrand "go.unistack.org/micro/v3/util/rand" | 	mrand "go.unistack.org/micro/v3/util/rand" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -66,6 +67,8 @@ type Broker struct { | |||||||
|  |  | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	init bool | 	init bool | ||||||
|  |  | ||||||
|  | 	done chan struct{} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *Broker) Live() bool { | func (r *Broker) Live() bool { | ||||||
| @@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho | |||||||
| 			return nil, nil, err | 			return nil, nil, err | ||||||
| 		} | 		} | ||||||
| 		k.connected.Store(1) | 		k.connected.Store(1) | ||||||
|  |  | ||||||
|  | 		if fatalOnError { | ||||||
|  | 			go func() { | ||||||
|  | 				c := 3 | ||||||
|  | 				n := 0 | ||||||
|  | 				tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second) | ||||||
|  | 				defer tc.Stop() | ||||||
|  | 				for range tc.C { | ||||||
|  | 					if k.connected.Load() == 0 { | ||||||
|  | 						if n > c { | ||||||
|  | 							k.opts.Logger.Fatal(context.Background(), "broker fatal error") | ||||||
|  | 						} | ||||||
|  | 						n++ | ||||||
|  | 					} else { | ||||||
|  | 						n = 0 | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 			}() | ||||||
|  | 		} | ||||||
| 		return c, htracer, nil | 		return c, htracer, nil | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	k.connected.Store(0) | 	k.connected.Store(0) | ||||||
|  | 	close(k.done) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if options.Context != nil { |  | ||||||
| 		if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v { |  | ||||||
| 			fatalOnError = v |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	sub := &Subscriber{ | 	sub := &Subscriber{ | ||||||
| 		topic:        topic, | 		topic:        topic, | ||||||
| 		opts:         options, | 		opts:         options, | ||||||
| @@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker { | |||||||
| 		connected: &atomic.Uint32{}, | 		connected: &atomic.Uint32{}, | ||||||
| 		opts:      options, | 		opts:      options, | ||||||
| 		kopts:     kopts, | 		kopts:     kopts, | ||||||
|  | 		done:      make(chan struct{}), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) { | |||||||
|  |  | ||||||
| 				s.Lock() | 				s.Lock() | ||||||
| 				for p, l := range lmap { | 				for p, l := range lmap { | ||||||
| 					s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag))) | 					s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag)) | ||||||
| 				} | 				} | ||||||
| 				s.Unlock() | 				s.Unlock() | ||||||
|  |  | ||||||
| @@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | |||||||
| 	for topic, partitions := range lost { | 	for topic, partitions := range lost { | ||||||
| 		for _, partition := range partitions { | 		for _, partition := range partitions { | ||||||
| 			tps := tp{topic, partition} | 			tps := tp{topic, partition} | ||||||
| 			pc := s.consumers[tps] | 			pc, ok := s.consumers[tps] | ||||||
|  | 			if !ok { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			delete(s.consumers, tps) | 			delete(s.consumers, tps) | ||||||
| 			close(pc.quit) | 			close(pc.quit) | ||||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||||
| @@ -174,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | |||||||
|  |  | ||||||
| func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { | func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		s.connected.Store(0) | 		//		s.connected.Store(0) | ||||||
| 		if s.fatalOnError { | 		if s.fatalOnError { | ||||||
| 			s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) | 			s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) | ||||||
| 		} | 		} | ||||||
| @@ -276,7 +279,7 @@ func (pc *consumer) consume() { | |||||||
| 								pc.c.MarkCommitRecords(record) | 								pc.c.MarkCommitRecords(record) | ||||||
| 							} else { | 							} else { | ||||||
| 								eventPool.Put(p) | 								eventPool.Put(p) | ||||||
| 								pc.connected.Store(0) | 								//								pc.connected.Store(0) | ||||||
| 								pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | 								pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||||
| 								return | 								return | ||||||
| 							} | 							} | ||||||
| @@ -293,7 +296,7 @@ func (pc *consumer) consume() { | |||||||
| 						pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | 						pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||||
| 						pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | 						pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) | ||||||
| 						eventPool.Put(p) | 						eventPool.Put(p) | ||||||
| 						pc.connected.Store(0) | 						//						pc.connected.Store(0) | ||||||
| 						pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | 						pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | ||||||
| 						sp.Finish() | 						sp.Finish() | ||||||
| 						return | 						return | ||||||
| @@ -331,7 +334,7 @@ func (pc *consumer) consume() { | |||||||
| 					pc.c.MarkCommitRecords(record) | 					pc.c.MarkCommitRecords(record) | ||||||
| 				} else { | 				} else { | ||||||
| 					eventPool.Put(p) | 					eventPool.Put(p) | ||||||
| 					pc.connected.Store(0) | 					//					pc.connected.Store(0) | ||||||
| 					pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | 					pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||||
| 					sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") | 					sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") | ||||||
| 					sp.Finish() | 					sp.Finish() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user