fix consume/killconsume #157
Reference in New Issue
Block a user
Delete Branch "devstigneev/micro-broker-kgo:v4_fix"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Pull Request template
Please, go through these steps before clicking submit on this PR.
closes #XXXXin your comment to auto-close the issue that your PR fixes (if applicable).PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING
@@ -166,0 +177,4 @@s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))}case <-timeout.C:s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))вынести юзеру и убрать лог
@@ -305,3 +328,1 @@err = h(pm)case func([]broker.Message) error:err = h([]broker.Message{pm})processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandleдавай в брокер добавим https://git.unistack.org/unistack-org/micro/src/branch/v4/server/options.go#L77
грейсфул таймаут, и тут его будем использовать
@@ -308,0 +340,4 @@select {case err = <-errChan:case <-processCtx.Done():err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)тут лучше возвращать context.ErrTimeout
@@ -335,6 +372,8 @@ func (pc *consumer) consume() {continue}pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))это на дебаг левел, так как обработка ошибок перенесена на юзера
пару моментов поправить
View command line instructions
Checkout
From your project repository, check out a new branch and test the changes.