add helper funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										20
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -73,6 +73,10 @@ func (k *Broker) Name() string { | ||||
| 	return k.opts.Name | ||||
| } | ||||
|  | ||||
| func (k *Broker) Client() *kgo.Client { | ||||
| 	return k.c | ||||
| } | ||||
|  | ||||
| func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { | ||||
| 	var c *kgo.Client | ||||
| 	var err error | ||||
| @@ -322,6 +326,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (k *Broker) TopicExists(ctx context.Context, topic string) error { | ||||
| 	mdreq := kmsg.NewMetadataRequest() | ||||
| 	mdreq.Topics = []kmsg.MetadataRequestTopic{ | ||||
| 		{Topic: &topic}, | ||||
| 	} | ||||
|  | ||||
| 	mdrsp, err := mdreq.RequestWith(ctx, k.c) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} else if mdrsp.Topics[0].ErrorCode != 0 { | ||||
| 		return fmt.Errorf("topic %s not exists or permission error", topic) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { | ||||
| 	return nil, nil | ||||
| } | ||||
|   | ||||
| @@ -46,6 +46,10 @@ type subscriber struct { | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| func (s *subscriber) Client() *kgo.Client { | ||||
| 	return s.c | ||||
| } | ||||
|  | ||||
| func (s *subscriber) Options() broker.SubscribeOptions { | ||||
| 	return s.opts | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user