fix graceful shutdown
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										18
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -11,13 +11,14 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
|  | 	"github.com/twmb/franz-go/pkg/kmsg" | ||||||
| 	"go.unistack.org/micro/v4/broker" | 	"go.unistack.org/micro/v4/broker" | ||||||
| 	"go.unistack.org/micro/v4/metadata" | 	"go.unistack.org/micro/v4/metadata" | ||||||
| 	id "go.unistack.org/micro/v4/util/id" | 	id "go.unistack.org/micro/v4/util/id" | ||||||
| 	mrand "go.unistack.org/micro/v4/util/rand" | 	mrand "go.unistack.org/micro/v4/util/rand" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var _ broker.Broker = &Broker{} | var _ broker.Broker = (*Broker)(nil) | ||||||
|  |  | ||||||
| var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") | var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") | ||||||
|  |  | ||||||
| @@ -133,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error { | |||||||
| 		return nctx.Err() | 		return nctx.Err() | ||||||
| 	default: | 	default: | ||||||
| 		for _, sub := range k.subs { | 		for _, sub := range k.subs { | ||||||
|  | 			if sub.closed { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			if err := sub.Unsubscribe(ctx); err != nil { | 			if err := sub.Unsubscribe(ctx); err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| @@ -336,6 +340,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han | |||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	mdreq := kmsg.NewMetadataRequest() | ||||||
|  | 	mdreq.Topics = []kmsg.MetadataRequestTopic{ | ||||||
|  | 		{Topic: &topic}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	mdrsp, err := mdreq.RequestWith(ctx, c) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} else if mdrsp.Topics[0].ErrorCode != 0 { | ||||||
|  | 		return nil, fmt.Errorf("topic %s not exists or permission error", topic) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	sub.c = c | 	sub.c = c | ||||||
| 	go sub.poll(ctx) | 	go sub.poll(ctx) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -56,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { | |||||||
| 	case <-ctx.Done(): | 	case <-ctx.Done(): | ||||||
| 		return ctx.Err() | 		return ctx.Err() | ||||||
| 	default: | 	default: | ||||||
|  | 		s.c.PauseFetchTopics(s.topic) | ||||||
|  | 		kc := make(map[string][]int32) | ||||||
|  | 		for ctp := range s.consumers { | ||||||
|  | 			kc[ctp.t] = append(kc[ctp.t], ctp.p) | ||||||
|  | 		} | ||||||
|  | 		s.killConsumers(ctx, kc) | ||||||
| 		close(s.done) | 		close(s.done) | ||||||
| 		s.closed = true | 		s.closed = true | ||||||
| 	} | 	} | ||||||
| @@ -72,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) { | |||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			s.c.Close() | 			s.c.CloseAllowingRebalance() | ||||||
| 			return | 			return | ||||||
| 		case <-s.done: | 		case <-s.done: | ||||||
| 			s.c.Close() | 			s.c.CloseAllowingRebalance() | ||||||
| 			return | 			return | ||||||
| 		default: | 		default: | ||||||
| 			fetches := s.c.PollRecords(ctx, maxInflight) | 			fetches := s.c.PollRecords(ctx, maxInflight) | ||||||
| 			if fetches.IsClientClosed() { | 			if !s.closed && fetches.IsClientClosed() { | ||||||
| 				s.kopts.Logger.Errorf(ctx, "[kgo] client closed") |  | ||||||
| 				s.closed = true | 				s.closed = true | ||||||
| 				return | 				return | ||||||
| 			} | 			} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user