fix graceful shutdown #124
18
kgo.go
18
kgo.go
@ -11,13 +11,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
|
"github.com/twmb/franz-go/pkg/kmsg"
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
id "go.unistack.org/micro/v3/util/id"
|
id "go.unistack.org/micro/v3/util/id"
|
||||||
mrand "go.unistack.org/micro/v3/util/rand"
|
mrand "go.unistack.org/micro/v3/util/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ broker.Broker = &Broker{}
|
var _ broker.Broker = (*Broker)(nil)
|
||||||
|
|
||||||
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
|
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
|
||||||
|
|
||||||
@ -134,6 +135,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
|
|||||||
return nctx.Err()
|
return nctx.Err()
|
||||||
default:
|
default:
|
||||||
for _, sub := range k.subs {
|
for _, sub := range k.subs {
|
||||||
|
if sub.closed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if err := sub.Unsubscribe(ctx); err != nil {
|
if err := sub.Unsubscribe(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -306,6 +310,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mdreq := kmsg.NewMetadataRequest()
|
||||||
|
mdreq.Topics = []kmsg.MetadataRequestTopic{
|
||||||
|
{Topic: &topic},
|
||||||
|
}
|
||||||
|
|
||||||
|
mdrsp, err := mdreq.RequestWith(ctx, c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if mdrsp.Topics[0].ErrorCode != 0 {
|
||||||
|
return nil, fmt.Errorf("topic %s not exists or permission error", topic)
|
||||||
|
}
|
||||||
|
|
||||||
sub.c = c
|
sub.c = c
|
||||||
go sub.poll(ctx)
|
go sub.poll(ctx)
|
||||||
|
|
||||||
|
@ -55,6 +55,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
|
s.c.PauseFetchTopics(s.topic)
|
||||||
|
kc := make(map[string][]int32)
|
||||||
|
for ctp := range s.consumers {
|
||||||
|
kc[ctp.t] = append(kc[ctp.t], ctp.p)
|
||||||
|
}
|
||||||
|
s.killConsumers(ctx, kc)
|
||||||
close(s.done)
|
close(s.done)
|
||||||
s.closed = true
|
s.closed = true
|
||||||
}
|
}
|
||||||
@ -71,15 +77,14 @@ func (s *subscriber) poll(ctx context.Context) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
s.c.Close()
|
s.c.CloseAllowingRebalance()
|
||||||
return
|
return
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
s.c.Close()
|
s.c.CloseAllowingRebalance()
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
fetches := s.c.PollRecords(ctx, maxInflight)
|
fetches := s.c.PollRecords(ctx, maxInflight)
|
||||||
if fetches.IsClientClosed() {
|
if !s.closed && fetches.IsClientClosed() {
|
||||||
s.kopts.Logger.Errorf(ctx, "[kgo] client closed")
|
|
||||||
s.closed = true
|
s.closed = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user