From 3fbac80e956497912e3cc69c556ec2ec04210566 Mon Sep 17 00:00:00 2001 From: devstigneev Date: Sun, 7 Apr 2024 21:39:21 +0300 Subject: [PATCH] add wgWaitGroup --- .gitignore | 9 +++++++++ go.mod | 4 ++++ subscriber.go | 15 ++++++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 66fd13c..9e16696 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +bin # Test binary, built with `go test -c` *.test @@ -13,3 +14,11 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# Go workspace file +go.work + +# General +.DS_Store +.idea +.vscode \ No newline at end of file diff --git a/go.mod b/go.mod index b65849b..27467d2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module go.unistack.org/micro-broker-kgo/v3 go 1.19 +replace ( + go.unistack.org/micro/v3 => ../micro +) + require ( github.com/google/uuid v1.6.0 github.com/twmb/franz-go v1.16.1 diff --git a/subscriber.go b/subscriber.go index dc85d92..6d227a0 100644 --- a/subscriber.go +++ b/subscriber.go @@ -106,6 +106,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) var wg sync.WaitGroup defer wg.Wait() + if s.kopts.Wait != nil { + ctx, cancel := context.WithTimeout(ctx, s.kopts.GracefulTimeout) + defer cancel() + s.kopts.Wait.WaitContext(ctx) + } + for topic, partitions := range lost { for _, partition := range partitions { tp := tp{topic, partition} @@ -148,7 +154,14 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str opts: s.opts, } s.consumers[tp{topic, partition}] = pc - go pc.consume() + + go func() { + if s.kopts.Wait != nil { + s.kopts.Wait.Add(1) + defer s.kopts.Wait.Done() + } + pc.consume() + }() } } }