add helper funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
23c2903c21
commit
88777a29ad
20
kgo.go
20
kgo.go
@ -73,6 +73,10 @@ func (k *Broker) Name() string {
|
|||||||
return k.opts.Name
|
return k.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *Broker) Client() *kgo.Client {
|
||||||
|
return k.c
|
||||||
|
}
|
||||||
|
|
||||||
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
|
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
|
||||||
var c *kgo.Client
|
var c *kgo.Client
|
||||||
var err error
|
var err error
|
||||||
@ -322,6 +326,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *Broker) TopicExists(ctx context.Context, topic string) error {
|
||||||
|
mdreq := kmsg.NewMetadataRequest()
|
||||||
|
mdreq.Topics = []kmsg.MetadataRequestTopic{
|
||||||
|
{Topic: &topic},
|
||||||
|
}
|
||||||
|
|
||||||
|
mdrsp, err := mdreq.RequestWith(ctx, k.c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if mdrsp.Topics[0].ErrorCode != 0 {
|
||||||
|
return fmt.Errorf("topic %s not exists or permission error", topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,10 @@ type subscriber struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Client() *kgo.Client {
|
||||||
|
return s.c
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscriber) Options() broker.SubscribeOptions {
|
func (s *subscriber) Options() broker.SubscribeOptions {
|
||||||
return s.opts
|
return s.opts
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user