rwfix #127

Merged
vtolstov merged 2 commits from rwfix into master 2023-12-20 22:57:33 +03:00
2 changed files with 30 additions and 8 deletions

23
kgo.go
View File

@ -11,13 +11,14 @@ import (
"time"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/metadata"
id "go.unistack.org/micro/v4/util/id"
mrand "go.unistack.org/micro/v4/util/rand"
)
var _ broker.Broker = &Broker{}
var _ broker.Broker = (*Broker)(nil)
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
@ -133,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
return nctx.Err()
default:
for _, sub := range k.subs {
if sub.closed {
continue
}
if err := sub.Unsubscribe(ctx); err != nil {
return err
}
@ -192,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock()
if !k.connected {
ok := k.connected
k.RUnlock()
if !ok {
k.Lock()
c, err := k.connect(ctx, k.kopts...)
if err != nil {
@ -204,7 +210,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.connected = true
k.Unlock()
}
k.RUnlock()
options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs))
@ -336,6 +341,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
return nil, err
}
mdreq := kmsg.NewMetadataRequest()
mdreq.Topics = []kmsg.MetadataRequestTopic{
{Topic: &topic},
}
mdrsp, err := mdreq.RequestWith(ctx, c)
if err != nil {
return nil, err
} else if mdrsp.Topics[0].ErrorCode != 0 {
return nil, fmt.Errorf("topic %s not exists or permission error", topic)
}
sub.c = c
go sub.poll(ctx)

View File

@ -56,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
default:
s.c.PauseFetchTopics(s.topic)
kc := make(map[string][]int32)
for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p)
}
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
}
@ -72,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.c.Close()
s.c.CloseAllowingRebalance()
return
case <-s.done:
s.c.Close()
s.c.CloseAllowingRebalance()
return
default:
fetches := s.c.PollRecords(ctx, maxInflight)
if fetches.IsClientClosed() {
s.kopts.Logger.Errorf(ctx, "[kgo] client closed")
if !s.closed && fetches.IsClientClosed() {
s.closed = true
return
}