diff --git a/segmentio.go b/segmentio.go index f159101..4b5248a 100644 --- a/segmentio.go +++ b/segmentio.go @@ -4,6 +4,7 @@ package segmentio import ( "context" "errors" + "io" "sync" "github.com/google/uuid" @@ -28,18 +29,26 @@ type kBroker struct { } type subscriber struct { - group *kafka.ConsumerGroup - t string - opts broker.SubscribeOptions + k *kBroker + group *kafka.ConsumerGroup + t string + opts broker.SubscribeOptions + offset int64 + gen *kafka.Generation + partition int + handler broker.Handler + reader *kafka.Reader } type publication struct { - t string - err error - m *broker.Message - ctx context.Context - gen *kafka.Generation - mp map[string]map[int]int64 // for commit offsets + t string + err error + m *broker.Message + ctx context.Context + gen *kafka.Generation + reader *kafka.Reader + km kafka.Message + mp map[string]map[int]int64 // for commit offsets } func init() { @@ -55,7 +64,8 @@ func (p *publication) Message() *broker.Message { } func (p *publication) Ack() error { - return p.gen.CommitOffsets(p.mp) + //return p.gen.CommitOffsets(p.mp) + return p.reader.CommitMessages(p.ctx, p.km) } func (p *publication) Error() error { @@ -71,7 +81,8 @@ func (s *subscriber) Topic() string { } func (s *subscriber) Unsubscribe() error { - return s.group.Close() + //return s.group.Close() + return s.reader.Close() } func (k *kBroker) Address() string { @@ -182,6 +193,80 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ return writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf}) } +func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + opt := broker.SubscribeOptions{ + AutoAck: true, + Queue: uuid.New().String(), + } + for _, o := range opts { + o(&opt) + } + + cfg := k.readerConfig + cfg.Topic = topic + cfg.GroupID = opt.Queue + cfg.WatchPartitionChanges = true + cfg.MaxAttempts = 1 + if err := cfg.Validate(); err != nil { + return nil, err + } + + reader := kafka.NewReader(cfg) + sub := &subscriber{opts: opt, t: topic, reader: reader} + + go func() { + for { + select { + case <-k.opts.Context.Done(): + return + default: + msg, err := reader.FetchMessage(k.opts.Context) + if err != nil && err == io.EOF { + return + } else if err != nil { + logger.Errorf("[kafka] subscribe error: %v", err) + } + p := &publication{t: topic, ctx: k.opts.Context, reader: reader, km: msg} + + var m broker.Message + eh := k.opts.ErrorHandler + p.m = &m + if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { + p.err = err + p.m.Body = msg.Value + if eh != nil { + eh(p) + } else { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka]: failed to unmarshal: %v", err) + } + } + continue + } + err = handler(p) + if err == nil && opt.AutoAck { + if err = p.Ack(); err != nil { + logger.Errorf("[kafka]: unable to commit msg: %v", err) + } + } else if err != nil { + p.err = err + if eh != nil { + eh(p) + } else { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka]: subscriber error: %v", err) + } + } + } + + } + } + }() + + return sub, nil +} + +/* func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { opt := broker.SubscribeOptions{ AutoAck: true, @@ -196,103 +281,135 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker WatchPartitionChanges: true, Brokers: k.readerConfig.Brokers, Topics: []string{topic}, + GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}}, } if err := gcfg.Validate(); err != nil { return nil, err } - group, err := kafka.NewConsumerGroup(gcfg) - if err != nil { - return nil, err - } - - sub := &subscriber{group: group, opts: opt, t: topic} + sub := &subscriber{k: k, opts: opt, t: topic, handler: handler} chErr := make(chan error) go func() { for { + logger.Info("new consumer group") + group, err := kafka.NewConsumerGroup(gcfg) + if err != nil { + chErr <- err + time.Sleep(1 * time.Second) + continue + } + logger.Info("group next") gen, err := group.Next(k.opts.Context) if err == kafka.ErrGroupClosed { chErr <- nil - return + time.Sleep(1 * time.Second) + continue } else if err != nil { chErr <- err - return + time.Sleep(1 * time.Second) + continue } chErr <- nil - + logger.Info("gen assign") assignments := gen.Assignments[topic] for _, assignment := range assignments { partition, offset := assignment.ID, assignment.Offset - p := &publication{t: topic, ctx: k.opts.Context, gen: gen} - p.mp = map[string]map[int]int64{p.t: {partition: offset}} - - gen.Start(func(ctx context.Context) { - // create reader for this partition. - cfg := k.readerConfig - cfg.Topic = topic - cfg.Partition = partition - cfg.GroupID = "" - reader := kafka.NewReader(cfg) - defer reader.Close() - // seek to the last committed offset for this partition. - reader.SetOffset(offset) - for { - msg, err := reader.ReadMessage(ctx) - switch err { - case kafka.ErrGenerationEnded: - // generation has ended - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debug("[kafka] subscription closed") - } - return - case nil: - var m broker.Message - eh := k.opts.ErrorHandler - p.m = &m - if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { - p.err = err - p.m.Body = msg.Value - if eh != nil { - eh(p) - } else { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: failed to unmarshal: %v", err) - } - } - continue - } - err = handler(p) - if err == nil && opt.AutoAck { - if err = p.Ack(); err != nil { - logger.Errorf("[kafka]: unable to commit msg: %v", err) - } - } else if err != nil { - p.err = err - if eh != nil { - eh(p) - } else { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[kafka]: subscriber error: %v", err) - } - } - } - } - } - }) + sub.offset = offset + sub.partition = partition + sub.gen = gen + logger.Infof("gen start part %v off %v", partition, offset) + gen.Start(sub.run) } } }() - err = <-chErr + err := <-chErr if err != nil { return nil, err } + go func() { + for { + select { + case <-k.opts.Context.Done(): + return + case err := <-chErr: + if err != nil { + logger.Error(err) + } + } + } + }() + return sub, nil } +func (s *subscriber) run(ctx context.Context) { + // create reader for this partition. + cfg := s.k.readerConfig + cfg.Topic = s.t + cfg.Partition = s.partition + cfg.GroupID = "" + p := &publication{t: s.t, ctx: s.k.opts.Context, gen: s.gen} + p.mp = map[string]map[int]int64{p.t: {s.partition: s.offset}} + + reader := kafka.NewReader(cfg) + defer reader.Close() + logger.Info("set offset") + // seek to the last committed offset for this partition. + reader.SetOffset(s.offset) + for { + select { + case <-ctx.Done(): + return + default: + msg, err := reader.ReadMessage(ctx) + switch err { + case kafka.ErrGenerationEnded: + logger.Infof("reader err: %v", err) + // generation has ended + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debug("[kafka] subscription closed") + } + return + case nil: + var m broker.Message + eh := s.k.opts.ErrorHandler + p.m = &m + if err := s.k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { + p.err = err + p.m.Body = msg.Value + if eh != nil { + eh(p) + } else { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka]: failed to unmarshal: %v", err) + } + } + continue + } + err = s.handler(p) + if err == nil && s.opts.AutoAck { + if err = p.Ack(); err != nil { + logger.Errorf("[kafka]: unable to commit msg: %v", err) + } + } else if err != nil { + p.err = err + if eh != nil { + eh(p) + } else { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[kafka]: subscriber error: %v", err) + } + } + } + } + } + } +} +*/ func (k *kBroker) String() string { return "kafka" }