Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
cbab0b5cb6 | |||
947132c25b | |||
98da69fbe8 | |||
d6d2483d8d |
10
go.mod
10
go.mod
@ -3,12 +3,12 @@ module go.unistack.org/micro-broker-kgo/v4
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/twmb/franz-go v1.11.5
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.3.0
|
||||
go.unistack.org/micro/v4 v4.0.2
|
||||
github.com/twmb/franz-go v1.16.1
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0
|
||||
go.unistack.org/micro/v4 v4.0.17
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/klauspost/compress v1.15.9 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.15 // indirect
|
||||
github.com/klauspost/compress v1.17.7 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.21 // indirect
|
||||
)
|
||||
|
29
go.sum
29
go.sum
@ -1,18 +1,11 @@
|
||||
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
|
||||
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
|
||||
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE=
|
||||
github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
go.unistack.org/micro/v4 v4.0.2 h1:2LeG6jslE50c72f1XwJhfTiidy67xklIC3saptLoUys=
|
||||
go.unistack.org/micro/v4 v4.0.2/go.mod h1:+wBa98rSf+mRXb/MuSVFPXtDrqN0k8rzPQiC8wRCwCo=
|
||||
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
|
||||
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
|
||||
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
|
||||
go.unistack.org/micro/v4 v4.0.17 h1:mF7uM+J4ILdG+1fcwzKYCwDlxhdbF/e1WnGzKKLnIXc=
|
||||
go.unistack.org/micro/v4 v4.0.17/go.mod h1:ZDgU9931vm2l7X6RN/6UuwRIVp24GRdmQ7dKmegArk4=
|
||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
||||
|
13
kgo.go
13
kgo.go
@ -53,6 +53,7 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
|
||||
}()
|
||||
|
||||
type Broker struct {
|
||||
init bool
|
||||
c *kgo.Client
|
||||
kopts []kgo.Opt
|
||||
connected bool
|
||||
@ -155,6 +156,10 @@ func (k *Broker) Init(opts ...broker.Option) error {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
|
||||
if len(opts) == 0 && k.init {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&k.opts)
|
||||
}
|
||||
@ -178,6 +183,7 @@ func (k *Broker) Init(opts ...broker.Option) error {
|
||||
}
|
||||
}
|
||||
|
||||
k.init = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -196,8 +202,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
|
||||
|
||||
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
k.RLock()
|
||||
if !k.connected {
|
||||
k.RUnlock()
|
||||
ok := k.connected
|
||||
k.RUnlock()
|
||||
|
||||
if !ok {
|
||||
k.Lock()
|
||||
c, err := k.connect(ctx, k.kopts...)
|
||||
if err != nil {
|
||||
@ -208,7 +216,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
k.connected = true
|
||||
k.Unlock()
|
||||
}
|
||||
k.RUnlock()
|
||||
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
records := make([]*kgo.Record, 0, len(msgs))
|
||||
|
@ -2,7 +2,6 @@ package kgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
@ -30,11 +29,7 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
|
||||
return
|
||||
}
|
||||
if len(args) > 0 {
|
||||
fields := make(map[string]interface{}, int(len(args)/2))
|
||||
for i := 0; i <= len(args)/2; i += 2 {
|
||||
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
|
||||
}
|
||||
l.l.Fields(fields).Log(l.ctx, mlvl, msg)
|
||||
l.l.Log(l.ctx, mlvl, append([]interface{}{msg}, args...)...)
|
||||
} else {
|
||||
l.l.Log(l.ctx, mlvl, msg)
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ type consumer struct {
|
||||
partition int32
|
||||
opts broker.SubscribeOptions
|
||||
kopts broker.Options
|
||||
handler interface{}
|
||||
handler broker.Handler
|
||||
quit chan struct{}
|
||||
done chan struct{}
|
||||
recs chan kgo.FetchTopicPartition
|
||||
@ -33,7 +33,7 @@ type subscriber struct {
|
||||
topic string
|
||||
opts broker.SubscribeOptions
|
||||
kopts broker.Options
|
||||
handler interface{}
|
||||
handler broker.Handler
|
||||
closed bool
|
||||
done chan struct{}
|
||||
consumers map[tp]*consumer
|
||||
|
Loading…
Reference in New Issue
Block a user