add wgWaitGroup
Some checks failed
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 10s
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled

This commit is contained in:
Денис Евстигнеев 2024-04-07 21:39:21 +03:00
parent 9c4d88bb69
commit 3fbac80e95
3 changed files with 27 additions and 1 deletions

9
.gitignore vendored
View File

@ -4,6 +4,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test
@ -13,3 +14,11 @@
# Dependency directories (remove the comment below to include it) # Dependency directories (remove the comment below to include it)
# vendor/ # vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

4
go.mod
View File

@ -2,6 +2,10 @@ module go.unistack.org/micro-broker-kgo/v3
go 1.19 go 1.19
replace (
go.unistack.org/micro/v3 => ../micro
)
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go v1.16.1

View File

@ -106,6 +106,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
if s.kopts.Wait != nil {
ctx, cancel := context.WithTimeout(ctx, s.kopts.GracefulTimeout)
defer cancel()
s.kopts.Wait.WaitContext(ctx)
}
for topic, partitions := range lost { for topic, partitions := range lost {
for _, partition := range partitions { for _, partition := range partitions {
tp := tp{topic, partition} tp := tp{topic, partition}
@ -148,7 +154,14 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
opts: s.opts, opts: s.opts,
} }
s.consumers[tp{topic, partition}] = pc s.consumers[tp{topic, partition}] = pc
go pc.consume()
go func() {
if s.kopts.Wait != nil {
s.kopts.Wait.Add(1)
defer s.kopts.Wait.Done()
}
pc.consume()
}()
} }
} }
} }