many fixes

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-08-25 21:51:15 +03:00
parent 70a55ced3f
commit e3b2a0d62b
5 changed files with 163 additions and 235 deletions

169
kgo.go
View File

@@ -5,14 +5,12 @@ import (
"context"
"fmt"
"math/rand"
"net"
"strings"
"sync"
"sync/atomic"
"time"
kgo "github.com/twmb/franz-go/pkg/kgo"
sasl "github.com/twmb/franz-go/pkg/sasl"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
@@ -201,92 +199,14 @@ func (k *kBroker) Init(opts ...broker.Option) error {
return err
}
var kopts []kgo.Opt
if k.opts.Context != nil {
if v, ok := k.opts.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 {
kopts = append(kopts, v...)
}
if v, ok := k.opts.Context.Value(clientIDKey{}).(string); ok && v != "" {
kopts = append(kopts, kgo.ClientID(v))
}
if v, ok := k.opts.Context.Value(maxReadBytesKey{}).(int32); ok {
kopts = append(kopts, kgo.BrokerMaxReadBytes(v))
}
if v, ok := k.opts.Context.Value(maxWriteBytesKey{}).(int32); ok {
kopts = append(kopts, kgo.BrokerMaxWriteBytes(v))
}
if v, ok := k.opts.Context.Value(connIdleTimeoutKey{}).(time.Duration); ok {
kopts = append(kopts, kgo.ConnIdleTimeout(v))
}
if v, ok := k.opts.Context.Value(connTimeoutOverheadKey{}).(time.Duration); ok {
kopts = append(kopts, kgo.ConnTimeoutOverhead(v))
}
if v, ok := k.opts.Context.Value(dialerKey{}).(func(ctx context.Context, network, host string) (net.Conn, error)); ok {
kopts = append(kopts, kgo.Dialer(v))
}
if v, ok := k.opts.Context.Value(metadataMaxAgeKey{}).(time.Duration); ok {
kopts = append(kopts, kgo.MetadataMaxAge(v))
}
if v, ok := k.opts.Context.Value(metadataMinAgeKey{}).(time.Duration); ok {
kopts = append(kopts, kgo.MetadataMinAge(v))
}
// if v, ok := k.opts.Context.Value(produceRetriesKey{}).(int); ok {
// kopts = append(kopts, kgo.ProduceRetries(v))
// }
if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok {
kopts = append(kopts, kgo.RequestRetries(v))
}
if v, ok := k.opts.Context.Value(retryBackoffFnKey{}).(func(int) time.Duration); ok {
kopts = append(kopts, kgo.RetryBackoffFn(v))
} else {
kopts = append(kopts, kgo.RetryBackoffFn(
func() func(int) time.Duration {
var rng mrand.Rand
return func(fails int) time.Duration {
const (
min = 250 * time.Millisecond
max = 2 * time.Second
)
if fails <= 0 {
return min
}
if fails > 10 {
return max
}
backoff := min * time.Duration(1<<(fails-1))
jitter := 0.8 + 0.4*rng.Float64()
backoff = time.Duration(float64(backoff) * jitter)
if backoff > max {
return max
}
return backoff
}
}(),
))
}
if v, ok := k.opts.Context.Value(retryTimeoutFnKey{}).(func(int16) time.Duration); ok {
kopts = append(kopts, kgo.RetryTimeoutFn(v))
}
if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(time.Duration); ok {
kopts = append(kopts, kgo.RetryTimeout(v))
}
if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok {
kopts = append(kopts, kgo.SASL(v...))
}
if v, ok := k.opts.Context.Value(hooksKey{}).([]kgo.Hook); ok {
kopts = append(kopts, kgo.WithHooks(v...))
k.kopts = append(k.kopts, v...)
}
}
kopts = append(kopts,
kgo.WithLogger(&mlogger{l: k.opts.Logger, ctx: k.opts.Context}),
kgo.RequiredAcks(kgo.AllISRAcks()),
// kgo.RecordPartitioner(),
)
k.kopts = kopts
// kgo.RecordPartitioner(),
k.init = true
return nil
@@ -306,12 +226,23 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
}
func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs))
var errs []string
var err error
var buf []byte
for _, msg := range msgs {
if options.BodyOnly {
buf = msg.Body
} else {
buf, err = k.opts.Codec.Marshal(msg)
if err != nil {
return err
}
}
topic, _ := msg.Header.Get(metadata.HeaderTopic)
rec := &kgo.Record{Value: msg.Body, Topic: topic}
rec := &kgo.Record{Value: buf, Topic: topic}
records = append(records, rec)
}
@@ -328,10 +259,6 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b
return nil
}
func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
type mlogger struct {
l logger.Logger
ctx context.Context
@@ -353,7 +280,11 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
default:
return
}
l.l.Fields(args...).Log(l.ctx, mlvl, msg)
fields := make(map[string]interface{}, int(len(args)/2))
for i := 0; i < len(args)/2; i += 2 {
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
}
l.l.Fields(fields).Log(l.ctx, mlvl, msg)
}
func (l *mlogger) Level() kgo.LogLevel {
@@ -370,6 +301,10 @@ func (l *mlogger) Level() kgo.LogLevel {
return kgo.LogLevelNone
}
func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
options := broker.NewSubscribeOptions(opts...)
@@ -440,21 +375,15 @@ func (s *subscriber) run(ctx context.Context) {
if err := s.handleFetches(ctx, fetches); err != nil {
s.kopts.Logger.Errorf(ctx, "fetch handler err: %v", err)
// TODO: fatal ?
//return
// return
}
}
}
}
func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) error {
var batch bool
var err error
if s.batchhandler != nil {
batch = true
}
_ = batch
mprecords := make(map[int32][]*kgo.Record)
cnt := int64(0)
@@ -500,7 +429,9 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
mu.Unlock()
return err
case <-ticker.C:
if atomic.LoadInt64(&cnt) == 0 {
v := atomic.LoadInt64(&cnt)
s.kopts.Logger.Debugf(ctx, "records to commit: %v", v)
if v == 0 {
return nil
}
mu.Lock()
@@ -508,8 +439,9 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
mu.Unlock()
return err
}
s.kopts.Logger.Debugf(ctx, "records to need process after commit: %v", v-int64(len(crecords)))
atomic.AddInt64(&cnt, -int64(len(crecords)))
crecords = crecords[:]
crecords = nil
mu.Unlock()
}
}
@@ -591,7 +523,46 @@ func (k *kBroker) String() string {
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.NewOptions(opts...)
kopts := []kgo.Opt{
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.RetryBackoffFn(
func() func(int) time.Duration {
var rng mrand.Rand
return func(fails int) time.Duration {
const (
min = 250 * time.Millisecond
max = 2 * time.Second
)
if fails <= 0 {
return min
}
if fails > 10 {
return max
}
backoff := min * time.Duration(1<<(fails-1))
jitter := 0.8 + 0.4*rng.Float64()
backoff = time.Duration(float64(backoff) * jitter)
if backoff > max {
return max
}
return backoff
}
}(),
),
}
if options.Context != nil {
if v, ok := options.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 {
kopts = append(kopts, v...)
}
}
return &kBroker{
opts: broker.NewOptions(opts...),
opts: options,
kopts: kopts,
}
}