Compare commits
	
		
			3 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 0845f4873b | |||
| 747e35148f | |||
| 21f06fee6c | 
							
								
								
									
										30
									
								
								broker.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								broker.go
									
									
									
									
									
								
							@@ -28,42 +28,44 @@ var (
 | 
				
			|||||||
func (m *hookEvent) OnGroupManageError(err error) {
 | 
					func (m *hookEvent) OnGroupManageError(err error) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		m.connected.Store(0)
 | 
							m.connected.Store(0)
 | 
				
			||||||
		if m.fatalOnError {
 | 
							// if m.fatalOnError {
 | 
				
			||||||
			m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err)
 | 
							m.log.Error(context.TODO(), "kgo.OnGroupManageError", err)
 | 
				
			||||||
		}
 | 
							//}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
 | 
					func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		m.connected.Store(0)
 | 
							// m.connected.Store(0)
 | 
				
			||||||
		if m.fatalOnError {
 | 
							// if m.fatalOnError {
 | 
				
			||||||
			m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err)
 | 
							m.log.Error(context.TODO(), "kgo.OnBrokerConnect", err)
 | 
				
			||||||
		}
 | 
							//}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
 | 
					func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
 | 
				
			||||||
	m.connected.Store(0)
 | 
						// m.connected.Store(0)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
 | 
					func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		m.connected.Store(0)
 | 
							// m.connected.Store(0)
 | 
				
			||||||
		if m.fatalOnError {
 | 
							// if m.fatalOnError {
 | 
				
			||||||
			m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err)
 | 
							m.log.Error(context.TODO(), "kgo.OnBrokerWrite", err)
 | 
				
			||||||
		}
 | 
							//}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
 | 
					func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		m.connected.Store(0)
 | 
							// m.connected.Store(0)
 | 
				
			||||||
 | 
							m.log.Error(context.TODO(), "kgo.OnBrokerRead", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
 | 
					func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		m.connected.Store(0)
 | 
							// m.connected.Store(0)
 | 
				
			||||||
 | 
							m.log.Error(context.TODO(), "kgo.OnProduceRecordUnbuffered", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								kgo.go
									
									
									
									
									
								
							@@ -19,6 +19,7 @@ import (
 | 
				
			|||||||
	"go.unistack.org/micro/v3/metadata"
 | 
						"go.unistack.org/micro/v3/metadata"
 | 
				
			||||||
	"go.unistack.org/micro/v3/semconv"
 | 
						"go.unistack.org/micro/v3/semconv"
 | 
				
			||||||
	"go.unistack.org/micro/v3/tracer"
 | 
						"go.unistack.org/micro/v3/tracer"
 | 
				
			||||||
 | 
						mjitter "go.unistack.org/micro/v3/util/jitter"
 | 
				
			||||||
	mrand "go.unistack.org/micro/v3/util/rand"
 | 
						mrand "go.unistack.org/micro/v3/util/rand"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -66,6 +67,8 @@ type Broker struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	sync.RWMutex
 | 
						sync.RWMutex
 | 
				
			||||||
	init bool
 | 
						init bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						done chan struct{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Broker) Live() bool {
 | 
					func (r *Broker) Live() bool {
 | 
				
			||||||
@@ -143,6 +146,25 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho
 | 
				
			|||||||
			return nil, nil, err
 | 
								return nil, nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		k.connected.Store(1)
 | 
							k.connected.Store(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if fatalOnError {
 | 
				
			||||||
 | 
								go func() {
 | 
				
			||||||
 | 
									c := 3
 | 
				
			||||||
 | 
									n := 0
 | 
				
			||||||
 | 
									tc := mjitter.NewTicker(500*time.Millisecond, 1*time.Second)
 | 
				
			||||||
 | 
									defer tc.Stop()
 | 
				
			||||||
 | 
									for range tc.C {
 | 
				
			||||||
 | 
										if k.connected.Load() == 0 {
 | 
				
			||||||
 | 
											if n > c {
 | 
				
			||||||
 | 
												k.opts.Logger.Fatal(context.Background(), "broker fatal error")
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
											n++
 | 
				
			||||||
 | 
										} else {
 | 
				
			||||||
 | 
											n = 0
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return c, htracer, nil
 | 
							return c, htracer, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -204,6 +226,7 @@ func (k *Broker) Disconnect(ctx context.Context) error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	k.connected.Store(0)
 | 
						k.connected.Store(0)
 | 
				
			||||||
 | 
						close(k.done)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -391,12 +414,6 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if options.Context != nil {
 | 
					 | 
				
			||||||
		if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v {
 | 
					 | 
				
			||||||
			fatalOnError = v
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	sub := &Subscriber{
 | 
						sub := &Subscriber{
 | 
				
			||||||
		topic:        topic,
 | 
							topic:        topic,
 | 
				
			||||||
		opts:         options,
 | 
							opts:         options,
 | 
				
			||||||
@@ -492,5 +509,6 @@ func NewBroker(opts ...broker.Option) *Broker {
 | 
				
			|||||||
		connected: &atomic.Uint32{},
 | 
							connected: &atomic.Uint32{},
 | 
				
			||||||
		opts:      options,
 | 
							opts:      options,
 | 
				
			||||||
		kopts:     kopts,
 | 
							kopts:     kopts,
 | 
				
			||||||
 | 
							done:      make(chan struct{}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
 | 
				
			|||||||
	for topic, partitions := range lost {
 | 
						for topic, partitions := range lost {
 | 
				
			||||||
		for _, partition := range partitions {
 | 
							for _, partition := range partitions {
 | 
				
			||||||
			tps := tp{topic, partition}
 | 
								tps := tp{topic, partition}
 | 
				
			||||||
			pc := s.consumers[tps]
 | 
								pc, ok := s.consumers[tps]
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			delete(s.consumers, tps)
 | 
								delete(s.consumers, tps)
 | 
				
			||||||
			close(pc.quit)
 | 
								close(pc.quit)
 | 
				
			||||||
			if s.kopts.Logger.V(logger.DebugLevel) {
 | 
								if s.kopts.Logger.V(logger.DebugLevel) {
 | 
				
			||||||
@@ -196,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
 | 
				
			|||||||
	s.killConsumers(ctx, revoked)
 | 
						s.killConsumers(ctx, revoked)
 | 
				
			||||||
	if err := c.CommitMarkedOffsets(ctx); err != nil {
 | 
						if err := c.CommitMarkedOffsets(ctx); err != nil {
 | 
				
			||||||
		s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
 | 
							s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
 | 
				
			||||||
		// s.connected.Store(0)
 | 
							s.connected.Store(0)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user