add connect checking

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-17 07:56:31 +03:00
parent 174e759b29
commit cc5d0ba4af

20
kgo.go
View File

@ -8,7 +8,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/twmb/franz-go/pkg/kerr"
kgo "github.com/twmb/franz-go/pkg/kgo" kgo "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
@ -129,6 +132,23 @@ func (k *kBroker) Connect(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
// Request versions in order to guess Kafka Cluster version
versionsReq := kmsg.NewApiVersionsRequest()
versionsRes, err := versionsReq.RequestWith(ctx, c)
if err != nil {
return fmt.Errorf("failed to request api versions: %w", err)
}
err = kerr.ErrorForCode(versionsRes.ErrorCode)
if err != nil {
return fmt.Errorf("failed to request api versions. Inner kafka error: %w", err)
}
versions := kversion.FromApiVersionsResponse(versionsRes)
if k.opts.Logger.V(logger.InfoLevel) {
logger.Infof(ctx, "[kgo] connected to to kafka cluster version %v", versions.VersionGuess())
}
k.Lock() k.Lock()
k.connected = true k.connected = true
k.writer = c k.writer = c