Compare commits

..

2 Commits

Author SHA1 Message Date
d404fa31ab export Subscriber
Some checks failed
build / test (push) Failing after 1m38s
codeql / analyze (go) (push) Failing after 1m59s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-22 18:28:51 +03:00
88777a29ad add helper funcs
Some checks failed
build / test (push) Failing after 1m39s
codeql / analyze (go) (push) Failing after 2m8s
build / lint (push) Successful in 9m13s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-22 17:49:36 +03:00
2 changed files with 35 additions and 11 deletions

24
kgo.go
View File

@@ -62,7 +62,7 @@ type Broker struct {
connected bool connected bool
sync.RWMutex sync.RWMutex
opts broker.Options opts broker.Options
subs []*subscriber subs []*Subscriber
} }
func (k *Broker) Address() string { func (k *Broker) Address() string {
@@ -73,6 +73,10 @@ func (k *Broker) Name() string {
return k.opts.Name return k.opts.Name
} }
func (k *Broker) Client() *kgo.Client {
return k.c
}
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) { func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
var c *kgo.Client var c *kgo.Client
var err error var err error
@@ -322,6 +326,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
return nil return nil
} }
func (k *Broker) TopicExists(ctx context.Context, topic string) error {
mdreq := kmsg.NewMetadataRequest()
mdreq.Topics = []kmsg.MetadataRequestTopic{
{Topic: &topic},
}
mdrsp, err := mdreq.RequestWith(ctx, k.c)
if err != nil {
return err
} else if mdrsp.Topics[0].ErrorCode != 0 {
return fmt.Errorf("topic %s not exists or permission error", topic)
}
return nil
}
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil return nil, nil
} }
@@ -344,7 +364,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
} }
} }
sub := &subscriber{ sub := &Subscriber{
topic: topic, topic: topic,
opts: options, opts: options,
handler: handler, handler: handler,

View File

@@ -33,7 +33,7 @@ type consumer struct {
recs chan kgo.FetchTopicPartition recs chan kgo.FetchTopicPartition
} }
type subscriber struct { type Subscriber struct {
c *kgo.Client c *kgo.Client
topic string topic string
htracer *hookTracer htracer *hookTracer
@@ -46,15 +46,19 @@ type subscriber struct {
sync.RWMutex sync.RWMutex
} }
func (s *subscriber) Options() broker.SubscribeOptions { func (s *Subscriber) Client() *kgo.Client {
return s.c
}
func (s *Subscriber) Options() broker.SubscribeOptions {
return s.opts return s.opts
} }
func (s *subscriber) Topic() string { func (s *Subscriber) Topic() string {
return s.topic return s.topic
} }
func (s *subscriber) Unsubscribe(ctx context.Context) error { func (s *Subscriber) Unsubscribe(ctx context.Context) error {
if s.closed { if s.closed {
return nil return nil
} }
@@ -76,7 +80,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
return nil return nil
} }
func (s *subscriber) poll(ctx context.Context) { func (s *Subscriber) poll(ctx context.Context) {
maxInflight := DefaultSubscribeMaxInflight maxInflight := DefaultSubscribeMaxInflight
if s.opts.Context != nil { if s.opts.Context != nil {
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
@@ -144,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) {
} }
} }
func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
@@ -161,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
} }
} }
func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
s.killConsumers(ctx, lost) s.killConsumers(ctx, lost)
} }
func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
s.killConsumers(ctx, revoked) s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil { if err := c.CommitMarkedOffsets(ctx); err != nil {
@@ -174,7 +178,7 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
} }
} }
func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
for topic, partitions := range assigned { for topic, partitions := range assigned {
for _, partition := range partitions { for _, partition := range partitions {
pc := &consumer{ pc := &consumer{