Compare commits

..

1 Commits
master ... v4

Author SHA1 Message Date
7312397abe graceful shutdown porting
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 15:25:31 +03:00
5 changed files with 34 additions and 29 deletions

10
go.mod
View File

@ -3,12 +3,12 @@ module go.unistack.org/micro-broker-kgo/v4
go 1.19 go 1.19
require ( require (
github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go v1.11.5
github.com/twmb/franz-go/pkg/kmsg v1.7.0 github.com/twmb/franz-go/pkg/kmsg v1.3.0
go.unistack.org/micro/v4 v4.0.17 go.unistack.org/micro/v4 v4.0.2
) )
require ( require (
github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect
) )

29
go.sum
View File

@ -1,11 +1,18 @@
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
go.unistack.org/micro/v4 v4.0.17 h1:mF7uM+J4ILdG+1fcwzKYCwDlxhdbF/e1WnGzKKLnIXc= go.unistack.org/micro/v4 v4.0.2 h1:2LeG6jslE50c72f1XwJhfTiidy67xklIC3saptLoUys=
go.unistack.org/micro/v4 v4.0.17/go.mod h1:ZDgU9931vm2l7X6RN/6UuwRIVp24GRdmQ7dKmegArk4= go.unistack.org/micro/v4 v4.0.2/go.mod h1:+wBa98rSf+mRXb/MuSVFPXtDrqN0k8rzPQiC8wRCwCo=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

11
kgo.go
View File

@ -53,7 +53,6 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
}() }()
type Broker struct { type Broker struct {
init bool
c *kgo.Client c *kgo.Client
kopts []kgo.Opt kopts []kgo.Opt
connected bool connected bool
@ -156,10 +155,6 @@ func (k *Broker) Init(opts ...broker.Option) error {
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
if len(opts) == 0 && k.init {
return nil
}
for _, o := range opts { for _, o := range opts {
o(&k.opts) o(&k.opts)
} }
@ -183,7 +178,6 @@ func (k *Broker) Init(opts ...broker.Option) error {
} }
} }
k.init = true
return nil return nil
} }
@ -202,10 +196,8 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock() k.RLock()
ok := k.connected if !k.connected {
k.RUnlock() k.RUnlock()
if !ok {
k.Lock() k.Lock()
c, err := k.connect(ctx, k.kopts...) c, err := k.connect(ctx, k.kopts...)
if err != nil { if err != nil {
@ -216,6 +208,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.connected = true k.connected = true
k.Unlock() k.Unlock()
} }
k.RUnlock()
options := broker.NewPublishOptions(opts...) options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs)) records := make([]*kgo.Record, 0, len(msgs))

View File

@ -2,6 +2,7 @@ package kgo
import ( import (
"context" "context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
@ -29,7 +30,11 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
return return
} }
if len(args) > 0 { if len(args) > 0 {
l.l.Log(l.ctx, mlvl, append([]interface{}{msg}, args...)...) fields := make(map[string]interface{}, int(len(args)/2))
for i := 0; i <= len(args)/2; i += 2 {
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
}
l.l.Fields(fields).Log(l.ctx, mlvl, msg)
} else { } else {
l.l.Log(l.ctx, mlvl, msg) l.l.Log(l.ctx, mlvl, msg)
} }

View File

@ -22,7 +22,7 @@ type consumer struct {
partition int32 partition int32
opts broker.SubscribeOptions opts broker.SubscribeOptions
kopts broker.Options kopts broker.Options
handler broker.Handler handler interface{}
quit chan struct{} quit chan struct{}
done chan struct{} done chan struct{}
recs chan kgo.FetchTopicPartition recs chan kgo.FetchTopicPartition
@ -33,7 +33,7 @@ type subscriber struct {
topic string topic string
opts broker.SubscribeOptions opts broker.SubscribeOptions
kopts broker.Options kopts broker.Options
handler broker.Handler handler interface{}
closed bool closed bool
done chan struct{} done chan struct{}
consumers map[tp]*consumer consumers map[tp]*consumer