fix consume/killconsume #157
@@ -1,5 +0,0 @@
|
|||||||
run:
|
|
||||||
concurrency: 8
|
|
||||||
timeout: 5m
|
|
||||||
issues-exit-code: 1
|
|
||||||
tests: true
|
|
||||||
@@ -121,7 +121,7 @@ func (s *Subscriber) poll(ctx context.Context) {
|
|||||||
tps := tp{t, p}
|
tps := tp{t, p}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
c := s.consumers[tps]
|
c := s.consumers[tps]
|
||||||
s.mu.Unlock()
|
s.mu.RUnlock()
|
||||||
if c != nil {
|
if c != nil {
|
||||||
c.recs <- newErrorFetchTopicPartition(err, t, p)
|
c.recs <- newErrorFetchTopicPartition(err, t, p)
|
||||||
}
|
}
|
||||||
@@ -129,9 +129,9 @@ func (s *Subscriber) poll(ctx context.Context) {
|
|||||||
|
|
||||||
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
||||||
tps := tp{p.Topic, p.Partition}
|
tps := tp{p.Topic, p.Partition}
|
||||||
s.mu.Lock()
|
s.mu.RLock()
|
||||||
c := s.consumers[tps]
|
c := s.consumers[tps]
|
||||||
s.mu.Unlock()
|
s.mu.RUnlock()
|
||||||
if c != nil {
|
if c != nil {
|
||||||
c.recs <- p
|
c.recs <- p
|
||||||
}
|
}
|
||||||
@@ -150,19 +150,36 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
|||||||
tps := tp{topic, partition}
|
tps := tp{topic, partition}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
pc, ok := s.consumers[tps]
|
pc, ok := s.consumers[tps]
|
||||||
|
if ok {
|
||||||
|
delete(s.consumers, tps)
|
||||||
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if !ok || pc == nil {
|
if !ok || pc == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
|
||||||
delete(s.consumers, tps)
|
|
||||||
s.mu.Unlock()
|
|
||||||
close(pc.quit)
|
|
||||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))
|
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close(pc.quit)
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() { <-pc.done; wg.Done() }()
|
go func(c *consumer, t string, p int32) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer
|
||||||
|
defer timeout.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.done:
|
||||||
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
|
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
|
||||||
|
}
|
||||||
|
case <-timeout.C:
|
||||||
|
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
|
||||||
|
|
|||||||
|
}
|
||||||
|
}(pc, topic, partition)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -197,7 +214,6 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
|||||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked))
|
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked))
|
||||||
}
|
}
|
||||||
s.killConsumers(ctx, revoked)
|
|
||||||
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
tpc := make(map[tp]*consumer, len(s.consumers))
|
tpc := make(map[tp]*consumer, len(s.consumers))
|
||||||
@@ -209,6 +225,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.killConsumers(ctx, revoked)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
|
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
|
||||||
@@ -251,7 +268,10 @@ func (pc *consumer) consume() {
|
|||||||
select {
|
select {
|
||||||
case <-pc.quit:
|
case <-pc.quit:
|
||||||
return
|
return
|
||||||
case p := <-pc.recs:
|
case p, ok := <-pc.recs:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
if p.Err != nil || p.FetchPartition.Err != nil {
|
if p.Err != nil || p.FetchPartition.Err != nil {
|
||||||
|
|
||||||
if p.Err != nil {
|
if p.Err != nil {
|
||||||
@@ -274,6 +294,11 @@ func (pc *consumer) consume() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, record := range p.Records {
|
for _, record := range p.Records {
|
||||||
|
select {
|
||||||
|
case <-pc.quit:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
ctx, sp := pc.htracer.WithProcessSpan(record)
|
ctx, sp := pc.htracer.WithProcessSpan(record)
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
|
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
|
||||||
@@ -300,12 +325,24 @@ func (pc *consumer) consume() {
|
|||||||
pm.hdr.Set("Micro-Key", string(record.Key))
|
pm.hdr.Set("Micro-Key", string(record.Key))
|
||||||
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
|
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
|
||||||
|
|
||||||
switch h := pc.handler.(type) {
|
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle
|
||||||
|
vtolstov
commented
давай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77 давай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77
грейсфул таймаут, и тут его будем использовать
|
|||||||
case func(broker.Message) error:
|
errChan := make(chan error, 1)
|
||||||
err = h(pm)
|
|
||||||
case func([]broker.Message) error:
|
go func() {
|
||||||
err = h([]broker.Message{pm})
|
switch h := pc.handler.(type) {
|
||||||
|
case func(broker.Message) error:
|
||||||
|
errChan <- h(pm)
|
||||||
|
case func([]broker.Message) error:
|
||||||
|
errChan <- h([]broker.Message{pm})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err = <-errChan:
|
||||||
|
case <-processCtx.Done():
|
||||||
|
err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
|
||||||
|
vtolstov
commented
тут лучше возвращать context.ErrTimeout тут лучше возвращать context.ErrTimeout
|
|||||||
}
|
}
|
||||||
|
cancel()
|
||||||
|
|
||||||
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -335,6 +372,8 @@ func (pc *consumer) consume() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
|
||||||
|
vtolstov
commented
это на дебаг левел, так как обработка ошибок перенесена на юзера это на дебаг левел, так как обработка ошибок перенесена на юзера
|
|||||||
|
|
||||||
pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition)
|
pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition)
|
||||||
switch h := pc.handler.(type) {
|
switch h := pc.handler.(type) {
|
||||||
case func(broker.Message) error:
|
case func(broker.Message) error:
|
||||||
|
|||||||
Reference in New Issue
Block a user
вынести юзеру и убрать лог