broker/segmentio: tmp revert
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
be82d76d81
commit
6a3018a9bc
271
segmentio.go
271
segmentio.go
@ -4,6 +4,7 @@ package segmentio
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@ -28,18 +29,26 @@ type kBroker struct {
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
group *kafka.ConsumerGroup
|
||||
t string
|
||||
opts broker.SubscribeOptions
|
||||
k *kBroker
|
||||
group *kafka.ConsumerGroup
|
||||
t string
|
||||
opts broker.SubscribeOptions
|
||||
offset int64
|
||||
gen *kafka.Generation
|
||||
partition int
|
||||
handler broker.Handler
|
||||
reader *kafka.Reader
|
||||
}
|
||||
|
||||
type publication struct {
|
||||
t string
|
||||
err error
|
||||
m *broker.Message
|
||||
ctx context.Context
|
||||
gen *kafka.Generation
|
||||
mp map[string]map[int]int64 // for commit offsets
|
||||
t string
|
||||
err error
|
||||
m *broker.Message
|
||||
ctx context.Context
|
||||
gen *kafka.Generation
|
||||
reader *kafka.Reader
|
||||
km kafka.Message
|
||||
mp map[string]map[int]int64 // for commit offsets
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -55,7 +64,8 @@ func (p *publication) Message() *broker.Message {
|
||||
}
|
||||
|
||||
func (p *publication) Ack() error {
|
||||
return p.gen.CommitOffsets(p.mp)
|
||||
//return p.gen.CommitOffsets(p.mp)
|
||||
return p.reader.CommitMessages(p.ctx, p.km)
|
||||
}
|
||||
|
||||
func (p *publication) Error() error {
|
||||
@ -71,7 +81,8 @@ func (s *subscriber) Topic() string {
|
||||
}
|
||||
|
||||
func (s *subscriber) Unsubscribe() error {
|
||||
return s.group.Close()
|
||||
//return s.group.Close()
|
||||
return s.reader.Close()
|
||||
}
|
||||
|
||||
func (k *kBroker) Address() string {
|
||||
@ -182,6 +193,80 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ
|
||||
return writer.WriteMessages(k.opts.Context, kafka.Message{Value: buf})
|
||||
}
|
||||
|
||||
func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
opt := broker.SubscribeOptions{
|
||||
AutoAck: true,
|
||||
Queue: uuid.New().String(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
cfg := k.readerConfig
|
||||
cfg.Topic = topic
|
||||
cfg.GroupID = opt.Queue
|
||||
cfg.WatchPartitionChanges = true
|
||||
cfg.MaxAttempts = 1
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader := kafka.NewReader(cfg)
|
||||
sub := &subscriber{opts: opt, t: topic, reader: reader}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-k.opts.Context.Done():
|
||||
return
|
||||
default:
|
||||
msg, err := reader.FetchMessage(k.opts.Context)
|
||||
if err != nil && err == io.EOF {
|
||||
return
|
||||
} else if err != nil {
|
||||
logger.Errorf("[kafka] subscribe error: %v", err)
|
||||
}
|
||||
p := &publication{t: topic, ctx: k.opts.Context, reader: reader, km: msg}
|
||||
|
||||
var m broker.Message
|
||||
eh := k.opts.ErrorHandler
|
||||
p.m = &m
|
||||
if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil {
|
||||
p.err = err
|
||||
p.m.Body = msg.Value
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[kafka]: failed to unmarshal: %v", err)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
err = handler(p)
|
||||
if err == nil && opt.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
logger.Errorf("[kafka]: unable to commit msg: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[kafka]: subscriber error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
/*
|
||||
func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
opt := broker.SubscribeOptions{
|
||||
AutoAck: true,
|
||||
@ -196,103 +281,135 @@ func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker
|
||||
WatchPartitionChanges: true,
|
||||
Brokers: k.readerConfig.Brokers,
|
||||
Topics: []string{topic},
|
||||
GroupBalancers: []kafka.GroupBalancer{kafka.RangeGroupBalancer{}},
|
||||
}
|
||||
if err := gcfg.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
group, err := kafka.NewConsumerGroup(gcfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub := &subscriber{group: group, opts: opt, t: topic}
|
||||
sub := &subscriber{k: k, opts: opt, t: topic, handler: handler}
|
||||
|
||||
chErr := make(chan error)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
logger.Info("new consumer group")
|
||||
group, err := kafka.NewConsumerGroup(gcfg)
|
||||
if err != nil {
|
||||
chErr <- err
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
logger.Info("group next")
|
||||
gen, err := group.Next(k.opts.Context)
|
||||
if err == kafka.ErrGroupClosed {
|
||||
chErr <- nil
|
||||
return
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
} else if err != nil {
|
||||
chErr <- err
|
||||
return
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
chErr <- nil
|
||||
|
||||
logger.Info("gen assign")
|
||||
assignments := gen.Assignments[topic]
|
||||
for _, assignment := range assignments {
|
||||
partition, offset := assignment.ID, assignment.Offset
|
||||
p := &publication{t: topic, ctx: k.opts.Context, gen: gen}
|
||||
p.mp = map[string]map[int]int64{p.t: {partition: offset}}
|
||||
|
||||
gen.Start(func(ctx context.Context) {
|
||||
// create reader for this partition.
|
||||
cfg := k.readerConfig
|
||||
cfg.Topic = topic
|
||||
cfg.Partition = partition
|
||||
cfg.GroupID = ""
|
||||
reader := kafka.NewReader(cfg)
|
||||
defer reader.Close()
|
||||
// seek to the last committed offset for this partition.
|
||||
reader.SetOffset(offset)
|
||||
for {
|
||||
msg, err := reader.ReadMessage(ctx)
|
||||
switch err {
|
||||
case kafka.ErrGenerationEnded:
|
||||
// generation has ended
|
||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||
logger.Debug("[kafka] subscription closed")
|
||||
}
|
||||
return
|
||||
case nil:
|
||||
var m broker.Message
|
||||
eh := k.opts.ErrorHandler
|
||||
p.m = &m
|
||||
if err := k.opts.Codec.Unmarshal(msg.Value, &m); err != nil {
|
||||
p.err = err
|
||||
p.m.Body = msg.Value
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[kafka]: failed to unmarshal: %v", err)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
err = handler(p)
|
||||
if err == nil && opt.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
logger.Errorf("[kafka]: unable to commit msg: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[kafka]: subscriber error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
sub.offset = offset
|
||||
sub.partition = partition
|
||||
sub.gen = gen
|
||||
logger.Infof("gen start part %v off %v", partition, offset)
|
||||
gen.Start(sub.run)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = <-chErr
|
||||
err := <-chErr
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-k.opts.Context.Done():
|
||||
return
|
||||
case err := <-chErr:
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (s *subscriber) run(ctx context.Context) {
|
||||
// create reader for this partition.
|
||||
cfg := s.k.readerConfig
|
||||
cfg.Topic = s.t
|
||||
cfg.Partition = s.partition
|
||||
cfg.GroupID = ""
|
||||
p := &publication{t: s.t, ctx: s.k.opts.Context, gen: s.gen}
|
||||
p.mp = map[string]map[int]int64{p.t: {s.partition: s.offset}}
|
||||
|
||||
reader := kafka.NewReader(cfg)
|
||||
defer reader.Close()
|
||||
logger.Info("set offset")
|
||||
// seek to the last committed offset for this partition.
|
||||
reader.SetOffset(s.offset)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
msg, err := reader.ReadMessage(ctx)
|
||||
switch err {
|
||||
case kafka.ErrGenerationEnded:
|
||||
logger.Infof("reader err: %v", err)
|
||||
// generation has ended
|
||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||
logger.Debug("[kafka] subscription closed")
|
||||
}
|
||||
return
|
||||
case nil:
|
||||
var m broker.Message
|
||||
eh := s.k.opts.ErrorHandler
|
||||
p.m = &m
|
||||
if err := s.k.opts.Codec.Unmarshal(msg.Value, &m); err != nil {
|
||||
p.err = err
|
||||
p.m.Body = msg.Value
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[kafka]: failed to unmarshal: %v", err)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
err = s.handler(p)
|
||||
if err == nil && s.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
logger.Errorf("[kafka]: unable to commit msg: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
p.err = err
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("[kafka]: subscriber error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
func (k *kBroker) String() string {
|
||||
return "kafka"
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user