From ccd912adb25b3f4cf0a616bda449ea4425e4b455 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 2 May 2025 19:08:54 +0300 Subject: [PATCH] dont modify connected state on errors Signed-off-by: Vasiliy Tolstov --- .github/workflows/job_sync.yml | 30 +++++++++++------------------- broker.go | 12 ++++++------ subscriber.go | 4 ++-- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/.github/workflows/job_sync.yml b/.github/workflows/job_sync.yml index 7f4af85..ad2db1e 100644 --- a/.github/workflows/job_sync.yml +++ b/.github/workflows/job_sync.yml @@ -3,58 +3,50 @@ name: sync on: schedule: - cron: '*/5 * * * *' - push: - branches: [ master, v3, v4 ] - paths-ignore: - - '.github/**' - - '.gitea/**' # Allows you to run this workflow manually from the Actions tab workflow_dispatch: jobs: sync: - if: env.GITHUB_ACTION == 0 + if: github.server_url == 'zhttps://github.com' runs-on: ubuntu-latest steps: - name: init run: | git config --global user.email "vtolstov " git config --global user.name "github-actions[bot]" - echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" | tee -a /root/.netrc - echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" | tee -a /root/.netrc + echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc + echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc - name: sync master run: | - git clone --depth=10 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo + git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo cd repo git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY} - git pull --rebase upstream master + git merge upstream/master git push upstream master --progress - git merge --allow-unrelated-histories "upstream/master" git push origin master --progress cd ../ rm -rf repo - name: sync v3 run: | - git clone --depth=10 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo + git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo cd repo git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY} - git pull --rebase upstream v3 - git push upstream v3 - git merge --allow-unrelated-histories "upstream/v3" + git merge upstream/v3 + git push upstream v3 --progress git push origin v3 --progress cd ../ rm -rf repo - name: sync v4 run: | - git clone --depth=10 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo + git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo cd repo git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY} - git pull --rebase upstream v4 - git push upstream v4 - git merge --allow-unrelated-histories "upstream/v4" + git merge upstream/v4 + git push upstream v4 --progress git push origin v4 --progress cd ../ rm -rf repo diff --git a/broker.go b/broker.go index 868d6d0..fc0d129 100644 --- a/broker.go +++ b/broker.go @@ -27,7 +27,7 @@ var ( func (m *hookEvent) OnGroupManageError(err error) { if err != nil { - m.connected.Store(0) + // m.connected.Store(0) if m.fatalOnError { m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) } @@ -36,7 +36,7 @@ func (m *hookEvent) OnGroupManageError(err error) { func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { if err != nil { - m.connected.Store(0) + // m.connected.Store(0) if m.fatalOnError { m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) } @@ -44,12 +44,12 @@ func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net } func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { - m.connected.Store(0) + // m.connected.Store(0) } func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { if err != nil { - m.connected.Store(0) + // m.connected.Store(0) if m.fatalOnError { m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) } @@ -58,12 +58,12 @@ func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.D func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { if err != nil { - m.connected.Store(0) + // m.connected.Store(0) } } func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { if err != nil { - m.connected.Store(0) + // m.connected.Store(0) } } diff --git a/subscriber.go b/subscriber.go index d6b6eac..d23feb5 100644 --- a/subscriber.go +++ b/subscriber.go @@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { if err != nil { - s.connected.Store(0) + // s.connected.Store(0) if s.fatalOnError { s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) } @@ -282,7 +282,7 @@ func (pc *consumer) consume() { pc.c.MarkCommitRecords(record) } else { sp.Finish() - pc.connected.Store(0) + // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited") return }