Compare commits

..

2 Commits

Author SHA1 Message Date
7312397abe graceful shutdown porting
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 15:25:31 +03:00
7676631737 fix graceful shutdown
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 15:15:58 +03:00
4 changed files with 32 additions and 11 deletions

4
go.mod
View File

@@ -4,11 +4,11 @@ go 1.19
require ( require (
github.com/twmb/franz-go v1.11.5 github.com/twmb/franz-go v1.11.5
go.unistack.org/micro/v4 v4.0.1 github.com/twmb/franz-go/pkg/kmsg v1.3.0
go.unistack.org/micro/v4 v4.0.2
) )
require ( require (
github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.3.0 // indirect
) )

4
go.sum
View File

@@ -6,8 +6,8 @@ github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE=
github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78= github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78=
github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw= github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw=
github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo= go.unistack.org/micro/v4 v4.0.2 h1:2LeG6jslE50c72f1XwJhfTiidy67xklIC3saptLoUys=
go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs= go.unistack.org/micro/v4 v4.0.2/go.mod h1:+wBa98rSf+mRXb/MuSVFPXtDrqN0k8rzPQiC8wRCwCo=
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

18
kgo.go
View File

@@ -11,13 +11,14 @@ import (
"time" "time"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
id "go.unistack.org/micro/v4/util/id" id "go.unistack.org/micro/v4/util/id"
mrand "go.unistack.org/micro/v4/util/rand" mrand "go.unistack.org/micro/v4/util/rand"
) )
var _ broker.Broker = &Broker{} var _ broker.Broker = (*Broker)(nil)
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
@@ -133,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
return nctx.Err() return nctx.Err()
default: default:
for _, sub := range k.subs { for _, sub := range k.subs {
if sub.closed {
continue
}
if err := sub.Unsubscribe(ctx); err != nil { if err := sub.Unsubscribe(ctx); err != nil {
return err return err
} }
@@ -336,6 +340,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
return nil, err return nil, err
} }
mdreq := kmsg.NewMetadataRequest()
mdreq.Topics = []kmsg.MetadataRequestTopic{
{Topic: &topic},
}
mdrsp, err := mdreq.RequestWith(ctx, c)
if err != nil {
return nil, err
} else if mdrsp.Topics[0].ErrorCode != 0 {
return nil, fmt.Errorf("topic %s not exists or permission error", topic)
}
sub.c = c sub.c = c
go sub.poll(ctx) go sub.poll(ctx)

View File

@@ -22,7 +22,7 @@ type consumer struct {
partition int32 partition int32
opts broker.SubscribeOptions opts broker.SubscribeOptions
kopts broker.Options kopts broker.Options
handler broker.Handler handler interface{}
quit chan struct{} quit chan struct{}
done chan struct{} done chan struct{}
recs chan kgo.FetchTopicPartition recs chan kgo.FetchTopicPartition
@@ -33,7 +33,7 @@ type subscriber struct {
topic string topic string
opts broker.SubscribeOptions opts broker.SubscribeOptions
kopts broker.Options kopts broker.Options
handler broker.Handler handler interface{}
closed bool closed bool
done chan struct{} done chan struct{}
consumers map[tp]*consumer consumers map[tp]*consumer
@@ -56,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
default: default:
s.c.PauseFetchTopics(s.topic)
kc := make(map[string][]int32)
for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p)
}
s.killConsumers(ctx, kc)
close(s.done) close(s.done)
s.closed = true s.closed = true
} }
@@ -72,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
s.c.Close() s.c.CloseAllowingRebalance()
return return
case <-s.done: case <-s.done:
s.c.Close() s.c.CloseAllowingRebalance()
return return
default: default:
fetches := s.c.PollRecords(ctx, maxInflight) fetches := s.c.PollRecords(ctx, maxInflight)
if fetches.IsClientClosed() { if !s.closed && fetches.IsClientClosed() {
s.kopts.Logger.Errorf(ctx, "[kgo] client closed")
s.closed = true s.closed = true
return return
} }