Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 6360feb351 | |||
| 0845f4873b | |||
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										30
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								broker.go
									
									
									
									
									
								
							| @@ -28,42 +28,44 @@ var ( | |||||||
| func (m *hookEvent) OnGroupManageError(err error) { | func (m *hookEvent) OnGroupManageError(err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		// if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | 		m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) | ||||||
| 		} | 		//} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		// if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | 		m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) | ||||||
| 		} | 		//} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | ||||||
| 	m.connected.Store(0) | 	// m.connected.Store(0) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
| 		if m.fatalOnError { | 		// if m.fatalOnError { | ||||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | 		m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err) | ||||||
| 		} | 		//} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
|  | 		m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		m.connected.Store(0) | 		// m.connected.Store(0) | ||||||
|  | 		m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -19,6 +19,7 @@ import ( | |||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v3/metadata" | ||||||
| 	"go.unistack.org/micro/v3/semconv" | 	"go.unistack.org/micro/v3/semconv" | ||||||
| 	"go.unistack.org/micro/v3/tracer" | 	"go.unistack.org/micro/v3/tracer" | ||||||
|  | 	mjitter "go.unistack.org/micro/v3/util/jitter" | ||||||
| 	mrand "go.unistack.org/micro/v3/util/rand" | 	mrand "go.unistack.org/micro/v3/util/rand" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -66,6 +67,8 @@ type Broker struct { | |||||||
|  |  | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	init bool | 	init bool | ||||||
|  |  | ||||||
|  | 	done chan struct{} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *Broker) Live() bool { | func (r *Broker) Live() bool { | ||||||
| @@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho | |||||||
| 			return nil, nil, err | 			return nil, nil, err | ||||||
| 		} | 		} | ||||||
| 		k.connected.Store(1) | 		k.connected.Store(1) | ||||||
|  |  | ||||||
|  | 		if fatalOnError { | ||||||
|  | 			go func() { | ||||||
|  | 				c := 3 | ||||||
|  | 				n := 0 | ||||||
|  | 				tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second) | ||||||
|  | 				defer tc.Stop() | ||||||
|  | 				for range tc.C { | ||||||
|  | 					if k.connected.Load() == 0 { | ||||||
|  | 						if n > c { | ||||||
|  | 							k.opts.Logger.Fatal(context.Background(), "broker fatal error") | ||||||
|  | 						} | ||||||
|  | 						n++ | ||||||
|  | 					} else { | ||||||
|  | 						n = 0 | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 			}() | ||||||
|  | 		} | ||||||
| 		return c, htracer, nil | 		return c, htracer, nil | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	k.connected.Store(0) | 	k.connected.Store(0) | ||||||
|  | 	close(k.done) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if options.Context != nil { |  | ||||||
| 		if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v { |  | ||||||
| 			fatalOnError = v |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	sub := &Subscriber{ | 	sub := &Subscriber{ | ||||||
| 		topic:        topic, | 		topic:        topic, | ||||||
| 		opts:         options, | 		opts:         options, | ||||||
| @@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker { | |||||||
| 		connected: &atomic.Uint32{}, | 		connected: &atomic.Uint32{}, | ||||||
| 		opts:      options, | 		opts:      options, | ||||||
| 		kopts:     kopts, | 		kopts:     kopts, | ||||||
|  | 		done:      make(chan struct{}), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) { | |||||||
|  |  | ||||||
| 				s.Lock() | 				s.Lock() | ||||||
| 				for p, l := range lmap { | 				for p, l := range lmap { | ||||||
| 					s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag))) | 					s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag)) | ||||||
| 				} | 				} | ||||||
| 				s.Unlock() | 				s.Unlock() | ||||||
|  |  | ||||||
| @@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | |||||||
| 	for topic, partitions := range lost { | 	for topic, partitions := range lost { | ||||||
| 		for _, partition := range partitions { | 		for _, partition := range partitions { | ||||||
| 			tps := tp{topic, partition} | 			tps := tp{topic, partition} | ||||||
| 			pc := s.consumers[tps] | 			pc, ok := s.consumers[tps] | ||||||
|  | 			if !ok { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			delete(s.consumers, tps) | 			delete(s.consumers, tps) | ||||||
| 			close(pc.quit) | 			close(pc.quit) | ||||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||||
| @@ -196,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str | |||||||
| 	s.killConsumers(ctx, revoked) | 	s.killConsumers(ctx, revoked) | ||||||
| 	if err := c.CommitMarkedOffsets(ctx); err != nil { | 	if err := c.CommitMarkedOffsets(ctx); err != nil { | ||||||
| 		s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) | 		s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) | ||||||
| 		// s.connected.Store(0) | 		s.connected.Store(0) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user