diff --git a/kgo.go b/kgo.go index 35878d1..c897786 100644 --- a/kgo.go +++ b/kgo.go @@ -73,6 +73,10 @@ func (k *Broker) Name() string { return k.opts.Name } +func (k *Broker) Client() *kgo.Client { + return k.c +} + func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { var c *kgo.Client var err error @@ -322,6 +326,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br return nil } +func (k *Broker) TopicExists(ctx context.Context, topic string) error { + mdreq := kmsg.NewMetadataRequest() + mdreq.Topics = []kmsg.MetadataRequestTopic{ + {Topic: &topic}, + } + + mdrsp, err := mdreq.RequestWith(ctx, k.c) + if err != nil { + return err + } else if mdrsp.Topics[0].ErrorCode != 0 { + return fmt.Errorf("topic %s not exists or permission error", topic) + } + + return nil +} + func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { return nil, nil } diff --git a/subscriber.go b/subscriber.go index 0500753..1050c3c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -46,6 +46,10 @@ type subscriber struct { sync.RWMutex } +func (s *subscriber) Client() *kgo.Client { + return s.c +} + func (s *subscriber) Options() broker.SubscribeOptions { return s.opts }