diff --git a/broker_test.go b/broker_test.go index e2b287c..e9d55c7 100644 --- a/broker_test.go +++ b/broker_test.go @@ -14,19 +14,18 @@ import ( var ( bm = &broker.Message{ Header: map[string]string{"hkey": "hval"}, - Body: []byte("body"), + Body: []byte(`"body"`), } ) func TestPubSub(t *testing.T) { - t.Skip() - logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) - ctx := context.Background() - if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { t.Skip() } + logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) + ctx := context.Background() + var addrs []string if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { addrs = []string{"127.0.0.1:9092"} diff --git a/go.mod b/go.mod index 65e4477..cdf3918 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,5 @@ go 1.16 require ( github.com/google/uuid v1.2.0 github.com/segmentio/kafka-go v0.4.16 - github.com/unistack-org/micro/v3 v3.3.20 + github.com/unistack-org/micro/v3 v3.4.7 ) diff --git a/go.sum b/go.sum index 6694b8b..018b23a 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,8 @@ github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/unistack-org/micro/v3 v3.3.20 h1:gB+sPtvYuEKJQG/k5xnC1TK7MnZbr7wlgyqpYNREdyo= -github.com/unistack-org/micro/v3 v3.3.20/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= +github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0= +github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= diff --git a/options.go b/options.go index a2c5626..cc03ccf 100644 --- a/options.go +++ b/options.go @@ -2,6 +2,7 @@ package segmentio import ( "context" + "time" kafka "github.com/segmentio/kafka-go" "github.com/unistack-org/micro/v3/broker" @@ -9,8 +10,9 @@ import ( ) var ( - DefaultReaderConfig = kafka.WriterConfig{} - DefaultWriterConfig = kafka.ReaderConfig{} + DefaultReaderConfig = kafka.ReaderConfig{} + DefaultWriterConfig = kafka.WriterConfig{} + DefaultStatsInterval = time.Second * 10 ) type readerConfigKey struct{} @@ -52,3 +54,15 @@ func PublishKey(key []byte) broker.PublishOption { func ClientPublishKey(key []byte) client.PublishOption { return client.SetPublishOption(publishKey{}, key) } + +type statsIntervalKey struct{} + +func StatsInterval(td time.Duration) broker.Option { + return broker.SetOption(statsIntervalKey{}, td) +} + +type writerCompletionFunc struct{} + +func WriterCompletionFunc(fn func([]kafka.Message, error)) broker.Option { + return broker.SetOption(writerCompletionFunc{}, fn) +} diff --git a/segmentio.go b/segmentio.go index 8afdc97..5f17325 100644 --- a/segmentio.go +++ b/segmentio.go @@ -19,12 +19,12 @@ type kBroker struct { readerConfig kafka.ReaderConfig writerConfig kafka.WriterConfig - writers map[string]*kafka.Writer - + writer *kafka.Writer connected bool init bool sync.RWMutex - opts broker.Options + opts broker.Options + messages []kafka.Message } type subscriber struct { @@ -86,11 +86,14 @@ func (s *subscriber) Topic() string { func (s *subscriber) Unsubscribe(ctx context.Context) error { var err error s.Lock() - defer s.Unlock() - if s.group != nil { - err = s.group.Close() - } s.closed = true + group := s.group + close(s.done) + s.Unlock() + + if group != nil { + err = group.Close() + } return err } @@ -145,9 +148,49 @@ func (k *kBroker) Connect(ctx context.Context) error { k.connected = true k.Unlock() + td := DefaultStatsInterval + if v, ok := k.opts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { + td = v + } + + go writerStats(k.opts.Context, k.writer, td, k.opts.Meter) + + if k.writer.Async { + go k.writeLoop() + } + return nil } +func (k *kBroker) writeLoop() { + var err error + + ticker := time.NewTicker(k.writer.BatchTimeout) + defer ticker.Stop() + + for { + select { + case <-k.opts.Context.Done(): + return + case <-ticker.C: + k.RLock() + if len(k.messages) != 0 { + err = k.writer.WriteMessages(k.opts.Context, k.messages...) + } + k.RUnlock() + if err == nil { + k.Lock() + k.messages = k.messages[0:0] + k.Unlock() + } else { + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] publish error %v", err) + } + } + } + } +} + func (k *kBroker) Disconnect(ctx context.Context) error { k.RLock() if !k.connected { @@ -158,10 +201,8 @@ func (k *kBroker) Disconnect(ctx context.Context) error { k.Lock() defer k.Unlock() - for _, writer := range k.writers { - if err := writer.Close(); err != nil { - return err - } + if err := k.writer.Close(); err != nil { + return err } k.connected = false @@ -180,7 +221,6 @@ func (k *kBroker) Options() broker.Options { } func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { - var cached bool var val []byte var err error @@ -194,73 +234,25 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message return err } } - kmsg := kafka.Message{Value: val} + kmsg := kafka.Message{Topic: topic, Value: val} if options.Context != nil { if key, ok := options.Context.Value(publishKey{}).([]byte); ok && len(key) > 0 { kmsg.Key = key } } - k.Lock() - writer, ok := k.writers[topic] - if !ok { - cfg := k.writerConfig - cfg.Topic = topic - if err = cfg.Validate(); err != nil { - k.Unlock() - return err - } - writer = kafka.NewWriter(cfg) - k.writers[topic] = writer - } else { - cached = true + if k.writer.Async { + k.Lock() + k.messages = append(k.messages, kmsg) + k.Unlock() + return nil } - k.Unlock() + wCtx := k.opts.Context if ctx != nil { wCtx = ctx } - err = writer.WriteMessages(wCtx, kmsg) - if err != nil { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "write message err: %v", err) - } - switch cached { - case false: - // non cached case, we can try to wait on some errors, but not timeout - if kerr, ok := err.(kafka.Error); ok { - if kerr.Temporary() && !kerr.Timeout() { - // additional chanse to publish message - time.Sleep(200 * time.Millisecond) - err = writer.WriteMessages(wCtx, kmsg) - } - } - case true: - // cached case, try to recreate writer and try again after that - k.Lock() - // close older writer to free memory - if err = writer.Close(); err != nil { - k.Unlock() - return err - } - delete(k.writers, topic) - k.Unlock() - - cfg := k.writerConfig - cfg.Topic = topic - if err = cfg.Validate(); err != nil { - return err - } - writer := kafka.NewWriter(cfg) - if err = writer.WriteMessages(wCtx, kmsg); err == nil { - k.Lock() - k.writers[topic] = writer - k.Unlock() - } - } - } - - return err + return k.writer.WriteMessages(wCtx, kmsg) } func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { @@ -279,8 +271,12 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha WatchPartitionChanges: true, Brokers: k.readerConfig.Brokers, Topics: []string{topic}, - GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}}, + GroupBalancers: k.readerConfig.GroupBalancers, + StartOffset: k.readerConfig.StartOffset, + Logger: k.readerConfig.Logger, + ErrorLogger: k.readerConfig.ErrorLogger, } + cgcfg.StartOffset = kafka.LastOffset if err := cgcfg.Validate(); err != nil { return nil, err } @@ -290,23 +286,16 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha return nil, err } - sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg} + sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, group: cgroup, cgcfg: cgcfg, done: make(chan struct{})} go func() { for { select { + case <-sub.done: + return case <-ctx.Done(): - sub.RLock() - closed := sub.closed - sub.RUnlock() - if closed { - // unsubcribed and closed - return - } // unexpected context closed - if k.opts.Context.Err() != nil { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err()) - } + if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err()) } return case <-k.opts.Context.Done(): @@ -318,16 +307,18 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha return } // unexpected context closed - if k.opts.Context.Err() != nil { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err()) - } + if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] context closed unexpected %v", k.opts.Context.Err()) } return default: sub.RLock() group := sub.group + closed := sub.closed sub.RUnlock() + if closed { + return + } gCtx := k.opts.Context if ctx != nil { gCtx = ctx @@ -341,18 +332,17 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha closed := sub.closed sub.RUnlock() if !closed { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err()) + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed %v", k.opts.Context.Err()) } - if err = group.Close(); err != nil { - if k.opts.Logger.V(logger.TraceLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] consumer group close error %v", err) - } + if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) continue } sub.createGroup(gCtx) + continue } - continue + return default: sub.RLock() closed := sub.closed @@ -362,10 +352,8 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %v", err) } } - if err = group.Close(); err != nil { - if k.opts.Logger.V(logger.ErrorLevel) { - k.opts.Logger.Tracef(k.opts.Context, "[segmentio] consumer group close error %v", err) - } + if err = group.Close(); err != nil && k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) } sub.createGroup(k.opts.Context) continue @@ -410,7 +398,18 @@ func (h *cgHandler) run(ctx context.Context) { offsets := make(map[string]map[int]int64) offsets[h.reader.Config().Topic] = make(map[int]int64) - defer h.reader.Close() + td := DefaultStatsInterval + if v, ok := h.brokerOpts.Context.Value(statsIntervalKey{}).(time.Duration); ok && td > 0 { + td = v + } + + go readerStats(ctx, h.reader, td, h.brokerOpts.Meter) + + defer func() { + if err := h.reader.Close(); err != nil && h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] reader close error: %v", err) + } + }() for { select { case <-ctx.Done(): @@ -419,8 +418,8 @@ func (h *cgHandler) run(ctx context.Context) { msg, err := h.reader.ReadMessage(ctx) switch err { default: - if h.brokerOpts.Logger.V(logger.TraceLevel) { - h.brokerOpts.Logger.Tracef(h.brokerOpts.Context, "[segmentio] unexpected error: %v", err) + if h.brokerOpts.Logger.V(logger.ErrorLevel) { + h.brokerOpts.Logger.Errorf(h.brokerOpts.Context, "[segmentio] unexpected error: %v", err) } return case kafka.ErrGenerationEnded: @@ -538,7 +537,7 @@ func (k *kBroker) configure(opts ...broker.Option) error { cAddrs = []string{"127.0.0.1:9092"} } - readerConfig := kafka.ReaderConfig{} + readerConfig := DefaultReaderConfig if cfg, ok := k.opts.Context.Value(readerConfigKey{}).(kafka.ReaderConfig); ok { readerConfig = cfg } @@ -547,7 +546,7 @@ func (k *kBroker) configure(opts ...broker.Option) error { } readerConfig.WatchPartitionChanges = true - writerConfig := kafka.WriterConfig{CompressionCodec: nil, BatchSize: 1} + writerConfig := DefaultWriterConfig if cfg, ok := k.opts.Context.Value(writerConfigKey{}).(kafka.WriterConfig); ok { writerConfig = cfg } @@ -555,8 +554,28 @@ func (k *kBroker) configure(opts ...broker.Option) error { writerConfig.Brokers = cAddrs } k.addrs = cAddrs - k.writerConfig = writerConfig k.readerConfig = readerConfig + k.writer = &kafka.Writer{ + Addr: kafka.TCP(k.addrs...), + Balancer: writerConfig.Balancer, + MaxAttempts: writerConfig.MaxAttempts, + BatchSize: writerConfig.BatchSize, + BatchBytes: int64(writerConfig.BatchBytes), + BatchTimeout: writerConfig.BatchTimeout, + ReadTimeout: writerConfig.ReadTimeout, + WriteTimeout: writerConfig.WriteTimeout, + RequiredAcks: kafka.RequiredAcks(writerConfig.RequiredAcks), + Async: writerConfig.Async, + //Completion: writerConfig.Completion, + //Compression: writerConfig.Compression, + Logger: writerConfig.Logger, + ErrorLogger: writerConfig.ErrorLogger, + //Transport: writerConfig.Transport, + } + + if fn, ok := k.opts.Context.Value(writerCompletionFunc{}).(func([]kafka.Message, error)); ok { + k.writer.Completion = fn + } k.init = true return nil @@ -564,7 +583,6 @@ func (k *kBroker) configure(opts ...broker.Option) error { func NewBroker(opts ...broker.Option) broker.Broker { return &kBroker{ - writers: make(map[string]*kafka.Writer), - opts: broker.NewOptions(opts...), + opts: broker.NewOptions(opts...), } } diff --git a/segmentio_test.go b/segmentio_test.go index 3d3bf24..e8cf849 100644 --- a/segmentio_test.go +++ b/segmentio_test.go @@ -43,6 +43,7 @@ func TestSegmentioSubscribe(t *testing.T) { done := make(chan struct{}, 100) fn := func(msg broker.Event) error { if err := msg.Ack(); err != nil { + panic(err) return err } done <- struct{}{} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..de84350 --- /dev/null +++ b/stats.go @@ -0,0 +1,120 @@ +package segmentio + +import ( + "context" + "time" + + kafka "github.com/segmentio/kafka-go" + "github.com/unistack-org/micro/v3/meter" +) + +func readerStats(ctx context.Context, r *kafka.Reader, td time.Duration, m meter.Meter) { + ticker := time.NewTicker(td) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if r == nil { + return + } + rstats := r.Stats() + labels := []string{"topic", rstats.Topic, "partition", rstats.Partition, "client_id", rstats.ClientID} + + m.Counter("broker_reader_dial_count", labels...).Add(int(rstats.Dials)) + m.Counter("broker_reader_fetch_count", labels...).Add(int(rstats.Fetches)) + m.Counter("broker_reader_message_count", labels...).Add(int(rstats.Messages)) + m.Counter("broker_reader_message_bytes", labels...).Add(int(rstats.Bytes)) + m.Counter("broker_reader_rebalance_count", labels...).Add(int(rstats.Rebalances)) + m.Counter("broker_reader_timeout_count", labels...).Add(int(rstats.Timeouts)) + m.Counter("broker_reader_error", labels...).Add(int(rstats.Errors)) + + /* + m.Counter("broker_reader_dial_seconds_avg", labels...).Add(uint64(rstats.DialTime.Avg)) + m.Counter("broker_reader_dial_seconds_min", labels...).Add(uint64(rstats.DialTime.Min)) + m.Counter("broker_reader_dial_seconds_max", labels...).Add(uint64(rstats.DialTime.Max)) + m.Counter("broker_reader_read_seconds_avg", labels...).Add(uint64(rstats.ReadTime.Avg)) + m.Counter("broker_reader_read_seconds_min", labels...).Add(uint64(rstats.ReadTime.Min)) + m.Counter("broker_reader_read_seconds_max", labels...).Add(uint64(rstats.ReadTime.Max)) + m.Counter("broker_reader_wait_seconds_avg", labels...).Add(uint64(rstats.WaitTime.Avg)) + m.Counter("broker_reader_wait_seconds_min", labels...).Add(uint64(rstats.WaitTime.Min)) + m.Counter("broker_reader_wait_seconds_max", labels...).Add(uint64(rstats.WaitTime.Max)) + */ + /* + m.Counter("broker_reader_fetch_size_avg", labels...).Add(uint64(rstats.FetchSize.Avg)) + m.Counter("broker_reader_fetch_size_min", labels...).Set(uint64(rstats.FetchSize.Min)) + m.Counter("broker_reader_fetch_size_max", labels...).Set(uint64(rstats.FetchSize.Max)) + m.Counter("broker_reader_fetch_bytes_avg", labels...).Set(uint64(rstats.FetchBytes.Avg)) + m.Counter("broker_reader_fetch_bytes_min", labels...).Set(uint64(rstats.FetchBytes.Min)) + m.Counter("broker_reader_fetch_bytes_max", labels...).Set(uint64(rstats.FetchBytes.Max)) + */ + + m.Counter("broker_reader_offset", labels...).Set(uint64(rstats.Offset)) + m.Counter("broker_reader_lag", labels...).Set(uint64(rstats.Lag)) + m.Counter("broker_reader_fetch_bytes_min", labels...).Set(uint64(rstats.MinBytes)) + m.Counter("broker_reader_fetch_bytes_max", labels...).Set(uint64(rstats.MaxBytes)) + m.Counter("broker_reader_fetch_wait_max", labels...).Set(uint64(rstats.MaxWait)) + m.Counter("broker_reader_queue_length", labels...).Set(uint64(rstats.QueueLength)) + m.Counter("broker_reader_queue_capacity", labels...).Set(uint64(rstats.QueueCapacity)) + } + } +} + +func writerStats(ctx context.Context, w *kafka.Writer, td time.Duration, m meter.Meter) { + ticker := time.NewTicker(td) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if w == nil { + return + } + wstats := w.Stats() + labels := []string{} + + m.Counter("broker_writer_write_count", labels...).Add(int(wstats.Writes)) + m.Counter("broker_writer_message_count", labels...).Add(int(wstats.Messages)) + m.Counter("broker_writer_message_bytes", labels...).Add(int(wstats.Bytes)) + m.Counter("broker_writer_error_count", labels...).Add(int(wstats.Errors)) + + /* + m.Counter("broker_writer_batch_seconds_avg", labels...).Set(uint64(wstats.BatchTime.Avg)) + m.Counter("broker_writer_batch_seconds_min", labels...).Set(uint64(wstats.BatchTime.Min)) + m.Counter("broker_writer_batch_seconds_max", labels...).Set(uint64(wstats.BatchTime.Max)) + m.Counter("broker_writer_write_seconds_avg", labels...).Set(uint64(wstats.WriteTime.Avg)) + m.Counter("broker_writer_write_seconds_min", labels...).Set(uint64(wstats.WriteTime.Min)) + m.Counter("broker_writer_write_seconds_max", labels...).Set(uint64(wstats.WriteTime.Max)) + m.Counter("broker_writer_wait_seconds_avg", labels...).Set(uint64(wstats.WaitTime.Avg)) + m.Counter("broker_writer_wait_seconds_min", labels...).Set(uint64(wstats.WaitTime.Min)) + m.Counter("broker_writer_wait_seconds_max", labels...).Set(uint64(wstats.WaitTime.Max)) + + m.Counter("broker_writer_retries_count_avg", labels...).Set(uint64(wstats.Retries.Avg)) + m.Counter("broker_writer_retries_count_min", labels...).Set(uint64(wstats.Retries.Min)) + m.Counter("broker_writer_retries_count_max", labels...).Set(uint64(wstats.Retries.Max)) + m.Counter("broker_writer_batch_size_avg", labels...).Set(uint64(wstats.BatchSize.Avg)) + m.Counter("broker_writer_batch_size_min", labels...).Set(uint64(wstats.BatchSize.Min)) + m.Counter("broker_writer_batch_size_max", labels...).Set(uint64(wstats.BatchSize.Max)) + m.Counter("broker_writer_batch_bytes_avg", labels...).Set(uint64(wstats.BatchBytes.Avg)) + m.Counter("broker_writer_batch_bytes_min", labels...).Set(uint64(wstats.BatchBytes.Min)) + m.Counter("broker_writer_batch_bytes_max", labels...).Set(uint64(wstats.BatchBytes.Max)) + */ + + m.Counter("broker_writer_attempts_max", labels...).Set(uint64(wstats.MaxAttempts)) + m.Counter("broker_writer_batch_max", labels...).Set(uint64(wstats.MaxBatchSize)) + m.Counter("broker_writer_batch_timeout", labels...).Set(uint64(wstats.BatchTimeout)) + m.Counter("broker_writer_read_timeout", labels...).Set(uint64(wstats.ReadTimeout)) + m.Counter("broker_writer_write_timeout", labels...).Set(uint64(wstats.WriteTimeout)) + m.Counter("broker_writer_acks_required", labels...).Set(uint64(wstats.RequiredAcks)) + if wstats.Async { + m.Counter("broker_writer_async", labels...).Set(1) + } else { + m.Counter("broker_writer_async", labels...).Set(0) + } + } + } +}