Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
7312397abe |
10
go.mod
10
go.mod
@ -3,12 +3,12 @@ module go.unistack.org/micro-broker-kgo/v4
|
|||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/twmb/franz-go v1.16.1
|
github.com/twmb/franz-go v1.11.5
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0
|
github.com/twmb/franz-go/pkg/kmsg v1.3.0
|
||||||
go.unistack.org/micro/v4 v4.0.17
|
go.unistack.org/micro/v4 v4.0.2
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.7 // indirect
|
github.com/klauspost/compress v1.15.9 // indirect
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 // indirect
|
github.com/pierrec/lz4/v4 v4.1.15 // indirect
|
||||||
)
|
)
|
||||||
|
29
go.sum
29
go.sum
@ -1,11 +1,18 @@
|
|||||||
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
|
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
|
||||||
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||||
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
|
github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE=
|
||||||
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
|
github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
|
github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
|
github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||||
go.unistack.org/micro/v4 v4.0.17 h1:mF7uM+J4ILdG+1fcwzKYCwDlxhdbF/e1WnGzKKLnIXc=
|
go.unistack.org/micro/v4 v4.0.2 h1:2LeG6jslE50c72f1XwJhfTiidy67xklIC3saptLoUys=
|
||||||
go.unistack.org/micro/v4 v4.0.17/go.mod h1:ZDgU9931vm2l7X6RN/6UuwRIVp24GRdmQ7dKmegArk4=
|
go.unistack.org/micro/v4 v4.0.2/go.mod h1:+wBa98rSf+mRXb/MuSVFPXtDrqN0k8rzPQiC8wRCwCo=
|
||||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
|
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
|
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
|
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
13
kgo.go
13
kgo.go
@ -53,7 +53,6 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
type Broker struct {
|
type Broker struct {
|
||||||
init bool
|
|
||||||
c *kgo.Client
|
c *kgo.Client
|
||||||
kopts []kgo.Opt
|
kopts []kgo.Opt
|
||||||
connected bool
|
connected bool
|
||||||
@ -156,10 +155,6 @@ func (k *Broker) Init(opts ...broker.Option) error {
|
|||||||
k.Lock()
|
k.Lock()
|
||||||
defer k.Unlock()
|
defer k.Unlock()
|
||||||
|
|
||||||
if len(opts) == 0 && k.init {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&k.opts)
|
o(&k.opts)
|
||||||
}
|
}
|
||||||
@ -183,7 +178,6 @@ func (k *Broker) Init(opts ...broker.Option) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
k.init = true
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,10 +196,8 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
|
|||||||
|
|
||||||
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||||
k.RLock()
|
k.RLock()
|
||||||
ok := k.connected
|
if !k.connected {
|
||||||
k.RUnlock()
|
k.RUnlock()
|
||||||
|
|
||||||
if !ok {
|
|
||||||
k.Lock()
|
k.Lock()
|
||||||
c, err := k.connect(ctx, k.kopts...)
|
c, err := k.connect(ctx, k.kopts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -216,6 +208,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
|||||||
k.connected = true
|
k.connected = true
|
||||||
k.Unlock()
|
k.Unlock()
|
||||||
}
|
}
|
||||||
|
k.RUnlock()
|
||||||
|
|
||||||
options := broker.NewPublishOptions(opts...)
|
options := broker.NewPublishOptions(opts...)
|
||||||
records := make([]*kgo.Record, 0, len(msgs))
|
records := make([]*kgo.Record, 0, len(msgs))
|
||||||
|
@ -2,6 +2,7 @@ package kgo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
"go.unistack.org/micro/v4/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
@ -29,7 +30,11 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
l.l.Log(l.ctx, mlvl, append([]interface{}{msg}, args...)...)
|
fields := make(map[string]interface{}, int(len(args)/2))
|
||||||
|
for i := 0; i <= len(args)/2; i += 2 {
|
||||||
|
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
|
||||||
|
}
|
||||||
|
l.l.Fields(fields).Log(l.ctx, mlvl, msg)
|
||||||
} else {
|
} else {
|
||||||
l.l.Log(l.ctx, mlvl, msg)
|
l.l.Log(l.ctx, mlvl, msg)
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ type consumer struct {
|
|||||||
partition int32
|
partition int32
|
||||||
opts broker.SubscribeOptions
|
opts broker.SubscribeOptions
|
||||||
kopts broker.Options
|
kopts broker.Options
|
||||||
handler broker.Handler
|
handler interface{}
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
recs chan kgo.FetchTopicPartition
|
recs chan kgo.FetchTopicPartition
|
||||||
@ -33,7 +33,7 @@ type subscriber struct {
|
|||||||
topic string
|
topic string
|
||||||
opts broker.SubscribeOptions
|
opts broker.SubscribeOptions
|
||||||
kopts broker.Options
|
kopts broker.Options
|
||||||
handler broker.Handler
|
handler interface{}
|
||||||
closed bool
|
closed bool
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
consumers map[tp]*consumer
|
consumers map[tp]*consumer
|
||||||
|
Loading…
Reference in New Issue
Block a user