diff --git a/go.mod b/go.mod index 3677ae7..3269540 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,12 @@ module go.unistack.org/micro-broker-kgo/v4 go 1.23.8 require ( - github.com/twmb/franz-go v1.19.4 + github.com/twmb/franz-go v1.19.5 github.com/twmb/franz-go/pkg/kadm v1.16.0 github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3 github.com/twmb/franz-go/pkg/kmsg v1.11.2 go.opentelemetry.io/otel v1.36.0 - go.unistack.org/micro/v4 v4.1.14 + go.unistack.org/micro/v4 v4.1.17 ) require ( @@ -16,8 +16,8 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect - github.com/spf13/cast v1.8.0 // indirect + github.com/spf13/cast v1.9.2 // indirect go.unistack.org/micro-proto/v4 v4.1.0 // indirect - golang.org/x/crypto v0.38.0 // indirect + golang.org/x/crypto v0.39.0 // indirect google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/go.sum b/go.sum index e3e7248..e0ec902 100644 --- a/go.sum +++ b/go.sum @@ -22,12 +22,16 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/spf13/cast v1.8.0 h1:gEN9K4b8Xws4EX0+a0reLmhq8moKn7ntRlQYgjPeCDk= github.com/spf13/cast v1.8.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/cast v1.9.2 h1:SsGfm7M8QOFtEzumm7UZrZdLLquNdzFYfIbEXntcFbE= +github.com/spf13/cast v1.9.2/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/franz-go v1.19.1 h1:cOhDFUkGvUFHSQ7UYW6bO77BJa2fYEk5mA2AX+1NIdE= github.com/twmb/franz-go v1.19.1/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= github.com/twmb/franz-go v1.19.4 h1:0ktflzm5YU7+YYdie8RQWFcU9uDJ03xLefplO1iMwO4= github.com/twmb/franz-go v1.19.4/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= +github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3 h1:p24opKWPySAy8xSl8NqRgOv7Q+bX7kdrQirBVRJzQfo= @@ -42,8 +46,12 @@ go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9 go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= go.unistack.org/micro/v4 v4.1.14 h1:6EotPq9kz/gaFb5YulHdKuuUwmj/7Hk44DpOlzh/A6k= go.unistack.org/micro/v4 v4.1.14/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs= +go.unistack.org/micro/v4 v4.1.17 h1:26QDtRSYVpozYuassyvLP4sEQRo3dxgD3sVILRXmIPo= +go.unistack.org/micro/v4 v4.1.17/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs= golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/kgo.go b/kgo.go index 6af9d39..ce59452 100644 --- a/kgo.go +++ b/kgo.go @@ -108,7 +108,7 @@ type kgoMessage struct { ctx context.Context body []byte hdr metadata.Metadata - opts broker.PublishOptions + opts broker.MessageOptions ack bool } @@ -150,8 +150,8 @@ func (b *Broker) newCodec(ct string) (codec.Codec, error) { return nil, codec.ErrUnknownContentType } -func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) { - options := broker.NewPublishOptions(opts...) +func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) { + options := broker.NewMessageOptions(opts...) if options.ContentType == "" { options.ContentType = b.opts.ContentType } @@ -351,10 +351,10 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M for _, msg := range messages { if mctx := msg.Context(); mctx != nil { - if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil { + if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil { key = k } - if p, ok := mctx.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { + if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { promise = p } } diff --git a/options.go b/options.go index 40d5645..4f7eae3 100644 --- a/options.go +++ b/options.go @@ -27,11 +27,11 @@ func SubscribeContext(ctx context.Context) broker.SubscribeOption { return broker.SetSubscribeOption(subscribeContextKey{}, ctx) } -type publishKey struct{} +type messageKey struct{} -// PublishKey set the kafka message key (broker option) -func PublishKey(key []byte) broker.PublishOption { - return broker.SetPublishOption(publishKey{}, key) +// MessageKey set the kafka message key (broker option) +func MessageKey(key []byte) broker.MessageOption { + return broker.SetMessageOption(messageKey{}, key) } type optionsKey struct{} @@ -103,11 +103,11 @@ func SubscribeFatalOnError(b bool) broker.SubscribeOption { return broker.SetSubscribeOption(fatalOnErrorKey{}, b) } -type publishPromiseKey struct{} +type messagePromiseKey struct{} -// PublishPromise set the kafka promise func for Produce -func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption { - return broker.SetPublishOption(publishPromiseKey{}, fn) +// MessagePromise set the kafka promise func for Produce +func MessagePromise(fn func(*kgo.Record, error)) broker.MessageOption { + return broker.SetMessageOption(messagePromiseKey{}, fn) } type subscribeMessagePoolKey struct{}