diff --git a/kgo.go b/kgo.go index f723b10..0197d9a 100644 --- a/kgo.go +++ b/kgo.go @@ -11,13 +11,14 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/metadata" id "go.unistack.org/micro/v4/util/id" mrand "go.unistack.org/micro/v4/util/rand" ) -var _ broker.Broker = &Broker{} +var _ broker.Broker = (*Broker)(nil) var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") @@ -133,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error { return nctx.Err() default: for _, sub := range k.subs { + if sub.closed { + continue + } if err := sub.Unsubscribe(ctx); err != nil { return err } @@ -192,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { k.RLock() - if !k.connected { - k.RUnlock() + ok := k.connected + k.RUnlock() + + if !ok { k.Lock() c, err := k.connect(ctx, k.kopts...) if err != nil { @@ -204,7 +210,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.connected = true k.Unlock() } - k.RUnlock() options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) @@ -336,6 +341,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han return nil, err } + mdreq := kmsg.NewMetadataRequest() + mdreq.Topics = []kmsg.MetadataRequestTopic{ + {Topic: &topic}, + } + + mdrsp, err := mdreq.RequestWith(ctx, c) + if err != nil { + return nil, err + } else if mdrsp.Topics[0].ErrorCode != 0 { + return nil, fmt.Errorf("topic %s not exists or permission error", topic) + } + sub.c = c go sub.poll(ctx) diff --git a/subscriber.go b/subscriber.go index 8bdcd1d..bc4313f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -56,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() default: + s.c.PauseFetchTopics(s.topic) + kc := make(map[string][]int32) + for ctp := range s.consumers { + kc[ctp.t] = append(kc[ctp.t], ctp.p) + } + s.killConsumers(ctx, kc) close(s.done) s.closed = true } @@ -72,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) { for { select { case <-ctx.Done(): - s.c.Close() + s.c.CloseAllowingRebalance() return case <-s.done: - s.c.Close() + s.c.CloseAllowingRebalance() return default: fetches := s.c.PollRecords(ctx, maxInflight) - if fetches.IsClientClosed() { - s.kopts.Logger.Errorf(ctx, "[kgo] client closed") + if !s.closed && fetches.IsClientClosed() { s.closed = true return }