dont modify connected state on errors

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-05-02 19:08:54 +03:00
parent db97b24904
commit 05ad1a693e
3 changed files with 19 additions and 27 deletions

View File

@@ -3,58 +3,50 @@ name: sync
on: on:
schedule: schedule:
- cron: '*/5 * * * *' - cron: '*/5 * * * *'
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
# Allows you to run this workflow manually from the Actions tab # Allows you to run this workflow manually from the Actions tab
workflow_dispatch: workflow_dispatch:
jobs: jobs:
sync: sync:
if: env.GITHUB_ACTION == 0 if: github.server_url == 'zhttps://github.com'
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: init - name: init
run: | run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>" git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]" git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" | tee -a /root/.netrc echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" | tee -a /root/.netrc echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: sync master - name: sync master
run: | run: |
git clone --depth=10 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY} git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream master git merge upstream/master
git push upstream master --progress git push upstream master --progress
git merge --allow-unrelated-histories "upstream/master"
git push origin master --progress git push origin master --progress
cd ../ cd ../
rm -rf repo rm -rf repo
- name: sync v3 - name: sync v3
run: | run: |
git clone --depth=10 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY} git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v3 git merge upstream/v3
git push upstream v3 git push upstream v3 --progress
git merge --allow-unrelated-histories "upstream/v3"
git push origin v3 --progress git push origin v3 --progress
cd ../ cd ../
rm -rf repo rm -rf repo
- name: sync v4 - name: sync v4
run: | run: |
git clone --depth=10 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY} git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v4 git merge upstream/v4
git push upstream v4 git push upstream v4 --progress
git merge --allow-unrelated-histories "upstream/v4"
git push origin v4 --progress git push origin v4 --progress
cd ../ cd ../
rm -rf repo rm -rf repo

View File

@@ -27,7 +27,7 @@ var (
func (m *hookEvent) OnGroupManageError(err error) { func (m *hookEvent) OnGroupManageError(err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
if m.fatalOnError { if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err) m.log.Fatal(context.TODO(), "kgo.OnGroupManageError", err)
} }
@@ -36,7 +36,7 @@ func (m *hookEvent) OnGroupManageError(err error) {
func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
if m.fatalOnError { if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err) m.log.Fatal(context.TODO(), "kgo.OnBrokerConnect", err)
} }
@@ -44,12 +44,12 @@ func (m *hookEvent) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net
} }
func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { func (m *hookEvent) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) {
m.connected.Store(0) // m.connected.Store(0)
} }
func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
if m.fatalOnError { if m.fatalOnError {
m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err) m.log.Fatal(context.TODO(), "kgo.OnBrokerWrite", err)
} }
@@ -58,12 +58,12 @@ func (m *hookEvent) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.D
func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { func (m *hookEvent) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
} }
} }
func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { func (m *hookEvent) OnProduceRecordUnbuffered(_ *kgo.Record, err error) {
if err != nil { if err != nil {
m.connected.Store(0) // m.connected.Store(0)
} }
} }

View File

@@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil { if err != nil {
s.connected.Store(0) // s.connected.Store(0)
if s.fatalOnError { if s.fatalOnError {
s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err)
} }
@@ -282,7 +282,7 @@ func (pc *consumer) consume() {
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
} else { } else {
sp.Finish() sp.Finish()
pc.connected.Store(0) // pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited") pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited")
return return
} }