broker/segmentio: speedup publish

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-06-18 01:38:54 +03:00
parent d9c3a34d39
commit 9b9387d916
2 changed files with 52 additions and 10 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/broker"
@ -182,24 +183,65 @@ func (k *kBroker) Options() broker.Options {
} }
func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
var cached bool
buf, err := k.opts.Codec.Marshal(msg) buf, err := k.opts.Codec.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
cfg := k.writerConfig kmsg := kafka.Message{Value: buf}
cfg.Topic = topic
if err = cfg.Validate(); err != nil {
return err
}
writer := kafka.NewWriter(cfg)
err = writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf}) k.Lock()
writer, ok := k.writers[topic]
if !ok {
cfg := k.writerConfig
cfg.Topic = topic
if err = cfg.Validate(); err != nil {
k.Unlock()
return err
}
writer = kafka.NewWriter(cfg)
k.writers[topic] = writer
} else {
cached = true
}
k.Unlock()
err = writer.WriteMessages(k.opts.Context, kmsg)
if err != nil { if err != nil {
return err switch cached {
case false:
// non cached case, we can try to wait on some errors, but not timeout
if kerr, ok := err.(kafka.Error); ok {
if kerr.Temporary() && !kerr.Timeout() {
// additional chanse to publish message
time.Sleep(200 * time.Millisecond)
err = writer.WriteMessages(k.opts.Context, kmsg)
}
}
case true:
// cached case, try to recreate writer and try again after that
k.Lock()
// close older writer to free memory
if err = writer.Close(); err != nil {
k.Unlock()
return err
}
cfg := k.writerConfig
cfg.Topic = topic
if err = cfg.Validate(); err != nil {
k.Unlock()
return err
}
writer := kafka.NewWriter(cfg)
k.writers[topic] = writer
k.Unlock()
err = writer.WriteMessages(k.opts.Context, kmsg)
}
} }
return writer.Close() return err
} }
func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {

View File

@ -12,7 +12,7 @@ import (
) )
func BenchmarkSegmentioCodecJsonPublish(b *testing.B) { func BenchmarkSegmentioCodecJsonPublish(b *testing.B) {
b.Skip() // b.Skip()
if tr := os.Getenv("TRAVIS"); len(tr) > 0 { if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
b.Skip() b.Skip()
} }