update for latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
2a935a51f3
commit
5934963d81
4
go.mod
4
go.mod
@ -8,5 +8,7 @@ require (
|
|||||||
github.com/klauspost/compress v1.13.1 // indirect
|
github.com/klauspost/compress v1.13.1 // indirect
|
||||||
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
|
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
|
||||||
github.com/segmentio/kafka-go v0.4.17
|
github.com/segmentio/kafka-go v0.4.17
|
||||||
github.com/unistack-org/micro/v3 v3.4.9
|
github.com/unistack-org/micro/v3 v3.5.3
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//replace github.com/unistack-org/micro/v3 => ../micro
|
||||||
|
13
go.sum
13
go.sum
@ -6,19 +6,15 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
|
|||||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||||
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
|
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
|
||||||
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
|
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
|
||||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
|
||||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
|
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
|
||||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
|
||||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
|
||||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||||
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
|
|
||||||
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||||
github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
|
github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
|
||||||
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||||
@ -28,24 +24,19 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
|||||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||||
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
|
|
||||||
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||||
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
|
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
|
||||||
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U=
|
|
||||||
github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
|
|
||||||
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
|
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
|
||||||
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
|
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
|
||||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0=
|
github.com/unistack-org/micro/v3 v3.5.3 h1:yb647rdyxKmzs8fwUm/YbyZupLfcYlZseJr/TpToW+4=
|
||||||
github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
|
github.com/unistack-org/micro/v3 v3.5.3/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro=
|
||||||
github.com/unistack-org/micro/v3 v3.4.9 h1:IBCW/yxQijO/X+2zNzTdSsvzlgE0+y49bvjWemtY2zA=
|
|
||||||
github.com/unistack-org/micro/v3 v3.4.9/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro=
|
|
||||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
|
||||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
|
||||||
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
|
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
|
||||||
|
233
segmentio.go
233
segmentio.go
@ -12,6 +12,7 @@ import (
|
|||||||
kafka "github.com/segmentio/kafka-go"
|
kafka "github.com/segmentio/kafka-go"
|
||||||
"github.com/unistack-org/micro/v3/broker"
|
"github.com/unistack-org/micro/v3/broker"
|
||||||
"github.com/unistack-org/micro/v3/logger"
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
type kBroker struct {
|
type kBroker struct {
|
||||||
@ -72,6 +73,10 @@ func (p *publication) Error() error {
|
|||||||
return p.err
|
return p.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *publication) SetError(err error) {
|
||||||
|
p.err = err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscriber) Options() broker.SubscribeOptions {
|
func (s *subscriber) Options() broker.SubscribeOptions {
|
||||||
return s.opts
|
return s.opts
|
||||||
}
|
}
|
||||||
@ -216,6 +221,46 @@ func (k *kBroker) Options() broker.Options {
|
|||||||
return k.opts
|
return k.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *kBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||||
|
var val []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
|
options := broker.NewPublishOptions(opts...)
|
||||||
|
|
||||||
|
kmsgs := make([]kafka.Message, 0, len(msgs))
|
||||||
|
for _, msg := range msgs {
|
||||||
|
if options.BodyOnly {
|
||||||
|
val = msg.Body
|
||||||
|
} else {
|
||||||
|
val, err = k.opts.Codec.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||||
|
kmsg := kafka.Message{Topic: topic, Value: val}
|
||||||
|
if options.Context != nil {
|
||||||
|
if key, ok := options.Context.Value(publishKey{}).([]byte); ok && len(key) > 0 {
|
||||||
|
kmsg.Key = key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
kmsgs = append(kmsgs, kmsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if k.writer.Async {
|
||||||
|
k.Lock()
|
||||||
|
k.messages = append(k.messages, kmsgs...)
|
||||||
|
k.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
wCtx := k.opts.Context
|
||||||
|
if ctx != nil {
|
||||||
|
wCtx = ctx
|
||||||
|
}
|
||||||
|
return k.writer.WriteMessages(wCtx, kmsgs...)
|
||||||
|
}
|
||||||
|
|
||||||
func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||||
var val []byte
|
var val []byte
|
||||||
var err error
|
var err error
|
||||||
@ -251,6 +296,190 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
|
|||||||
return k.writer.WriteMessages(wCtx, kmsg)
|
return k.writer.WriteMessages(wCtx, kmsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
|
opt := broker.NewSubscribeOptions(opts...)
|
||||||
|
|
||||||
|
if opt.Group == "" {
|
||||||
|
id, err := uuid.NewRandom()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
opt.Group = id.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
cgcfg := kafka.ConsumerGroupConfig{
|
||||||
|
ID: opt.Group,
|
||||||
|
WatchPartitionChanges: true,
|
||||||
|
Brokers: k.readerConfig.Brokers,
|
||||||
|
Topics: []string{topic},
|
||||||
|
GroupBalancers: k.readerConfig.GroupBalancers,
|
||||||
|
StartOffset: k.readerConfig.StartOffset,
|
||||||
|
Logger: k.readerConfig.Logger,
|
||||||
|
ErrorLogger: k.readerConfig.ErrorLogger,
|
||||||
|
Dialer: k.readerConfig.Dialer,
|
||||||
|
}
|
||||||
|
if err := cgcfg.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
gCtx := k.opts.Context
|
||||||
|
if ctx != nil {
|
||||||
|
gCtx = ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, cgcfg: cgcfg}
|
||||||
|
sub.createGroup(gCtx)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
sub.RLock()
|
||||||
|
closed := sub.closed
|
||||||
|
sub.RUnlock()
|
||||||
|
if !closed {
|
||||||
|
if err := sub.group.Close(); err != nil {
|
||||||
|
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
sub.RLock()
|
||||||
|
closed := sub.closed
|
||||||
|
sub.RUnlock()
|
||||||
|
if closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] subscribe context closed %v", k.opts.Context.Err())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-k.opts.Context.Done():
|
||||||
|
sub.RLock()
|
||||||
|
closed := sub.closed
|
||||||
|
sub.RUnlock()
|
||||||
|
if closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] broker context closed error %v", k.opts.Context.Err())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
sub.RLock()
|
||||||
|
closed := sub.closed
|
||||||
|
sub.RUnlock()
|
||||||
|
if closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
generation, err := sub.group.Next(gCtx)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
// normal execution
|
||||||
|
case kafka.ErrGroupClosed:
|
||||||
|
k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err)
|
||||||
|
sub.RLock()
|
||||||
|
closed := sub.closed
|
||||||
|
sub.RUnlock()
|
||||||
|
if closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if k.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed by kafka %v", k.opts.Context.Err())
|
||||||
|
}
|
||||||
|
sub.createGroup(gCtx)
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err)
|
||||||
|
sub.RLock()
|
||||||
|
closed := sub.closed
|
||||||
|
sub.RUnlock()
|
||||||
|
if closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
|
k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err)
|
||||||
|
}
|
||||||
|
sub.createGroup(gCtx)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize)
|
||||||
|
errChLen := 0
|
||||||
|
for _, assignments := range generation.Assignments {
|
||||||
|
errChLen += len(assignments)
|
||||||
|
}
|
||||||
|
errChs := make([]chan error, 0, errChLen)
|
||||||
|
|
||||||
|
commitDoneCh := make(chan bool)
|
||||||
|
readerDone := int32(0)
|
||||||
|
cntWait := int32(0)
|
||||||
|
|
||||||
|
for topic, assignments := range generation.Assignments {
|
||||||
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
|
k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments)
|
||||||
|
}
|
||||||
|
for _, assignment := range assignments {
|
||||||
|
cfg := k.readerConfig
|
||||||
|
cfg.Topic = topic
|
||||||
|
cfg.Partition = assignment.ID
|
||||||
|
cfg.GroupID = ""
|
||||||
|
reader := kafka.NewReader(cfg)
|
||||||
|
|
||||||
|
if err := reader.SetOffset(assignment.Offset); err != nil {
|
||||||
|
if k.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err)
|
||||||
|
}
|
||||||
|
if err = reader.Close(); err != nil {
|
||||||
|
if k.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
k.opts.Logger.Errorf(k.opts.Context, "reader close err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
errCh := make(chan error)
|
||||||
|
errChs = append(errChs, errCh)
|
||||||
|
cgh := &cgBatchHandler{
|
||||||
|
brokerOpts: k.opts,
|
||||||
|
subOpts: opt,
|
||||||
|
reader: reader,
|
||||||
|
handler: handler,
|
||||||
|
ackCh: ackCh,
|
||||||
|
errCh: errCh,
|
||||||
|
cntWait: &cntWait,
|
||||||
|
readerDone: &readerDone,
|
||||||
|
commitDoneCh: commitDoneCh,
|
||||||
|
}
|
||||||
|
atomic.AddInt32(cgh.cntWait, 1)
|
||||||
|
generation.Start(cgh.run)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if k.opts.Logger.V(logger.TraceLevel) {
|
||||||
|
k.opts.Logger.Trace(k.opts.Context, "start async commit loop")
|
||||||
|
}
|
||||||
|
// run async commit loop
|
||||||
|
go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return sub, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type cgBatchHandler struct {
|
||||||
|
brokerOpts broker.Options
|
||||||
|
subOpts broker.SubscribeOptions
|
||||||
|
reader *kafka.Reader
|
||||||
|
handler broker.BatchHandler
|
||||||
|
ackCh chan map[string]map[int]int64
|
||||||
|
errCh chan error
|
||||||
|
readerDone *int32
|
||||||
|
commitDoneCh chan bool
|
||||||
|
cntWait *int32
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
opt := broker.NewSubscribeOptions(opts...)
|
opt := broker.NewSubscribeOptions(opts...)
|
||||||
|
|
||||||
@ -645,7 +874,7 @@ func (h *cgHandler) run(ctx context.Context) {
|
|||||||
p.msg.Body = msg.Value
|
p.msg.Body = msg.Value
|
||||||
} else {
|
} else {
|
||||||
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil {
|
if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil {
|
||||||
p.err = err
|
p.SetError(err)
|
||||||
p.msg.Body = msg.Value
|
p.msg.Body = msg.Value
|
||||||
if eh != nil {
|
if eh != nil {
|
||||||
_ = eh(p)
|
_ = eh(p)
|
||||||
@ -710,7 +939,7 @@ func (sub *subscriber) createGroup(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k *kBroker) String() string {
|
func (k *kBroker) String() string {
|
||||||
return "kafka"
|
return "segmentio"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kBroker) configure(opts ...broker.Option) error {
|
func (k *kBroker) configure(opts ...broker.Option) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user