add stats metrics

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-08 14:11:14 +03:00
parent 502b8c199b
commit 17404ee935
7 changed files with 266 additions and 114 deletions

View File

@ -14,19 +14,18 @@ import (
var (
bm = &broker.Message{
Header: map[string]string{"hkey": "hval"},
Body: []byte("body"),
Body: []byte(`"body"`),
}
)
func TestPubSub(t *testing.T) {
t.Skip()
logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel))
ctx := context.Background()
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
t.Skip()
}
logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel))
ctx := context.Background()
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}

2
go.mod
View File

@ -5,5 +5,5 @@ go 1.16
require (
github.com/google/uuid v1.2.0
github.com/segmentio/kafka-go v0.4.16
github.com/unistack-org/micro/v3 v3.3.20
github.com/unistack-org/micro/v3 v3.4.7
)

4
go.sum
View File

@ -31,8 +31,8 @@ github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/unistack-org/micro/v3 v3.3.20 h1:gB+sPtvYuEKJQG/k5xnC1TK7MnZbr7wlgyqpYNREdyo=
github.com/unistack-org/micro/v3 v3.3.20/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0=
github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=

View File

@ -2,6 +2,7 @@ package segmentio
import (
"context"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/unistack-org/micro/v3/broker"
@ -9,8 +10,9 @@ import (
)
var (
DefaultReaderConfig = kafka.WriterConfig{}
DefaultWriterConfig = kafka.ReaderConfig{}
DefaultReaderConfig = kafka.ReaderConfig{}
DefaultWriterConfig = kafka.WriterConfig{}
DefaultStatsInterval = time.Second * 10
)
type readerConfigKey struct{}
@ -52,3 +54,15 @@ func PublishKey(key []byte) broker.PublishOption {
func ClientPublishKey(key []byte) client.PublishOption {
return client.SetPublishOption(publishKey{}, key)
}
type statsIntervalKey struct{}
func StatsInterval(td time.Duration) broker.Option {
return broker.SetOption(statsIntervalKey{}, td)
}
type writerCompletionFunc struct{}
func WriterCompletionFunc(fn func([]kafka.Message, error)) broker.Option {
return broker.SetOption(writerCompletionFunc{}, fn)
}

View File

@ -19,12 +19,12 @@ type kBroker struct {
readerConfig kafka.ReaderConfig
writerConfig kafka.WriterConfig
writers map[string]*kafka.Writer
writer *kafka.Writer
connected bool
init bool
sync.RWMutex
opts broker.Options
opts broker.Options
messages []kafka.Message
}
type subscriber struct {
@ -86,11 +86,14 @@ func (s *subscriber) Topic() string {
func (s *subscriber) Unsubscribe(ctx context.Context) error {
var err error
s.Lock()
defer s.Unlock()
if s.group != nil {
err = s.group.Close()
}
s.closed = true
group := s.group
close(s.done)
s.Unlock()
if group != nil {
err = group.Close()
}
return err
}
@ -145,9 +148,49 @@ func (k *kBroker) Connect(ctx context.Context) error {
k.connected = true
k.Unlock()
td := DefaultStatsInterval
if v, ok := k.opts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 {
td = v
}
go writerStats(k.opts.Context, k.writer, td, k.opts.Meter)
if k.writer.Async {
go k.writeLoop()
}
return nil
}
func (k *kBroker) writeLoop() {
var err error
ticker := time.NewTicker(k.writer.BatchTimeout)
defer ticker.Stop()
for {
select {
case <-k.opts.Context.Done():
return
case <-ticker.C:
k.RLock()
if len(k.messages) != 0 {
err = k.writer.WriteMessages(k.opts.Context, k.messages...)
}
k.RUnlock()
if err == nil {
k.Lock()
k.messages = k.messages[0:0]
k.Unlock()
} else {
if k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] publish error %v", err)
}
}
}
}
}
func (k *kBroker) Disconnect(ctx context.Context) error {
k.RLock()
if !k.connected {
@ -158,10 +201,8 @@ func (k *kBroker) Disconnect(ctx context.Context) error {
k.Lock()
defer k.Unlock()
for _, writer := range k.writers {
if err := writer.Close(); err != nil {
return err
}
if err := k.writer.Close(); err != nil {
return err
}
k.connected = false
@ -180,7 +221,6 @@ func (k *kBroker) Options() broker.Options {
}
func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
var cached bool
var val []byte
var err error
@ -194,73 +234,25 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
return err
}
}
kmsg := kafka.Message{Value: val}
kmsg := kafka.Message{Topic: topic, Value: val}
if options.Context != nil {
if key, ok := options.Context.Value(publishKey{}).([]byte); ok && len(key) > 0 {
kmsg.Key = key
}
}
k.Lock()
writer, ok := k.writers[topic]
if !ok {
cfg := k.writerConfig
cfg.Topic = topic
if err = cfg.Validate(); err != nil {
k.Unlock()
return err
}
writer = kafka.NewWriter(cfg)
k.writers[topic] = writer
} else {
cached = true
if k.writer.Async {
k.Lock()
k.messages = append(k.messages, kmsg)
k.Unlock()
return nil
}
k.Unlock()
wCtx := k.opts.Context
if ctx != nil {
wCtx = ctx
}
err = writer.WriteMessages(wCtx, kmsg)
if err != nil {
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "write message err: %v", err)
}
switch cached {
case false:
// non cached case, we can try to wait on some errors, but not timeout
if kerr, ok := err.(kafka.Error); ok {
if kerr.Temporary() && !kerr.Timeout() {
// additional chanse to publish message
time.Sleep(200 * time.Millisecond)
err = writer.WriteMessages(wCtx, kmsg)
}
}
case true:
// cached case, try to recreate writer and try again after that
k.Lock()
// close older writer to free memory
if err = writer.Close(); err != nil {
k.Unlock()
return err
}
delete(k.writers, topic)
k.Unlock()
cfg := k.writerConfig
cfg.Topic = topic
if err = cfg.Validate(); err != nil {
return err
}
writer := kafka.NewWriter(cfg)
if err = writer.WriteMessages(wCtx, kmsg); err == nil {
k.Lock()
k.writers[topic] = writer
k.Unlock()
}
}
}
return err
return k.writer.WriteMessages(wCtx, kmsg)
}
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
@ -279,8 +271,12 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
WatchPartitionChanges: true,
Brokers: k.readerConfig.Brokers,
Topics: []string{topic},
GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}},
GroupBalancers: k.readerConfig.GroupBalancers,
StartOffset: k.readerConfig.StartOffset,
Logger: k.readerConfig.Logger,
ErrorLogger: k.readerConfig.ErrorLogger,
}
cgcfg.StartOffset = kafka.LastOffset
if err := cgcfg.Validate(); err != nil {
return nil, err
}
@ -290,23 +286,16 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
return nil, err
}
sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg}
sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg, done: make(chan struct{})}
go func() {
for {
select {
case <-sub.done:
return
case <-ctx.Done():
sub.RLock()
closed := sub.closed
sub.RUnlock()
if closed {
// unsubcribed and closed
return
}
// unexpected context closed
if k.opts.Context.Err() != nil {
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err())
}
if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err())
}
return
case <-k.opts.Context.Done():
@ -318,16 +307,18 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
return
}
// unexpected context closed
if k.opts.Context.Err() != nil {
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err())
}
if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err())
}
return
default:
sub.RLock()
group := sub.group
closed := sub.closed
sub.RUnlock()
if closed {
return
}
gCtx := k.opts.Context
if ctx != nil {
gCtx = ctx
@ -341,18 +332,17 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
closed := sub.closed
sub.RUnlock()
if !closed {
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err())
if k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err())
}
if err = group.Close(); err != nil {
if k.opts.Logger.V(logger.TraceLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] consumer group close error %v", err)
}
if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err)
continue
}
sub.createGroup(gCtx)
continue
}
continue
return
default:
sub.RLock()
closed := sub.closed
@ -362,10 +352,8 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %v", err)
}
}
if err = group.Close(); err != nil {
if k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] consumer group close error %v", err)
}
if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) {
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err)
}
sub.createGroup(k.opts.Context)
continue
@ -410,7 +398,18 @@ func (h *cgHandler) run(ctx context.Context) {
offsets := make(map[string]map[int]int64)
offsets[h.reader.Config().Topic] = make(map[int]int64)
defer h.reader.Close()
td := DefaultStatsInterval
if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 {
td = v
}
go readerStats(ctx, h.reader, td, h.brokerOpts.Meter)
defer func() {
if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err)
}
}()
for {
select {
case <-ctx.Done():
@ -419,8 +418,8 @@ func (h *cgHandler) run(ctx context.Context) {
msg, err := h.reader.ReadMessage(ctx)
switch err {
default:
if h.brokerOpts.Logger.V(logger.TraceLevel) {
h.brokerOpts.Logger.Tracef(h.brokerOpts.Context, "[segmentio] unexpected error: %v", err)
if h.brokerOpts.Logger.V(logger.ErrorLevel) {
h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error: %v", err)
}
return
case kafka.ErrGenerationEnded:
@ -538,7 +537,7 @@ func (k *kBroker) configure(opts ...broker.Option) error {
cAddrs = []string{"127.0.0.1:9092"}
}
readerConfig := kafka.ReaderConfig{}
readerConfig := DefaultReaderConfig
if cfg, ok := k.opts.Context.Value(readerConfigKey{}).(kafka.ReaderConfig); ok {
readerConfig = cfg
}
@ -547,7 +546,7 @@ func (k *kBroker) configure(opts ...broker.Option) error {
}
readerConfig.WatchPartitionChanges = true
writerConfig := kafka.WriterConfig{CompressionCodec: nil, BatchSize: 1}
writerConfig := DefaultWriterConfig
if cfg, ok := k.opts.Context.Value(writerConfigKey{}).(kafka.WriterConfig); ok {
writerConfig = cfg
}
@ -555,8 +554,28 @@ func (k *kBroker) configure(opts ...broker.Option) error {
writerConfig.Brokers = cAddrs
}
k.addrs = cAddrs
k.writerConfig = writerConfig
k.readerConfig = readerConfig
k.writer = &kafka.Writer{
Addr: kafka.TCP(k.addrs...),
Balancer: writerConfig.Balancer,
MaxAttempts: writerConfig.MaxAttempts,
BatchSize: writerConfig.BatchSize,
BatchBytes: int64(writerConfig.BatchBytes),
BatchTimeout: writerConfig.BatchTimeout,
ReadTimeout: writerConfig.ReadTimeout,
WriteTimeout: writerConfig.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(writerConfig.RequiredAcks),
Async: writerConfig.Async,
//Completion: writerConfig.Completion,
//Compression: writerConfig.Compression,
Logger: writerConfig.Logger,
ErrorLogger: writerConfig.ErrorLogger,
//Transport: writerConfig.Transport,
}
if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok {
k.writer.Completion = fn
}
k.init = true
return nil
@ -564,7 +583,6 @@ func (k *kBroker) configure(opts ...broker.Option) error {
func NewBroker(opts ...broker.Option) broker.Broker {
return &kBroker{
writers: make(map[string]*kafka.Writer),
opts: broker.NewOptions(opts...),
opts: broker.NewOptions(opts...),
}
}

View File

@ -43,6 +43,7 @@ func TestSegmentioSubscribe(t *testing.T) {
done := make(chan struct{}, 100)
fn := func(msg broker.Event) error {
if err := msg.Ack(); err != nil {
panic(err)
return err
}
done <- struct{}{}

120
stats.go Normal file
View File

@ -0,0 +1,120 @@
package segmentio
import (
"context"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/unistack-org/micro/v3/meter"
)
func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter.Meter) {
ticker := time.NewTicker(td)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if r == nil {
return
}
rstats := r.Stats()
labels := []string{"topic", rstats.Topic, "partition", rstats.Partition, "client_id", rstats.ClientID}
m.Counter("broker_reader_dial_count", labels...).Add(int(rstats.Dials))
m.Counter("broker_reader_fetch_count", labels...).Add(int(rstats.Fetches))
m.Counter("broker_reader_message_count", labels...).Add(int(rstats.Messages))
m.Counter("broker_reader_message_bytes", labels...).Add(int(rstats.Bytes))
m.Counter("broker_reader_rebalance_count", labels...).Add(int(rstats.Rebalances))
m.Counter("broker_reader_timeout_count", labels...).Add(int(rstats.Timeouts))
m.Counter("broker_reader_error", labels...).Add(int(rstats.Errors))
/*
m.Counter("broker_reader_dial_seconds_avg", labels...).Add(uint64(rstats.DialTime.Avg))
m.Counter("broker_reader_dial_seconds_min", labels...).Add(uint64(rstats.DialTime.Min))
m.Counter("broker_reader_dial_seconds_max", labels...).Add(uint64(rstats.DialTime.Max))
m.Counter("broker_reader_read_seconds_avg", labels...).Add(uint64(rstats.ReadTime.Avg))
m.Counter("broker_reader_read_seconds_min", labels...).Add(uint64(rstats.ReadTime.Min))
m.Counter("broker_reader_read_seconds_max", labels...).Add(uint64(rstats.ReadTime.Max))
m.Counter("broker_reader_wait_seconds_avg", labels...).Add(uint64(rstats.WaitTime.Avg))
m.Counter("broker_reader_wait_seconds_min", labels...).Add(uint64(rstats.WaitTime.Min))
m.Counter("broker_reader_wait_seconds_max", labels...).Add(uint64(rstats.WaitTime.Max))
*/
/*
m.Counter("broker_reader_fetch_size_avg", labels...).Add(uint64(rstats.FetchSize.Avg))
m.Counter("broker_reader_fetch_size_min", labels...).Set(uint64(rstats.FetchSize.Min))
m.Counter("broker_reader_fetch_size_max", labels...).Set(uint64(rstats.FetchSize.Max))
m.Counter("broker_reader_fetch_bytes_avg", labels...).Set(uint64(rstats.FetchBytes.Avg))
m.Counter("broker_reader_fetch_bytes_min", labels...).Set(uint64(rstats.FetchBytes.Min))
m.Counter("broker_reader_fetch_bytes_max", labels...).Set(uint64(rstats.FetchBytes.Max))
*/
m.Counter("broker_reader_offset", labels...).Set(uint64(rstats.Offset))
m.Counter("broker_reader_lag", labels...).Set(uint64(rstats.Lag))
m.Counter("broker_reader_fetch_bytes_min", labels...).Set(uint64(rstats.MinBytes))
m.Counter("broker_reader_fetch_bytes_max", labels...).Set(uint64(rstats.MaxBytes))
m.Counter("broker_reader_fetch_wait_max", labels...).Set(uint64(rstats.MaxWait))
m.Counter("broker_reader_queue_length", labels...).Set(uint64(rstats.QueueLength))
m.Counter("broker_reader_queue_capacity", labels...).Set(uint64(rstats.QueueCapacity))
}
}
}
func writerStats(ctx context.Context, w *kafka.Writer, td time.Duration, m meter.Meter) {
ticker := time.NewTicker(td)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if w == nil {
return
}
wstats := w.Stats()
labels := []string{}
m.Counter("broker_writer_write_count", labels...).Add(int(wstats.Writes))
m.Counter("broker_writer_message_count", labels...).Add(int(wstats.Messages))
m.Counter("broker_writer_message_bytes", labels...).Add(int(wstats.Bytes))
m.Counter("broker_writer_error_count", labels...).Add(int(wstats.Errors))
/*
m.Counter("broker_writer_batch_seconds_avg", labels...).Set(uint64(wstats.BatchTime.Avg))
m.Counter("broker_writer_batch_seconds_min", labels...).Set(uint64(wstats.BatchTime.Min))
m.Counter("broker_writer_batch_seconds_max", labels...).Set(uint64(wstats.BatchTime.Max))
m.Counter("broker_writer_write_seconds_avg", labels...).Set(uint64(wstats.WriteTime.Avg))
m.Counter("broker_writer_write_seconds_min", labels...).Set(uint64(wstats.WriteTime.Min))
m.Counter("broker_writer_write_seconds_max", labels...).Set(uint64(wstats.WriteTime.Max))
m.Counter("broker_writer_wait_seconds_avg", labels...).Set(uint64(wstats.WaitTime.Avg))
m.Counter("broker_writer_wait_seconds_min", labels...).Set(uint64(wstats.WaitTime.Min))
m.Counter("broker_writer_wait_seconds_max", labels...).Set(uint64(wstats.WaitTime.Max))
m.Counter("broker_writer_retries_count_avg", labels...).Set(uint64(wstats.Retries.Avg))
m.Counter("broker_writer_retries_count_min", labels...).Set(uint64(wstats.Retries.Min))
m.Counter("broker_writer_retries_count_max", labels...).Set(uint64(wstats.Retries.Max))
m.Counter("broker_writer_batch_size_avg", labels...).Set(uint64(wstats.BatchSize.Avg))
m.Counter("broker_writer_batch_size_min", labels...).Set(uint64(wstats.BatchSize.Min))
m.Counter("broker_writer_batch_size_max", labels...).Set(uint64(wstats.BatchSize.Max))
m.Counter("broker_writer_batch_bytes_avg", labels...).Set(uint64(wstats.BatchBytes.Avg))
m.Counter("broker_writer_batch_bytes_min", labels...).Set(uint64(wstats.BatchBytes.Min))
m.Counter("broker_writer_batch_bytes_max", labels...).Set(uint64(wstats.BatchBytes.Max))
*/
m.Counter("broker_writer_attempts_max", labels...).Set(uint64(wstats.MaxAttempts))
m.Counter("broker_writer_batch_max", labels...).Set(uint64(wstats.MaxBatchSize))
m.Counter("broker_writer_batch_timeout", labels...).Set(uint64(wstats.BatchTimeout))
m.Counter("broker_writer_read_timeout", labels...).Set(uint64(wstats.ReadTimeout))
m.Counter("broker_writer_write_timeout", labels...).Set(uint64(wstats.WriteTimeout))
m.Counter("broker_writer_acks_required", labels...).Set(uint64(wstats.RequiredAcks))
if wstats.Async {
m.Counter("broker_writer_async", labels...).Set(1)
} else {
m.Counter("broker_writer_async", labels...).Set(0)
}
}
}
}