fix consume/killconsume #157

Open
devstigneev wants to merge 4 commits from devstigneev/micro-broker-kgo:v4_fix into v4
Member

Pull Request template

Please, go through these steps before clicking submit on this PR.

  1. Give a descriptive title to your PR.
  2. Provide a description of your changes.
  3. Make sure you have some relevant tests.
  4. Put closes #XXXX in your comment to auto-close the issue that your PR fixes (if applicable).

PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING

## Pull Request template Please, go through these steps before clicking submit on this PR. 1. Give a descriptive title to your PR. 2. Provide a description of your changes. 3. Make sure you have some relevant tests. 4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable). **PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
devstigneev added 1 commit 2025-11-27 13:15:37 +03:00
fix consume/killconsume
Some checks failed
coverage / build (pull_request) Failing after 3m29s
lint / lint (pull_request) Failing after 4m59s
test / test (pull_request) Failing after 18m36s
28cb71af46
devstigneev added 2 commits 2025-12-05 15:35:42 +03:00
Merge remote-tracking branch 'fork/v4_fix' into v4_fix
Some checks failed
coverage / build (pull_request) Failing after 1m24s
lint / lint (pull_request) Failing after 1m14s
test / test (pull_request) Failing after 2m33s
553f729b25
# Conflicts:
#	subscriber.go
devstigneev added 1 commit 2025-12-05 15:38:57 +03:00
removed old version golangcylint.yaml
Some checks failed
lint / lint (pull_request) Failing after 1m27s
coverage / build (pull_request) Failing after 1m33s
test / test (pull_request) Failing after 17m0s
bd97d3416e
vtolstov approved these changes 2025-12-06 13:13:54 +03:00
Dismissed
@@ -166,0 +177,4 @@
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
Owner

вынести юзеру и убрать лог

вынести юзеру и убрать лог
@@ -305,3 +328,1 @@
err = h(pm)
case func([]broker.Message) error:
err = h([]broker.Message{pm})
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle
Owner

давай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77
грейсфул таймаут, и тут его будем использовать

давай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77 грейсфул таймаут, и тут его будем использовать
@@ -308,0 +340,4 @@
select {
case err = <-errChan:
case <-processCtx.Done():
err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
Owner

тут лучше возвращать context.ErrTimeout

тут лучше возвращать context.ErrTimeout
@@ -335,6 +372,8 @@ func (pc *consumer) consume() {
continue
}
pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
Owner

это на дебаг левел, так как обработка ошибок перенесена на юзера

это на дебаг левел, так как обработка ошибок перенесена на юзера
vtolstov requested changes 2025-12-06 13:14:16 +03:00
vtolstov left a comment
Owner

пару моментов поправить

пару моментов поправить
Some checks failed
lint / lint (pull_request) Failing after 1m27s
coverage / build (pull_request) Failing after 1m33s
test / test (pull_request) Failing after 17m0s
This pull request can be merged automatically.
You are not authorized to merge this pull request.
View command line instructions

Checkout

From your project repository, check out a new branch and test the changes.
git fetch -u v4_fix:devstigneev-v4_fix
git checkout devstigneev-v4_fix
Sign in to join this conversation.