diff --git a/README.md b/README.md index 50c0b12..cdae321 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,2 @@ -# micro-broker-kgo -yet another micro kafka broker alternative +# broker-kgo -TODO: -* dont always append options from context on Init and New -* add SubscriberOptions(...kgo.Opt) -* add ServerSubscribeOptions(...kgo.Opt) -* check PublisherOptions(...kgo.Opt) -* check ClientPublisherOptions(...kgo.Opt) \ No newline at end of file diff --git a/carrier.go b/carrier.go new file mode 100644 index 0000000..cd77d9f --- /dev/null +++ b/carrier.go @@ -0,0 +1,53 @@ +package kgo + +import ( + "github.com/twmb/franz-go/pkg/kgo" +) + +// RecordCarrier injects and extracts traces from a kgo.Record. +// +// This type exists to satisfy the otel/propagation.TextMapCarrier interface. +type RecordCarrier struct { + record *kgo.Record +} + +// NewRecordCarrier creates a new RecordCarrier. +func NewRecordCarrier(record *kgo.Record) RecordCarrier { + return RecordCarrier{record: record} +} + +// Get retrieves a single value for a given key if it exists. +func (c RecordCarrier) Get(key string) string { + for _, h := range c.record.Headers { + if h.Key == key { + return string(h.Value) + } + } + return "" +} + +// Set sets a header. +func (c RecordCarrier) Set(key, val string) { + // Check if key already exists. + for i, h := range c.record.Headers { + if h.Key == key { + // Key exist, update the value. + c.record.Headers[i].Value = []byte(val) + return + } + } + // Key does not exist, append new header. + c.record.Headers = append(c.record.Headers, kgo.RecordHeader{ + Key: key, + Value: []byte(val), + }) +} + +// Keys returns a slice of all key identifiers in the carrier. +func (c RecordCarrier) Keys() []string { + out := make([]string, len(c.record.Headers)) + for i, h := range c.record.Headers { + out[i] = h.Key + } + return out +} diff --git a/go.mod b/go.mod index ad174aa..e8b077f 100644 --- a/go.mod +++ b/go.mod @@ -1,19 +1,18 @@ module go.unistack.org/micro-broker-kgo/v3 -go 1.17 +go 1.21 require ( + github.com/google/uuid v1.6.0 github.com/twmb/franz-go v1.12.1 github.com/twmb/franz-go/pkg/kmsg v1.4.0 + go.opentelemetry.io/otel v1.23.1 go.unistack.org/micro/v3 v3.10.38 ) require ( github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/imdario/mergo v0.3.16 // indirect github.com/klauspost/compress v1.16.0 // indirect - github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect golang.org/x/sys v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 // indirect diff --git a/go.sum b/go.sum index 28676c3..16523ec 100644 --- a/go.sum +++ b/go.sum @@ -1,29 +1,35 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= -github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/twmb/franz-go v1.12.1 h1:8lWT8q0spL40Nfw6eonJ8OoPGLvF9arvadRRmcSiu9Y= github.com/twmb/franz-go v1.12.1/go.mod h1:Ofc5tSSUJKLmpRNUYSejUsAZKYAHDHywTS322KWdChQ= github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -go.unistack.org/micro/v3 v3.10.23 h1:4BE7NwwyJbCWOfzjzztamBxJSgRHHW1uQtMGNDLHG3s= +go.opentelemetry.io/otel v1.23.1 h1:Za4UzOqJYS+MUczKI320AtqZHZb7EqxO00jAHE0jmQY= +go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA= go.unistack.org/micro/v3 v3.10.38 h1:g87FalXGCFcW6oUT8PtBIFHQYKAl8Ro69/dXIb3AoCo= go.unistack.org/micro/v3 v3.10.38/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -31,6 +37,8 @@ golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY= @@ -41,5 +49,5 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kadmtest.go b/kadmtest.go new file mode 100644 index 0000000..474ef68 --- /dev/null +++ b/kadmtest.go @@ -0,0 +1,65 @@ +//go:build ignore + +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kversion" + + //"github.com/twmb/franz-go/pkg/sasl/scram" + "github.com/twmb/franz-go/pkg/sasl/plain" +) + +func die(msg string, args ...any) { + fmt.Fprintf(os.Stderr, msg, args...) + os.Exit(1) +} + +func main() { + seeds := []string{"vm-kafka-ump01tn.mbrd.ru:9092", "vm-kafka-ump02tn.mbrd.ru:9092", "vm-kafka-ump03tn.mbrd.ru:9092"} + + pass := "XXXXX" + user := "XXXXX" + + var adminClient *kadm.Client + { + client, err := kgo.NewClient( + kgo.SeedBrokers(seeds...), + // kgo.SASL((scram.Auth{User: user, Pass: pass}).AsSha512Mechanism()), + kgo.SASL((plain.Auth{User: user, Pass: pass}).AsMechanism()), + + // Do not try to send requests newer than 2.4.0 to avoid breaking changes in the request struct. + // Sometimes there are breaking changes for newer versions where more properties are required to set. + kgo.MaxVersions(kversion.V2_4_0()), + ) + if err != nil { + panic(err) + } + defer client.Close() + + adminClient = kadm.NewClient(client) + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + dg, err := adminClient.DescribeGroups(ctx, "interestrate_loader") + if err != nil { + die("failed to describe group: %v", err) + } + + for _, m := range dg["interestrate_loader"].Members { + mc, _ := m.Assigned.AsConsumer() + for _, mt := range mc.Topics { + for _, p := range mt.Partitions { + fmt.Printf("client:%s\tpartitions: %d\n", m.ClientID, p) + } + } + } +} diff --git a/kgo.go b/kgo.go index 081b976..97415b2 100644 --- a/kgo.go +++ b/kgo.go @@ -1,5 +1,5 @@ // Package kgo provides a kafka broker using kgo -package kgo // import "go.unistack.org/micro-broker-kgo/v3" +package kgo import ( "context" @@ -10,11 +10,12 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/metadata" - id "go.unistack.org/micro/v3/util/id" + "go.unistack.org/micro/v3/tracer" mrand "go.unistack.org/micro/v3/util/rand" ) @@ -56,7 +57,6 @@ type Broker struct { c *kgo.Client kopts []kgo.Opt connected bool - init bool sync.RWMutex opts broker.Options subs []*subscriber @@ -73,9 +73,31 @@ func (k *Broker) Name() string { func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, error) { var c *kgo.Client var err error + var span tracer.Span + ctx, span = k.opts.Tracer.Start(ctx, "Connect") + defer span.Finish() + + clientID := "kgo" + group := "" + if k.opts.Context != nil { + if id, ok := k.opts.Context.Value(clientIDKey{}).(string); ok { + clientID = id + } + if id, ok := k.opts.Context.Value(groupKey{}).(string); ok { + group = id + } + } + + opts = append(opts, + kgo.WithHooks(&hookMeter{meter: k.opts.Meter}), + kgo.WithHooks(&hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer}), + ) select { case <-ctx.Done(): + if ctx.Err() != nil { + span.SetStatus(tracer.SpanStatusError, ctx.Err().Error()) + } return nil, ctx.Err() default: c, err = kgo.NewClient(opts...) @@ -83,6 +105,7 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err err = c.Ping(ctx) // check connectivity to cluster } if err != nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) return nil, err } } @@ -127,6 +150,9 @@ func (k *Broker) Disconnect(ctx context.Context) error { if ctx != nil { nctx = ctx } + var span tracer.Span + ctx, span = k.opts.Tracer.Start(ctx, "Disconnect") + defer span.Finish() k.Lock() defer k.Unlock() @@ -196,12 +222,12 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, } func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - k.RLock() - ok := k.connected - k.RUnlock() + var span tracer.Span + ctx, span = k.opts.Tracer.Start(ctx, "Publish") + defer span.Finish() - if !ok { - k.Lock() + k.Lock() + if !k.connected { c, err := k.connect(ctx, k.kopts...) if err != nil { k.Unlock() @@ -209,24 +235,30 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br } k.c = c k.connected = true - k.Unlock() } + k.Unlock() options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string var err error var key []byte + var promise func(*kgo.Record, error) if options.Context != nil { if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { key = k } + if p, ok := options.Context.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { + promise = p + } } for _, msg := range msgs { rec := &kgo.Record{Context: ctx, Key: key} rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) + msg.Header.Del(metadata.HeaderTopic) + // k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc() if options.BodyOnly || k.opts.Codec.String() == "noop" { rec.Value = msg.Body for k, v := range msg.Header { @@ -241,10 +273,36 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br records = append(records, rec) } + if promise != nil { + // ts := time.Now() + for _, rec := range records { + k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { + // te := time.Since(ts) + // k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() + // k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) + // k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) + if err != nil { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() + } else { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() + } + promise(r, err) + }) + } + return nil + } + // ts := time.Now() results := k.c.ProduceSync(ctx, records...) + // te := time.Since(ts) for _, result := range results { + // k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds()) + // k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds()) + /// k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec() if result.Err != nil { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) + } else { + // k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() } } @@ -263,11 +321,11 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han options := broker.NewSubscribeOptions(opts...) if options.Group == "" { - uid, err := id.New() + uid, err := uuid.NewRandom() if err != nil { return nil, err } - options.Group = uid + options.Group = uid.String() } commitInterval := DefaultCommitInterval @@ -335,7 +393,6 @@ func (k *Broker) String() string { } func NewBroker(opts ...broker.Option) *Broker { - rand.Seed(time.Now().Unix()) options := broker.NewOptions(opts...) kaddrs := options.Addrs @@ -349,8 +406,6 @@ func NewBroker(opts ...broker.Option) *Broker { kgo.DisableIdempotentWrite(), kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), - // kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, func() string { return time.Now().Format(time.StampMilli) })), - kgo.WithHooks(&metrics{meter: options.Meter}), kgo.SeedBrokers(kaddrs...), kgo.RetryBackoffFn(DefaultRetryBackoffFn), kgo.BlockRebalanceOnPoll(), diff --git a/logger.go b/logger.go index 73f8796..5bd5a27 100644 --- a/logger.go +++ b/logger.go @@ -30,11 +30,11 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { return } if len(args) > 0 { - fields := make([]interface{}, 0, len(args)) + fields := make(map[string]interface{}, int(len(args)/2)) for i := 0; i <= len(args)/2; i += 2 { - fields = append(fields, fmt.Sprintf("%v", args[i]), args[i+1]) + fields[fmt.Sprintf("%v", args[i])] = args[i+1] } - l.l.Fields(fields...).Log(l.ctx, mlvl, msg) + l.l.Fields(fields).Log(l.ctx, mlvl, msg) } else { l.l.Log(l.ctx, mlvl, msg) } diff --git a/metrics.go b/meter.go similarity index 72% rename from metrics.go rename to meter.go index bab138b..4b3871d 100644 --- a/metrics.go +++ b/meter.go @@ -9,19 +9,19 @@ import ( "go.unistack.org/micro/v3/meter" ) -type metrics struct { +type hookMeter struct { meter meter.Meter } var ( - _ kgo.HookBrokerConnect = &metrics{} - _ kgo.HookBrokerDisconnect = &metrics{} - _ kgo.HookBrokerRead = &metrics{} - _ kgo.HookBrokerThrottle = &metrics{} - _ kgo.HookBrokerWrite = &metrics{} - _ kgo.HookFetchBatchRead = &metrics{} - _ kgo.HookProduceBatchWritten = &metrics{} - _ kgo.HookGroupManageError = &metrics{} + _ kgo.HookBrokerConnect = &hookMeter{} + _ kgo.HookBrokerDisconnect = &hookMeter{} + _ kgo.HookBrokerRead = &hookMeter{} + _ kgo.HookBrokerThrottle = &hookMeter{} + _ kgo.HookBrokerWrite = &hookMeter{} + _ kgo.HookFetchBatchRead = &hookMeter{} + _ kgo.HookProduceBatchWritten = &hookMeter{} + _ kgo.HookGroupManageError = &hookMeter{} ) const ( @@ -54,11 +54,11 @@ const ( labelTopic = "topic" ) -func (m *metrics) OnGroupManageError(err error) { +func (m *hookMeter) OnGroupManageError(err error) { m.meter.Counter(metricBrokerGroupErrors).Inc() } -func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { +func (m *hookMeter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc() @@ -67,12 +67,12 @@ func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ ne m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc() } -func (m *metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { +func (m *hookMeter) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc() } -func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { +func (m *hookMeter) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc() @@ -83,7 +83,7 @@ func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten i m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds()) } -func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { +func (m *hookMeter) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc() @@ -95,18 +95,18 @@ func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds()) } -func (m *metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { +func (m *hookMeter) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds()) } -func (m *metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { +func (m *hookMeter) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) } -func (m *metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { +func (m *hookMeter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) diff --git a/options.go b/options.go index 4664ee6..a69011b 100644 --- a/options.go +++ b/options.go @@ -63,6 +63,18 @@ func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption { } } +type clientIDKey struct{} + +func ClientID(id string) broker.Option { + return broker.SetOption(clientIDKey{}, id) +} + +type groupKey struct{} + +func Group(id string) broker.Option { + return broker.SetOption(groupKey{}, id) +} + type commitIntervalKey struct{} // CommitInterval specifies interval to send commits @@ -78,3 +90,15 @@ type subscribeMaxInflightKey struct{} func SubscribeMaxInFlight(n int) broker.SubscribeOption { return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) } + +type publishPromiseKey struct{} + +// PublishPromise set the kafka promise func for Produce +func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption { + return broker.SetPublishOption(publishPromiseKey{}, fn) +} + +// ClientPublishKey set the kafka message key (client option) +func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption { + return client.SetPublishOption(publishPromiseKey{}, fn) +} diff --git a/subscriber.go b/subscriber.go index 1cf97fa..dc85d92 100644 --- a/subscriber.go +++ b/subscriber.go @@ -64,6 +64,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { s.killConsumers(ctx, kc) close(s.done) s.closed = true + s.c.ResumeFetchTopics(s.topic) } return nil } @@ -141,7 +142,7 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str quit: make(chan struct{}), done: make(chan struct{}), - recs: make(chan kgo.FetchTopicPartition, 4), + recs: make(chan kgo.FetchTopicPartition, 100), handler: s.handler, kopts: s.kopts, opts: s.opts, @@ -168,46 +169,63 @@ func (pc *consumer) consume() { return case p := <-pc.recs: for _, record := range p.Records { + // ts := time.Now() + // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc() p := eventPool.Get().(*event) p.msg.Header = nil p.msg.Body = nil p.topic = record.Topic p.err = nil p.ack = false + p.msg.Header = metadata.New(len(record.Headers)) + for _, hdr := range record.Headers { + p.msg.Header.Set(hdr.Key, string(hdr.Value)) + } if pc.kopts.Codec.String() == "noop" { - p.msg.Header = metadata.New(len(record.Headers)) - for _, hdr := range record.Headers { - p.msg.Header.Set(hdr.Key, string(hdr.Value)) - } p.msg.Body = record.Value } else if pc.opts.BodyOnly { p.msg.Body = record.Value } else { if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { + // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() p.err = err p.msg.Body = record.Value if eh != nil { _ = eh(p) + // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() if p.ack { pc.c.MarkCommitRecords(record) } else { eventPool.Put(p) - pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") return } eventPool.Put(p) + // te := time.Since(ts) + // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) + // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) continue } else { if pc.kopts.Logger.V(logger.ErrorLevel) { pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) } } + // te := time.Since(ts) + // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() + // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) + // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) eventPool.Put(p) - pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") + pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") return } } err := pc.handler(p) + if err == nil { + // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() + } else { + // pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() + } + // pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() if err == nil && pc.opts.AutoAck { p.ack = true } else if err != nil { @@ -220,6 +238,9 @@ func (pc *consumer) consume() { } } } + // te := time.Since(ts) + // pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) + // pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) if p.ack { eventPool.Put(p) pc.c.MarkCommitRecords(record) diff --git a/tracer.go b/tracer.go new file mode 100644 index 0000000..8a7203e --- /dev/null +++ b/tracer.go @@ -0,0 +1,212 @@ +package kgo + +import ( + "context" + "net" + "time" + "unicode/utf8" + + "github.com/twmb/franz-go/pkg/kgo" + "go.unistack.org/micro/v3/tracer" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.18.0" +) + +type hookTracer struct { + clientID string + group string + tracer tracer.Tracer +} + +var ( + _ kgo.HookBrokerConnect = &hookTracer{} + _ kgo.HookBrokerDisconnect = &hookTracer{} + _ kgo.HookBrokerRead = &hookTracer{} + _ kgo.HookBrokerThrottle = &hookTracer{} + _ kgo.HookBrokerWrite = &hookTracer{} + _ kgo.HookFetchBatchRead = &hookTracer{} + _ kgo.HookProduceBatchWritten = &hookTracer{} + _ kgo.HookGroupManageError = &hookTracer{} +) + +func (m *hookTracer) OnGroupManageError(err error) { +} + +func (m *hookTracer) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { +} + +func (m *hookTracer) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { +} + +func (m *hookTracer) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { +} + +func (m *hookTracer) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { +} + +func (m *hookTracer) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { +} + +func (m *hookTracer) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { +} + +func (m *hookTracer) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { +} + +// OnProduceRecordBuffered starts a new span for the "publish" operation on a +// buffered record. +// +// It sets span options and injects the span context into record and updates +// the record's context, so it can be ended in the OnProduceRecordUnbuffered +// hook. +func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) { + // Set up span options. + attrs := []attribute.KeyValue{ + semconv.MessagingSystemKey.String("kafka"), + semconv.MessagingDestinationKindTopic, + semconv.MessagingDestinationName(r.Topic), + semconv.MessagingOperationPublish, + } + attrs = maybeKeyAttr(attrs, r) + if m.clientID != "" { + attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID)) + } + ifattrs := make([]interface{}, 0, len(attrs)) + for _, attr := range attrs { + ifattrs = append(ifattrs, attr) + } + opts := []tracer.SpanOption{ + tracer.WithSpanLabels(ifattrs...), + tracer.WithSpanKind(tracer.SpanKindProducer), + } + // Start the "publish" span. + ctx, _ := m.tracer.Start(r.Context, r.Topic+" publish", opts...) + // Inject the span context into the record. + // t.propagators.Inject(ctx, NewRecordCarrier(r)) + // Update the record context. + r.Context = ctx +} + +// OnProduceRecordUnbuffered continues and ends the "publish" span for an +// unbuffered record. +// +// It sets attributes with values unset when producing and records any error +// that occurred during the publish operation. +func (m *hookTracer) OnProduceRecordUnbuffered(r *kgo.Record, err error) { + span, ok := tracer.SpanFromContext(r.Context) + if !ok { + return + } + defer span.Finish() + span.AddLabels( + semconv.MessagingKafkaDestinationPartition(int(r.Partition)), + ) + if err != nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +// OnFetchRecordBuffered starts a new span for the "receive" operation on a +// buffered record. +// +// It sets the span options and extracts the span context from the record, +// updates the record's context to ensure it can be ended in the +// OnFetchRecordUnbuffered hook and can be used in downstream consumer +// processing. +func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) { + // Set up the span options. + attrs := []attribute.KeyValue{ + semconv.MessagingSystemKey.String("kafka"), + semconv.MessagingSourceKindTopic, + semconv.MessagingSourceName(r.Topic), + semconv.MessagingOperationReceive, + semconv.MessagingKafkaSourcePartition(int(r.Partition)), + } + attrs = maybeKeyAttr(attrs, r) + if m.clientID != "" { + attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID)) + } + if m.group != "" { + attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group)) + } + ifattrs := make([]interface{}, 0, len(attrs)) + for _, attr := range attrs { + ifattrs = append(ifattrs, attr) + } + opts := []tracer.SpanOption{ + tracer.WithSpanLabels(ifattrs...), + tracer.WithSpanKind(tracer.SpanKindConsumer), + } + + if r.Context == nil { + r.Context = context.Background() + } + // Extract the span context from the record. + // ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r)) + // Start the "receive" span. + newCtx, _ := m.tracer.Start(r.Context, r.Topic+" receive", opts...) + // Update the record context. + r.Context = newCtx +} + +// OnFetchRecordUnbuffered continues and ends the "receive" span for an +// unbuffered record. +func (m *hookTracer) OnFetchRecordUnbuffered(r *kgo.Record, _ bool) { + if span, ok := tracer.SpanFromContext(r.Context); ok { + defer span.Finish() + } +} + +// WithProcessSpan starts a new span for the "process" operation on a consumer +// record. +// +// It sets up the span options. The user's application code is responsible for +// ending the span. +// +// This should only ever be called within a polling loop of a consumed record and +// not a record which has been created for producing, so call this at the start of each +// iteration of your processing for the record. +func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Span) { + // Set up the span options. + attrs := []attribute.KeyValue{ + semconv.MessagingSystemKey.String("kafka"), + semconv.MessagingSourceKindTopic, + semconv.MessagingSourceName(r.Topic), + semconv.MessagingOperationProcess, + semconv.MessagingKafkaSourcePartition(int(r.Partition)), + semconv.MessagingKafkaMessageOffset(int(r.Offset)), + } + attrs = maybeKeyAttr(attrs, r) + ifattrs := make([]interface{}, 0, len(attrs)) + for _, attr := range attrs { + ifattrs = append(ifattrs, attr) + } + if m.clientID != "" { + attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(m.clientID)) + } + if m.group != "" { + attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(m.group)) + } + opts := []tracer.SpanOption{ + tracer.WithSpanLabels(ifattrs...), + tracer.WithSpanKind(tracer.SpanKindConsumer), + } + + if r.Context == nil { + r.Context = context.Background() + } + // Start a new span using the provided context and options. + return m.tracer.Start(r.Context, r.Topic+" process", opts...) +} + +func maybeKeyAttr(attrs []attribute.KeyValue, r *kgo.Record) []attribute.KeyValue { + if r.Key == nil { + return attrs + } + var keykey string + if !utf8.Valid(r.Key) { + return attrs + } + keykey = string(r.Key) + return append(attrs, semconv.MessagingKafkaMessageKeyKey.String(keykey)) +}