add wgWaitGroup #130
9
.gitignore
vendored
9
.gitignore
vendored
@ -4,6 +4,7 @@
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
bin
|
||||
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
@ -13,3 +14,11 @@
|
||||
|
||||
# Dependency directories (remove the comment below to include it)
|
||||
# vendor/
|
||||
|
||||
# Go workspace file
|
||||
go.work
|
||||
|
||||
# General
|
||||
.DS_Store
|
||||
.idea
|
||||
.vscode
|
2
go.mod
2
go.mod
@ -7,7 +7,7 @@ require (
|
||||
github.com/twmb/franz-go v1.16.1
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0
|
||||
go.opentelemetry.io/otel v1.24.0
|
||||
go.unistack.org/micro/v3 v3.10.46
|
||||
go.unistack.org/micro/v3 v3.10.57
|
||||
)
|
||||
|
||||
require (
|
||||
|
4
go.sum
4
go.sum
@ -16,8 +16,8 @@ github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqj
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
|
||||
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
|
||||
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
|
||||
go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU=
|
||||
go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
||||
go.unistack.org/micro/v3 v3.10.57 h1:VxG7Cs7kBOgxgQlP+K8TvTxIgh2pmqAwmAaKdTAQNtQ=
|
||||
go.unistack.org/micro/v3 v3.10.57/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
||||
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
|
||||
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
|
||||
|
@ -106,6 +106,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
if s.kopts.Wait != nil {
|
||||
ctx, cancel := context.WithTimeout(ctx, s.kopts.GracefulTimeout)
|
||||
defer cancel()
|
||||
s.kopts.Wait.WaitContext(ctx)
|
||||
}
|
||||
|
||||
for topic, partitions := range lost {
|
||||
for _, partition := range partitions {
|
||||
tp := tp{topic, partition}
|
||||
@ -148,7 +154,14 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
|
||||
opts: s.opts,
|
||||
}
|
||||
s.consumers[tp{topic, partition}] = pc
|
||||
go pc.consume()
|
||||
|
||||
go func() {
|
||||
if s.kopts.Wait != nil {
|
||||
s.kopts.Wait.Add(1)
|
||||
defer s.kopts.Wait.Done()
|
||||
}
|
||||
pc.consume()
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user