add wgWaitGroup #130

Open
devstigneev wants to merge 2 commits from devstigneev/micro-broker-kgo:v3_issue_120 into v3
4 changed files with 26 additions and 4 deletions

9
.gitignore vendored
View File

@ -4,6 +4,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test
@ -13,3 +14,11 @@
# Dependency directories (remove the comment below to include it) # Dependency directories (remove the comment below to include it)
# vendor/ # vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kmsg v1.7.0 github.com/twmb/franz-go/pkg/kmsg v1.7.0
go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel v1.24.0
go.unistack.org/micro/v3 v3.10.46 go.unistack.org/micro/v3 v3.10.57
) )
require ( require (

4
go.sum
View File

@ -16,8 +16,8 @@ github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqj
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU= go.unistack.org/micro/v3 v3.10.57 h1:VxG7Cs7kBOgxgQlP+K8TvTxIgh2pmqAwmAaKdTAQNtQ=
go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= go.unistack.org/micro/v3 v3.10.57/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=

View File

@ -106,6 +106,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
if s.kopts.Wait != nil {
ctx, cancel := context.WithTimeout(ctx, s.kopts.GracefulTimeout)
defer cancel()
s.kopts.Wait.WaitContext(ctx)
}
for topic, partitions := range lost { for topic, partitions := range lost {
for _, partition := range partitions { for _, partition := range partitions {
tp := tp{topic, partition} tp := tp{topic, partition}
@ -148,7 +154,14 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
opts: s.opts, opts: s.opts,
} }
s.consumers[tp{topic, partition}] = pc s.consumers[tp{topic, partition}] = pc
go pc.consume()
go func() {
if s.kopts.Wait != nil {
s.kopts.Wait.Add(1)
defer s.kopts.Wait.Done()
}
pc.consume()
}()
} }
} }
} }