From d404fa31ab8074d746b94a64b1854f27490947e9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 22 May 2024 18:28:51 +0300 Subject: [PATCH] export Subscriber Signed-off-by: Vasiliy Tolstov --- kgo.go | 4 ++-- subscriber.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/kgo.go b/kgo.go index c897786..5b0d53b 100644 --- a/kgo.go +++ b/kgo.go @@ -62,7 +62,7 @@ type Broker struct { connected bool sync.RWMutex opts broker.Options - subs []*subscriber + subs []*Subscriber } func (k *Broker) Address() string { @@ -364,7 +364,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han } } - sub := &subscriber{ + sub := &Subscriber{ topic: topic, opts: options, handler: handler, diff --git a/subscriber.go b/subscriber.go index 1050c3c..454246a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -33,7 +33,7 @@ type consumer struct { recs chan kgo.FetchTopicPartition } -type subscriber struct { +type Subscriber struct { c *kgo.Client topic string htracer *hookTracer @@ -46,19 +46,19 @@ type subscriber struct { sync.RWMutex } -func (s *subscriber) Client() *kgo.Client { +func (s *Subscriber) Client() *kgo.Client { return s.c } -func (s *subscriber) Options() broker.SubscribeOptions { +func (s *Subscriber) Options() broker.SubscribeOptions { return s.opts } -func (s *subscriber) Topic() string { +func (s *Subscriber) Topic() string { return s.topic } -func (s *subscriber) Unsubscribe(ctx context.Context) error { +func (s *Subscriber) Unsubscribe(ctx context.Context) error { if s.closed { return nil } @@ -80,7 +80,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { return nil } -func (s *subscriber) poll(ctx context.Context) { +func (s *Subscriber) poll(ctx context.Context) { maxInflight := DefaultSubscribeMaxInflight if s.opts.Context != nil { if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { @@ -148,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) { } } -func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { +func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) { var wg sync.WaitGroup defer wg.Wait() @@ -165,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) } } -func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { +func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) { s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost) s.killConsumers(ctx, lost) } -func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { +func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked) s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { @@ -178,7 +178,7 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str } } -func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { +func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { for topic, partitions := range assigned { for _, partition := range partitions { pc := &consumer{