diff --git a/kgo.go b/kgo.go index 093e77f..f83cc81 100644 --- a/kgo.go +++ b/kgo.go @@ -11,13 +11,14 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/metadata" id "go.unistack.org/micro/v3/util/id" mrand "go.unistack.org/micro/v3/util/rand" ) -var _ broker.Broker = &Broker{} +var _ broker.Broker = (*Broker)(nil) var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") @@ -134,6 +135,9 @@ func (k *Broker) Disconnect(ctx context.Context) error { return nctx.Err() default: for _, sub := range k.subs { + if sub.closed { + continue + } if err := sub.Unsubscribe(ctx); err != nil { return err } @@ -306,6 +310,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han return nil, err } + mdreq := kmsg.NewMetadataRequest() + mdreq.Topics = []kmsg.MetadataRequestTopic{ + {Topic: &topic}, + } + + mdrsp, err := mdreq.RequestWith(ctx, c) + if err != nil { + return nil, err + } else if mdrsp.Topics[0].ErrorCode != 0 { + return nil, fmt.Errorf("topic %s not exists or permission error", topic) + } + sub.c = c go sub.poll(ctx) diff --git a/subscriber.go b/subscriber.go index b6f147f..8099fc0 100644 --- a/subscriber.go +++ b/subscriber.go @@ -55,6 +55,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() default: + s.c.PauseFetchTopics(s.topic) + kc := make(map[string][]int32) + for ctp := range s.consumers { + kc[ctp.t] = append(kc[ctp.t], ctp.p) + } + s.killConsumers(ctx, kc) close(s.done) s.closed = true } @@ -71,15 +77,14 @@ func (s *subscriber) poll(ctx context.Context) { for { select { case <-ctx.Done(): - s.c.Close() + s.c.CloseAllowingRebalance() return case <-s.done: - s.c.Close() + s.c.CloseAllowingRebalance() return default: fetches := s.c.PollRecords(ctx, maxInflight) - if fetches.IsClientClosed() { - s.kopts.Logger.Errorf(ctx, "[kgo] client closed") + if !s.closed && fetches.IsClientClosed() { s.closed = true return }