93 lines
2.6 KiB
Go
93 lines
2.6 KiB
Go
package kgo
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
"go.unistack.org/micro/v4/broker"
|
|
"go.unistack.org/micro/v4/client"
|
|
)
|
|
|
|
// DefaultCommitInterval specifies how fast send commit offsets to kafka
|
|
var DefaultCommitInterval = 5 * time.Second
|
|
|
|
type subscribeContextKey struct{}
|
|
|
|
// SubscribeContext set the context for broker.SubscribeOption
|
|
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
|
|
return broker.SetSubscribeOption(subscribeContextKey{}, ctx)
|
|
}
|
|
|
|
type publishKey struct{}
|
|
|
|
// PublishKey set the kafka message key (broker option)
|
|
func PublishKey(key []byte) broker.PublishOption {
|
|
return broker.SetPublishOption(publishKey{}, key)
|
|
}
|
|
|
|
// ClientPublishKey set the kafka message key (client option)
|
|
func ClientPublishKey(key []byte) client.PublishOption {
|
|
return client.SetPublishOption(publishKey{}, key)
|
|
}
|
|
|
|
type optionsKey struct{}
|
|
|
|
// Options pass additional options to broker
|
|
func Options(opts ...kgo.Opt) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
|
|
if !ok {
|
|
options = make([]kgo.Opt, 0, len(opts))
|
|
}
|
|
options = append(options, opts...)
|
|
o.Context = context.WithValue(o.Context, optionsKey{}, options)
|
|
}
|
|
}
|
|
|
|
// SubscribeOptions pass additional options to broker in Subscribe
|
|
func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
|
|
return func(o *broker.SubscribeOptions) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
|
|
if !ok {
|
|
options = make([]kgo.Opt, 0, len(opts))
|
|
}
|
|
options = append(options, opts...)
|
|
o.Context = context.WithValue(o.Context, optionsKey{}, options)
|
|
}
|
|
}
|
|
|
|
type commitIntervalKey struct{}
|
|
|
|
// CommitInterval specifies interval to send commits
|
|
func CommitInterval(td time.Duration) broker.Option {
|
|
return broker.SetOption(commitIntervalKey{}, td)
|
|
}
|
|
|
|
var DefaultSubscribeMaxInflight = 10
|
|
|
|
type subscribeMaxInflightKey struct{}
|
|
|
|
// SubscribeMaxInFlight max queued messages
|
|
func SubscribeMaxInFlight(n int) broker.SubscribeOption {
|
|
return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n)
|
|
}
|
|
|
|
type publishPromiseKey struct{}
|
|
|
|
// PublishPromise set the kafka promise func for Produce
|
|
func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption {
|
|
return broker.SetPublishOption(publishPromiseKey{}, fn)
|
|
}
|
|
|
|
// ClientPublishKey set the kafka message key (client option)
|
|
func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption {
|
|
return client.SetPublishOption(publishPromiseKey{}, fn)
|
|
}
|