add additional options for server and broke #26
@ -1,2 +1,9 @@
|
||||
# micro-broker-kgo
|
||||
yet another micro kafka broker alternative
|
||||
|
||||
TODO:
|
||||
* dont always append options from context on Init and New
|
||||
* add SubscriberOptions(...kgo.Opt)
|
||||
* add ServerSubscribeOptions(...kgo.Opt)
|
||||
* check PublisherOptions(...kgo.Opt)
|
||||
* check ClientPublisherOptions(...kgo.Opt)
|
8
go.mod
8
go.mod
@ -3,8 +3,8 @@ module go.unistack.org/micro-broker-kgo/v3
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/klauspost/compress v1.13.6 // indirect
|
||||
github.com/twmb/franz-go v1.1.2
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210930203137-7fbfd17de279
|
||||
go.unistack.org/micro/v3 v3.8.0
|
||||
github.com/pierrec/lz4/v4 v4.1.12 // indirect
|
||||
github.com/twmb/franz-go v1.2.6
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b
|
||||
go.unistack.org/micro/v3 v3.8.12
|
||||
)
|
||||
|
32
go.sum
32
go.sum
@ -15,36 +15,37 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U
|
||||
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
|
||||
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
|
||||
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
||||
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
|
||||
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8=
|
||||
github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/twmb/franz-go v1.1.2 h1:i9lCDEcXWpRd3YXFlffIc/koBl+JijI13F5sg69NPRk=
|
||||
github.com/twmb/franz-go v1.1.2/go.mod h1:KerrVhzNpasYrWJLr2Yj6Cui43f1BxH4U9SJEDVOjqQ=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210914042331-106aef61b693/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210930203137-7fbfd17de279 h1:58GX0Ju/PCG1zWVdsLEE5OUu70KlfOrxY7djPszuqiw=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210930203137-7fbfd17de279/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
github.com/twmb/franz-go v1.2.6 h1:WVub2Sml7LqER9VU0WxsiOTom4LBK7YMj+7jbqadE3U=
|
||||
github.com/twmb/franz-go v1.2.6/go.mod h1:P+i2DnBaec1o0z9EI8CyAM/WAjG99CHI3oCAhZDoy48=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211127185622-3b34db0c6d1e/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b h1:rCe006NN/89GvtdXqRbgsqtL/nVTj83/dQ9ok8DJFcM=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
|
||||
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
|
||||
github.com/unistack-org/micro-proto v0.0.9 h1:KrWLS4FUX7UAWNAilQf70uad6ZPf/0EudeddCXllRVc=
|
||||
github.com/unistack-org/micro-proto v0.0.9/go.mod h1:Cckwmzd89gvS7ThxzZp9kQR/EOdksFQcsTAtDDyKwrg=
|
||||
go.unistack.org/micro/v3 v3.8.0 h1:9k6C40xdJf65VW6cVB8sOJVSJhyz1UfFOhX7/II2ivo=
|
||||
go.unistack.org/micro/v3 v3.8.0/go.mod h1:Tkteri0wiiybbH6aPqay26pZHFIAwL9LXJc2x1Jkakk=
|
||||
go.unistack.org/micro-proto/v3 v3.1.0 h1:q39FwjFiRZn+Ux/tt+d3bJTmDtsQQWa+3SLYVo1vLfA=
|
||||
go.unistack.org/micro-proto/v3 v3.1.0/go.mod h1:DpRhYCBXlmSJ/AAXTmntvlh7kQkYU6eFvlmYAx4BQS8=
|
||||
go.unistack.org/micro/v3 v3.8.12 h1:ACaHE8ZIHFXqEGPSRvXzND4hcqCSQf04WkzOFY6Y1gQ=
|
||||
go.unistack.org/micro/v3 v3.8.12/go.mod h1:KMMmOmbgo/D52/rCAbqeKbBsgEEbSKM69he54J3ZIuA=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@ -52,7 +53,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
6
kgo.go
6
kgo.go
@ -243,6 +243,12 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
|
||||
}
|
||||
|
||||
func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
k.RLock()
|
||||
if !k.connected {
|
||||
k.RUnlock()
|
||||
return broker.ErrNotConnected
|
||||
}
|
||||
k.RUnlock()
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
records := make([]*kgo.Record, 0, len(msgs))
|
||||
var errs []string
|
||||
|
31
options.go
31
options.go
@ -7,6 +7,7 @@ import (
|
||||
kgo "github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
)
|
||||
|
||||
// DefaultCommitInterval specifies how fast send commit offsets to kafka
|
||||
@ -48,6 +49,36 @@ func Options(opts ...kgo.Opt) broker.Option {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeOptions pass additional options to broker
|
||||
func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
|
||||
return func(o *broker.SubscribeOptions) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
|
||||
if !ok {
|
||||
options = make([]kgo.Opt, 0, len(opts))
|
||||
}
|
||||
options = append(options, opts...)
|
||||
o.Context = context.WithValue(o.Context, optionsKey{}, options)
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberOptions pass additional options to broker
|
||||
func SubscriberOptions(opts ...kgo.Opt) server.SubscriberOption {
|
||||
return func(o *server.SubscriberOptions) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
|
||||
if !ok {
|
||||
options = make([]kgo.Opt, 0, len(opts))
|
||||
}
|
||||
options = append(options, opts...)
|
||||
o.Context = context.WithValue(o.Context, optionsKey{}, options)
|
||||
}
|
||||
}
|
||||
|
||||
type commitIntervalKey struct{}
|
||||
|
||||
// CommitInterval specifies interval to send commits
|
||||
|
24
util.go
24
util.go
@ -158,6 +158,14 @@ func (w *worker) handle() {
|
||||
p.ack = false
|
||||
if w.opts.BodyOnly {
|
||||
p.msg.Body = record.Value
|
||||
if l := len(record.Headers); l > 0 {
|
||||
if p.msg.Header == nil {
|
||||
p.msg.Header = metadata.New(l)
|
||||
}
|
||||
for _, h := range record.Headers {
|
||||
p.msg.Header.Set(h.Key, string(h.Value))
|
||||
}
|
||||
}
|
||||
} else if w.kopts.Codec.String() == "noop" {
|
||||
p.msg.Body = record.Value
|
||||
p.msg.Header = metadata.New(len(record.Headers))
|
||||
@ -168,6 +176,14 @@ func (w *worker) handle() {
|
||||
if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
||||
p.err = err
|
||||
p.msg.Body = record.Value
|
||||
if l := len(record.Headers); l > 0 {
|
||||
if p.msg.Header == nil {
|
||||
p.msg.Header = metadata.New(l)
|
||||
}
|
||||
for _, h := range record.Headers {
|
||||
p.msg.Header.Set(h.Key, string(h.Value))
|
||||
}
|
||||
}
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
if p.ack {
|
||||
@ -188,6 +204,14 @@ func (w *worker) handle() {
|
||||
w.cherr <- err
|
||||
return
|
||||
}
|
||||
if l := len(record.Headers); l > 0 {
|
||||
if p.msg.Header == nil {
|
||||
p.msg.Header = metadata.New(l)
|
||||
}
|
||||
for _, h := range record.Headers {
|
||||
p.msg.Header.Set(h.Key, string(h.Value))
|
||||
}
|
||||
}
|
||||
}
|
||||
err = w.handler(p)
|
||||
if err == nil && w.opts.AutoAck {
|
||||
|
Loading…
x
Reference in New Issue
Block a user