diff --git a/go.mod b/go.mod index dbbcdd9..075ec18 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/unistack-org/micro-broker-kgo/v3 go 1.16 require ( + github.com/google/uuid v1.3.0 github.com/twmb/franz-go v0.8.6 - github.com/unistack-org/micro/v3 v3.4.8 + github.com/unistack-org/micro/v3 v3.5.4 ) diff --git a/go.sum b/go.sum index 1cd027c..99e41ca 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -31,8 +31,8 @@ github.com/twmb/franz-go v0.8.6 h1:m49t7tcgUz70hvTpnzHrsvQ/38Q/VKCpEj1FvDetVTE= github.com/twmb/franz-go v0.8.6/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= -github.com/unistack-org/micro/v3 v3.4.8 h1:9+qGlNHgChC3aMuFrtTFUtG55PEAjneSvplg7phwoCI= -github.com/unistack-org/micro/v3 v3.4.8/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= +github.com/unistack-org/micro/v3 v3.5.4 h1:6nIljqND355f+Fhc2mtCxYb5IRwer6nsMoAXpN8kka0= +github.com/unistack-org/micro/v3 v3.5.4/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= diff --git a/kgo.go b/kgo.go index 45bf53f..a45335f 100644 --- a/kgo.go +++ b/kgo.go @@ -3,37 +3,45 @@ package kgo import ( "context" + "fmt" "net" + "strings" "sync" "time" + "github.com/google/uuid" kgo "github.com/twmb/franz-go/pkg/kgo" sasl "github.com/twmb/franz-go/pkg/sasl" "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" ) type kBroker struct { - client *kgo.Client + writer *kgo.Client // used only to push messages + kopts []kgo.Opt connected bool init bool sync.RWMutex opts broker.Options + subs []*subscriber } type subscriber struct { - topic string - opts broker.SubscribeOptions - handler broker.Handler - closed bool - done chan struct{} + reader *kgo.Client // used only to pull messages + topic string + opts broker.SubscribeOptions + kopts broker.Options + handler broker.Handler + batchhandler broker.BatchHandler + closed bool + done chan struct{} sync.RWMutex } type publication struct { - topic string - partition int - offset int64 - err error + topic string + err error sync.RWMutex msg *broker.Message } @@ -54,6 +62,10 @@ func (p *publication) Error() error { return p.err } +func (p *publication) SetError(err error) { + p.err = err +} + func (s *subscriber) Options() broker.SubscribeOptions { return s.opts } @@ -63,6 +75,12 @@ func (s *subscriber) Topic() string { } func (s *subscriber) Unsubscribe(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + s.reader.Close() + } return nil } @@ -70,7 +88,7 @@ func (k *kBroker) Address() string { if len(k.opts.Addrs) > 0 { return k.opts.Addrs[0] } - return "127.0.0.1:9092" + return strings.Join(k.opts.Addrs, ",") } func (k *kBroker) Name() string { @@ -90,68 +108,20 @@ func (k *kBroker) Connect(ctx context.Context) error { nctx = ctx } - opts := []kgo.Opt{kgo.SeedBrokers(k.opts.Addrs...)} - if k.opts.Context != nil { - if v, ok := k.opts.Context.Value(clientIDKey{}).(string); ok && v != "" { - opts = append(opts, kgo.ClientID(v)) - } - if v, ok := k.opts.Context.Value(maxReadBytesKey{}).(int32); ok { - opts = append(opts, kgo.BrokerMaxReadBytes(v)) - } - if v, ok := k.opts.Context.Value(maxWriteBytesKey{}).(int32); ok { - opts = append(opts, kgo.BrokerMaxWriteBytes(v)) - } - - if v, ok := k.opts.Context.Value(connIdleTimeoutKey{}).(time.Duration); ok { - opts = append(opts, kgo.ConnIdleTimeout(v)) - } - if v, ok := k.opts.Context.Value(connTimeoutOverheadKey{}).(time.Duration); ok { - opts = append(opts, kgo.ConnTimeoutOverhead(v)) - } - if v, ok := k.opts.Context.Value(dialerKey{}).(func(ctx context.Context, network, host string) (net.Conn, error)); ok { - opts = append(opts, kgo.Dialer(v)) - } - if v, ok := k.opts.Context.Value(metadataMaxAgeKey{}).(time.Duration); ok { - opts = append(opts, kgo.MetadataMaxAge(v)) - } - if v, ok := k.opts.Context.Value(metadataMinAgeKey{}).(time.Duration); ok { - opts = append(opts, kgo.MetadataMinAge(v)) - } - if v, ok := k.opts.Context.Value(produceRetriesKey{}).(int); ok { - opts = append(opts, kgo.ProduceRetries(v)) - } - if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok { - opts = append(opts, kgo.RequestRetries(v)) - } - if v, ok := k.opts.Context.Value(retryBackoffKey{}).(func(int) time.Duration); ok { - opts = append(opts, kgo.RetryBackoff(v)) - } - if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(func(int16) time.Duration); ok { - opts = append(opts, kgo.RetryTimeout(v)) - } - if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok { - opts = append(opts, kgo.SASL(v...)) - } - if v, ok := k.opts.Context.Value(hooksKey{}).([]kgo.Hook); ok { - opts = append(opts, kgo.WithHooks(v...)) - } - } - - var c *kgo.Client - var err error - select { case <-nctx.Done(): return nctx.Err() default: - c, err = kgo.NewClient(opts...) + c, err := kgo.NewClient(k.kopts...) if err != nil { return err } + k.Lock() + k.connected = true + k.writer = c + k.Unlock() } - k.client = c - return nil } @@ -175,7 +145,12 @@ func (k *kBroker) Disconnect(ctx context.Context) error { case <-nctx.Done(): return nctx.Err() default: - k.client.Close() + for _, sub := range k.subs { + if err := sub.Unsubscribe(ctx); err != nil { + return err + } + } + k.writer.Close() } k.connected = false @@ -206,6 +181,56 @@ func (k *kBroker) Init(opts ...broker.Option) error { return err } + kopts := append(k.kopts, kgo.SeedBrokers(k.opts.Addrs...)) + if k.opts.Context != nil { + if v, ok := k.opts.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 { + kopts = append(kopts, v...) + } + if v, ok := k.opts.Context.Value(clientIDKey{}).(string); ok && v != "" { + kopts = append(kopts, kgo.ClientID(v)) + } + if v, ok := k.opts.Context.Value(maxReadBytesKey{}).(int32); ok { + kopts = append(kopts, kgo.BrokerMaxReadBytes(v)) + } + if v, ok := k.opts.Context.Value(maxWriteBytesKey{}).(int32); ok { + kopts = append(kopts, kgo.BrokerMaxWriteBytes(v)) + } + if v, ok := k.opts.Context.Value(connIdleTimeoutKey{}).(time.Duration); ok { + kopts = append(kopts, kgo.ConnIdleTimeout(v)) + } + if v, ok := k.opts.Context.Value(connTimeoutOverheadKey{}).(time.Duration); ok { + kopts = append(kopts, kgo.ConnTimeoutOverhead(v)) + } + if v, ok := k.opts.Context.Value(dialerKey{}).(func(ctx context.Context, network, host string) (net.Conn, error)); ok { + kopts = append(kopts, kgo.Dialer(v)) + } + if v, ok := k.opts.Context.Value(metadataMaxAgeKey{}).(time.Duration); ok { + kopts = append(kopts, kgo.MetadataMaxAge(v)) + } + if v, ok := k.opts.Context.Value(metadataMinAgeKey{}).(time.Duration); ok { + kopts = append(kopts, kgo.MetadataMinAge(v)) + } + if v, ok := k.opts.Context.Value(produceRetriesKey{}).(int); ok { + kopts = append(kopts, kgo.ProduceRetries(v)) + } + if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok { + kopts = append(kopts, kgo.RequestRetries(v)) + } + if v, ok := k.opts.Context.Value(retryBackoffKey{}).(func(int) time.Duration); ok { + kopts = append(kopts, kgo.RetryBackoff(v)) + } + if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(func(int16) time.Duration); ok { + kopts = append(kopts, kgo.RetryTimeout(v)) + } + if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok { + kopts = append(kopts, kgo.SASL(v...)) + } + if v, ok := k.opts.Context.Value(hooksKey{}).([]kgo.Hook); ok { + kopts = append(kopts, kgo.WithHooks(v...)) + } + } + + k.kopts = kopts k.init = true return nil @@ -215,16 +240,192 @@ func (k *kBroker) Options() broker.Options { return k.opts } +func (k *kBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + return k.publish(ctx, msgs, opts...) +} + func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { + msg.Header.Set(metadata.HeaderTopic, topic) + return k.publish(ctx, []*broker.Message{msg}, opts...) +} + +func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + records := make([]*kgo.Record, 0, len(msgs)) + var errs []string + results := k.writer.ProduceSync(ctx, records...) + for _, result := range results { + if result.Err != nil { + errs = append(errs, result.Err.Error()) + } + } + if len(errs) > 0 { + return fmt.Errorf("publish error: %s", strings.Join(errs, "\n")) + } return nil } +func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return nil, nil +} + +type mlogger struct { + l logger.Logger + ctx context.Context +} + +func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { + mlvl := logger.ErrorLevel + switch lvl { + case kgo.LogLevelNone: + return + case kgo.LogLevelError: + mlvl = logger.ErrorLevel + case kgo.LogLevelWarn: + mlvl = logger.WarnLevel + case kgo.LogLevelInfo: + mlvl = logger.InfoLevel + case kgo.LogLevelDebug: + mlvl = logger.DebugLevel + default: + return + } + l.l.Fields(l.ctx, mlvl, msg, args...) +} + +func (l *mlogger) Level() kgo.LogLevel { + switch l.l.Options().Level { + case logger.ErrorLevel: + return kgo.LogLevelError + case logger.WarnLevel: + return kgo.LogLevelWarn + case logger.InfoLevel: + return kgo.LogLevelInfo + case logger.DebugLevel, logger.TraceLevel: + return kgo.LogLevelDebug + } + return kgo.LogLevelNone +} + func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { options := broker.NewSubscribeOptions(opts...) - sub := &subscriber{opts: options} + + if options.Group == "" { + uid, err := uuid.NewRandom() + if err != nil { + return nil, err + } + options.Group = uid.String() + } + + kopts := append(k.kopts, + kgo.ConsumerGroup(options.Group), + kgo.ConsumeTopics(topic), + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + kgo.DisableAutoCommit(), + kgo.WithLogger(&mlogger{l: k.opts.Logger, ctx: k.opts.Context}), + // TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked + ) + + reader, err := kgo.NewClient(kopts...) + if err != nil { + return nil, err + } + + sub := &subscriber{opts: options, reader: reader, handler: handler, kopts: k.opts} + go sub.run(ctx) + + k.Lock() + k.subs = append(k.subs, sub) + k.Unlock() return sub, nil } +func (s *subscriber) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-s.kopts.Context.Done(): + return + default: + fetches := s.reader.PollFetches(ctx) + if fetches.IsClientClosed() { + return + } + fetches.EachError(func(t string, p int32, err error) { + s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", t, p, err) + }) + + s.handleFetches(ctx, fetches) + } + } +} + +func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) error { + var records []*kgo.Record + var batch bool + var err error + + if s.batchhandler != nil { + batch = true + } + _ = batch + + for _, fetch := range fetches { + for _, ftopic := range fetch.Topics { + ridx := 0 + for { + for _, partition := range ftopic.Partitions { + if ridx >= len(partition.Records) { + continue + } + records = append(records, partition.Records[ridx]) + } + + for _, record := range records { + eh := s.kopts.ErrorHandler + if s.opts.ErrorHandler != nil { + eh = s.opts.ErrorHandler + } + p := &publication{topic: record.Topic, msg: &broker.Message{}} + + if s.opts.BodyOnly { + p.msg.Body = record.Value + } else { + if err := s.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { + p.err = err + p.msg.Body = record.Value + if eh != nil { + _ = eh(p) + } else { + if s.kopts.Logger.V(logger.ErrorLevel) { + s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err) + } + } + continue + } + } + err = s.handler(p) + if err == nil && s.opts.AutoAck { + records = append(records, record) + } else if err != nil { + p.err = err + if eh != nil { + _ = eh(p) + } else { + if s.kopts.Logger.V(logger.ErrorLevel) { + s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: subscriber error: %v", err) + } + } + } + } + } + ridx++ + } + } + return s.reader.CommitRecords(ctx, records...) +} + func (k *kBroker) String() string { return "kgo" } diff --git a/kgo_test.go b/kgo_test.go index 905ad64..496da98 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -2,6 +2,7 @@ package kgo_test import ( "context" + "fmt" "os" "strings" "testing" @@ -50,6 +51,7 @@ func TestPubSub(t *testing.T) { done := make(chan bool, 1) fn := func(msg broker.Event) error { + fmt.Printf("EEEE %s\n", msg.Message().Body) done <- true return msg.Ack() } diff --git a/options.go b/options.go index cac3440..566b370 100644 --- a/options.go +++ b/options.go @@ -111,3 +111,9 @@ type hooksKey struct{} func Hooks(hooks ...kgo.Hook) broker.Option { return broker.SetOption(hooksKey{}, hooks) } + +type optionsKey struct{} + +func Options(opts ...kgo.Opt) broker.Option { + return broker.SetOption(optionsKey{}, opts) +}