diff --git a/event.go b/event.go index 6dbf963..6f601ca 100644 --- a/event.go +++ b/event.go @@ -3,7 +3,7 @@ package kgo import ( "sync" - "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v4/broker" ) type event struct { diff --git a/go.mod b/go.mod index a989f3a..dc73b5a 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,14 @@ -module go.unistack.org/micro-broker-kgo/v3 +module go.unistack.org/micro-broker-kgo/v4 -go 1.17 +go 1.19 require ( github.com/twmb/franz-go v1.11.5 - github.com/twmb/franz-go/pkg/kmsg v1.3.0 - go.unistack.org/micro/v3 v3.10.14 + go.unistack.org/micro/v4 v4.0.1 ) require ( github.com/klauspost/compress v1.15.9 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index 5633005..76f1a45 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,13 @@ -github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE= github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78= github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw= github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE= -go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q= +go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo= +go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -19,6 +16,3 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo.go b/kgo.go index 093e77f..d85ccdb 100644 --- a/kgo.go +++ b/kgo.go @@ -1,5 +1,5 @@ // Package kgo provides a kafka broker using kgo -package kgo // import "go.unistack.org/micro-broker-kgo/v3" +package kgo // import "go.unistack.org/micro-broker-kgo/v4" import ( "context" @@ -11,10 +11,10 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/metadata" - id "go.unistack.org/micro/v3/util/id" - mrand "go.unistack.org/micro/v3/util/rand" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/metadata" + id "go.unistack.org/micro/v4/util/id" + mrand "go.unistack.org/micro/v4/util/rand" ) var _ broker.Broker = &Broker{} @@ -212,16 +212,21 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br var errs []string var err error var key []byte + var promise func(*kgo.Record, error) if options.Context != nil { if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { key = k } + if p, ok := options.Context.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { + promise = p + } } for _, msg := range msgs { rec := &kgo.Record{Context: ctx, Key: key} rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) + k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc() if options.BodyOnly { rec.Value = msg.Body } else if k.opts.Codec.String() == "noop" { @@ -238,10 +243,36 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br records = append(records, rec) } + if promise != nil { + ts := time.Now() + for _, rec := range records { + k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { + te := time.Since(ts) + k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() + k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) + k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) + if err != nil { + k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() + } else { + k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() + } + promise(r, err) + }) + } + return nil + } + ts := time.Now() results := k.c.ProduceSync(ctx, records...) + te := time.Since(ts) for _, result := range results { + k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds()) + k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds()) + k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec() if result.Err != nil { + k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) + } else { + k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() } } diff --git a/kgo_test.go b/kgo_test.go index 743a848..8d1d266 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -10,10 +10,10 @@ import ( "time" kg "github.com/twmb/franz-go/pkg/kgo" - kgo "go.unistack.org/micro-broker-kgo/v3" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" + kgo "go.unistack.org/micro-broker-kgo/v4" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" ) var ( diff --git a/logger.go b/logger.go index 5bd5a27..09654fe 100644 --- a/logger.go +++ b/logger.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v4/logger" ) type mlogger struct { diff --git a/metrics.go b/metrics.go index bab138b..0018330 100644 --- a/metrics.go +++ b/metrics.go @@ -6,9 +6,44 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v4/meter" ) +/* + func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { + handler := &wrapper{ + opts: NewOptions(opts...), + } + return handler.SubscriberFunc + } + + func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { + return func(ctx context.Context, msg server.Message) error { + endpoint := msg.Topic() + + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc() + ts := time.Now() + err := fn(ctx, msg) + te := time.Since(ts) + w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec() + + w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) + w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds()) + + if err == nil { + labels = append(labels, labelStatus, labelSuccess) + } else { + labels = append(labels, labelStatus, labelFailure) + } + w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc() + + return err + } + } +*/ type metrics struct { meter meter.Meter } diff --git a/options.go b/options.go index 4664ee6..a3707b6 100644 --- a/options.go +++ b/options.go @@ -5,8 +5,8 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/client" ) // DefaultCommitInterval specifies how fast send commit offsets to kafka @@ -78,3 +78,15 @@ type subscribeMaxInflightKey struct{} func SubscribeMaxInFlight(n int) broker.SubscribeOption { return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) } + +type publishPromiseKey struct{} + +// PublishPromise set the kafka promise func for Produce +func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption { + return broker.SetPublishOption(publishPromiseKey{}, fn) +} + +// ClientPublishKey set the kafka message key (client option) +func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption { + return client.SetPublishOption(publishPromiseKey{}, fn) +} diff --git a/subscriber.go b/subscriber.go index b6f147f..913c9f6 100644 --- a/subscriber.go +++ b/subscriber.go @@ -5,9 +5,9 @@ import ( "sync" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" ) type tp struct {