Compare commits

..

8 Commits

Author SHA1 Message Date
d2ac0c1360 update deps
Some checks failed
build / test (push) Failing after 1m49s
codeql / analyze (go) (push) Failing after 2m6s
build / lint (push) Successful in 9m18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-07 00:46:24 +03:00
69dd8c4eea fixup fields again
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-22 09:04:02 +03:00
27a6a923cd gomod
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-21 15:45:13 +03:00
0a395235d6 backport
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-21 15:08:58 +03:00
23f0ad0f2f Merge pull request 'change log fields' (#129) from fix into v3
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Reviewed-on: #129
2024-02-15 10:45:37 +03:00
4fcbe0a770 change log fields
Some checks failed
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
autoapprove / autoapprove (pull_request) Has been cancelled
automerge / automerge (pull_request) Has been cancelled
dependabot-automerge / automerge (pull_request) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-15 10:44:59 +03:00
28c9865121 Merge pull request 'logger fix' (#128) from logger_fix into v3
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Reviewed-on: #128
2024-02-15 10:21:06 +03:00
697413d829 logger fix
Some checks failed
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
autoapprove / autoapprove (pull_request) Has been cancelled
automerge / automerge (pull_request) Has been cancelled
dependabot-automerge / automerge (pull_request) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-15 10:20:15 +03:00
11 changed files with 516 additions and 80 deletions

View File

@@ -1,9 +1,2 @@
# micro-broker-kgo
yet another micro kafka broker alternative
# broker-kgo
TODO:
* dont always append options from context on Init and New
* add SubscriberOptions(...kgo.Opt)
* add ServerSubscribeOptions(...kgo.Opt)
* check PublisherOptions(...kgo.Opt)
* check ClientPublisherOptions(...kgo.Opt)

53
carrier.go Normal file
View File

@@ -0,0 +1,53 @@
package kgo
import (
"github.com/twmb/franz-go/pkg/kgo"
)
// RecordCarrier injects and extracts traces from a kgo.Record.
//
// This type exists to satisfy the otel/propagation.TextMapCarrier interface.
type RecordCarrier struct {
record *kgo.Record
}
// NewRecordCarrier creates a new RecordCarrier.
func NewRecordCarrier(record *kgo.Record) RecordCarrier {
return RecordCarrier{record: record}
}
// Get retrieves a single value for a given key if it exists.
func (c RecordCarrier) Get(key string) string {
for _, h := range c.record.Headers {
if h.Key == key {
return string(h.Value)
}
}
return ""
}
// Set sets a header.
func (c RecordCarrier) Set(key, val string) {
// Check if key already exists.
for i, h := range c.record.Headers {
if h.Key == key {
// Key exist, update the value.
c.record.Headers[i].Value = []byte(val)
return
}
}
// Key does not exist, append new header.
c.record.Headers = append(c.record.Headers, kgo.RecordHeader{
Key: key,
Value: []byte(val),
})
}
// Keys returns a slice of all key identifiers in the carrier.
func (c RecordCarrier) Keys() []string {
out := make([]string, len(c.record.Headers))
for i, h := range c.record.Headers {
out[i] = h.Key
}
return out
}

19
go.mod
View File

@@ -1,14 +1,21 @@
module go.unistack.org/micro-broker-kgo/v3
go 1.17
go 1.19
require (
github.com/twmb/franz-go v1.12.1
github.com/twmb/franz-go/pkg/kmsg v1.4.0
go.unistack.org/micro/v3 v3.10.23
github.com/google/uuid v1.6.0
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kmsg v1.7.0
go.opentelemetry.io/otel v1.24.0
go.unistack.org/micro/v3 v3.10.46
)
require (
github.com/klauspost/compress v1.16.0 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/grpc v1.62.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)

51
go.sum
View File

@@ -1,19 +1,32 @@
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/twmb/franz-go v1.12.1 h1:8lWT8q0spL40Nfw6eonJ8OoPGLvF9arvadRRmcSiu9Y=
github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s=
go.unistack.org/micro/v3 v3.10.23 h1:4BE7NwwyJbCWOfzjzztamBxJSgRHHW1uQtMGNDLHG3s=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.unistack.org/micro/v3 v3.10.46 h1:rnuEqiFkerwJmKzHmHBXRgxFemZustxWz2hRNLQQ8cU=
go.unistack.org/micro/v3 v3.10.46/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

65
kadmtest.go Normal file
View File

@@ -0,0 +1,65 @@
//go:build ignore
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kversion"
//"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/pkg/sasl/plain"
)
func die(msg string, args ...any) {
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}
func main() {
seeds := []string{"vm-kafka-ump01tn.mbrd.ru:9092", "vm-kafka-ump02tn.mbrd.ru:9092", "vm-kafka-ump03tn.mbrd.ru:9092"}
pass := "XXXXX"
user := "XXXXX"
var adminClient *kadm.Client
{
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
// kgo.SASL((scram.Auth{User: user, Pass: pass}).AsSha512Mechanism()),
kgo.SASL((plain.Auth{User: user, Pass: pass}).AsMechanism()),
// Do not try to send requests newer than 2.4.0 to avoid breaking changes in the request struct.
// Sometimes there are breaking changes for newer versions where more properties are required to set.
kgo.MaxVersions(kversion.V2_4_0()),
)
if err != nil {
panic(err)
}
defer client.Close()
adminClient = kadm.NewClient(client)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
dg, err := adminClient.DescribeGroups(ctx, "interestrate_loader")
if err != nil {
die("failed to describe group: %v", err)
}
for _, m := range dg["interestrate_loader"].Members {
mc, _ := m.Assigned.AsConsumer()
for _, mt := range mc.Topics {
for _, p := range mt.Partitions {
fmt.Printf("client:%s\tpartitions: %d\n", m.ClientID, p)
}
}
}
}

85
kgo.go
View File

@@ -1,5 +1,5 @@
// Package kgo provides a kafka broker using kgo
package kgo // import "go.unistack.org/micro-broker-kgo/v3"
package kgo
import (
"context"
@@ -10,11 +10,12 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/metadata"
id "go.unistack.org/micro/v3/util/id"
"go.unistack.org/micro/v3/tracer"
mrand "go.unistack.org/micro/v3/util/rand"
)
@@ -56,7 +57,6 @@ type Broker struct {
c *kgo.Client
kopts []kgo.Opt
connected bool
init bool
sync.RWMutex
opts broker.Options
subs []*subscriber
@@ -73,9 +73,31 @@ func (k *Broker) Name() string {
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, error) {
var c *kgo.Client
var err error
var span tracer.Span
ctx, span = k.opts.Tracer.Start(ctx, "Connect")
defer span.Finish()
clientID := "kgo"
group := ""
if k.opts.Context != nil {
if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok {
clientID = id
}
if id, ok := k.opts.Context.Value(groupKey{}).(string); ok {
group = id
}
}
opts = append(opts,
kgo.WithHooks(&hookMeter{meter: k.opts.Meter}),
kgo.WithHooks(&hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer}),
)
select {
case <-ctx.Done():
if ctx.Err() != nil {
span.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
}
return nil, ctx.Err()
default:
c, err = kgo.NewClient(opts...)
@@ -83,6 +105,7 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
err = c.Ping(ctx) // check connectivity to cluster
}
if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
return nil, err
}
}
@@ -127,6 +150,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
if ctx != nil {
nctx = ctx
}
var span tracer.Span
ctx, span = k.opts.Tracer.Start(ctx, "Disconnect")
defer span.Finish()
k.Lock()
defer k.Unlock()
@@ -196,12 +222,12 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
}
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock()
ok := k.connected
k.RUnlock()
var span tracer.Span
ctx, span = k.opts.Tracer.Start(ctx, "Publish")
defer span.Finish()
if !ok {
k.Lock()
if !k.connected {
c, err := k.connect(ctx, k.kopts...)
if err != nil {
k.Unlock()
@@ -209,27 +235,31 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
}
k.c = c
k.connected = true
k.Unlock()
}
k.Unlock()
options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs))
var errs []string
var err error
var key []byte
var promise func(*kgo.Record, error)
if options.Context != nil {
if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil {
key = k
}
if p, ok := options.Context.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
promise = p
}
}
for _, msg := range msgs {
rec := &kgo.Record{Context: ctx, Key: key}
rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic)
if options.BodyOnly {
rec.Value = msg.Body
} else if k.opts.Codec.String() == "noop" {
msg.Header.Del(metadata.HeaderTopic)
// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc()
if options.BodyOnly || k.opts.Codec.String() == "noop" {
rec.Value = msg.Body
for k, v := range msg.Header {
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
@@ -243,10 +273,36 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
records = append(records, rec)
}
if promise != nil {
// ts := time.Now()
for _, rec := range records {
k.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
// te := time.Since(ts)
// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec()
// k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds())
// k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds())
if err != nil {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc()
} else {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc()
}
promise(r, err)
})
}
return nil
}
// ts := time.Now()
results := k.c.ProduceSync(ctx, records...)
// te := time.Since(ts)
for _, result := range results {
// k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds())
// k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds())
/// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec()
if result.Err != nil {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc()
errs = append(errs, result.Err.Error())
} else {
// k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc()
}
}
@@ -265,11 +321,11 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
options := broker.NewSubscribeOptions(opts...)
if options.Group == "" {
uid, err := id.New()
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}
options.Group = uid
options.Group = uid.String()
}
commitInterval := DefaultCommitInterval
@@ -337,7 +393,6 @@ func (k *Broker) String() string {
}
func NewBroker(opts ...broker.Option) *Broker {
rand.Seed(time.Now().Unix())
options := broker.NewOptions(opts...)
kaddrs := options.Addrs
@@ -351,8 +406,6 @@ func NewBroker(opts ...broker.Option) *Broker {
kgo.DisableIdempotentWrite(),
kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}),
// kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, func() string { return time.Now().Format(time.StampMilli) })),
kgo.WithHooks(&metrics{meter: options.Meter}),
kgo.SeedBrokers(kaddrs...),
kgo.RetryBackoffFn(DefaultRetryBackoffFn),
kgo.BlockRebalanceOnPoll(),

View File

@@ -2,7 +2,6 @@ package kgo
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/logger"
@@ -30,11 +29,7 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
return
}
if len(args) > 0 {
fields := make(map[string]interface{}, int(len(args)/2))
for i := 0; i <= len(args)/2; i += 2 {
fields[fmt.Sprintf("%v", args[i])] = args[i+1]
}
l.l.Fields(fields).Log(l.ctx, mlvl, msg)
l.l.Log(l.ctx, mlvl, append([]interface{}{msg}, args...)...)
} else {
l.l.Log(l.ctx, mlvl, msg)
}

View File

@@ -9,19 +9,19 @@ import (
"go.unistack.org/micro/v3/meter"
)
type metrics struct {
type hookMeter struct {
meter meter.Meter
}
var (
_ kgo.HookBrokerConnect = &metrics{}
_ kgo.HookBrokerDisconnect = &metrics{}
_ kgo.HookBrokerRead = &metrics{}
_ kgo.HookBrokerThrottle = &metrics{}
_ kgo.HookBrokerWrite = &metrics{}
_ kgo.HookFetchBatchRead = &metrics{}
_ kgo.HookProduceBatchWritten = &metrics{}
_ kgo.HookGroupManageError = &metrics{}
_ kgo.HookBrokerConnect = &hookMeter{}
_ kgo.HookBrokerDisconnect = &hookMeter{}
_ kgo.HookBrokerRead = &hookMeter{}
_ kgo.HookBrokerThrottle = &hookMeter{}
_ kgo.HookBrokerWrite = &hookMeter{}
_ kgo.HookFetchBatchRead = &hookMeter{}
_ kgo.HookProduceBatchWritten = &hookMeter{}
_ kgo.HookGroupManageError = &hookMeter{}
)
const (
@@ -54,11 +54,11 @@ const (
labelTopic = "topic"
)
func (m *metrics) OnGroupManageError(err error) {
func (m *hookMeter) OnGroupManageError(err error) {
m.meter.Counter(metricBrokerGroupErrors).Inc()
}
func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc()
@@ -67,12 +67,12 @@ func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ ne
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc()
}
func (m *metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
func (m *hookMeter) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
node := strconv.Itoa(int(meta.NodeID))
m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc()
}
func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
func (m *hookMeter) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc()
@@ -83,7 +83,7 @@ func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten i
m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds())
}
func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
func (m *hookMeter) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc()
@@ -95,18 +95,18 @@ func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int,
m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds())
}
func (m *metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
func (m *hookMeter) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
node := strconv.Itoa(int(meta.NodeID))
m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds())
}
func (m *metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) {
func (m *hookMeter) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes)
m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes)
}
func (m *metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) {
func (m *hookMeter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes)
m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes)

View File

@@ -63,6 +63,18 @@ func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
}
}
type clientIDKey struct{}
func ClientID(id string) broker.Option {
return broker.SetOption(clientIDKey{}, id)
}
type groupKey struct{}
func Group(id string) broker.Option {
return broker.SetOption(groupKey{}, id)
}
type commitIntervalKey struct{}
// CommitInterval specifies interval to send commits
@@ -78,3 +90,15 @@ type subscribeMaxInflightKey struct{}
func SubscribeMaxInFlight(n int) broker.SubscribeOption {
return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n)
}
type publishPromiseKey struct{}
// PublishPromise set the kafka promise func for Produce
func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption {
return broker.SetPublishOption(publishPromiseKey{}, fn)
}
// ClientPublishKey set the kafka message key (client option)
func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption {
return client.SetPublishOption(publishPromiseKey{}, fn)
}

View File

@@ -64,6 +64,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
s.c.ResumeFetchTopics(s.topic)
}
return nil
}
@@ -141,7 +142,7 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
quit: make(chan struct{}),
done: make(chan struct{}),
recs: make(chan kgo.FetchTopicPartition, 4),
recs: make(chan kgo.FetchTopicPartition, 100),
handler: s.handler,
kopts: s.kopts,
opts: s.opts,
@@ -168,46 +169,63 @@ func (pc *consumer) consume() {
return
case p := <-pc.recs:
for _, record := range p.Records {
// ts := time.Now()
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc()
p := eventPool.Get().(*event)
p.msg.Header = nil
p.msg.Body = nil
p.topic = record.Topic
p.err = nil
p.ack = false
if pc.kopts.Codec.String() == "noop" {
p.msg.Header = metadata.New(len(record.Headers))
for _, hdr := range record.Headers {
p.msg.Header.Set(hdr.Key, string(hdr.Value))
}
if pc.kopts.Codec.String() == "noop" {
p.msg.Body = record.Value
} else if pc.opts.BodyOnly {
p.msg.Body = record.Value
} else {
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
p.err = err
p.msg.Body = record.Value
if eh != nil {
_ = eh(p)
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
if p.ack {
pc.c.MarkCommitRecords(record)
} else {
eventPool.Put(p)
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
return
}
eventPool.Put(p)
// te := time.Since(ts)
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
continue
} else {
if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
}
}
// te := time.Since(ts)
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
eventPool.Put(p)
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
return
}
}
err := pc.handler(p)
if err == nil {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc()
} else {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
}
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
if err == nil && pc.opts.AutoAck {
p.ack = true
} else if err != nil {
@@ -220,6 +238,9 @@ func (pc *consumer) consume() {
}
}
}
// te := time.Since(ts)
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
if p.ack {
eventPool.Put(p)
pc.c.MarkCommitRecords(record)

212
tracer.go Normal file
View File

@@ -0,0 +1,212 @@
package kgo
import (
"context"
"net"
"time"
"unicode/utf8"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v3/tracer"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
)
type hookTracer struct {
clientID string
group string
tracer tracer.Tracer
}
var (
_ kgo.HookBrokerConnect = &hookTracer{}
_ kgo.HookBrokerDisconnect = &hookTracer{}
_ kgo.HookBrokerRead = &hookTracer{}
_ kgo.HookBrokerThrottle = &hookTracer{}
_ kgo.HookBrokerWrite = &hookTracer{}
_ kgo.HookFetchBatchRead = &hookTracer{}
_ kgo.HookProduceBatchWritten = &hookTracer{}
_ kgo.HookGroupManageError = &hookTracer{}
)
func (m *hookTracer) OnGroupManageError(err error) {
}
func (m *hookTracer) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
}
func (m *hookTracer) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
}
func (m *hookTracer) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
}
func (m *hookTracer) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
}
func (m *hookTracer) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
}
func (m *hookTracer) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) {
}
func (m *hookTracer) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) {
}
// OnProduceRecordBuffered starts a new span for the "publish" operation on a
// buffered record.
//
// It sets span options and injects the span context into record and updates
// the record's context, so it can be ended in the OnProduceRecordUnbuffered
// hook.
func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
// Set up span options.
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(r.Topic),
semconv.MessagingOperationPublish,
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanKind(tracer.SpanKindProducer),
}
// Start the "publish" span.
ctx, _ := m.tracer.Start(r.Context, r.Topic+" publish", opts...)
// Inject the span context into the record.
// t.propagators.Inject(ctx, NewRecordCarrier(r))
// Update the record context.
r.Context = ctx
}
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
// unbuffered record.
//
// It sets attributes with values unset when producing and records any error
// that occurred during the publish operation.
func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
span, ok := tracer.SpanFromContext(r.Context)
if !ok {
return
}
defer span.Finish()
span.AddLabels(
semconv.MessagingKafkaDestinationPartition(int(r.Partition)),
)
if err != nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
}
}
// OnFetchRecordBuffered starts a new span for the "receive" operation on a
// buffered record.
//
// It sets the span options and extracts the span context from the record,
// updates the record's context to ensure it can be ended in the
// OnFetchRecordUnbuffered hook and can be used in downstream consumer
// processing.
func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
// Set up the span options.
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
}
attrs = maybeKeyAttr(attrs, r)
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
if r.Context == nil {
r.Context = context.Background()
}
// Extract the span context from the record.
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
// Start the "receive" span.
newCtx, _ := m.tracer.Start(r.Context, r.Topic+" receive", opts...)
// Update the record context.
r.Context = newCtx
}
// OnFetchRecordUnbuffered continues and ends the "receive" span for an
// unbuffered record.
func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) {
if span, ok := tracer.SpanFromContext(r.Context); ok {
defer span.Finish()
}
}
// WithProcessSpan starts a new span for the "process" operation on a consumer
// record.
//
// It sets up the span options. The user's application code is responsible for
// ending the span.
//
// This should only ever be called within a polling loop of a consumed record and
// not a record which has been created for producing, so call this at the start of each
// iteration of your processing for the record.
func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) {
// Set up the span options.
attrs := []attribute.KeyValue{
semconv.MessagingSystemKey.String("kafka"),
semconv.MessagingSourceKindTopic,
semconv.MessagingSourceName(r.Topic),
semconv.MessagingOperationProcess,
semconv.MessagingKafkaSourcePartition(int(r.Partition)),
semconv.MessagingKafkaMessageOffset(int(r.Offset)),
}
attrs = maybeKeyAttr(attrs, r)
ifattrs := make([]interface{}, 0, len(attrs))
for _, attr := range attrs {
ifattrs = append(ifattrs, attr)
}
if m.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID))
}
if m.group != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group))
}
opts := []tracer.SpanOption{
tracer.WithSpanLabels(ifattrs...),
tracer.WithSpanKind(tracer.SpanKindConsumer),
}
if r.Context == nil {
r.Context = context.Background()
}
// Start a new span using the provided context and options.
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
}
func maybeKeyAttr(attrs []attribute.KeyValue, r *kgo.Record) []attribute.KeyValue {
if r.Key == nil {
return attrs
}
var keykey string
if !utf8.Valid(r.Key) {
return attrs
}
keykey = string(r.Key)
return append(attrs, semconv.MessagingKafkaMessageKeyKey.String(keykey))
}