Compare commits

...

9 Commits

Author SHA1 Message Date
4cc2e888fc Merge remote-tracking branch 'github' into v4 2026-02-09 19:55:28 +03:00
vtolstov
2485d96a6e Apply Code Coverage Badge 2026-02-03 10:16:01 +00:00
c579507af6 Merge remote-tracking branch 'github/v4' into v4 2026-02-03 13:15:48 +03:00
dependabot[bot]
b98878827f Bump golang.org/x/crypto from 0.43.0 to 0.45.0 (#196)
All checks were successful
coverage / build (push) Successful in 3m13s
test / test (push) Successful in 4m51s
Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.43.0 to 0.45.0.
- [Commits](https://github.com/golang/crypto/compare/v0.43.0...v0.45.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-version: 0.45.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2026-02-03 13:13:40 +03:00
dependabot[bot]
981bb67004 Bump golang.org/x/crypto from 0.43.0 to 0.45.0 (#196)
Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.43.0 to 0.45.0.
- [Commits](https://github.com/golang/crypto/compare/v0.43.0...v0.45.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-version: 0.45.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2026-02-03 09:32:40 +03:00
d0a959611d Merge pull request 'correction create opts' (#160) from devstigneev/micro-broker-kgo:fix_opts into v4
Reviewed-on: #160
2026-01-30 09:58:17 +03:00
Evstigneev Denis
acb7fd2b11 correction create opts 2026-01-30 09:52:41 +03:00
5080a52834 fixup revoked error
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-02 21:50:53 +03:00
fe5d474f36 fixup error
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-02 21:46:03 +03:00
5 changed files with 30 additions and 29 deletions

View File

@@ -1,2 +1,2 @@
# micro-broker-kgo
![Coverage](https://img.shields.io/badge/Coverage-64.8%25-yellow)
![Coverage](https://img.shields.io/badge/Coverage-62.0%25-yellow)

2
go.mod
View File

@@ -21,7 +21,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/spf13/cast v1.10.0 // indirect
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

14
go.sum
View File

@@ -6,8 +6,6 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -26,30 +24,22 @@ github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
github.com/twmb/franz-go v1.20.2 h1:CiwhyKZHW6vqSHJkh+RTxFAJkio0jBjM/JQhx/HZ72A=
github.com/twmb/franz-go v1.20.2/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw=
github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc=
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3 h1:p24opKWPySAy8xSl8NqRgOv7Q+bX7kdrQirBVRJzQfo=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3/go.mod h1:7uQs3Ae6HkWT1Y9elMbqtAcNFCI0y6+iS+Phw49L49U=
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
go.unistack.org/micro/v4 v4.1.21 h1:F9PrbI1BhXSDS0FopwcO5wWrT2Xh38w9VhVZZBHjfg8=
go.unistack.org/micro/v4 v4.1.21/go.mod h1:nlBXTbx0rQrSZX4HPp2m57PHmpuGPWUd0O+jpUIiPto=
go.unistack.org/micro/v4 v4.1.24 h1:PbkSWJS3ssB5A0y0tOdOw6u9e2Mk4yombF4yR0Jshvo=
go.unistack.org/micro/v4 v4.1.24/go.mod h1:nlBXTbx0rQrSZX4HPp2m57PHmpuGPWUd0O+jpUIiPto=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

29
kgo.go
View File

@@ -568,19 +568,22 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
messagePool: messagePool,
}
kopts := append(b.kopts,
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(1*time.Second),
kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsAssigned(sub.assigned),
kgo.OnPartitionsRevoked(sub.revoked),
kgo.StopProducerOnDataLossDetected(),
kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitCallback(sub.autocommit),
kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
kopts := append(
[]kgo.Opt{
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(1 * time.Second),
kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsAssigned(sub.assigned),
kgo.OnPartitionsRevoked(sub.revoked),
kgo.StopProducerOnDataLossDetected(),
kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitCallback(sub.autocommit),
kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
},
b.kopts...,
)
if options.Context != nil {

View File

@@ -123,7 +123,7 @@ func (s *Subscriber) poll(ctx context.Context) {
c := s.consumers[tps]
s.mu.Unlock()
if c != nil {
c.recs <- newErrorFetchTopicPartition(kgo.ErrClientClosed, t, p)
c.recs <- newErrorFetchTopicPartition(err, t, p)
}
})
@@ -199,7 +199,15 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
}
s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil {
s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
s.mu.Lock()
tpc := make(map[tp]*consumer, len(s.consumers))
maps.Copy(tpc, s.consumers)
s.mu.Unlock()
for tp, c := range tpc {
if c != nil {
c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p)
}
}
}
}