improve performance and correctness

* properly handle rebalances
* simplify code
* return on NewBroker instance and not interface

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2023-03-13 21:56:44 +03:00
parent 9a69314c25
commit a9b13378f3
7 changed files with 478 additions and 499 deletions

View File

@@ -4,10 +4,9 @@ import (
"context"
"time"
kgo "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/server"
)
// DefaultCommitInterval specifies how fast send commit offsets to kafka
@@ -49,7 +48,7 @@ func Options(opts ...kgo.Opt) broker.Option {
}
}
// SubscribeOptions pass additional options to broker
// SubscribeOptions pass additional options to broker in Subscribe
func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
return func(o *broker.SubscribeOptions) {
if o.Context == nil {
@@ -64,21 +63,6 @@ func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
}
}
// SubscriberOptions pass additional options to broker
func SubscriberOptions(opts ...kgo.Opt) server.SubscriberOption {
return func(o *server.SubscriberOptions) {
if o.Context == nil {
o.Context = context.Background()
}
options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
if !ok {
options = make([]kgo.Opt, 0, len(opts))
}
options = append(options, opts...)
o.Context = context.WithValue(o.Context, optionsKey{}, options)
}
}
type commitIntervalKey struct{}
// CommitInterval specifies interval to send commits
@@ -86,7 +70,7 @@ func CommitInterval(td time.Duration) broker.Option {
return broker.SetOption(commitIntervalKey{}, td)
}
var DefaultSubscribeMaxInflight = 1000
var DefaultSubscribeMaxInflight = 10
type subscribeMaxInflightKey struct{}