broker/segmentio: intermediate rewrite

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-22 00:43:14 +03:00
parent 6a3018a9bc
commit 03a583d9d4

View File

@ -4,7 +4,6 @@ package segmentio
import ( import (
"context" "context"
"errors" "errors"
"io"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@ -30,25 +29,28 @@ type kBroker struct {
type subscriber struct { type subscriber struct {
k *kBroker k *kBroker
group *kafka.ConsumerGroup topic string
t string
opts broker.SubscribeOptions opts broker.SubscribeOptions
offset int64 offset int64
gen *kafka.Generation gen *kafka.Generation
partition int partition int
handler broker.Handler handler broker.Handler
reader *kafka.Reader reader *kafka.Reader
exit bool
done chan struct{}
group *kafka.ConsumerGroup
sync.RWMutex
} }
type publication struct { type publication struct {
t string topic string
err error err error
m *broker.Message m *broker.Message
ctx context.Context ctx context.Context
gen *kafka.Generation generation *kafka.Generation
reader *kafka.Reader reader *kafka.Reader
km kafka.Message km kafka.Message
mp map[string]map[int]int64 // for commit offsets offsets map[string]map[int]int64 // for commit offsets
} }
func init() { func init() {
@ -56,7 +58,7 @@ func init() {
} }
func (p *publication) Topic() string { func (p *publication) Topic() string {
return p.t return p.topic
} }
func (p *publication) Message() *broker.Message { func (p *publication) Message() *broker.Message {
@ -64,8 +66,7 @@ func (p *publication) Message() *broker.Message {
} }
func (p *publication) Ack() error { func (p *publication) Ack() error {
//return p.gen.CommitOffsets(p.mp) return p.generation.CommitOffsets(p.offsets)
return p.reader.CommitMessages(p.ctx, p.km)
} }
func (p *publication) Error() error { func (p *publication) Error() error {
@ -77,12 +78,17 @@ func (s *subscriber) Options() broker.SubscribeOptions {
} }
func (s *subscriber) Topic() string { func (s *subscriber) Topic() string {
return s.t return s.topic
} }
func (s *subscriber) Unsubscribe() error { func (s *subscriber) Unsubscribe() error {
//return s.group.Close() var err error
return s.reader.Close() s.Lock()
defer s.Unlock()
if s.group != nil {
err = s.group.Close()
}
return err
} }
func (k *kBroker) Address() string { func (k *kBroker) Address() string {
@ -202,142 +208,73 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
o(&opt) o(&opt)
} }
cfg := k.readerConfig cgcfg := kafka.ConsumerGroupConfig{
cfg.Topic = topic
cfg.GroupID = opt.Queue
cfg.WatchPartitionChanges = true
cfg.MaxAttempts = 1
if err := cfg.Validate(); err != nil {
return nil, err
}
reader := kafka.NewReader(cfg)
sub := &subscriber{opts: opt, t: topic, reader: reader}
go func() {
for {
select {
case <-k.opts.Context.Done():
return
default:
msg, err := reader.FetchMessage(k.opts.Context)
if err != nil && err == io.EOF {
return
} else if err != nil {
logger.Errorf("[kafka] subscribe error: %v", err)
}
p := &publication{t: topic, ctx: k.opts.Context, reader: reader, km: msg}
var m broker.Message
eh := k.opts.ErrorHandler
p.m = &m
if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil {
p.err = err
p.m.Body = msg.Value
if eh != nil {
eh(p)
} else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: failed to unmarshal: %v", err)
}
}
continue
}
err = handler(p)
if err == nil && opt.AutoAck {
if err = p.Ack(); err != nil {
logger.Errorf("[kafka]: unable to commit msg: %v", err)
}
} else if err != nil {
p.err = err
if eh != nil {
eh(p)
} else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: subscriber error: %v", err)
}
}
}
}
}
}()
return sub, nil
}
/*
func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
opt := broker.SubscribeOptions{
AutoAck: true,
Queue: uuid.New().String(),
}
for _, o := range opts {
o(&opt)
}
gcfg := kafka.ConsumerGroupConfig{
ID: opt.Queue, ID: opt.Queue,
WatchPartitionChanges: true, WatchPartitionChanges: true,
Brokers: k.readerConfig.Brokers, Brokers: k.readerConfig.Brokers,
Topics: []string{topic}, Topics: []string{topic},
GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}}, GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}},
} }
if err := gcfg.Validate(); err != nil { if err := cgcfg.Validate(); err != nil {
return nil, err return nil, err
} }
sub := &subscriber{k: k, opts: opt, t: topic, handler: handler} group, err := kafka.NewConsumerGroup(cgcfg)
chErr := make(chan error)
go func() {
for {
logger.Info("new consumer group")
group, err := kafka.NewConsumerGroup(gcfg)
if err != nil {
chErr <- err
time.Sleep(1 * time.Second)
continue
}
logger.Info("group next")
gen, err := group.Next(k.opts.Context)
if err == kafka.ErrGroupClosed {
chErr <- nil
time.Sleep(1 * time.Second)
continue
} else if err != nil {
chErr <- err
time.Sleep(1 * time.Second)
continue
}
chErr <- nil
logger.Info("gen assign")
assignments := gen.Assignments[topic]
for _, assignment := range assignments {
partition, offset := assignment.ID, assignment.Offset
sub.offset = offset
sub.partition = partition
sub.gen = gen
logger.Infof("gen start part %v off %v", partition, offset)
gen.Start(sub.run)
}
}
}()
err := <-chErr
if err != nil { if err != nil {
return nil, err return nil, err
} }
sub := &subscriber{opts: opt, topic: topic, group: group}
go func() { go func() {
for { for {
select { select {
case <-k.opts.Context.Done(): case <-k.opts.Context.Done():
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer group closed %v", k.opts.Context.Err())
}
// consumer group closed
return return
case err := <-chErr: default:
if err != nil { /*
logger.Error(err) group, err := kafka.NewConsumerGroup(cgcfg)
if err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer group error %v", err)
}
continue
}
sub.Lock()
*(sub.group) = *group
sub.Unlock()
*/
generation, err := group.Next(k.opts.Context)
switch err {
case kafka.ErrGroupClosed:
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer generation ended %v", k.opts.Context.Err())
}
continue
default:
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Trace("[segmentio] consumer error %v", k.opts.Context.Err())
}
continue
case nil:
// continue
}
for _, t := range cgcfg.Topics {
assignments := generation.Assignments[t]
for _, assignment := range assignments {
cfg := k.readerConfig
cfg.Topic = t
cfg.Partition = assignment.ID
cfg.GroupID = ""
// cfg.StartOffset = assignment.Offset
cgh := &cgHandler{generation: generation, brokerOpts: k.opts, subOpts: opt, reader: kafka.NewReader(cfg), handler: handler}
generation.Start(cgh.run)
}
} }
} }
} }
@ -346,54 +283,60 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
return sub, nil return sub, nil
} }
func (s *subscriber) run(ctx context.Context) { type cgHandler struct {
// create reader for this partition. topic string
cfg := s.k.readerConfig generation *kafka.Generation
cfg.Topic = s.t brokerOpts broker.Options
cfg.Partition = s.partition subOpts broker.SubscribeOptions
cfg.GroupID = "" reader *kafka.Reader
p := &publication{t: s.t, ctx: s.k.opts.Context, gen: s.gen} handler broker.Handler
p.mp = map[string]map[int]int64{p.t: {s.partition: s.offset}} }
reader := kafka.NewReader(cfg) func (h *cgHandler) run(ctx context.Context) {
defer reader.Close() offsets := make(map[string]map[int]int64)
logger.Info("set offset") offsets[h.reader.Config().Topic] = make(map[int]int64)
// seek to the last committed offset for this partition.
reader.SetOffset(s.offset) defer h.reader.Close()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
msg, err := reader.ReadMessage(ctx) msg, err := h.reader.ReadMessage(ctx)
switch err { switch err {
default:
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("[segmentio] unexpected error: %v", err)
}
return
case kafka.ErrGenerationEnded: case kafka.ErrGenerationEnded:
logger.Infof("reader err: %v", err)
// generation has ended // generation has ended
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Debug("[kafka] subscription closed") logger.Trace("[segmentio] generation ended and subscription closed")
} }
return return
case nil: case nil:
var m broker.Message var m broker.Message
eh := s.k.opts.ErrorHandler eh := h.brokerOpts.ErrorHandler
p.m = &m offsets[msg.Topic][msg.Partition] = msg.Offset
if err := s.k.opts.Codec.Unmarshal(msg.Value, &m); err != nil { p := &publication{generation: h.generation, m: &m, offsets: offsets}
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, &m); err != nil {
p.err = err p.err = err
p.m.Body = msg.Value p.m.Body = msg.Value
if eh != nil { if eh != nil {
eh(p) eh(p)
} else { } else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: failed to unmarshal: %v", err) logger.Errorf("[segmentio]: failed to unmarshal: %v", err)
} }
} }
continue continue
} }
err = s.handler(p) err = h.handler(p)
if err == nil && s.opts.AutoAck { if err == nil && h.subOpts.AutoAck {
if err = p.Ack(); err != nil { if err = p.Ack(); err != nil {
logger.Errorf("[kafka]: unable to commit msg: %v", err) logger.Errorf("[segmentio]: unable to commit msg: %v", err)
} }
} else if err != nil { } else if err != nil {
p.err = err p.err = err
@ -401,7 +344,7 @@ func (s *subscriber) run(ctx context.Context) {
eh(p) eh(p)
} else { } else {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[kafka]: subscriber error: %v", err) logger.Errorf("[segmentio]: subscriber error: %v", err)
} }
} }
} }
@ -409,7 +352,7 @@ func (s *subscriber) run(ctx context.Context) {
} }
} }
} }
*/
func (k *kBroker) String() string { func (k *kBroker) String() string {
return "kafka" return "kafka"
} }