Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
4b0b70e7d0 | |||
|
da88718e03 | ||
aed304b7e3 | |||
b796c43257 | |||
b0691edf82 | |||
d43b59f2f3 | |||
f976516d6b | |||
8f5c03ad60 | |||
|
9e9c1434ff |
@@ -1,2 +1,2 @@
|
|||||||
# micro-broker-kgo
|
# micro-broker-kgo
|
||||||

|

|
||||||
|
12
go.sum
12
go.sum
@@ -20,16 +20,10 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
|
|||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
github.com/spf13/cast v1.8.0 h1:gEN9K4b8Xws4EX0+a0reLmhq8moKn7ntRlQYgjPeCDk=
|
|
||||||
github.com/spf13/cast v1.8.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
|
||||||
github.com/spf13/cast v1.9.2 h1:SsGfm7M8QOFtEzumm7UZrZdLLquNdzFYfIbEXntcFbE=
|
github.com/spf13/cast v1.9.2 h1:SsGfm7M8QOFtEzumm7UZrZdLLquNdzFYfIbEXntcFbE=
|
||||||
github.com/spf13/cast v1.9.2/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
|
github.com/spf13/cast v1.9.2/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
|
||||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/twmb/franz-go v1.19.1 h1:cOhDFUkGvUFHSQ7UYW6bO77BJa2fYEk5mA2AX+1NIdE=
|
|
||||||
github.com/twmb/franz-go v1.19.1/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
|
||||||
github.com/twmb/franz-go v1.19.4 h1:0ktflzm5YU7+YYdie8RQWFcU9uDJ03xLefplO1iMwO4=
|
|
||||||
github.com/twmb/franz-go v1.19.4/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
|
||||||
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
|
github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y=
|
||||||
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM=
|
||||||
github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE=
|
github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE=
|
||||||
@@ -38,18 +32,12 @@ github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3 h1:p24opKW
|
|||||||
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3/go.mod h1:7uQs3Ae6HkWT1Y9elMbqtAcNFCI0y6+iS+Phw49L49U=
|
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250508175730-72e1646135e3/go.mod h1:7uQs3Ae6HkWT1Y9elMbqtAcNFCI0y6+iS+Phw49L49U=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
|
github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg=
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
|
github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE=
|
||||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
|
||||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
|
||||||
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
|
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
|
||||||
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
|
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
||||||
go.unistack.org/micro/v4 v4.1.14 h1:6EotPq9kz/gaFb5YulHdKuuUwmj/7Hk44DpOlzh/A6k=
|
|
||||||
go.unistack.org/micro/v4 v4.1.14/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs=
|
|
||||||
go.unistack.org/micro/v4 v4.1.17 h1:26QDtRSYVpozYuassyvLP4sEQRo3dxgD3sVILRXmIPo=
|
go.unistack.org/micro/v4 v4.1.17 h1:26QDtRSYVpozYuassyvLP4sEQRo3dxgD3sVILRXmIPo=
|
||||||
go.unistack.org/micro/v4 v4.1.17/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs=
|
go.unistack.org/micro/v4 v4.1.17/go.mod h1:xleO2M5Yxh4s6I+RUcLrEpUjobefh+71ctrdIfn7TUs=
|
||||||
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
|
|
||||||
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
|
|
||||||
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
||||||
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
||||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||||
|
94
kgo.go
94
kgo.go
@@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
options.ContentType = b.opts.ContentType
|
options.ContentType = b.opts.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options}
|
m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options}
|
||||||
c, err := b.newCodec(m.opts.ContentType)
|
c, err := b.newCodec(m.opts.ContentType)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.body, err = c.Marshal(body)
|
m.body, err = c.Marshal(body)
|
||||||
@@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType)
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -344,40 +346,44 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||||
records := make([]*kgo.Record, 0, len(messages))
|
var records []*kgo.Record
|
||||||
var errs []string
|
|
||||||
var key []byte
|
|
||||||
var promise func(*kgo.Record, error)
|
|
||||||
|
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
if mctx := msg.Context(); mctx != nil {
|
|
||||||
if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil {
|
|
||||||
key = k
|
|
||||||
}
|
|
||||||
if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
|
||||||
promise = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rec := &kgo.Record{
|
rec := &kgo.Record{
|
||||||
Context: ctx,
|
Context: msg.Context(),
|
||||||
Key: key,
|
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
Value: msg.Body(),
|
Value: msg.Body(),
|
||||||
}
|
}
|
||||||
|
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
var promise func(*kgo.Record, error)
|
||||||
|
if rec.Context != nil {
|
||||||
|
if k, ok := rec.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := rec.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
kmsg, ok := msg.(*kgoMessage)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if kmsg.opts.Context != nil {
|
||||||
|
if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
||||||
|
rec.Key = k
|
||||||
|
}
|
||||||
|
if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
||||||
|
promise = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setHeaders(rec, msg.Header())
|
setHeaders(rec, msg.Header())
|
||||||
|
|
||||||
records = append(records, rec)
|
if promise != nil {
|
||||||
}
|
ts := time.Now()
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
||||||
ts := time.Now()
|
|
||||||
|
|
||||||
if promise != nil {
|
|
||||||
|
|
||||||
for _, rec := range records {
|
|
||||||
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
||||||
@@ -390,27 +396,33 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|||||||
}
|
}
|
||||||
promise(r, err)
|
promise(r, err)
|
||||||
})
|
})
|
||||||
}
|
continue
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
results := b.c.ProduceSync(ctx, records...)
|
|
||||||
|
|
||||||
te := time.Since(ts)
|
|
||||||
for _, result := range results {
|
|
||||||
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
|
||||||
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec()
|
|
||||||
if result.Err != nil {
|
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
|
|
||||||
errs = append(errs, result.Err.Error())
|
|
||||||
} else {
|
} else {
|
||||||
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
|
records = append(records, rec)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errs) > 0 {
|
if len(records) > 0 {
|
||||||
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
var errs []string
|
||||||
|
ts := time.Now()
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Set(uint64(len(records)))
|
||||||
|
results := b.c.ProduceSync(ctx, records...)
|
||||||
|
te := time.Since(ts)
|
||||||
|
for _, result := range results {
|
||||||
|
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
||||||
|
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec()
|
||||||
|
if result.Err != nil {
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
|
||||||
|
errs = append(errs, result.Err.Error())
|
||||||
|
} else {
|
||||||
|
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@@ -81,7 +81,7 @@ func TestFail(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
// t.Logf("broker publish")
|
// t.Logf("broker publish")
|
||||||
if err := b.Publish(ctx, "test", msg); err != nil {
|
if err := b.Publish(ctx, "test.fail", msg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -96,7 +96,7 @@ func TestFail(t *testing.T) {
|
|||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.fail", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
@@ -184,7 +184,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
msgs = append(msgs, m)
|
msgs = append(msgs, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.Publish(ctx, "test", msgs...); err != nil {
|
if err := b.Publish(ctx, "test.pubsub", msgs...); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// t.Skip()
|
// t.Skip()
|
||||||
@@ -197,7 +197,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
return msg.Ack()
|
return msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := b.Subscribe(ctx, "test", fn,
|
sub, err := b.Subscribe(ctx, "test.pubsub", fn,
|
||||||
broker.SubscribeAutoAck(true),
|
broker.SubscribeAutoAck(true),
|
||||||
broker.SubscribeGroup(group),
|
broker.SubscribeGroup(group),
|
||||||
broker.SubscribeBodyOnly(true))
|
broker.SubscribeBodyOnly(true))
|
||||||
|
@@ -39,16 +39,15 @@ type consumer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Subscriber struct {
|
type Subscriber struct {
|
||||||
consumers map[tp]*consumer
|
consumers map[tp]*consumer
|
||||||
c *kgo.Client
|
c *kgo.Client
|
||||||
htracer *hookTracer
|
htracer *hookTracer
|
||||||
topic string
|
topic string
|
||||||
messagePool bool
|
messagePool bool
|
||||||
handler interface{}
|
handler interface{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
kopts broker.Options
|
kopts broker.Options
|
||||||
opts broker.SubscribeOptions
|
opts broker.SubscribeOptions
|
||||||
|
|
||||||
connected *atomic.Uint32
|
connected *atomic.Uint32
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
closed bool
|
closed bool
|
||||||
|
Reference in New Issue
Block a user