fix consume/killconsume #157

Open
devstigneev wants to merge 7 commits from devstigneev/micro-broker-kgo:v4_fix into v4
Member

Pull Request template

Please, go through these steps before clicking submit on this PR.

  1. Give a descriptive title to your PR.
  2. Provide a description of your changes.
  3. Make sure you have some relevant tests.
  4. Put closes #XXXX in your comment to auto-close the issue that your PR fixes (if applicable).

PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING

## Pull Request template Please, go through these steps before clicking submit on this PR. 1. Give a descriptive title to your PR. 2. Provide a description of your changes. 3. Make sure you have some relevant tests. 4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable). **PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
devstigneev added 1 commit 2025-11-27 13:15:37 +03:00
fix consume/killconsume
Some checks failed
coverage / build (pull_request) Failing after 3m29s
lint / lint (pull_request) Failing after 4m59s
test / test (pull_request) Failing after 18m36s
28cb71af46
devstigneev added 2 commits 2025-12-05 15:35:42 +03:00
Merge remote-tracking branch 'fork/v4_fix' into v4_fix
Some checks failed
coverage / build (pull_request) Failing after 1m24s
lint / lint (pull_request) Failing after 1m14s
test / test (pull_request) Failing after 2m33s
553f729b25
# Conflicts:
#	subscriber.go
devstigneev added 1 commit 2025-12-05 15:38:57 +03:00
removed old version golangcylint.yaml
Some checks failed
lint / lint (pull_request) Failing after 1m27s
coverage / build (pull_request) Failing after 1m33s
test / test (pull_request) Failing after 17m0s
bd97d3416e
vtolstov approved these changes 2025-12-06 13:13:54 +03:00
Dismissed
subscriber.go Outdated
@@ -166,0 +177,4 @@
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
Owner

вынести юзеру и убрать лог

вынести юзеру и убрать лог
Author
Member

killConsumers, вроде отсюда особо юзеру нечего отправить, оставил просто на дебаг уровне ошибки

killConsumers, вроде отсюда особо юзеру нечего отправить, оставил просто на дебаг уровне ошибки
subscriber.go Outdated
@@ -305,3 +328,1 @@
err = h(pm)
case func([]broker.Message) error:
err = h([]broker.Message{pm})
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle
Owner

давай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77
грейсфул таймаут, и тут его будем использовать

давай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77 грейсфул таймаут, и тут его будем использовать
@@ -308,0 +340,4 @@
select {
case err = <-errChan:
case <-processCtx.Done():
err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
Owner

тут лучше возвращать context.ErrTimeout

тут лучше возвращать context.ErrTimeout
subscriber.go Outdated
@@ -335,6 +372,8 @@ func (pc *consumer) consume() {
continue
}
pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
Owner

это на дебаг левел, так как обработка ошибок перенесена на юзера

это на дебаг левел, так как обработка ошибок перенесена на юзера
vtolstov requested changes 2025-12-06 13:14:16 +03:00
vtolstov left a comment
Owner

пару моментов поправить

пару моментов поправить
devstigneev added 1 commit 2025-12-10 15:30:07 +03:00
corrected by comments
Some checks failed
test / test (pull_request) Failing after 14m30s
lint / lint (pull_request) Failing after 14m42s
coverage / build (pull_request) Failing after 14m48s
2919dd5994
devstigneev added 1 commit 2025-12-10 16:34:54 +03:00
add test rebalance for killconsumers
Some checks failed
test / test (pull_request) Failing after 16m5s
lint / lint (pull_request) Failing after 16m17s
coverage / build (pull_request) Failing after 16m22s
f98811b93e
devstigneev added 1 commit 2025-12-10 16:52:00 +03:00
fix datarace
Some checks failed
test / test (pull_request) Failing after 18m15s
lint / lint (pull_request) Failing after 18m27s
coverage / build (pull_request) Failing after 18m33s
813999e969
Some checks failed
test / test (pull_request) Failing after 18m15s
lint / lint (pull_request) Failing after 18m27s
coverage / build (pull_request) Failing after 18m33s
This pull request can be merged automatically.
You are not authorized to merge this pull request.
View command line instructions

Checkout

From your project repository, check out a new branch and test the changes.
git fetch -u v4_fix:devstigneev-v4_fix
git checkout devstigneev-v4_fix
Sign in to join this conversation.