diff --git a/go.mod b/go.mod index 0c9b901..661ffe1 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,7 @@ require ( github.com/klauspost/compress v1.13.1 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/segmentio/kafka-go v0.4.17 - github.com/unistack-org/micro/v3 v3.4.9 + github.com/unistack-org/micro/v3 v3.5.3 ) + +//replace github.com/unistack-org/micro/v3 => ../micro diff --git a/go.sum b/go.sum index fa52e7e..70e25db 100644 --- a/go.sum +++ b/go.sum @@ -6,19 +6,15 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= -github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= @@ -28,24 +24,19 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= -github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/segmentio/kafka-go v0.4.16 h1:9dt78ehM9qzAkekA60D6A96RlqDzC3hnYYa8y5Szd+U= -github.com/segmentio/kafka-go v0.4.16/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY= github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/unistack-org/micro/v3 v3.4.7 h1:zmGFx2J6tIbmr4IGLcc+LNtbftQFZI42bfuNV5xNYM0= -github.com/unistack-org/micro/v3 v3.4.7/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= -github.com/unistack-org/micro/v3 v3.4.9 h1:IBCW/yxQijO/X+2zNzTdSsvzlgE0+y49bvjWemtY2zA= -github.com/unistack-org/micro/v3 v3.4.9/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro= +github.com/unistack-org/micro/v3 v3.5.3 h1:yb647rdyxKmzs8fwUm/YbyZupLfcYlZseJr/TpToW+4= +github.com/unistack-org/micro/v3 v3.5.3/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= diff --git a/segmentio.go b/segmentio.go index 2add790..c8bdfc3 100644 --- a/segmentio.go +++ b/segmentio.go @@ -12,6 +12,7 @@ import ( kafka "github.com/segmentio/kafka-go" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" ) type kBroker struct { @@ -72,6 +73,10 @@ func (p *publication) Error() error { return p.err } +func (p *publication) SetError(err error) { + p.err = err +} + func (s *subscriber) Options() broker.SubscribeOptions { return s.opts } @@ -216,6 +221,46 @@ func (k *kBroker) Options() broker.Options { return k.opts } +func (k *kBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + var val []byte + var err error + + options := broker.NewPublishOptions(opts...) + + kmsgs := make([]kafka.Message, 0, len(msgs)) + for _, msg := range msgs { + if options.BodyOnly { + val = msg.Body + } else { + val, err = k.opts.Codec.Marshal(msg) + if err != nil { + return err + } + } + topic, _ := msg.Header.Get(metadata.HeaderTopic) + kmsg := kafka.Message{Topic: topic, Value: val} + if options.Context != nil { + if key, ok := options.Context.Value(publishKey{}).([]byte); ok && len(key) > 0 { + kmsg.Key = key + } + } + kmsgs = append(kmsgs, kmsg) + } + + if k.writer.Async { + k.Lock() + k.messages = append(k.messages, kmsgs...) + k.Unlock() + return nil + } + + wCtx := k.opts.Context + if ctx != nil { + wCtx = ctx + } + return k.writer.WriteMessages(wCtx, kmsgs...) +} + func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { var val []byte var err error @@ -251,6 +296,190 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message return k.writer.WriteMessages(wCtx, kmsg) } +/* +func (k *kBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + opt := broker.NewSubscribeOptions(opts...) + + if opt.Group == "" { + id, err := uuid.NewRandom() + if err != nil { + return nil, err + } + opt.Group = id.String() + } + + cgcfg := kafka.ConsumerGroupConfig{ + ID: opt.Group, + WatchPartitionChanges: true, + Brokers: k.readerConfig.Brokers, + Topics: []string{topic}, + GroupBalancers: k.readerConfig.GroupBalancers, + StartOffset: k.readerConfig.StartOffset, + Logger: k.readerConfig.Logger, + ErrorLogger: k.readerConfig.ErrorLogger, + Dialer: k.readerConfig.Dialer, + } + if err := cgcfg.Validate(); err != nil { + return nil, err + } + gCtx := k.opts.Context + if ctx != nil { + gCtx = ctx + } + + sub := &subscriber{brokerOpts: k.opts, opts: opt, topic: topic, cgcfg: cgcfg} + sub.createGroup(gCtx) + + go func() { + defer func() { + sub.RLock() + closed := sub.closed + sub.RUnlock() + if !closed { + if err := sub.group.Close(); err != nil { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] consumer group close error %v", err) + } + } + }() + + for { + select { + case <-ctx.Done(): + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + return + } + if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] subscribe context closed %v", k.opts.Context.Err()) + } + return + case <-k.opts.Context.Done(): + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + return + } + if k.opts.Context.Err() != nil && k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] broker context closed error %v", k.opts.Context.Err()) + } + return + default: + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + return + } + generation, err := sub.group.Next(gCtx) + switch err { + case nil: + // normal execution + case kafka.ErrGroupClosed: + k.opts.Logger.Tracef(k.opts.Context, "group closed %v", err) + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + return + } + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "[segmentio] recreate consumer group, as it closed by kafka %v", k.opts.Context.Err()) + } + sub.createGroup(gCtx) + continue + default: + k.opts.Logger.Tracef(k.opts.Context, "some error: %v", err) + sub.RLock() + closed := sub.closed + sub.RUnlock() + if closed { + return + } + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Tracef(k.opts.Context, "[segmentio] recreate consumer group, as unexpected consumer error %T %v", err, err) + } + sub.createGroup(gCtx) + continue + } + + ackCh := make(chan map[string]map[int]int64, DefaultCommitQueueSize) + errChLen := 0 + for _, assignments := range generation.Assignments { + errChLen += len(assignments) + } + errChs := make([]chan error, 0, errChLen) + + commitDoneCh := make(chan bool) + readerDone := int32(0) + cntWait := int32(0) + + for topic, assignments := range generation.Assignments { + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Tracef(k.opts.Context, "topic: %s assignments: %v", topic, assignments) + } + for _, assignment := range assignments { + cfg := k.readerConfig + cfg.Topic = topic + cfg.Partition = assignment.ID + cfg.GroupID = "" + reader := kafka.NewReader(cfg) + + if err := reader.SetOffset(assignment.Offset); err != nil { + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "assignments offset %d can be set by reader: %v", assignment.Offset, err) + } + if err = reader.Close(); err != nil { + if k.opts.Logger.V(logger.ErrorLevel) { + k.opts.Logger.Errorf(k.opts.Context, "reader close err: %v", err) + } + } + continue + } + errCh := make(chan error) + errChs = append(errChs, errCh) + cgh := &cgBatchHandler{ + brokerOpts: k.opts, + subOpts: opt, + reader: reader, + handler: handler, + ackCh: ackCh, + errCh: errCh, + cntWait: &cntWait, + readerDone: &readerDone, + commitDoneCh: commitDoneCh, + } + atomic.AddInt32(cgh.cntWait, 1) + generation.Start(cgh.run) + } + } + if k.opts.Logger.V(logger.TraceLevel) { + k.opts.Logger.Trace(k.opts.Context, "start async commit loop") + } + // run async commit loop + go k.commitLoop(generation, k.readerConfig.CommitInterval, ackCh, errChs, &readerDone, commitDoneCh, &cntWait) + } + } + }() + + return sub, nil +} + +type cgBatchHandler struct { + brokerOpts broker.Options + subOpts broker.SubscribeOptions + reader *kafka.Reader + handler broker.BatchHandler + ackCh chan map[string]map[int]int64 + errCh chan error + readerDone *int32 + commitDoneCh chan bool + cntWait *int32 +} +*/ + func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { opt := broker.NewSubscribeOptions(opts...) @@ -645,7 +874,7 @@ func (h *cgHandler) run(ctx context.Context) { p.msg.Body = msg.Value } else { if err := h.brokerOpts.Codec.Unmarshal(msg.Value, p.msg); err != nil { - p.err = err + p.SetError(err) p.msg.Body = msg.Value if eh != nil { _ = eh(p) @@ -710,7 +939,7 @@ func (sub *subscriber) createGroup(ctx context.Context) { } func (k *kBroker) String() string { - return "kafka" + return "segmentio" } func (k *kBroker) configure(opts ...broker.Option) error {