Compare commits
	
		
			3 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 0845f4873b | |||
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										30
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								broker.go
									
									
									
									
									
								
							| @@ -28,42 +28,44 @@ var ( | ||||
| func (m *hookEvent) OnGroupManageError(err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) | ||||
| 		} | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) | ||||
| 		//} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) | ||||
| 		} | ||||
| 		// m.connected.Store(0) | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err) | ||||
| 		//} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { | ||||
| 	m.connected.Store(0) | ||||
| 	// m.connected.Store(0) | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		if m.fatalOnError { | ||||
| 			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) | ||||
| 		} | ||||
| 		// m.connected.Store(0) | ||||
| 		// if m.fatalOnError { | ||||
| 		m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err) | ||||
| 		//} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		m.log.Error(context.TODO(), "kgo.OnBrokerRead", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { | ||||
| 	if err != nil { | ||||
| 		m.connected.Store(0) | ||||
| 		// m.connected.Store(0) | ||||
| 		m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err) | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -19,6 +19,7 @@ import ( | ||||
| 	"go.unistack.org/micro/v3/metadata" | ||||
| 	"go.unistack.org/micro/v3/semconv" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
| 	mjitter "go.unistack.org/micro/v3/util/jitter" | ||||
| 	mrand "go.unistack.org/micro/v3/util/rand" | ||||
| ) | ||||
|  | ||||
| @@ -66,6 +67,8 @@ type Broker struct { | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	init bool | ||||
|  | ||||
| 	done chan struct{} | ||||
| } | ||||
|  | ||||
| func (r *Broker) Live() bool { | ||||
| @@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho | ||||
| 			return nil, nil, err | ||||
| 		} | ||||
| 		k.connected.Store(1) | ||||
|  | ||||
| 		if fatalOnError { | ||||
| 			go func() { | ||||
| 				c := 3 | ||||
| 				n := 0 | ||||
| 				tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second) | ||||
| 				defer tc.Stop() | ||||
| 				for range tc.C { | ||||
| 					if k.connected.Load() == 0 { | ||||
| 						if n > c { | ||||
| 							k.opts.Logger.Fatal(context.Background(), "broker fatal error") | ||||
| 						} | ||||
| 						n++ | ||||
| 					} else { | ||||
| 						n = 0 | ||||
| 					} | ||||
| 				} | ||||
| 			}() | ||||
| 		} | ||||
| 		return c, htracer, nil | ||||
| 	} | ||||
| } | ||||
| @@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error { | ||||
| 	} | ||||
|  | ||||
| 	k.connected.Store(0) | ||||
| 	close(k.done) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if options.Context != nil { | ||||
| 		if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v { | ||||
| 			fatalOnError = v | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	sub := &Subscriber{ | ||||
| 		topic:        topic, | ||||
| 		opts:         options, | ||||
| @@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker { | ||||
| 		connected: &atomic.Uint32{}, | ||||
| 		opts:      options, | ||||
| 		kopts:     kopts, | ||||
| 		done:      make(chan struct{}), | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) | ||||
| 	for topic, partitions := range lost { | ||||
| 		for _, partition := range partitions { | ||||
| 			tps := tp{topic, partition} | ||||
| 			pc := s.consumers[tps] | ||||
| 			pc, ok := s.consumers[tps] | ||||
| 			if !ok { | ||||
| 				continue | ||||
| 			} | ||||
| 			delete(s.consumers, tps) | ||||
| 			close(pc.quit) | ||||
| 			if s.kopts.Logger.V(logger.DebugLevel) { | ||||
| @@ -196,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str | ||||
| 	s.killConsumers(ctx, revoked) | ||||
| 	if err := c.CommitMarkedOffsets(ctx); err != nil { | ||||
| 		s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) | ||||
| 		// s.connected.Store(0) | ||||
| 		s.connected.Store(0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user