From ac454693a613646b4056a18af3a01795b7f5ac8c Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 12 Mar 2025 16:57:25 +0300 Subject: [PATCH] update to v4 Signed-off-by: Vasiliy Tolstov --- broker.go | 2 +- carrier.go | 6 +- event.go | 50 ----------- go.mod | 15 ++-- go.sum | 51 +++-------- kgo.go | 245 ++++++++++++++++++++++++++++++++++---------------- kgo_test.go | 31 +++---- logger.go | 2 +- meter.go | 2 +- options.go | 13 +-- subscriber.go | 119 +++++++----------------- tracer.go | 4 +- 12 files changed, 240 insertions(+), 300 deletions(-) delete mode 100644 event.go diff --git a/broker.go b/broker.go index fbd6772..868d6d0 100644 --- a/broker.go +++ b/broker.go @@ -7,7 +7,7 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v4/logger" ) type hookEvent struct { diff --git a/carrier.go b/carrier.go index 2d3c33c..cbd0502 100644 --- a/carrier.go +++ b/carrier.go @@ -6,7 +6,7 @@ import ( "strings" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v4/metadata" ) // RecordCarrier injects and extracts traces from a kgo.Record. @@ -77,7 +77,7 @@ loop: for i := 0; i < len(r.Headers); i++ { if strings.EqualFold(r.Headers[i].Key, k) { // Key exist, update the value. - r.Headers[i].Value = []byte(v) + r.Headers[i].Value = []byte(strings.Join(v, ",")) continue loop } } @@ -85,7 +85,7 @@ loop: // Key does not exist, append new header. r.Headers = append(r.Headers, kgo.RecordHeader{ Key: k, - Value: []byte(v), + Value: []byte(strings.Join(v, ",")), }) seen[k] = struct{}{} diff --git a/event.go b/event.go deleted file mode 100644 index 9ac6ba9..0000000 --- a/event.go +++ /dev/null @@ -1,50 +0,0 @@ -package kgo - -import ( - "context" - "sync" - - "go.unistack.org/micro/v3/broker" -) - -type event struct { - msg *broker.Message - err error - ctx context.Context - - topic string - - sync.RWMutex - ack bool -} - -func (p *event) Context() context.Context { - return p.ctx -} - -func (p *event) Topic() string { - return p.topic -} - -func (p *event) Message() *broker.Message { - return p.msg -} - -func (p *event) Ack() error { - p.ack = true - return nil -} - -func (p *event) Error() error { - return p.err -} - -func (p *event) SetError(err error) { - p.err = err -} - -var eventPool = sync.Pool{ - New: func() interface{} { - return &event{msg: &broker.Message{}} - }, -} diff --git a/go.mod b/go.mod index d77a9f0..38bfa5c 100644 --- a/go.mod +++ b/go.mod @@ -1,27 +1,24 @@ -module go.unistack.org/micro-broker-kgo/v3 +module go.unistack.org/micro-broker-kgo/v4 go 1.23.0 require ( - github.com/google/uuid v1.6.0 github.com/twmb/franz-go v1.18.1 github.com/twmb/franz-go/pkg/kadm v1.15.0 github.com/twmb/franz-go/pkg/kmsg v1.9.0 go.opentelemetry.io/otel v1.34.0 - go.unistack.org/micro/v3 v3.11.41 + go.unistack.org/micro/v4 v4.1.4 ) require ( github.com/ash3in/uuidv8 v1.2.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/klauspost/compress v1.18.0 // indirect - github.com/kr/pretty v0.3.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect - go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/crypto v0.35.0 // indirect - golang.org/x/sys v0.30.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 // indirect - google.golang.org/grpc v1.70.0 // indirect + github.com/spf13/cast v1.7.1 // indirect + go.unistack.org/micro-proto/v4 v4.1.0 // indirect + golang.org/x/crypto v0.36.0 // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a116105..87e6dd5 100644 --- a/go.sum +++ b/go.sum @@ -2,17 +2,14 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -23,56 +20,28 @@ github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIc github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= -github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= -github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= -github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= -go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= -go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= -go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= -go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.37 h1:ZcpnXAYEMcAwmnVb5b7o8/PylGnILxXMHaUlRrPmRI0= -go.unistack.org/micro/v3 v3.11.37/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= -go.unistack.org/micro/v3 v3.11.41 h1:dP4sBLIZpMo+MWGe5bbESewK8wBzYm4Yik/67x4dEtQ= -go.unistack.org/micro/v3 v3.11.41/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= -golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= -google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= +go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= +go.unistack.org/micro/v4 v4.1.4 h1:gyjVPP9g7UAFZlhEbla9Ih/BeKJfBLAbN/NvAhJCpKU= +go.unistack.org/micro/v4 v4.1.4/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/kgo.go b/kgo.go index 1cd57df..3904508 100644 --- a/kgo.go +++ b/kgo.go @@ -11,19 +11,27 @@ import ( "sync/atomic" "time" - "github.com/google/uuid" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/semconv" - "go.unistack.org/micro/v3/tracer" - mrand "go.unistack.org/micro/v3/util/rand" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/semconv" + "go.unistack.org/micro/v4/tracer" + "go.unistack.org/micro/v4/util/id" + mrand "go.unistack.org/micro/v4/util/rand" ) var _ broker.Broker = (*Broker)(nil) +var messagePool = sync.Pool{ + New: func() interface{} { + return &kgoMessage{} + }, +} + var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") var DefaultRetryBackoffFn = func() func(int) time.Duration { @@ -56,8 +64,10 @@ var DefaultRetryBackoffFn = func() func(int) time.Duration { }() type Broker struct { - c *kgo.Client - connected *atomic.Uint32 + funcPublish broker.FuncPublish + funcSubscribe broker.FuncSubscribe + c *kgo.Client + connected *atomic.Uint32 kopts []kgo.Opt subs []*Subscriber @@ -92,6 +102,68 @@ func (k *Broker) Client() *kgo.Client { return k.c } +type kgoMessage struct { + c codec.Codec + topic string + ctx context.Context + body []byte + hdr metadata.Metadata + opts broker.PublishOptions + ack bool +} + +func (m *kgoMessage) Ack() error { + m.ack = true + return nil +} + +func (m *kgoMessage) Body() []byte { + return m.body +} + +func (m *kgoMessage) Header() metadata.Metadata { + return m.hdr +} + +func (m *kgoMessage) Context() context.Context { + return m.ctx +} + +func (m *kgoMessage) Topic() string { + return "" +} + +func (m *kgoMessage) Unmarshal(dst interface{}, opts ...codec.Option) error { + return m.c.Unmarshal(m.body, dst) +} + +func (b *Broker) newCodec(ct string) (codec.Codec, error) { + if idx := strings.IndexRune(ct, ';'); idx >= 0 { + ct = ct[:idx] + } + b.RLock() + c, ok := b.opts.Codecs[ct] + b.RUnlock() + if ok { + return c, nil + } + return nil, codec.ErrUnknownContentType +} + +func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) { + options := broker.NewPublishOptions(opts...) + m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options} + c, err := b.newCodec(m.opts.ContentType) + if err == nil { + m.body, err = c.Marshal(body) + } + if err != nil { + return nil, err + } + + return m, nil +} + func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { var c *kgo.Client var err error @@ -238,6 +310,18 @@ func (k *Broker) Init(opts ...broker.Option) error { } } + k.funcPublish = k.fnPublish + k.funcSubscribe = k.fnSubscribe + + k.opts.Hooks.EachPrev(func(hook options.Hook) { + switch h := hook.(type) { + case broker.HookPublish: + k.funcPublish = h(k.funcPublish) + case broker.HookSubscribe: + k.funcSubscribe = h(k.funcSubscribe) + } + }) + k.init = true return nil @@ -247,95 +331,88 @@ func (k *Broker) Options() broker.Options { return k.opts } -func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - return k.publish(ctx, msgs, opts...) +func (b *Broker) Publish(ctx context.Context, topic string, messages ...broker.Message) error { + return b.funcPublish(ctx, topic, messages...) } -func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { - msg.Header.Set(metadata.HeaderTopic, topic) - return k.publish(ctx, []*broker.Message{msg}, opts...) +func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker.Message) error { + return b.publish(ctx, topic, messages...) } -func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - k.Lock() - if k.connected.Load() == 0 { - c, _, err := k.connect(ctx, k.kopts...) +func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error { + if b.connected.Load() == 0 { + c, _, err := b.connect(ctx, b.kopts...) if err != nil { - k.Unlock() return err } - k.c = c - k.connected.Store(1) + b.Lock() + b.c = c + b.Unlock() } - k.Unlock() - options := broker.NewPublishOptions(opts...) - records := make([]*kgo.Record, 0, len(msgs)) + records := make([]*kgo.Record, 0, len(messages)) var errs []string - var err error var key []byte var promise func(*kgo.Record, error) - if options.Context != nil { - if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { - key = k - } - if p, ok := options.Context.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { - promise = p - } - } + for _, msg := range messages { - for _, msg := range msgs { - rec := &kgo.Record{Context: ctx, Key: key} - - rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) - msg.Header.Del(metadata.HeaderTopic) - - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() - if options.BodyOnly || k.opts.Codec.String() == "noop" { - rec.Value = msg.Body - setHeaders(rec, msg.Header) - } else { - rec.Value, err = k.opts.Codec.Marshal(msg) - if err != nil { - return err + if mctx := msg.Context(); mctx != nil { + if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil { + key = k + } + if p, ok := mctx.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { + promise = p } } + + rec := &kgo.Record{ + Context: ctx, + Key: key, + Topic: topic, + Value: msg.Body(), + } + + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() + + setHeaders(rec, msg.Header()) + records = append(records, rec) } + ts := time.Now() + if promise != nil { - ts := time.Now() + for _, rec := range records { - k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { + b.c.Produce(ctx, rec, func(r *kgo.Record, err error) { te := time.Since(ts) - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() - k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) - k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() + b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) + b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", rec.Topic, "topic", rec.Topic).Update(te.Seconds()) if err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "failure").Inc() } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", rec.Topic, "topic", rec.Topic, "status", "success").Inc() } promise(r, err) }) } return nil } - ts := time.Now() - results := k.c.ProduceSync(ctx, records...) + results := b.c.ProduceSync(ctx, records...) te := time.Since(ts) for _, result := range results { - k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) - k.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) - k.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() + b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() if result.Err != nil { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() errs = append(errs, result.Err.Error()) } else { - k.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() } } @@ -362,31 +439,44 @@ func (k *Broker) TopicExists(ctx context.Context, topic string) error { return nil } -func (k *Broker) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) { - return nil, nil +func (b *Broker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return b.funcSubscribe(ctx, topic, handler, opts...) } -func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + if err := broker.IsValidHandler(handler); err != nil { + return nil, err + } + options := broker.NewSubscribeOptions(opts...) + switch handler.(type) { + default: + return nil, broker.ErrInvalidHandler + case func(broker.Message) error: + break + case func([]broker.Message) error: + break + } + if options.Group == "" { - uid, err := uuid.NewRandom() + uid, err := id.New() if err != nil { return nil, err } - options.Group = uid.String() + options.Group = uid } commitInterval := DefaultCommitInterval - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { + if b.opts.Context != nil { + if v, ok := b.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { commitInterval = v } } var fatalOnError bool - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { + if b.opts.Context != nil { + if v, ok := b.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v { fatalOnError = v } } @@ -401,14 +491,14 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han topic: topic, opts: options, handler: handler, - kopts: k.opts, + kopts: b.opts, consumers: make(map[tp]*consumer), done: make(chan struct{}), fatalOnError: fatalOnError, - connected: k.connected, + connected: b.connected, } - kopts := append(k.kopts, + kopts := append(b.kopts, kgo.ConsumerGroup(options.Group), kgo.ConsumeTopics(topic), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), @@ -428,7 +518,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } } - c, htracer, err := k.connect(ctx, kopts...) + c, htracer, err := b.connect(ctx, kopts...) if err != nil { return nil, err } @@ -450,9 +540,10 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han go sub.poll(ctx) - k.Lock() - k.subs = append(k.subs, sub) - k.Unlock() + b.Lock() + b.subs = append(b.subs, sub) + b.Unlock() + return sub, nil } diff --git a/kgo_test.go b/kgo_test.go index a1955dd..0399a4b 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -9,11 +9,11 @@ import ( "time" kg "github.com/twmb/franz-go/pkg/kgo" - kgo "go.unistack.org/micro-broker-kgo/v3" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/logger/slog" - "go.unistack.org/micro/v3/metadata" + kgo "go.unistack.org/micro-broker-kgo/v4" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/logger/slog" + "go.unistack.org/micro/v4/metadata" ) var ( @@ -23,11 +23,6 @@ var ( loglevel = logger.DebugLevel ) -var bm = &broker.Message{ - Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"}, - Body: []byte(`"body"`), -} - func TestFail(t *testing.T) { if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { t.Skip() @@ -72,9 +67,10 @@ func TestFail(t *testing.T) { }() t.Logf("broker health %v", b.Health()) - msgs := make([]*broker.Message, 0, msgcnt) + msgs := make([]broker.Message, 0, msgcnt) for i := int64(0); i < msgcnt; i++ { - msgs = append(msgs, bm) + m, _ := b.NewMessage(ctx, metadata.Pairs("hkey", "hval"), []byte(`test`)) + msgs = append(msgs, m) } for _, msg := range msgs { @@ -86,7 +82,7 @@ func TestFail(t *testing.T) { // t.Skip() idx := int64(0) - fn := func(msg broker.Event) error { + fn := func(msg broker.Message) error { atomic.AddInt64(&idx, 1) time.Sleep(500 * time.Millisecond) t.Logf("ack") @@ -170,19 +166,20 @@ func TestPubSub(t *testing.T) { } }() if prefill { - msgs := make([]*broker.Message, 0, msgcnt) + msgs := make([]broker.Message, 0, msgcnt) for i := int64(0); i < msgcnt; i++ { - msgs = append(msgs, bm) + m, _ := b.NewMessage(ctx, metadata.Pairs("hkey", "hval"), []byte(`test`)) + msgs = append(msgs, m) } - if err := b.BatchPublish(ctx, msgs); err != nil { + if err := b.Publish(ctx, "test", msgs...); err != nil { t.Fatal(err) } // t.Skip() } done := make(chan bool, 1) idx := int64(0) - fn := func(msg broker.Event) error { + fn := func(msg broker.Message) error { atomic.AddInt64(&idx, 1) // time.Sleep(200 * time.Millisecond) return msg.Ack() diff --git a/logger.go b/logger.go index 05dc1b4..bfafa6e 100644 --- a/logger.go +++ b/logger.go @@ -4,7 +4,7 @@ import ( "context" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v4/logger" ) type mlogger struct { diff --git a/meter.go b/meter.go index 54a6831..104f661 100644 --- a/meter.go +++ b/meter.go @@ -6,7 +6,7 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v4/meter" ) type hookMeter struct { diff --git a/options.go b/options.go index 8f69673..62223c8 100644 --- a/options.go +++ b/options.go @@ -5,8 +5,7 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v4/broker" ) var ( @@ -35,11 +34,6 @@ func PublishKey(key []byte) broker.PublishOption { return broker.SetPublishOption(publishKey{}, key) } -// ClientPublishKey set the kafka message key (client option) -func ClientPublishKey(key []byte) client.PublishOption { - return client.SetPublishOption(publishKey{}, key) -} - type optionsKey struct{} // Options pass additional options to broker @@ -115,8 +109,3 @@ type publishPromiseKey struct{} func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption { return broker.SetPublishOption(publishPromiseKey{}, fn) } - -// ClientPublishKey set the kafka message key (client option) -func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption { - return client.SetPublishOption(publishPromiseKey{}, fn) -} diff --git a/subscriber.go b/subscriber.go index a7b5fa8..db86984 100644 --- a/subscriber.go +++ b/subscriber.go @@ -11,11 +11,11 @@ import ( "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/semconv" - "go.unistack.org/micro/v3/tracer" + "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/semconv" + "go.unistack.org/micro/v4/tracer" ) type tp struct { @@ -33,7 +33,7 @@ type consumer struct { kopts broker.Options partition int32 opts broker.SubscribeOptions - handler broker.Handler + handler interface{} connected *atomic.Uint32 } @@ -43,7 +43,7 @@ type Subscriber struct { htracer *hookTracer topic string - handler broker.Handler + handler interface{} done chan struct{} kopts broker.Options opts broker.SubscribeOptions @@ -225,17 +225,14 @@ func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str } func (pc *consumer) consume() { + var err error + defer close(pc.done) if pc.kopts.Logger.V(logger.DebugLevel) { pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("starting, topic %s partition %d", pc.topic, pc.partition)) defer pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("killing, topic %s partition %d", pc.topic, pc.partition)) } - eh := pc.kopts.ErrorHandler - if pc.opts.ErrorHandler != nil { - eh = pc.opts.ErrorHandler - } - for { select { case <-pc.quit: @@ -245,98 +242,48 @@ func (pc *consumer) consume() { ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() - p := eventPool.Get().(*event) - p.msg.Header = nil - p.msg.Body = nil + p := messagePool.Get().(*kgoMessage) + + p.body = record.Value p.topic = record.Topic - p.err = nil p.ack = false - p.msg.Header = metadata.New(len(record.Headers)) + p.hdr = metadata.New(len(record.Headers)) p.ctx = ctx for _, hdr := range record.Headers { - p.msg.Header.Set(hdr.Key, string(hdr.Value)) + p.hdr.Set(hdr.Key, string(hdr.Value)) } - if pc.kopts.Codec.String() == "noop" { - p.msg.Body = record.Value - } else if pc.opts.BodyOnly { - p.msg.Body = record.Value - } else { - sp.AddEvent("codec unmarshal start") - err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) - sp.AddEvent("codec unmarshal stop") - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() - p.err = err - p.msg.Body = record.Value - if eh != nil { - _ = eh(p) - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() - if p.ack { - pc.c.MarkCommitRecords(record) - } else { - eventPool.Put(p) - pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") - return - } - eventPool.Put(p) - te := time.Since(ts) - pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - continue - } else { - pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err) - } - te := time.Since(ts) - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() - pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - eventPool.Put(p) - pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") - sp.Finish() - return - } + + switch h := pc.handler.(type) { + case func(broker.Message) error: + err = h(p) + case func([]broker.Message) error: + err = h([]broker.Message{p}) } - sp.AddEvent("handler start") - err := pc.handler(p) - sp.AddEvent("handler stop") - if err == nil { - pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() - } else { + + pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() + if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() - } - pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() - if err == nil && pc.opts.AutoAck { + } else if pc.opts.AutoAck { p.ack = true - } else if err != nil { - p.err = err - if eh != nil { - sp.AddEvent("error handler start") - _ = eh(p) - sp.AddEvent("error handler stop") - } else { - if pc.kopts.Logger.V(logger.ErrorLevel) { - pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) - } - } } + te := time.Since(ts) pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) - if p.ack { - eventPool.Put(p) + + ack := p.ack + messagePool.Put(p) + + if ack { pc.c.MarkCommitRecords(record) } else { - eventPool.Put(p) - pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") - sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") sp.Finish() + pc.connected.Store(0) + pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited") return } + sp.Finish() } } diff --git a/tracer.go b/tracer.go index feca43c..291a8ca 100644 --- a/tracer.go +++ b/tracer.go @@ -6,8 +6,8 @@ import ( "github.com/twmb/franz-go/pkg/kgo" semconv "go.opentelemetry.io/otel/semconv/v1.18.0" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/tracer" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/tracer" ) type hookTracer struct {