2020-04-08 11:53:02 +03:00
// Package kafka provides a kafka broker using segmentio
package segmentio
import (
"context"
2021-01-19 15:53:47 +03:00
"fmt"
2021-08-04 16:40:48 +03:00
"strings"
2020-04-08 11:53:02 +03:00
"sync"
2021-07-20 15:05:42 +03:00
"sync/atomic"
2020-06-18 01:38:54 +03:00
"time"
2020-04-08 11:53:02 +03:00
"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
2021-01-19 15:53:47 +03:00
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
2021-07-26 09:41:11 +03:00
"github.com/unistack-org/micro/v3/metadata"
2020-04-08 11:53:02 +03:00
)
type kBroker struct {
addrs [ ] string
readerConfig kafka . ReaderConfig
writerConfig kafka . WriterConfig
2021-07-08 14:11:14 +03:00
writer * kafka . Writer
2020-04-08 11:53:02 +03:00
connected bool
2021-03-24 23:22:00 +03:00
init bool
2020-04-08 11:53:02 +03:00
sync . RWMutex
2021-07-08 14:11:14 +03:00
opts broker . Options
messages [ ] kafka . Message
2020-04-08 11:53:02 +03:00
}
type subscriber struct {
2021-01-19 15:53:47 +03:00
topic string
opts broker . SubscribeOptions
closed bool
group * kafka . ConsumerGroup
cgcfg kafka . ConsumerGroupConfig
brokerOpts broker . Options
2020-04-22 00:43:14 +03:00
sync . RWMutex
2020-04-08 11:53:02 +03:00
}
type publication struct {
2021-07-20 15:05:42 +03:00
topic string
partition int
offset int64
err error
ackErr atomic . Value
msg * broker . Message
ackCh chan map [ string ] map [ int ] int64
readerDone * int32
2020-04-08 11:53:02 +03:00
}
func ( p * publication ) Topic ( ) string {
2020-04-22 00:43:14 +03:00
return p . topic
2020-04-08 11:53:02 +03:00
}
func ( p * publication ) Message ( ) * broker . Message {
2021-07-18 13:28:13 +03:00
return p . msg
2020-04-08 11:53:02 +03:00
}
func ( p * publication ) Ack ( ) error {
2021-07-29 23:50:26 +03:00
if cerr := p . ackErr . Load ( ) ; cerr != nil {
return cerr . ( error )
}
2021-07-20 15:05:42 +03:00
if atomic . LoadInt32 ( p . readerDone ) == 1 {
return kafka . ErrGroupClosed
}
2021-07-18 13:28:13 +03:00
p . ackCh <- map [ string ] map [ int ] int64 { p . topic : { p . partition : p . offset } }
2021-07-20 15:05:42 +03:00
return nil
2020-04-08 11:53:02 +03:00
}
func ( p * publication ) Error ( ) error {
return p . err
}
2021-07-26 09:41:11 +03:00
func ( p * publication ) SetError ( err error ) {
p . err = err
}
2020-04-08 11:53:02 +03:00
func ( s * subscriber ) Options ( ) broker . SubscribeOptions {
return s . opts
}
func ( s * subscriber ) Topic ( ) string {
2020-04-22 00:43:14 +03:00
return s . topic
2020-04-08 11:53:02 +03:00
}
2021-01-19 15:53:47 +03:00
func ( s * subscriber ) Unsubscribe ( ctx context . Context ) error {
2020-04-22 00:43:14 +03:00
var err error
s . Lock ( )
2021-07-08 14:11:14 +03:00
group := s . group
s . Unlock ( )
if group != nil {
err = group . Close ( )
}
2021-08-04 16:40:48 +03:00
if err == nil {
s . Lock ( )
s . closed = true
s . Unlock ( )
}
2020-04-22 00:43:14 +03:00
return err
2020-04-08 11:53:02 +03:00
}
func ( k * kBroker ) Address ( ) string {
2021-08-04 16:40:48 +03:00
return strings . Join ( k . addrs , "," )
2020-04-08 11:53:02 +03:00
}
2021-02-10 00:19:08 +03:00
func ( k * kBroker ) Name ( ) string {
return k . opts . Name
}
2021-01-19 15:53:47 +03:00
func ( k * kBroker ) Connect ( ctx context . Context ) error {
2020-04-08 11:53:02 +03:00
k . RLock ( )
if k . connected {
k . RUnlock ( )
return nil
}
k . RUnlock ( )
2021-01-19 15:53:47 +03:00
dialCtx := k . opts . Context
if ctx != nil {
dialCtx = ctx
}
2020-04-08 11:53:02 +03:00
kaddrs := make ( [ ] string , 0 , len ( k . addrs ) )
for _ , addr := range k . addrs {
2021-01-19 15:53:47 +03:00
conn , err := kafka . DialContext ( dialCtx , "tcp" , addr )
2020-04-08 11:53:02 +03:00
if err != nil {
continue
}
if _ , err = conn . Brokers ( ) ; err != nil {
2021-01-19 15:53:47 +03:00
if err = conn . Close ( ) ; err != nil {
return err
}
2020-04-08 11:53:02 +03:00
continue
}
kaddrs = append ( kaddrs , addr )
2021-01-19 15:53:47 +03:00
if err = conn . Close ( ) ; err != nil {
return err
}
2020-04-08 11:53:02 +03:00
}
if len ( kaddrs ) == 0 {
2021-01-19 15:53:47 +03:00
return fmt . Errorf ( "no available brokers: %v" , k . addrs )
2020-04-08 11:53:02 +03:00
}
k . Lock ( )
k . addrs = kaddrs
2020-04-13 17:56:40 +03:00
k . readerConfig . Brokers = k . addrs
k . writerConfig . Brokers = k . addrs
2020-04-08 11:53:02 +03:00
k . connected = true
k . Unlock ( )
2021-07-08 14:11:14 +03:00
td := DefaultStatsInterval
if v , ok := k . opts . Context . Value ( statsIntervalKey { } ) . ( time . Duration ) ; ok && td > 0 {
td = v
}
go writerStats ( k . opts . Context , k . writer , td , k . opts . Meter )
if k . writer . Async {
go k . writeLoop ( )
}
2020-04-08 11:53:02 +03:00
return nil
}
2021-07-08 14:11:14 +03:00
func ( k * kBroker ) writeLoop ( ) {
var err error
ticker := time . NewTicker ( k . writer . BatchTimeout )
defer ticker . Stop ( )
for {
select {
case <- k . opts . Context . Done ( ) :
return
case <- ticker . C :
k . RLock ( )
if len ( k . messages ) != 0 {
err = k . writer . WriteMessages ( k . opts . Context , k . messages ... )
}
k . RUnlock ( )
if err == nil {
k . Lock ( )
k . messages = k . messages [ 0 : 0 ]
k . Unlock ( )
} else {
if k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] publish error %v" , err )
}
}
}
}
}
2021-01-19 15:53:47 +03:00
func ( k * kBroker ) Disconnect ( ctx context . Context ) error {
2020-04-08 11:53:02 +03:00
k . RLock ( )
if ! k . connected {
k . RUnlock ( )
return nil
}
k . RUnlock ( )
k . Lock ( )
defer k . Unlock ( )
2021-07-08 14:11:14 +03:00
if err := k . writer . Close ( ) ; err != nil {
return err
2020-04-08 11:53:02 +03:00
}
k . connected = false
return nil
}
func ( k * kBroker ) Init ( opts ... broker . Option ) error {
2021-03-24 23:22:00 +03:00
if len ( opts ) == 0 && k . init {
return nil
}
2021-01-19 15:53:47 +03:00
return k . configure ( opts ... )
2020-04-08 11:53:02 +03:00
}
func ( k * kBroker ) Options ( ) broker . Options {
return k . opts
}
2021-07-26 09:41:11 +03:00
func ( k * kBroker ) BatchPublish ( ctx context . Context , msgs [ ] * broker . Message , opts ... broker . PublishOption ) error {
var val [ ] byte
var err error
options := broker . NewPublishOptions ( opts ... )
kmsgs := make ( [ ] kafka . Message , 0 , len ( msgs ) )
for _ , msg := range msgs {
if options . BodyOnly {
val = msg . Body
} else {
val , err = k . opts . Codec . Marshal ( msg )
if err != nil {
return err
}
}
topic , _ := msg . Header . Get ( metadata . HeaderTopic )
kmsg := kafka . Message { Topic : topic , Value : val }
if options . Context != nil {
if key , ok := options . Context . Value ( publishKey { } ) . ( [ ] byte ) ; ok && len ( key ) > 0 {
kmsg . Key = key
}
}
kmsgs = append ( kmsgs , kmsg )
}
if k . writer . Async {
k . Lock ( )
k . messages = append ( k . messages , kmsgs ... )
k . Unlock ( )
return nil
}
wCtx := k . opts . Context
if ctx != nil {
wCtx = ctx
}
return k . writer . WriteMessages ( wCtx , kmsgs ... )
}
2021-01-19 15:53:47 +03:00
func ( k * kBroker ) Publish ( ctx context . Context , topic string , msg * broker . Message , opts ... broker . PublishOption ) error {
2021-06-11 14:18:43 +03:00
var val [ ] byte
var err error
2020-06-18 01:38:54 +03:00
2021-01-19 15:53:47 +03:00
options := broker . NewPublishOptions ( opts ... )
2021-06-11 14:18:43 +03:00
if options . BodyOnly {
val = msg . Body
} else {
val , err = k . opts . Codec . Marshal ( msg )
if err != nil {
return err
}
2020-04-08 11:53:02 +03:00
}
2021-07-08 14:11:14 +03:00
kmsg := kafka . Message { Topic : topic , Value : val }
2021-01-19 15:53:47 +03:00
if options . Context != nil {
if key , ok := options . Context . Value ( publishKey { } ) . ( [ ] byte ) ; ok && len ( key ) > 0 {
kmsg . Key = key
}
}
2020-06-18 01:38:54 +03:00
2021-07-08 14:11:14 +03:00
if k . writer . Async {
k . Lock ( )
k . messages = append ( k . messages , kmsg )
k . Unlock ( )
return nil
2020-04-08 11:53:02 +03:00
}
2021-07-08 14:11:14 +03:00
2021-01-19 15:53:47 +03:00
wCtx := k . opts . Context
if ctx != nil {
wCtx = ctx
}
2021-08-04 16:40:48 +03:00
if err = k . writer . WriteMessages ( wCtx , kmsg ) ; err == nil {
return nil
}
logger . Debugf ( wCtx , "recreate writer because of err: %v" , err )
k . Lock ( )
if err = k . writer . Close ( ) ; err != nil {
logger . Errorf ( wCtx , "failed to close writer: %v" , err )
k . Unlock ( )
return err
}
k . writer = newWriter ( k . writerConfig )
k . Unlock ( )
2021-07-08 14:11:14 +03:00
return k . writer . WriteMessages ( wCtx , kmsg )
2020-04-08 11:53:02 +03:00
}
2021-07-26 09:41:11 +03:00
func ( k * kBroker ) BatchSubscribe ( ctx context . Context , topic string , handler broker . BatchHandler , opts ... broker . SubscribeOption ) ( broker . Subscriber , error ) {
opt := broker . NewSubscribeOptions ( opts ... )
if opt . Group == "" {
id , err := uuid . NewRandom ( )
if err != nil {
return nil , err
}
opt . Group = id . String ( )
}
cgcfg := kafka . ConsumerGroupConfig {
ID : opt . Group ,
WatchPartitionChanges : true ,
Brokers : k . readerConfig . Brokers ,
Topics : [ ] string { topic } ,
GroupBalancers : k . readerConfig . GroupBalancers ,
StartOffset : k . readerConfig . StartOffset ,
Logger : k . readerConfig . Logger ,
ErrorLogger : k . readerConfig . ErrorLogger ,
Dialer : k . readerConfig . Dialer ,
}
if err := cgcfg . Validate ( ) ; err != nil {
return nil , err
}
gCtx := k . opts . Context
if ctx != nil {
gCtx = ctx
}
sub := & subscriber { brokerOpts : k . opts , opts : opt , topic : topic , cgcfg : cgcfg }
sub . createGroup ( gCtx )
go func ( ) {
defer func ( ) {
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if ! closed {
if err := sub . group . Close ( ) ; err != nil {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] consumer group close error %v" , err )
}
}
} ( )
for {
select {
case <- ctx . Done ( ) :
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
if k . opts . Context . Err ( ) != nil && k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] subscribe context closed %v" , k . opts . Context . Err ( ) )
}
return
case <- k . opts . Context . Done ( ) :
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
if k . opts . Context . Err ( ) != nil && k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] broker context closed error %v" , k . opts . Context . Err ( ) )
}
return
default :
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
generation , err := sub . group . Next ( gCtx )
switch err {
case nil :
// normal execution
case kafka . ErrGroupClosed :
2021-08-04 16:40:48 +03:00
k . opts . Logger . Debugf ( k . opts . Context , "group closed %v" , err )
2021-07-26 09:41:11 +03:00
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
if k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] recreate consumer group, as it closed by kafka %v" , k . opts . Context . Err ( ) )
}
sub . createGroup ( gCtx )
continue
default :
2021-08-04 16:40:48 +03:00
k . opts . Logger . Debugf ( k . opts . Context , "some error: %v" , err )
2021-07-26 09:41:11 +03:00
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
2021-08-04 16:40:48 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debugf ( k . opts . Context , "[segmentio] recreate consumer group, as unexpected consumer error %T %v" , err , err )
2021-07-26 09:41:11 +03:00
}
sub . createGroup ( gCtx )
continue
}
ackCh := make ( chan map [ string ] map [ int ] int64 , DefaultCommitQueueSize )
errChLen := 0
for _ , assignments := range generation . Assignments {
errChLen += len ( assignments )
}
errChs := make ( [ ] chan error , 0 , errChLen )
commitDoneCh := make ( chan bool )
readerDone := int32 ( 0 )
cntWait := int32 ( 0 )
for topic , assignments := range generation . Assignments {
2021-07-29 23:50:26 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debugf ( k . opts . Context , "topic: %s assignments: %v" , topic , assignments )
2021-07-26 09:41:11 +03:00
}
for _ , assignment := range assignments {
cfg := k . readerConfig
cfg . Topic = topic
cfg . Partition = assignment . ID
cfg . GroupID = ""
reader := kafka . NewReader ( cfg )
if err := reader . SetOffset ( assignment . Offset ) ; err != nil {
if k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "assignments offset %d can be set by reader: %v" , assignment . Offset , err )
}
if err = reader . Close ( ) ; err != nil {
if k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "reader close err: %v" , err )
}
}
continue
}
errCh := make ( chan error )
errChs = append ( errChs , errCh )
2021-07-29 23:50:26 +03:00
cgh := & cgHandler {
2021-07-26 09:41:11 +03:00
brokerOpts : k . opts ,
subOpts : opt ,
reader : reader ,
2021-07-29 23:50:26 +03:00
batchhandler : handler ,
2021-07-26 09:41:11 +03:00
ackCh : ackCh ,
errCh : errCh ,
cntWait : & cntWait ,
readerDone : & readerDone ,
commitDoneCh : commitDoneCh ,
}
atomic . AddInt32 ( cgh . cntWait , 1 )
generation . Start ( cgh . run )
}
}
2021-08-04 16:40:48 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debug ( k . opts . Context , "start commit loop" )
2021-07-26 09:41:11 +03:00
}
// run async commit loop
go k . commitLoop ( generation , k . readerConfig . CommitInterval , ackCh , errChs , & readerDone , commitDoneCh , & cntWait )
}
}
} ( )
return sub , nil
}
2021-01-19 15:53:47 +03:00
func ( k * kBroker ) Subscribe ( ctx context . Context , topic string , handler broker . Handler , opts ... broker . SubscribeOption ) ( broker . Subscriber , error ) {
opt := broker . NewSubscribeOptions ( opts ... )
if opt . Group == "" {
id , err := uuid . NewRandom ( )
if err != nil {
return nil , err
}
opt . Group = id . String ( )
2020-04-21 18:35:24 +03:00
}
2020-04-22 00:43:14 +03:00
cgcfg := kafka . ConsumerGroupConfig {
2021-01-19 15:53:47 +03:00
ID : opt . Group ,
2020-04-13 15:51:23 +03:00
WatchPartitionChanges : true ,
Brokers : k . readerConfig . Brokers ,
Topics : [ ] string { topic } ,
2021-07-08 14:11:14 +03:00
GroupBalancers : k . readerConfig . GroupBalancers ,
StartOffset : k . readerConfig . StartOffset ,
Logger : k . readerConfig . Logger ,
ErrorLogger : k . readerConfig . ErrorLogger ,
2021-07-20 15:05:42 +03:00
Dialer : k . readerConfig . Dialer ,
2020-04-08 11:53:02 +03:00
}
2020-04-22 00:43:14 +03:00
if err := cgcfg . Validate ( ) ; err != nil {
2020-04-13 15:51:23 +03:00
return nil , err
}
2021-07-20 15:05:42 +03:00
gCtx := k . opts . Context
if ctx != nil {
gCtx = ctx
2020-04-15 04:07:17 +03:00
}
2021-07-20 15:05:42 +03:00
sub := & subscriber { brokerOpts : k . opts , opts : opt , topic : topic , cgcfg : cgcfg }
sub . createGroup ( gCtx )
2020-04-21 18:35:24 +03:00
go func ( ) {
2021-07-20 15:05:42 +03:00
defer func ( ) {
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if ! closed {
if err := sub . group . Close ( ) ; err != nil {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] consumer group close error %v" , err )
}
}
} ( )
2020-04-21 18:35:24 +03:00
for {
select {
2021-01-19 15:53:47 +03:00
case <- ctx . Done ( ) :
2021-07-20 15:05:42 +03:00
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
2021-07-08 14:11:14 +03:00
if k . opts . Context . Err ( ) != nil && k . opts . Logger . V ( logger . ErrorLevel ) {
2021-07-20 15:05:42 +03:00
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] subscribe context closed %v" , k . opts . Context . Err ( ) )
2021-01-19 15:53:47 +03:00
}
return
2020-04-21 18:35:24 +03:00
case <- k . opts . Context . Done ( ) :
2020-06-02 01:01:53 +03:00
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
if closed {
return
}
2021-07-08 14:11:14 +03:00
if k . opts . Context . Err ( ) != nil && k . opts . Logger . V ( logger . ErrorLevel ) {
2021-07-20 15:05:42 +03:00
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] broker context closed error %v" , k . opts . Context . Err ( ) )
2020-04-22 00:43:14 +03:00
}
2020-04-21 18:35:24 +03:00
return
2020-04-22 00:43:14 +03:00
default :
2020-06-02 01:01:53 +03:00
sub . RLock ( )
2021-07-08 14:11:14 +03:00
closed := sub . closed
2020-06-02 01:01:53 +03:00
sub . RUnlock ( )
2021-07-08 14:11:14 +03:00
if closed {
return
}
2021-07-20 15:05:42 +03:00
generation , err := sub . group . Next ( gCtx )
2020-04-22 00:43:14 +03:00
switch err {
2020-06-02 01:01:53 +03:00
case nil :
// normal execution
2020-04-22 00:43:14 +03:00
case kafka . ErrGroupClosed :
2021-08-04 16:40:48 +03:00
k . opts . Logger . Debugf ( k . opts . Context , "group closed %v" , err )
2020-06-02 01:01:53 +03:00
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
2021-07-20 15:05:42 +03:00
if closed {
return
2020-04-22 00:43:14 +03:00
}
2021-07-20 15:05:42 +03:00
if k . opts . Logger . V ( logger . ErrorLevel ) {
k . opts . Logger . Errorf ( k . opts . Context , "[segmentio] recreate consumer group, as it closed by kafka %v" , k . opts . Context . Err ( ) )
}
sub . createGroup ( gCtx )
continue
2020-04-22 00:43:14 +03:00
default :
2021-08-04 16:40:48 +03:00
k . opts . Logger . Debugf ( k . opts . Context , "some error: %v" , err )
2020-06-02 01:01:53 +03:00
sub . RLock ( )
closed := sub . closed
sub . RUnlock ( )
2021-07-20 15:05:42 +03:00
if closed {
return
2020-06-02 01:01:53 +03:00
}
2021-08-04 16:40:48 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debugf ( k . opts . Context , "[segmentio] recreate consumer group, as unexpected consumer error %T %v" , err , err )
2020-04-22 00:43:14 +03:00
}
2021-07-18 13:28:13 +03:00
sub . createGroup ( gCtx )
2020-04-22 00:43:14 +03:00
continue
}
2021-08-04 16:40:48 +03:00
//k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(0))
2021-07-18 13:28:13 +03:00
ackCh := make ( chan map [ string ] map [ int ] int64 , DefaultCommitQueueSize )
errChLen := 0
for _ , assignments := range generation . Assignments {
errChLen += len ( assignments )
}
2021-07-29 23:50:26 +03:00
2021-07-20 15:05:42 +03:00
errChs := make ( [ ] chan error , 0 , errChLen )
commitDoneCh := make ( chan bool )
readerDone := int32 ( 0 )
cntWait := int32 ( 0 )
2021-07-18 13:28:13 +03:00
for topic , assignments := range generation . Assignments {
2021-08-04 16:40:48 +03:00
//k.opts.Meter.Counter("broker_reader_partitions", "topic", topic).Set(uint64(len(assignments)))
2021-07-29 23:50:26 +03:00
2021-08-04 16:40:48 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debugf ( k . opts . Context , "topic: %s assignments: %v" , topic , assignments )
2021-07-18 13:28:13 +03:00
}
2020-04-22 00:43:14 +03:00
for _ , assignment := range assignments {
cfg := k . readerConfig
2021-07-18 13:28:13 +03:00
cfg . Topic = topic
2020-04-22 00:43:14 +03:00
cfg . Partition = assignment . ID
cfg . GroupID = ""
2020-04-22 13:50:18 +03:00
reader := kafka . NewReader ( cfg )
2021-01-19 15:53:47 +03:00
2021-08-04 16:40:48 +03:00
// as we dont use consumer group in reader, reader not started before actuall fetch, so we can ignore all errors
_ = reader . SetOffset ( assignment . Offset )
2021-07-20 15:05:42 +03:00
errCh := make ( chan error )
errChs = append ( errChs , errCh )
cgh := & cgHandler {
brokerOpts : k . opts ,
subOpts : opt ,
reader : reader ,
handler : handler ,
ackCh : ackCh ,
errCh : errCh ,
cntWait : & cntWait ,
readerDone : & readerDone ,
commitDoneCh : commitDoneCh ,
}
atomic . AddInt32 ( cgh . cntWait , 1 )
2020-04-22 00:43:14 +03:00
generation . Start ( cgh . run )
}
2020-04-21 18:35:24 +03:00
}
2021-07-18 13:28:13 +03:00
// run async commit loop
2021-07-20 15:05:42 +03:00
go k . commitLoop ( generation , k . readerConfig . CommitInterval , ackCh , errChs , & readerDone , commitDoneCh , & cntWait )
2020-04-21 18:35:24 +03:00
}
}
} ( )
2020-04-13 15:51:23 +03:00
return sub , nil
2020-04-08 11:53:02 +03:00
}
2020-04-22 00:43:14 +03:00
type cgHandler struct {
2021-07-20 15:05:42 +03:00
brokerOpts broker . Options
subOpts broker . SubscribeOptions
reader * kafka . Reader
handler broker . Handler
2021-07-29 23:50:26 +03:00
batchhandler broker . BatchHandler
2021-07-20 15:05:42 +03:00
ackCh chan map [ string ] map [ int ] int64
errCh chan error
readerDone * int32
commitDoneCh chan bool
cntWait * int32
2020-04-22 00:43:14 +03:00
}
2021-07-20 15:05:42 +03:00
func ( k * kBroker ) commitLoop ( generation * kafka . Generation , commitInterval time . Duration , ackCh chan map [ string ] map [ int ] int64 , errChs [ ] chan error , readerDone * int32 , commitDoneCh chan bool , cntWait * int32 ) {
2021-08-04 16:40:48 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debug ( k . opts . Context , "start async commit loop" )
}
2021-07-18 13:28:13 +03:00
td := DefaultCommitInterval
2021-07-18 13:45:12 +03:00
if commitInterval > 0 {
td = commitInterval
}
2021-07-18 13:28:13 +03:00
if v , ok := k . opts . Context . Value ( commitIntervalKey { } ) . ( time . Duration ) ; ok && td > 0 {
td = v
}
2021-07-20 15:05:42 +03:00
var mapMu sync . Mutex
offsets := make ( map [ string ] map [ int ] int64 , 4 )
go func ( ) {
defer func ( ) {
2021-08-04 16:40:48 +03:00
k . opts . Logger . Debug ( k . opts . Context , "return from commitLoop and close commitDoneCh" )
2021-07-20 15:05:42 +03:00
close ( commitDoneCh )
} ( )
checkTicker := time . NewTicker ( 300 * time . Millisecond )
defer checkTicker . Stop ( )
for {
select {
case <- checkTicker . C :
2021-08-04 16:40:48 +03:00
if atomic . LoadInt32 ( cntWait ) != 0 {
continue
}
mapMu . Lock ( )
if err := generation . CommitOffsets ( offsets ) ; err != nil {
for _ , errCh := range errChs {
errCh <- err
2021-07-20 15:05:42 +03:00
}
mapMu . Unlock ( )
return
}
2021-08-04 16:40:48 +03:00
mapMu . Unlock ( )
if k . opts . Logger . V ( logger . DebugLevel ) {
k . opts . Logger . Debug ( k . opts . Context , "stop commit filling loop" )
2021-07-20 15:05:42 +03:00
}
2021-08-04 16:40:48 +03:00
return
case ack := <- ackCh :
2021-07-20 15:05:42 +03:00
switch td {
case 0 : // sync commits as CommitInterval == 0
if len ( ack ) > 0 {
err := generation . CommitOffsets ( ack )
if err != nil {
for _ , errCh := range errChs {
errCh <- err
}
return
}
}
default : // async commits as CommitInterval > 0
mapMu . Lock ( )
for t , p := range ack {
if _ , ok := offsets [ t ] ; ! ok {
offsets [ t ] = make ( map [ int ] int64 , 4 )
}
for k , v := range p {
offsets [ t ] [ k ] = v
}
}
mapMu . Unlock ( )
}
}
}
} ( )
2021-08-04 16:40:48 +03:00
if td == 0 {
//sync commit loop
for {
if atomic . LoadInt32 ( readerDone ) == 1 && atomic . LoadInt32 ( cntWait ) == 0 {
break
}
time . Sleep ( 1 * time . Second )
}
}
2021-07-18 13:45:12 +03:00
// async commit loop
if td > 0 {
ticker := time . NewTicker ( td )
2021-07-20 15:05:42 +03:00
doneTicker := time . NewTicker ( 300 * time . Millisecond )
defer doneTicker . Stop ( )
2021-07-18 13:45:12 +03:00
for {
select {
2021-07-20 15:05:42 +03:00
case <- doneTicker . C :
2021-08-04 16:40:48 +03:00
if atomic . LoadInt32 ( readerDone ) == 1 && atomic . LoadInt32 ( cntWait ) == 0 {
// fire immediate commit offsets
2021-07-20 15:05:42 +03:00
ticker . Stop ( )
2021-07-18 13:45:12 +03:00
}
case <- ticker . C :
mapMu . Lock ( )
2021-08-04 16:40:48 +03:00
if k . opts . Logger . V ( logger . DebugLevel ) && len ( offsets ) > 0 {
k . opts . Logger . Debugf ( k . opts . Context , "async commit offsets: %v" , offsets )
2021-07-18 13:45:12 +03:00
}
err := generation . CommitOffsets ( offsets )
if err != nil {
for _ , errCh := range errChs {
errCh <- err
}
mapMu . Unlock ( )
return
}
offsets = make ( map [ string ] map [ int ] int64 , 4 )
2021-07-20 15:05:42 +03:00
mapMu . Unlock ( )
if atomic . LoadInt32 ( readerDone ) == 1 && atomic . LoadInt32 ( cntWait ) == 0 {
return
2021-07-18 13:28:13 +03:00
}
}
}
}
}
func ( h * cgHandler ) run ( ctx context . Context ) {
2021-08-04 16:40:48 +03:00
if h . brokerOpts . Logger . V ( logger . DebugLevel ) {
h . brokerOpts . Logger . Debugf ( ctx , "start partition reader topic: %s partition: %d" , h . reader . Config ( ) . Topic , h . reader . Config ( ) . Partition )
2021-07-20 15:05:42 +03:00
}
2021-07-08 14:11:14 +03:00
td := DefaultStatsInterval
if v , ok := h . brokerOpts . Context . Value ( statsIntervalKey { } ) . ( time . Duration ) ; ok && td > 0 {
td = v
}
2021-07-20 15:05:42 +03:00
// start stats loop
2021-07-08 14:11:14 +03:00
go readerStats ( ctx , h . reader , td , h . brokerOpts . Meter )
2021-07-20 15:05:42 +03:00
var commitErr atomic . Value
2021-07-18 13:28:13 +03:00
2021-07-08 14:11:14 +03:00
defer func ( ) {
2021-07-20 15:05:42 +03:00
atomic . AddInt32 ( h . cntWait , - 1 )
atomic . CompareAndSwapInt32 ( h . readerDone , 0 , 1 )
2021-07-08 14:11:14 +03:00
if err := h . reader . Close ( ) ; err != nil && h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
2021-07-29 23:50:26 +03:00
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio] reader for topic %s partition %d close error: %v" , h . reader . Config ( ) . Topic , h . reader . Config ( ) . Partition , err )
2021-07-08 14:11:14 +03:00
}
2021-08-04 16:40:48 +03:00
h . brokerOpts . Logger . Debug ( h . brokerOpts . Context , "wait start for commitDoneCh channel closing" )
2021-07-20 15:05:42 +03:00
<- h . commitDoneCh
2021-08-04 16:40:48 +03:00
h . brokerOpts . Logger . Debug ( h . brokerOpts . Context , "wait stop for commitDoneCh channel closing" )
if h . brokerOpts . Logger . V ( logger . DebugLevel ) {
h . brokerOpts . Logger . Debugf ( ctx , "stop partition reader topic: %s partition: %d" , h . reader . Config ( ) . Topic , h . reader . Config ( ) . Partition )
2021-07-20 15:05:42 +03:00
}
2021-07-08 14:11:14 +03:00
} ( )
2021-07-18 13:28:13 +03:00
2021-07-29 23:50:26 +03:00
/ *
tc := time . NewTicker ( 3 * time . Second )
defer tc . Stop ( )
* /
2021-07-18 13:28:13 +03:00
go func ( ) {
for {
select {
2021-07-29 23:50:26 +03:00
// case <-tc.C:
// commitErr.Store(errors.New("my err"))
// return
2021-07-18 13:28:13 +03:00
case err := <- h . errCh :
2021-07-20 15:05:42 +03:00
if err != nil {
commitErr . Store ( err )
2021-07-29 23:50:26 +03:00
return
2021-07-20 15:05:42 +03:00
}
2021-07-18 13:28:13 +03:00
case <- ctx . Done ( ) :
return
}
}
} ( )
2020-04-21 18:35:24 +03:00
for {
2021-07-29 23:50:26 +03:00
select {
case <- ctx . Done ( ) :
2021-07-18 13:28:13 +03:00
return
2021-07-29 23:50:26 +03:00
default :
msg , err := h . reader . ReadMessage ( ctx )
switch err {
default :
2021-08-04 16:40:48 +03:00
switch kerr := err . ( type ) {
case kafka . Error :
if h . brokerOpts . Logger . V ( logger . DebugLevel ) {
h . brokerOpts . Logger . Debugf ( h . brokerOpts . Context , "[segmentio] kafka error %T err: %v" , kerr , kerr )
}
return
default :
if h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio] unexpected error type: %T err: %v" , err , err )
}
return
2021-07-20 15:05:42 +03:00
}
2021-07-29 23:50:26 +03:00
case kafka . ErrGenerationEnded :
// generation has ended
2021-08-04 16:40:48 +03:00
if h . brokerOpts . Logger . V ( logger . DebugLevel ) {
h . brokerOpts . Logger . Debug ( h . brokerOpts . Context , "[segmentio] generation ended, rebalance or close" )
2021-07-29 23:50:26 +03:00
}
return
case nil :
if cerr := commitErr . Load ( ) ; cerr != nil {
if h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio] commit error: %v" , cerr )
}
return
}
2021-07-20 15:05:42 +03:00
2021-07-29 23:50:26 +03:00
eh := h . brokerOpts . ErrorHandler
2020-04-22 00:43:14 +03:00
2021-07-29 23:50:26 +03:00
if h . subOpts . ErrorHandler != nil {
eh = h . subOpts . ErrorHandler
}
p := & publication { ackCh : h . ackCh , partition : msg . Partition , offset : msg . Offset + 1 , topic : msg . Topic , msg : & broker . Message { } , readerDone : h . readerDone }
2021-01-19 15:53:47 +03:00
2021-07-29 23:50:26 +03:00
if h . subOpts . BodyOnly {
2021-07-18 13:28:13 +03:00
p . msg . Body = msg . Value
2021-07-29 23:50:26 +03:00
} else {
if err := h . brokerOpts . Codec . Unmarshal ( msg . Value , p . msg ) ; err != nil {
2021-08-04 16:40:48 +03:00
p . err = err
2021-07-29 23:50:26 +03:00
p . msg . Body = msg . Value
if eh != nil {
_ = eh ( p )
} else {
if h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio]: failed to unmarshal: %v" , err )
}
2020-04-21 18:35:24 +03:00
}
2021-07-29 23:50:26 +03:00
continue
2020-04-21 18:35:24 +03:00
}
2021-07-18 13:28:13 +03:00
}
2021-07-29 23:50:26 +03:00
if cerr := commitErr . Load ( ) ; cerr != nil {
2021-07-18 13:28:13 +03:00
if h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
2021-07-29 23:50:26 +03:00
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio] commit error: %v" , cerr )
2021-07-18 13:28:13 +03:00
}
2021-07-29 23:50:26 +03:00
return
2021-07-18 13:28:13 +03:00
}
2021-07-29 23:50:26 +03:00
err = h . handler ( p )
if err == nil && h . subOpts . AutoAck {
if err = p . Ack ( ) ; err != nil {
if h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio]: message ack error: %v" , err )
}
return
}
} else if err != nil {
p . err = err
if eh != nil {
_ = eh ( p )
} else {
if h . brokerOpts . Logger . V ( logger . ErrorLevel ) {
h . brokerOpts . Logger . Errorf ( h . brokerOpts . Context , "[segmentio]: subscriber error: %v" , err )
}
2021-07-18 13:28:13 +03:00
}
2020-04-21 18:35:24 +03:00
}
}
}
}
}
2020-04-22 00:43:14 +03:00
2020-06-02 01:01:53 +03:00
func ( sub * subscriber ) createGroup ( ctx context . Context ) {
2021-08-04 16:40:48 +03:00
var err error
2020-06-02 01:01:53 +03:00
for {
select {
case <- ctx . Done ( ) :
return
default :
2021-07-29 23:50:26 +03:00
sub . RLock ( )
cgcfg := sub . cgcfg
2021-08-04 16:40:48 +03:00
closed := sub . closed
cgroup := sub . group
2021-07-29 23:50:26 +03:00
sub . RUnlock ( )
2021-08-04 16:40:48 +03:00
if closed {
return
}
if cgroup != nil {
if err = cgroup . Close ( ) ; err != nil {
if sub . brokerOpts . Logger . V ( logger . ErrorLevel ) {
sub . brokerOpts . Logger . Errorf ( sub . brokerOpts . Context , "[segmentio]: consumer group close error %v" , err )
}
}
}
cgroup , err = kafka . NewConsumerGroup ( cgcfg )
2020-06-02 01:01:53 +03:00
if err != nil {
2021-01-19 15:53:47 +03:00
if sub . brokerOpts . Logger . V ( logger . ErrorLevel ) {
sub . brokerOpts . Logger . Errorf ( sub . brokerOpts . Context , "[segmentio]: consumer group error %v" , err )
2020-06-02 01:01:53 +03:00
}
continue
}
sub . Lock ( )
sub . group = cgroup
sub . Unlock ( )
return
}
}
}
2020-04-08 11:53:02 +03:00
func ( k * kBroker ) String ( ) string {
2021-07-26 09:41:11 +03:00
return "segmentio"
2020-04-08 11:53:02 +03:00
}
2021-01-19 15:53:47 +03:00
func ( k * kBroker ) configure ( opts ... broker . Option ) error {
2020-04-08 11:53:02 +03:00
for _ , o := range opts {
2021-01-19 15:53:47 +03:00
o ( & k . opts )
2020-04-08 11:53:02 +03:00
}
2021-03-24 23:22:00 +03:00
if err := k . opts . Register . Init ( ) ; err != nil {
return err
}
if err := k . opts . Tracer . Init ( ) ; err != nil {
return err
}
if err := k . opts . Logger . Init ( ) ; err != nil {
return err
}
if err := k . opts . Meter . Init ( ) ; err != nil {
return err
}
2020-04-08 11:53:02 +03:00
var cAddrs [ ] string
2021-01-19 15:53:47 +03:00
for _ , addr := range k . opts . Addrs {
2020-04-08 11:53:02 +03:00
if len ( addr ) == 0 {
continue
}
cAddrs = append ( cAddrs , addr )
}
if len ( cAddrs ) == 0 {
cAddrs = [ ] string { "127.0.0.1:9092" }
}
2021-07-08 14:11:14 +03:00
readerConfig := DefaultReaderConfig
2021-01-19 15:53:47 +03:00
if cfg , ok := k . opts . Context . Value ( readerConfigKey { } ) . ( kafka . ReaderConfig ) ; ok {
2020-04-08 11:53:02 +03:00
readerConfig = cfg
}
if len ( readerConfig . Brokers ) == 0 {
readerConfig . Brokers = cAddrs
}
readerConfig . WatchPartitionChanges = true
2021-07-08 14:11:14 +03:00
writerConfig := DefaultWriterConfig
2021-01-19 15:53:47 +03:00
if cfg , ok := k . opts . Context . Value ( writerConfigKey { } ) . ( kafka . WriterConfig ) ; ok {
2020-04-08 11:53:02 +03:00
writerConfig = cfg
}
if len ( writerConfig . Brokers ) == 0 {
writerConfig . Brokers = cAddrs
}
2021-01-19 15:53:47 +03:00
k . addrs = cAddrs
k . readerConfig = readerConfig
2021-07-29 23:50:26 +03:00
k . writerConfig = writerConfig
2021-08-04 16:40:48 +03:00
k . writerConfig . Brokers = k . addrs
2021-07-29 23:50:26 +03:00
if k . readerConfig . Dialer == nil {
k . readerConfig . Dialer = kafka . DefaultDialer
}
if k . writerConfig . Dialer == nil {
k . writerConfig . Dialer = kafka . DefaultDialer
}
if id , ok := k . opts . Context . Value ( clientIDKey { } ) . ( string ) ; ok {
k . writerConfig . Dialer . ClientID = id
k . readerConfig . Dialer . ClientID = id
}
2021-08-04 16:40:48 +03:00
k . writer = newWriter ( k . writerConfig )
2021-07-08 14:11:14 +03:00
if fn , ok := k . opts . Context . Value ( writerCompletionFunc { } ) . ( func ( [ ] kafka . Message , error ) ) ; ok {
k . writer . Completion = fn
}
2020-04-08 11:53:02 +03:00
2021-03-24 23:22:00 +03:00
k . init = true
2021-01-19 15:53:47 +03:00
return nil
}
2021-08-04 16:40:48 +03:00
func newWriter ( writerConfig kafka . WriterConfig ) * kafka . Writer {
return & kafka . Writer {
Addr : kafka . TCP ( writerConfig . Brokers ... ) ,
Balancer : writerConfig . Balancer ,
MaxAttempts : writerConfig . MaxAttempts ,
BatchSize : writerConfig . BatchSize ,
BatchBytes : int64 ( writerConfig . BatchBytes ) ,
BatchTimeout : writerConfig . BatchTimeout ,
ReadTimeout : writerConfig . ReadTimeout ,
WriteTimeout : writerConfig . WriteTimeout ,
RequiredAcks : kafka . RequiredAcks ( writerConfig . RequiredAcks ) ,
Async : writerConfig . Async ,
Logger : writerConfig . Logger ,
ErrorLogger : writerConfig . ErrorLogger ,
Transport : & kafka . Transport {
Dial : writerConfig . Dialer . DialFunc ,
ClientID : writerConfig . Dialer . ClientID ,
IdleTimeout : time . Second * 5 ,
MetadataTTL : time . Second * 9 ,
SASL : writerConfig . Dialer . SASLMechanism ,
} ,
}
}
2021-01-19 15:53:47 +03:00
func NewBroker ( opts ... broker . Option ) broker . Broker {
2020-04-08 11:53:02 +03:00
return & kBroker {
2021-07-08 14:11:14 +03:00
opts : broker . NewOptions ( opts ... ) ,
2020-04-08 11:53:02 +03:00
}
}