From 3fbac80e956497912e3cc69c556ec2ec04210566 Mon Sep 17 00:00:00 2001 From: devstigneev Date: Sun, 7 Apr 2024 21:39:21 +0300 Subject: [PATCH 1/2] add wgWaitGroup --- .gitignore | 9 +++++++++ go.mod | 4 ++++ subscriber.go | 15 ++++++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 66fd13c..9e16696 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +bin # Test binary, built with `go test -c` *.test @@ -13,3 +14,11 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# Go workspace file +go.work + +# General +.DS_Store +.idea +.vscode \ No newline at end of file diff --git a/go.mod b/go.mod index b65849b..27467d2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module go.unistack.org/micro-broker-kgo/v3 go 1.19 +replace ( + go.unistack.org/micro/v3 => ../micro +) + require ( github.com/google/uuid v1.6.0 github.com/twmb/franz-go v1.16.1 diff --git a/subscriber.go b/subscriber.go index dc85d92..6d227a0 100644 --- a/subscriber.go +++ b/subscriber.go @@ -106,6 +106,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) var wg sync.WaitGroup defer wg.Wait() + if s.kopts.Wait != nil { + ctx, cancel := context.WithTimeout(ctx, s.kopts.GracefulTimeout) + defer cancel() + s.kopts.Wait.WaitContext(ctx) + } + for topic, partitions := range lost { for _, partition := range partitions { tp := tp{topic, partition} @@ -148,7 +154,14 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str opts: s.opts, } s.consumers[tp{topic, partition}] = pc - go pc.consume() + + go func() { + if s.kopts.Wait != nil { + s.kopts.Wait.Add(1) + defer s.kopts.Wait.Done() + } + pc.consume() + }() } } } -- 2.45.2 From 6ad8e2910b9c62ba218ec9d873640abc7f7eb54d Mon Sep 17 00:00:00 2001 From: devstigneev Date: Tue, 9 Apr 2024 17:25:32 +0300 Subject: [PATCH 2/2] removed replace in gomod --- go.mod | 6 +----- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 27467d2..8dd8ee3 100644 --- a/go.mod +++ b/go.mod @@ -2,16 +2,12 @@ module go.unistack.org/micro-broker-kgo/v3 go 1.19 -replace ( - go.unistack.org/micro/v3 => ../micro -) - require ( github.com/google/uuid v1.6.0 github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go/pkg/kmsg v1.7.0 go.opentelemetry.io/otel v1.24.0 - go.unistack.org/micro/v3 v3.10.46 + go.unistack.org/micro/v3 v3.10.57 ) require ( diff --git a/go.sum b/go.sum index 6e7a5f7..e986226 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqj github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU= -go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +go.unistack.org/micro/v3 v3.10.57 h1:VxG7Cs7kBOgxgQlP+K8TvTxIgh2pmqAwmAaKdTAQNtQ= +go.unistack.org/micro/v3 v3.10.57/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -- 2.45.2