From cc5d0ba4afe2c2e3b813f7b42042dd38cd96bdd9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 17 Sep 2021 07:56:31 +0300 Subject: [PATCH] add connect checking Signed-off-by: Vasiliy Tolstov --- kgo.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/kgo.go b/kgo.go index 42a538a..3c2ba7b 100644 --- a/kgo.go +++ b/kgo.go @@ -8,7 +8,10 @@ import ( "sync" "time" + "github.com/twmb/franz-go/pkg/kerr" kgo "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/kversion" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" @@ -129,6 +132,23 @@ func (k *kBroker) Connect(ctx context.Context) error { if err != nil { return err } + + // Request versions in order to guess Kafka Cluster version + versionsReq := kmsg.NewApiVersionsRequest() + versionsRes, err := versionsReq.RequestWith(ctx, c) + if err != nil { + return fmt.Errorf("failed to request api versions: %w", err) + } + err = kerr.ErrorForCode(versionsRes.ErrorCode) + if err != nil { + return fmt.Errorf("failed to request api versions. Inner kafka error: %w", err) + } + versions := kversion.FromApiVersionsResponse(versionsRes) + + if k.opts.Logger.V(logger.InfoLevel) { + logger.Infof(ctx, "[kgo] connected to to kafka cluster version %v", versions.VersionGuess()) + } + k.Lock() k.connected = true k.writer = c