diff --git a/event.go b/event.go new file mode 100644 index 0000000..6dbf963 --- /dev/null +++ b/event.go @@ -0,0 +1,42 @@ +package kgo + +import ( + "sync" + + "go.unistack.org/micro/v3/broker" +) + +type event struct { + topic string + err error + sync.RWMutex + msg *broker.Message + ack bool +} + +func (p *event) Topic() string { + return p.topic +} + +func (p *event) Message() *broker.Message { + return p.msg +} + +func (p *event) Ack() error { + p.ack = true + return nil +} + +func (p *event) Error() error { + return p.err +} + +func (p *event) SetError(err error) { + p.err = err +} + +var eventPool = sync.Pool{ + New: func() interface{} { + return &event{msg: &broker.Message{}} + }, +} diff --git a/kgo.go b/kgo.go index 32df6aa..093e77f 100644 --- a/kgo.go +++ b/kgo.go @@ -3,26 +3,56 @@ package kgo // import "go.unistack.org/micro-broker-kgo/v3" import ( "context" + "errors" "fmt" + "math/rand" "strings" "sync" "time" - "github.com/twmb/franz-go/pkg/kerr" - kgo "github.com/twmb/franz-go/pkg/kgo" - "github.com/twmb/franz-go/pkg/kmsg" - "github.com/twmb/franz-go/pkg/kversion" + "github.com/twmb/franz-go/pkg/kgo" "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/util/id" + id "go.unistack.org/micro/v3/util/id" mrand "go.unistack.org/micro/v3/util/rand" ) -var _ broker.Broker = &kBroker{} +var _ broker.Broker = &Broker{} -type kBroker struct { - writer *kgo.Client // used only to push messages +var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") + +var DefaultRetryBackoffFn = func() func(int) time.Duration { + var rngMu sync.Mutex + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + return func(fails int) time.Duration { + const ( + min = 100 * time.Millisecond + max = time.Second + ) + if fails <= 0 { + return min + } + if fails > 10 { + return max + } + + backoff := min * time.Duration(1<<(fails-1)) + + rngMu.Lock() + jitter := 0.8 + 0.4*rng.Float64() + rngMu.Unlock() + + backoff = time.Duration(float64(backoff) * jitter) + + if backoff > max { + return max + } + return backoff + } +}() + +type Broker struct { + c *kgo.Client kopts []kgo.Opt connected bool init bool @@ -31,79 +61,34 @@ type kBroker struct { subs []*subscriber } -type subscriber struct { - reader *kgo.Client // used only to pull messages - topic string - opts broker.SubscribeOptions - kopts broker.Options - handler broker.Handler - batchhandler broker.BatchHandler - closed bool - done chan struct{} - consumers map[string]map[int32]worker - sync.RWMutex -} - -type publication struct { - topic string - err error - sync.RWMutex - msg *broker.Message - ack bool -} - -func (p *publication) Topic() string { - return p.topic -} - -func (p *publication) Message() *broker.Message { - return p.msg -} - -func (p *publication) Ack() error { - p.ack = true - return nil -} - -func (p *publication) Error() error { - return p.err -} - -func (p *publication) SetError(err error) { - p.err = err -} - -func (s *subscriber) Options() broker.SubscribeOptions { - return s.opts -} - -func (s *subscriber) Topic() string { - return s.topic -} - -func (s *subscriber) Unsubscribe(ctx context.Context) error { - if s.closed { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - close(s.done) - s.closed = true - } - return nil -} - -func (k *kBroker) Address() string { +func (k *Broker) Address() string { return strings.Join(k.opts.Addrs, ",") } -func (k *kBroker) Name() string { +func (k *Broker) Name() string { return k.opts.Name } -func (k *kBroker) Connect(ctx context.Context) error { +func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, error) { + var c *kgo.Client + var err error + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + c, err = kgo.NewClient(opts...) + if err == nil { + err = c.Ping(ctx) // check connectivity to cluster + } + if err != nil { + return nil, err + } + } + return c, nil +} + +func (k *Broker) Connect(ctx context.Context) error { k.RLock() if k.connected { k.RUnlock() @@ -116,51 +101,20 @@ func (k *kBroker) Connect(ctx context.Context) error { nctx = ctx } - kaddrs := k.opts.Addrs - - // shuffle addrs - var rng mrand.Rand - rng.Shuffle(len(kaddrs), func(i, j int) { - kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] - }) - - kopts := append(k.kopts, kgo.SeedBrokers(kaddrs...)) - - select { - case <-nctx.Done(): - return nctx.Err() - default: - c, err := kgo.NewClient(kopts...) - if err != nil { - return err - } - - // Request versions in order to guess Kafka Cluster version - versionsReq := kmsg.NewApiVersionsRequest() - versionsRes, err := versionsReq.RequestWith(ctx, c) - if err != nil { - return fmt.Errorf("failed to request api versions: %w", err) - } - err = kerr.ErrorForCode(versionsRes.ErrorCode) - if err != nil { - return fmt.Errorf("failed to request api versions. Inner kafka error: %w", err) - } - versions := kversion.FromApiVersionsResponse(versionsRes) - - if k.opts.Logger.V(logger.InfoLevel) { - logger.Infof(ctx, "[kgo] connected to to kafka cluster version %v", versions.VersionGuess()) - } - - k.Lock() - k.connected = true - k.writer = c - k.Unlock() + c, err := k.connect(nctx, k.kopts...) + if err != nil { + return err } + k.Lock() + k.c = c + k.connected = true + k.Unlock() + return nil } -func (k *kBroker) Disconnect(ctx context.Context) error { +func (k *Broker) Disconnect(ctx context.Context) error { k.RLock() if !k.connected { k.RUnlock() @@ -168,14 +122,13 @@ func (k *kBroker) Disconnect(ctx context.Context) error { } k.RUnlock() - k.Lock() - defer k.Unlock() - nctx := k.opts.Context if ctx != nil { nctx = ctx } + k.Lock() + defer k.Unlock() select { case <-nctx.Done(): return nctx.Err() @@ -185,20 +138,20 @@ func (k *kBroker) Disconnect(ctx context.Context) error { return err } } - k.writer.Close() + if k.c != nil { + k.c.CloseAllowingRebalance() + // k.c.Close() + } } k.connected = false return nil } -func (k *kBroker) Init(opts ...broker.Option) error { +func (k *Broker) Init(opts ...broker.Option) error { k.Lock() defer k.Unlock() - if len(opts) == 0 && k.init { - return nil - } for _, o := range opts { o(&k.opts) } @@ -222,33 +175,38 @@ func (k *kBroker) Init(opts ...broker.Option) error { } } - // kgo.RecordPartitioner(), - - k.init = true - return nil } -func (k *kBroker) Options() broker.Options { +func (k *Broker) Options() broker.Options { return k.opts } -func (k *kBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { +func (k *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { return k.publish(ctx, msgs, opts...) } -func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { +func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { msg.Header.Set(metadata.HeaderTopic, topic) return k.publish(ctx, []*broker.Message{msg}, opts...) } -func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { +func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { k.RLock() if !k.connected { k.RUnlock() - return broker.ErrNotConnected + k.Lock() + c, err := k.connect(ctx, k.kopts...) + if err != nil { + k.Unlock() + return err + } + k.c = c + k.connected = true + k.Unlock() } k.RUnlock() + options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string @@ -264,13 +222,13 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b for _, msg := range msgs { rec := &kgo.Record{Context: ctx, Key: key} rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) - if k.opts.Codec.String() == "noop" { + if options.BodyOnly { + rec.Value = msg.Body + } else if k.opts.Codec.String() == "noop" { rec.Value = msg.Body for k, v := range msg.Header { rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)}) } - } else if options.BodyOnly { - rec.Value = msg.Body } else { rec.Value, err = k.opts.Codec.Marshal(msg) if err != nil { @@ -280,7 +238,7 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b records = append(records, rec) } - results := k.writer.ProduceSync(ctx, records...) + results := k.c.ProduceSync(ctx, records...) for _, result := range results { if result.Err != nil { errs = append(errs, result.Err.Error()) @@ -294,53 +252,11 @@ func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...b return nil } -type mlogger struct { - l logger.Logger - ctx context.Context -} - -func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { - var mlvl logger.Level - switch lvl { - case kgo.LogLevelNone: - return - case kgo.LogLevelError: - mlvl = logger.ErrorLevel - case kgo.LogLevelWarn: - mlvl = logger.WarnLevel - case kgo.LogLevelInfo: - mlvl = logger.InfoLevel - case kgo.LogLevelDebug: - mlvl = logger.DebugLevel - default: - return - } - fields := make(map[string]interface{}, int(len(args)/2)) - for i := 0; i < len(args)/2; i += 2 { - fields[fmt.Sprintf("%v", args[i])] = args[i+1] - } - l.l.Fields(fields).Log(l.ctx, mlvl, msg) -} - -func (l *mlogger) Level() kgo.LogLevel { - switch l.l.Options().Level { - case logger.ErrorLevel: - return kgo.LogLevelError - case logger.WarnLevel: - return kgo.LogLevelWarn - case logger.InfoLevel: - return kgo.LogLevelInfo - case logger.DebugLevel, logger.TraceLevel: - return kgo.LogLevelDebug - } - return kgo.LogLevelNone -} - -func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } -func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { options := broker.NewSubscribeOptions(opts...) if options.Group == "" { @@ -351,54 +267,47 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha options.Group = uid } - kaddrs := k.opts.Addrs - - // shuffle addrs - var rng mrand.Rand - rng.Shuffle(len(kaddrs), func(i, j int) { - kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] - }) - - td := DefaultCommitInterval + commitInterval := DefaultCommitInterval if k.opts.Context != nil { if v, ok := k.opts.Context.Value(commitIntervalKey{}).(time.Duration); ok && v > 0 { - td = v + commitInterval = v } } sub := &subscriber{ topic: topic, - done: make(chan struct{}), opts: options, handler: handler, kopts: k.opts, - consumers: make(map[string]map[int32]worker), + consumers: make(map[tp]*consumer), + done: make(chan struct{}), } kopts := append(k.kopts, - kgo.SeedBrokers(kaddrs...), kgo.ConsumerGroup(options.Group), kgo.ConsumeTopics(topic), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.FetchMaxWait(1*time.Second), - // kgo.KeepControlRecords(), - kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()), - kgo.FetchIsolationLevel(kgo.ReadUncommitted()), - kgo.WithHooks(&metrics{meter: k.opts.Meter}), - kgo.AutoCommitMarks(), - kgo.AutoCommitInterval(td), + kgo.AutoCommitInterval(commitInterval), kgo.OnPartitionsAssigned(sub.assigned), kgo.OnPartitionsRevoked(sub.revoked), - kgo.OnPartitionsLost(sub.revoked), + kgo.OnPartitionsLost(sub.lost), + kgo.AutoCommitMarks(), ) - reader, err := kgo.NewClient(kopts...) + if options.Context != nil { + if v, ok := options.Context.Value(optionsKey{}).([]kgo.Opt); ok && len(v) > 0 { + kopts = append(kopts, v...) + } + } + + c, err := k.connect(ctx, kopts...) if err != nil { return nil, err } - sub.reader = reader - go sub.run(ctx) + sub.c = c + go sub.poll(ctx) k.Lock() k.subs = append(k.subs, sub) @@ -406,45 +315,32 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha return sub, nil } -func (k *kBroker) String() string { +func (k *Broker) String() string { return "kgo" } -func NewBroker(opts ...broker.Option) *kBroker { +func NewBroker(opts ...broker.Option) *Broker { + rand.Seed(time.Now().Unix()) options := broker.NewOptions(opts...) - if options.Codec.String() != "noop" { - options.Logger.Infof(options.Context, "broker codec not noop, disable plain kafka headers usage") - } + + kaddrs := options.Addrs + // shuffle addrs + var rng mrand.Rand + rng.Shuffle(len(kaddrs), func(i, j int) { + kaddrs[i], kaddrs[j] = kaddrs[j], kaddrs[i] + }) kopts := []kgo.Opt{ + kgo.DialTimeout(3 * time.Second), kgo.DisableIdempotentWrite(), kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), - kgo.RetryBackoffFn( - func() func(int) time.Duration { - var rng mrand.Rand - return func(fails int) time.Duration { - const ( - min = 250 * time.Millisecond - max = 2 * time.Second - ) - if fails <= 0 { - return min - } - if fails > 10 { - return max - } - - backoff := min * time.Duration(1<<(fails-1)) - jitter := 0.8 + 0.4*rng.Float64() - backoff = time.Duration(float64(backoff) * jitter) - - if backoff > max { - return max - } - return backoff - } - }(), - ), + // kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, func() string { return time.Now().Format(time.StampMilli) })), + kgo.WithHooks(&metrics{meter: options.Meter}), + kgo.SeedBrokers(kaddrs...), + kgo.RetryBackoffFn(DefaultRetryBackoffFn), + kgo.BlockRebalanceOnPoll(), + kgo.Balancers(kgo.CooperativeStickyBalancer()), + kgo.FetchIsolationLevel(kgo.ReadUncommitted()), } if options.Context != nil { @@ -453,7 +349,7 @@ func NewBroker(opts ...broker.Option) *kBroker { } } - return &kBroker{ + return &Broker{ opts: options, kopts: kopts, } diff --git a/kgo_test.go b/kgo_test.go index 56a7dc2..743a848 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -28,6 +28,23 @@ var bm = &broker.Message{ Body: []byte(`"body"`), } +func TestConnect(t *testing.T) { + var addrs []string + ctx := context.TODO() + b := kgo.NewBroker( + broker.Addrs(addrs...), + kgo.CommitInterval(5*time.Second), + kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)), + ) + if err := b.Init(); err != nil { + t.Fatal(err) + } + + if err := b.Connect(ctx); err != nil { + t.Fatal(err) + } +} + func TestPubSub(t *testing.T) { if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { t.Skip() diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..5bd5a27 --- /dev/null +++ b/logger.go @@ -0,0 +1,55 @@ +package kgo + +import ( + "context" + "fmt" + + "github.com/twmb/franz-go/pkg/kgo" + "go.unistack.org/micro/v3/logger" +) + +type mlogger struct { + l logger.Logger + ctx context.Context +} + +func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) { + var mlvl logger.Level + switch lvl { + case kgo.LogLevelNone: + return + case kgo.LogLevelError: + mlvl = logger.ErrorLevel + case kgo.LogLevelWarn: + mlvl = logger.WarnLevel + case kgo.LogLevelInfo: + mlvl = logger.InfoLevel + case kgo.LogLevelDebug: + mlvl = logger.DebugLevel + default: + return + } + if len(args) > 0 { + fields := make(map[string]interface{}, int(len(args)/2)) + for i := 0; i <= len(args)/2; i += 2 { + fields[fmt.Sprintf("%v", args[i])] = args[i+1] + } + l.l.Fields(fields).Log(l.ctx, mlvl, msg) + } else { + l.l.Log(l.ctx, mlvl, msg) + } +} + +func (l *mlogger) Level() kgo.LogLevel { + switch l.l.Options().Level { + case logger.ErrorLevel: + return kgo.LogLevelError + case logger.WarnLevel: + return kgo.LogLevelWarn + case logger.InfoLevel: + return kgo.LogLevelInfo + case logger.DebugLevel, logger.TraceLevel: + return kgo.LogLevelDebug + } + return kgo.LogLevelNone +} diff --git a/options.go b/options.go index 37fbb57..4664ee6 100644 --- a/options.go +++ b/options.go @@ -4,10 +4,9 @@ import ( "context" "time" - kgo "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kgo" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/client" - "go.unistack.org/micro/v3/server" ) // DefaultCommitInterval specifies how fast send commit offsets to kafka @@ -49,7 +48,7 @@ func Options(opts ...kgo.Opt) broker.Option { } } -// SubscribeOptions pass additional options to broker +// SubscribeOptions pass additional options to broker in Subscribe func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption { return func(o *broker.SubscribeOptions) { if o.Context == nil { @@ -64,21 +63,6 @@ func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption { } } -// SubscriberOptions pass additional options to broker -func SubscriberOptions(opts ...kgo.Opt) server.SubscriberOption { - return func(o *server.SubscriberOptions) { - if o.Context == nil { - o.Context = context.Background() - } - options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt) - if !ok { - options = make([]kgo.Opt, 0, len(opts)) - } - options = append(options, opts...) - o.Context = context.WithValue(o.Context, optionsKey{}, options) - } -} - type commitIntervalKey struct{} // CommitInterval specifies interval to send commits @@ -86,7 +70,7 @@ func CommitInterval(td time.Duration) broker.Option { return broker.SetOption(commitIntervalKey{}, td) } -var DefaultSubscribeMaxInflight = 1000 +var DefaultSubscribeMaxInflight = 10 type subscribeMaxInflightKey struct{} diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..b6f147f --- /dev/null +++ b/subscriber.go @@ -0,0 +1,229 @@ +package kgo + +import ( + "context" + "sync" + + "github.com/twmb/franz-go/pkg/kgo" + "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/metadata" +) + +type tp struct { + t string + p int32 +} + +type consumer struct { + c *kgo.Client + topic string + partition int32 + opts broker.SubscribeOptions + kopts broker.Options + handler broker.Handler + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition +} + +type subscriber struct { + c *kgo.Client + topic string + opts broker.SubscribeOptions + kopts broker.Options + handler broker.Handler + closed bool + done chan struct{} + consumers map[tp]*consumer + sync.RWMutex +} + +func (s *subscriber) Options() broker.SubscribeOptions { + return s.opts +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Unsubscribe(ctx context.Context) error { + if s.closed { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + close(s.done) + s.closed = true + } + return nil +} + +func (s *subscriber) poll(ctx context.Context) { + maxInflight := DefaultSubscribeMaxInflight + if s.opts.Context != nil { + if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { + maxInflight = n + } + } + for { + select { + case <-ctx.Done(): + s.c.Close() + return + case <-s.done: + s.c.Close() + return + default: + fetches := s.c.PollRecords(ctx, maxInflight) + if fetches.IsClientClosed() { + s.kopts.Logger.Errorf(ctx, "[kgo] client closed") + s.closed = true + return + } + fetches.EachError(func(t string, p int32, err error) { + s.kopts.Logger.Fatalf(ctx, "[kgo] fetch topic %s partition %d err: %v", t, p, err) + }) + + fetches.EachPartition(func(p kgo.FetchTopicPartition) { + tp := tp{p.Topic, p.Partition} + s.consumers[tp].recs <- p + }) + s.c.AllowRebalance() + } + } +} + +func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { + var wg sync.WaitGroup + defer wg.Wait() + + for topic, partitions := range lost { + for _, partition := range partitions { + tp := tp{topic, partition} + pc := s.consumers[tp] + delete(s.consumers, tp) + close(pc.quit) + s.kopts.Logger.Debugf(ctx, "[kgo] waiting for work to finish topic %s partition %d", topic, partition) + wg.Add(1) + go func() { <-pc.done; wg.Done() }() + } + } +} + +func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { + s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) + s.killConsumers(ctx, lost) +} + +func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { + s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) + s.killConsumers(ctx, revoked) + if err := c.CommitMarkedOffsets(ctx); err != nil { + s.kopts.Logger.Errorf(ctx, "[kgo] revoked CommitMarkedOffsets err: %v", err) + } +} + +func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { + for topic, partitions := range assigned { + for _, partition := range partitions { + pc := &consumer{ + c: c, + topic: topic, + partition: partition, + + quit: make(chan struct{}), + done: make(chan struct{}), + recs: make(chan kgo.FetchTopicPartition, 4), + handler: s.handler, + kopts: s.kopts, + opts: s.opts, + } + s.consumers[tp{topic, partition}] = pc + go pc.consume() + } + } +} + +func (pc *consumer) consume() { + defer close(pc.done) + pc.kopts.Logger.Debugf(pc.kopts.Context, "starting, topic %s partition %d", pc.topic, pc.partition) + defer pc.kopts.Logger.Debugf(pc.kopts.Context, "killing, topic %s partition %d", pc.topic, pc.partition) + + eh := pc.kopts.ErrorHandler + if pc.opts.ErrorHandler != nil { + eh = pc.opts.ErrorHandler + } + + for { + select { + case <-pc.quit: + return + case p := <-pc.recs: + for _, record := range p.Records { + p := eventPool.Get().(*event) + p.msg.Header = nil + p.msg.Body = nil + p.topic = record.Topic + p.err = nil + p.ack = false + if pc.kopts.Codec.String() == "noop" { + p.msg.Header = metadata.New(len(record.Headers)) + for _, hdr := range record.Headers { + p.msg.Header.Set(hdr.Key, string(hdr.Value)) + } + p.msg.Body = record.Value + } else if pc.opts.BodyOnly { + p.msg.Body = record.Value + } else { + if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { + p.err = err + p.msg.Body = record.Value + if eh != nil { + _ = eh(p) + if p.ack { + pc.c.MarkCommitRecords(record) + } else { + eventPool.Put(p) + pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + return + } + eventPool.Put(p) + continue + } else { + if pc.kopts.Logger.V(logger.ErrorLevel) { + pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) + } + } + eventPool.Put(p) + pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") + return + } + } + err := pc.handler(p) + if err == nil && pc.opts.AutoAck { + p.ack = true + } else if err != nil { + p.err = err + if eh != nil { + _ = eh(p) + } else { + if pc.kopts.Logger.V(logger.ErrorLevel) { + pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: subscriber error: %v", err) + } + } + } + if p.ack { + eventPool.Put(p) + pc.c.MarkCommitRecords(record) + } else { + eventPool.Put(p) + pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + return + } + } + } + } +} diff --git a/util.go b/util.go deleted file mode 100644 index e08cd4e..0000000 --- a/util.go +++ /dev/null @@ -1,244 +0,0 @@ -package kgo - -import ( - "context" - "errors" - "sync" - - kgo "github.com/twmb/franz-go/pkg/kgo" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" -) - -var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") - -var pPool = sync.Pool{ - New: func() interface{} { - return &publication{msg: broker.NewMessage("")} - }, -} - -type worker struct { - done chan struct{} - recs chan []*kgo.Record - cherr chan error - handler broker.Handler - batchHandler broker.BatchHandler - opts broker.SubscribeOptions - kopts broker.Options - tpmap map[string][]int32 - maxInflight int - reader *kgo.Client - ctx context.Context -} - -func (s *subscriber) run(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-s.kopts.Context.Done(): - return - default: - fetches := s.reader.PollFetches(ctx) - if fetches.IsClientClosed() { - // TODO: fatal ? - return - } - if len(fetches.Errors()) > 0 { - for _, err := range fetches.Errors() { - s.kopts.Logger.Fatalf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err) - } - // TODO: fatal ? - return - } - - fetches.EachPartition(func(p kgo.FetchTopicPartition) { - s.Lock() - consumers := s.consumers[p.Topic] - s.Unlock() - if consumers == nil { - return - } - w, ok := consumers[p.Partition] - if !ok { - return - } - select { - case err := <-w.cherr: - s.kopts.Logger.Fatalf(ctx, "handle err: %v", err) - return - case w.recs <- p.Records: - case <-w.done: - } - }) - } - } -} - -func (s *subscriber) assigned(ctx context.Context, _ *kgo.Client, assigned map[string][]int32) { - maxInflight := DefaultSubscribeMaxInflight - - if s.opts.Context != nil { - if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { - maxInflight = n - } - } - - s.Lock() - for topic, partitions := range assigned { - if s.consumers[topic] == nil { - s.consumers[topic] = make(map[int32]worker) - } - for _, partition := range partitions { - w := worker{ - done: make(chan struct{}), - recs: make(chan []*kgo.Record), - cherr: make(chan error), - kopts: s.kopts, - opts: s.opts, - ctx: ctx, - tpmap: map[string][]int32{topic: []int32{partition}}, - reader: s.reader, - handler: s.handler, - batchHandler: s.batchhandler, - maxInflight: maxInflight, - } - s.consumers[topic][partition] = w - go w.handle() - } - } - s.Unlock() -} - -func (s *subscriber) revoked(_ context.Context, _ *kgo.Client, revoked map[string][]int32) { - s.Lock() - for topic, partitions := range revoked { - ptopics := s.consumers[topic] - for _, partition := range partitions { - w := ptopics[partition] - delete(ptopics, partition) - if len(ptopics) == 0 { - delete(s.consumers, topic) - } - close(w.done) - } - } - s.Unlock() -} - -func (w *worker) handle() { - var err error - - eh := w.kopts.ErrorHandler - if w.opts.ErrorHandler != nil { - eh = w.opts.ErrorHandler - } - - paused := false - for { - select { - case <-w.ctx.Done(): - w.cherr <- w.ctx.Err() - return - case <-w.done: - return - case recs := <-w.recs: - if len(recs) >= w.maxInflight { - paused = true - w.reader.PauseFetchPartitions(w.tpmap) - } - for _, record := range recs { - p := pPool.Get().(*publication) - p.msg.Header = nil - p.msg.Body = nil - p.topic = record.Topic - p.err = nil - p.ack = false - if w.opts.BodyOnly { - p.msg.Body = record.Value - if l := len(record.Headers); l > 0 { - if p.msg.Header == nil { - p.msg.Header = metadata.New(l) - } - for _, h := range record.Headers { - p.msg.Header.Set(h.Key, string(h.Value)) - } - } - } else if w.kopts.Codec.String() == "noop" { - p.msg.Body = record.Value - p.msg.Header = metadata.New(len(record.Headers)) - for _, h := range record.Headers { - p.msg.Header.Set(h.Key, string(h.Value)) - } - } else { - if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { - p.err = err - p.msg.Body = record.Value - if l := len(record.Headers); l > 0 { - if p.msg.Header == nil { - p.msg.Header = metadata.New(l) - } - for _, h := range record.Headers { - p.msg.Header.Set(h.Key, string(h.Value)) - } - } - if eh != nil { - _ = eh(p) - if p.ack { - w.reader.MarkCommitRecords(record) - } else { - w.cherr <- ErrLostMessage - pPool.Put(p) - return - } - pPool.Put(p) - continue - } else { - if w.kopts.Logger.V(logger.ErrorLevel) { - w.kopts.Logger.Errorf(w.kopts.Context, "[kgo]: failed to unmarshal: %v", err) - } - } - pPool.Put(p) - w.cherr <- err - return - } - if l := len(record.Headers); l > 0 { - if p.msg.Header == nil { - p.msg.Header = metadata.New(l) - } - for _, h := range record.Headers { - p.msg.Header.Set(h.Key, string(h.Value)) - } - } - } - err = w.handler(p) - if err == nil && w.opts.AutoAck { - p.ack = true - } else if err != nil { - p.err = err - if eh != nil { - _ = eh(p) - } else { - if w.kopts.Logger.V(logger.ErrorLevel) { - w.kopts.Logger.Errorf(w.kopts.Context, "[kgo]: subscriber error: %v", err) - } - } - } - if p.ack { - pPool.Put(p) - w.reader.MarkCommitRecords(record) - } else { - pPool.Put(p) - w.cherr <- ErrLostMessage - return - } - } - if paused { - paused = false - w.reader.ResumeFetchPartitions(w.tpmap) - } - } - } -}