add backoff for reconnect

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-08-24 13:20:31 +03:00
parent 3f7fcaf4aa
commit 70a55ced3f
6 changed files with 451 additions and 79 deletions

3
go.mod
View File

@ -3,10 +3,11 @@ module github.com/unistack-org/micro-broker-kgo/v3
go 1.16 go 1.16
require ( require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.13.4 // indirect github.com/klauspost/compress v1.13.4 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/twmb/franz-go v0.10.2-0.20210820215004-4c20135fffc1 github.com/twmb/franz-go v0.10.2-0.20210823212011-0d01f7456b4d
github.com/unistack-org/micro-codec-json/v3 v3.2.5 github.com/unistack-org/micro-codec-json/v3 v3.2.5
github.com/unistack-org/micro/v3 v3.6.3 github.com/unistack-org/micro/v3 v3.6.3
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

9
go.sum
View File

@ -1,5 +1,6 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
@ -36,10 +37,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/twmb/franz-go v0.10.1 h1:z05pd/6cBy7ywBK9TIjGko1wa2F1zuFgmVPR6uil0ik= github.com/twmb/franz-go v0.10.2-0.20210823212011-0d01f7456b4d h1:O7QsIa7lVn+P60nsOBAjkajGKcOpaN3zsOGYboCgfs4=
github.com/twmb/franz-go v0.10.1/go.mod h1:Qa6npC7EIi4WoLmnUz9Ue/sPL0k+ex4OyHCYJ2pCAqw= github.com/twmb/franz-go v0.10.2-0.20210823212011-0d01f7456b4d/go.mod h1:Qa6npC7EIi4WoLmnUz9Ue/sPL0k+ex4OyHCYJ2pCAqw=
github.com/twmb/franz-go v0.10.2-0.20210820215004-4c20135fffc1 h1:JNd7gUA9uggKfso+hxeWg0CJbfM8yo124XH8tBv4v/E=
github.com/twmb/franz-go v0.10.2-0.20210820215004-4c20135fffc1/go.mod h1:Qa6npC7EIi4WoLmnUz9Ue/sPL0k+ex4OyHCYJ2pCAqw=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f h1:Opx7EKsXb4IOPj1ammVNksPFpbXx6aaxdIn4hGjiIpk= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f h1:Opx7EKsXb4IOPj1ammVNksPFpbXx6aaxdIn4hGjiIpk=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=

148
kgo.go
View File

@ -8,6 +8,7 @@ import (
"net" "net"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
kgo "github.com/twmb/franz-go/pkg/kgo" kgo "github.com/twmb/franz-go/pkg/kgo"
@ -16,6 +17,7 @@ import (
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/util/id" "github.com/unistack-org/micro/v3/util/id"
mrand "github.com/unistack-org/micro/v3/util/rand"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -49,6 +51,10 @@ type publication struct {
ack bool ack bool
} }
func init() {
rand.Seed(time.Now().UnixNano())
}
func (p *publication) Topic() string { func (p *publication) Topic() string {
return p.topic return p.topic
} }
@ -79,11 +85,15 @@ func (s *subscriber) Topic() string {
} }
func (s *subscriber) Unsubscribe(ctx context.Context) error { func (s *subscriber) Unsubscribe(ctx context.Context) error {
if s.closed {
return nil
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
default: default:
s.reader.Close() close(s.done)
s.closed = true
} }
return nil return nil
} }
@ -112,7 +122,6 @@ func (k *kBroker) Connect(ctx context.Context) error {
kaddrs := k.opts.Addrs kaddrs := k.opts.Addrs
// shuffle addrs // shuffle addrs
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(kaddrs), func(i, j int) { rand.Shuffle(len(kaddrs), func(i, j int) {
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
}) })
@ -228,12 +237,42 @@ func (k *kBroker) Init(opts ...broker.Option) error {
if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok { if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok {
kopts = append(kopts, kgo.RequestRetries(v)) kopts = append(kopts, kgo.RequestRetries(v))
} }
/// if v, ok := k.opts.Context.Value(retryBackoffKey{}).(func(int) time.Duration); ok { if v, ok := k.opts.Context.Value(retryBackoffFnKey{}).(func(int) time.Duration); ok {
// kopts = append(kopts, kgo.RetryBackoff(v)) kopts = append(kopts, kgo.RetryBackoffFn(v))
// } } else {
// if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(func(int16) time.Duration); ok { kopts = append(kopts, kgo.RetryBackoffFn(
// kopts = append(kopts, kgo.RetryTimeout(v)) func() func(int) time.Duration {
// } var rng mrand.Rand
return func(fails int) time.Duration {
const (
min = 250 * time.Millisecond
max = 2 * time.Second
)
if fails <= 0 {
return min
}
if fails > 10 {
return max
}
backoff := min * time.Duration(1<<(fails-1))
jitter := 0.8 + 0.4*rng.Float64()
backoff = time.Duration(float64(backoff) * jitter)
if backoff > max {
return max
}
return backoff
}
}(),
))
}
if v, ok := k.opts.Context.Value(retryTimeoutFnKey{}).(func(int16) time.Duration); ok {
kopts = append(kopts, kgo.RetryTimeoutFn(v))
}
if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(time.Duration); ok {
kopts = append(kopts, kgo.RetryTimeout(v))
}
if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok { if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok {
kopts = append(kopts, kgo.SASL(v...)) kopts = append(kopts, kgo.SASL(v...))
} }
@ -242,8 +281,9 @@ func (k *kBroker) Init(opts ...broker.Option) error {
} }
} }
kopts = append(kopts, kopts = append(kopts,
//kgo.WithLogger(&mlogger{l: k.opts.Logger, ctx: k.opts.Context}), kgo.WithLogger(&mlogger{l: k.opts.Logger, ctx: k.opts.Context}),
kgo.RequiredAcks(kgo.AllISRAcks()), kgo.RequiredAcks(kgo.AllISRAcks()),
// kgo.RecordPartitioner(),
) )
k.kopts = kopts k.kopts = kopts
@ -298,7 +338,7 @@ type mlogger struct {
} }
func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
mlvl := logger.ErrorLevel var mlvl logger.Level
switch lvl { switch lvl {
case kgo.LogLevelNone: case kgo.LogLevelNone:
return return
@ -344,7 +384,6 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
kaddrs := k.opts.Addrs kaddrs := k.opts.Addrs
// shuffle addrs // shuffle addrs
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(kaddrs), func(i, j int) { rand.Shuffle(len(kaddrs), func(i, j int) {
kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i]
}) })
@ -356,7 +395,10 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.DisableAutoCommit(), kgo.DisableAutoCommit(),
kgo.FetchMaxWait(1*time.Second), kgo.FetchMaxWait(1*time.Second),
kgo.KeepControlRecords(),
kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()),
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
// kgo.WithHooks(&metrics{meter: k.opts.Meter}),
// TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked // TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked
) )
@ -365,7 +407,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
return nil, err return nil, err
} }
sub := &subscriber{opts: options, reader: reader, handler: handler, kopts: k.opts} sub := &subscriber{done: make(chan struct{}), opts: options, reader: reader, handler: handler, kopts: k.opts}
go sub.run(ctx) go sub.run(ctx)
k.Lock() k.Lock()
@ -382,17 +424,24 @@ func (s *subscriber) run(ctx context.Context) {
case <-s.kopts.Context.Done(): case <-s.kopts.Context.Done():
return return
default: default:
fmt.Printf("poll fetches\n")
fetches := s.reader.PollFetches(ctx) fetches := s.reader.PollFetches(ctx)
fmt.Printf("fetches polled\n")
if fetches.IsClientClosed() { if fetches.IsClientClosed() {
fmt.Printf("CCCCCC\n") // TODO: fatal ?
return return
} }
fetches.EachError(func(t string, p int32, err error) { if len(fetches.Errors()) > 0 {
s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", t, p, err) for _, err := range fetches.Errors() {
}) s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err)
s.handleFetches(ctx, fetches) }
// TODO: fatal ?
return
}
if err := s.handleFetches(ctx, fetches); err != nil {
s.kopts.Logger.Errorf(ctx, "fetch handler err: %v", err)
// TODO: fatal ?
//return
}
} }
} }
} }
@ -408,30 +457,74 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
mprecords := make(map[int32][]*kgo.Record) mprecords := make(map[int32][]*kgo.Record)
cnt := 0 cnt := int64(0)
for _, fetch := range fetches { for _, fetch := range fetches {
for _, ftopic := range fetch.Topics { for _, ftopic := range fetch.Topics {
for _, partition := range ftopic.Partitions { for _, partition := range ftopic.Partitions {
mprecords[partition.Partition] = append(mprecords[partition.Partition], partition.Records...) mprecords[partition.Partition] = append(mprecords[partition.Partition], partition.Records...)
cnt += len(partition.Records) cnt += int64(len(partition.Records))
} }
} }
} }
// preallocate optimistic // preallocate optimistic
crecords := make([]*kgo.Record, 0, cnt) crecords := make([]*kgo.Record, 0, 1000)
eh := s.kopts.ErrorHandler eh := s.kopts.ErrorHandler
if s.opts.ErrorHandler != nil { if s.opts.ErrorHandler != nil {
eh = s.opts.ErrorHandler eh = s.opts.ErrorHandler
} }
g := &errgroup.Group{} var mu sync.Mutex
g, gctx := errgroup.WithContext(ctx)
td := DefaultCommitInterval
if s.kopts.Context != nil {
if v, ok := s.kopts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 {
td = v
}
}
ticker := time.NewTicker(td)
defer ticker.Stop()
g.Go(func() error {
for {
select {
case <-gctx.Done():
return gctx.Err()
case <-s.done:
mu.Lock()
err := s.reader.CommitRecords(ctx, crecords...)
mu.Unlock()
return err
case <-ticker.C:
if atomic.LoadInt64(&cnt) == 0 {
return nil
}
mu.Lock()
if err := s.reader.CommitRecords(ctx, crecords...); err != nil {
mu.Unlock()
return err
}
atomic.AddInt64(&cnt, -int64(len(crecords)))
crecords = crecords[:]
mu.Unlock()
}
}
})
for _, records := range mprecords { for _, records := range mprecords {
precords := records precords := records
g.Go(func() error { g.Go(func() error {
for _, record := range precords { for _, record := range precords {
select {
case <-s.done:
return nil
case <-gctx.Done():
return gctx.Err()
default:
p := &publication{topic: record.Topic, msg: &broker.Message{}} p := &publication{topic: record.Topic, msg: &broker.Message{}}
if s.opts.BodyOnly { if s.opts.BodyOnly {
p.msg.Body = record.Value p.msg.Body = record.Value
@ -442,7 +535,9 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
if p.ack { if p.ack {
mu.Lock()
crecords = append(crecords, record) crecords = append(crecords, record)
mu.Unlock()
} }
return nil return nil
} else { } else {
@ -455,7 +550,9 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
} }
err = s.handler(p) err = s.handler(p)
if err == nil && (s.opts.AutoAck || p.ack) { if err == nil && (s.opts.AutoAck || p.ack) {
mu.Lock()
crecords = append(crecords, record) crecords = append(crecords, record)
mu.Unlock()
} }
if err != nil { if err != nil {
p.err = err p.err = err
@ -467,7 +564,10 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err
} }
} }
if p.ack { if p.ack {
mu.Lock()
crecords = append(crecords, record) crecords = append(crecords, record)
mu.Unlock()
}
} }
} }
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -37,7 +38,12 @@ func TestPubSub(t *testing.T) {
addrs = strings.Split(addr, ",") addrs = strings.Split(addr, ",")
} }
b := kgo.NewBroker(broker.Codec(jsoncodec.NewCodec()), broker.Addrs(addrs...), kgo.ClientID("test")) b := kgo.NewBroker(
broker.Codec(jsoncodec.NewCodec()),
broker.Addrs(addrs...),
kgo.ClientID("test"),
kgo.CommitInterval(1*time.Second),
)
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -51,7 +57,7 @@ func TestPubSub(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
}() }()
_ = bm
/* /*
fmt.Printf("prefill") fmt.Printf("prefill")
@ -63,15 +69,17 @@ func TestPubSub(t *testing.T) {
if err := b.BatchPublish(ctx, msgs); err != nil { if err := b.BatchPublish(ctx, msgs); err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Skip()
*/ */
done := make(chan bool, 1) done := make(chan bool, 1)
idx := 0 idx := int64(0)
fn := func(msg broker.Event) error { fn := func(msg broker.Event) error {
idx++ atomic.AddInt64(&idx, 1)
//time.Sleep(200 * time.Millisecond)
return msg.Ack() return msg.Ack()
} }
sub, err := b.Subscribe(ctx, "test", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true)) sub, err := b.Subscribe(ctx, "test", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup("test14"), broker.SubscribeBodyOnly(true))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -82,7 +90,12 @@ func TestPubSub(t *testing.T) {
}() }()
for { for {
fmt.Printf("processed %v\n", idx) if v := atomic.LoadInt64(&idx); v == 12637303 {
close(done)
break
} else {
fmt.Printf("processed %v\n", v)
}
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
<-done <-done

240
metrics.go Normal file
View File

@ -0,0 +1,240 @@
package kgo
/*
import (
"net"
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/unistack-org/micro/v3/meter"
)
type metrics struct {
meter meter.Meter
connects *prometheus.CounterVec
connectErrs *prometheus.CounterVec
disconnects *prometheus.CounterVec
writeErrs *prometheus.CounterVec
writeBytes *prometheus.CounterVec
writeWaits *prometheus.HistogramVec
writeTimings *prometheus.HistogramVec
readErrs *prometheus.CounterVec
readBytes *prometheus.CounterVec
readWaits *prometheus.HistogramVec
readTimings *prometheus.HistogramVec
throttles *prometheus.HistogramVec
produceBatchesUncompressed *prometheus.CounterVec
produceBatchesCompressed *prometheus.CounterVec
fetchBatchesUncompressed *prometheus.CounterVec
fetchBatchesCompressed *prometheus.CounterVec
}
var (
_ kgo.HookBrokerConnect = &metrics{}
_ kgo.HookBrokerDisconnect = &metrics{}
_ kgo.HookBrokerE2E = &metrics{}
_ kgo.HookBrokerRead = &metrics{}
_ kgo.HookBrokerThrottle = &metrics{}
_ kgo.HookBrokerWrite = &metrics{}
_ kgo.HookFetchBatchRead = &metrics{}
_ kgo.HookProduceBatchWritten = &metrics{}
/*
HookFetchRecordBuffered
HookFetchRecordUnbuffered
HookGroupManageError
HookNewClient
HookProduceRecordBuffered
HookProduceRecordUnbuffered
*/
/*
)
func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.connectErrs.WithLabelValues(node).Inc()
return
}
m.connects.WithLabelValues(node).Inc()
}
func (m *Metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
node := strconv.Itoa(int(meta.NodeID))
m.disconnects.WithLabelValues(node).Inc()
}
func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.writeErrs.WithLabelValues(node).Inc()
return
}
m.writeBytes.WithLabelValues(node).Add(float64(bytesWritten))
m.writeWaits.WithLabelValues(node).Observe(writeWait.Seconds())
m.writeTimings.WithLabelValues(node).Observe(timeToWrite.Seconds())
}
func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.readErrs.WithLabelValues(node).Inc()
return
}
m.readBytes.WithLabelValues(node).Add(float64(bytesRead))
m.readWaits.WithLabelValues(node).Observe(readWait.Seconds())
m.readTimings.WithLabelValues(node).Observe(timeToRead.Seconds())
}
func (m *Metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
node := strconv.Itoa(int(meta.NodeID))
m.throttles.WithLabelValues(node).Observe(throttleInterval.Seconds())
}
func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.ProduceBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
m.produceBatchesUncompressed.WithLabelValues(node, topic).Add(float64(metrics.UncompressedBytes))
m.produceBatchesCompressed.WithLabelValues(node, topic).Add(float64(metrics.CompressedBytes))
}
func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.FetchBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
m.fetchBatchesUncompressed.WithLabelValues(node, topic).Add(float64(metrics.UncompressedBytes))
m.fetchBatchesCompressed.WithLabelValues(node, topic).Add(float64(metrics.CompressedBytes))
}
func NewMetrics(namespace string) (m *Metrics) {
reg := prometheus.NewRegistry()
factory := promauto.With(reg)
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
reg.MustRegister(prometheus.NewGoCollector())
return &Metrics{
reg: reg,
// connects and disconnects
connects: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "connects_total",
Help: "Total number of connections opened, by broker",
}, []string{"node_id"}),
connectErrs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "connect_errors_total",
Help: "Total number of connection errors, by broker",
}, []string{"node_id"}),
disconnects: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "disconnects_total",
Help: "Total number of connections closed, by broker",
}, []string{"node_id"}),
// write
writeErrs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "write_errors_total",
Help: "Total number of write errors, by broker",
}, []string{"node_id"}),
writeBytes: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "write_bytes_total",
Help: "Total number of bytes written, by broker",
}, []string{"node_id"}),
writeWaits: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "write_wait_latencies",
Help: "Latency of time spent waiting to write to Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
writeTimings: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "write_latencies",
Help: "Latency of time spent writing to Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
// read
readErrs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "read_errors_total",
Help: "Total number of read errors, by broker",
}, []string{"node_id"}),
readBytes: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "read_bytes_total",
Help: "Total number of bytes read, by broker",
}, []string{"node_id"}),
readWaits: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "read_wait_latencies",
Help: "Latency of time spent waiting to read from Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
readTimings: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "read_latencies",
Help: "Latency of time spent reading from Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
// throttles
throttles: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "throttle_latencies",
Help: "Latency of Kafka request throttles, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
// produces & consumes
produceBatchesUncompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "produce_bytes_uncompressed_total",
Help: "Total number of uncompressed bytes produced, by broker and topic",
}, []string{"broker", "topic"}),
produceBatchesCompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "produce_bytes_compressed_total",
Help: "Total number of compressed bytes actually produced, by topic and partition",
}, []string{"topic", "partition"}),
fetchBatchesUncompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "fetch_bytes_uncompressed_total",
Help: "Total number of uncompressed bytes fetched, by topic and partition",
}, []string{"topic", "partition"}),
fetchBatchesCompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "fetch_bytes_compressed_total",
Help: "Total number of compressed bytes actually fetched, by topic and partition",
}, []string{"topic", "partition"}),
}
}
*/

View File

@ -11,6 +11,11 @@ import (
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
) )
var (
// DefaultCommitInterval specifies how fast send commit offsets to kafka
DefaultCommitInterval = 5 * time.Second
)
type subscribeContextKey struct{} type subscribeContextKey struct{}
// SubscribeContext set the context for broker.SubscribeOption // SubscribeContext set the context for broker.SubscribeOption
@ -100,18 +105,25 @@ func RequestRetries(n int) broker.Option {
return broker.SetOption(requestRetriesKey{}, n) return broker.SetOption(requestRetriesKey{}, n)
} }
type retryBackoffKey struct{} type retryBackoffFnKey struct{}
// RetryBackoff set backoff func for retry // RetryBackoffFn set backoff func for retry
func RetryBackoff(fn func(int) time.Duration) broker.Option { func RetryBackoffFn(fn func(int) time.Duration) broker.Option {
return broker.SetOption(retryBackoffKey{}, fn) return broker.SetOption(retryBackoffFnKey{}, fn)
} }
type retryTimeoutKey struct{} type retryTimeoutKey struct{}
// RetryTimeout limit retry timeout // RetryTimeout limit retry timeout
func RetryTimeout(fn func(int16) time.Duration) broker.Option { func RetryTimeout(td time.Duration) broker.Option {
return broker.SetOption(retryTimeoutKey{}, fn) return broker.SetOption(retryTimeoutKey{}, td)
}
type retryTimeoutFnKey struct{}
// RetryTimeoutFn set func limit retry timeout
func RetryTimeoutFn(fn func(int16) time.Duration) broker.Option {
return broker.SetOption(retryTimeoutFnKey{}, fn)
} }
type saslKey struct{} type saslKey struct{}
@ -134,3 +146,10 @@ type optionsKey struct{}
func Options(opts ...kgo.Opt) broker.Option { func Options(opts ...kgo.Opt) broker.Option {
return broker.SetOption(optionsKey{}, opts) return broker.SetOption(optionsKey{}, opts)
} }
type commitIntervalKey struct{}
// CommitInterval specifies interval to send commits
func CommitInterval(td time.Duration) broker.Option {
return broker.SetOption(commitIntervalKey{}, td)
}