Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
d404fa31ab | |||
88777a29ad |
24
kgo.go
24
kgo.go
@@ -62,7 +62,7 @@ type Broker struct {
|
||||
connected bool
|
||||
sync.RWMutex
|
||||
opts broker.Options
|
||||
subs []*subscriber
|
||||
subs []*Subscriber
|
||||
}
|
||||
|
||||
func (k *Broker) Address() string {
|
||||
@@ -73,6 +73,10 @@ func (k *Broker) Name() string {
|
||||
return k.opts.Name
|
||||
}
|
||||
|
||||
func (k *Broker) Client() *kgo.Client {
|
||||
return k.c
|
||||
}
|
||||
|
||||
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
|
||||
var c *kgo.Client
|
||||
var err error
|
||||
@@ -322,6 +326,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *Broker) TopicExists(ctx context.Context, topic string) error {
|
||||
mdreq := kmsg.NewMetadataRequest()
|
||||
mdreq.Topics = []kmsg.MetadataRequestTopic{
|
||||
{Topic: &topic},
|
||||
}
|
||||
|
||||
mdrsp, err := mdreq.RequestWith(ctx, k.c)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if mdrsp.Topics[0].ErrorCode != 0 {
|
||||
return fmt.Errorf("topic %s not exists or permission error", topic)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -344,7 +364,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
}
|
||||
}
|
||||
|
||||
sub := &subscriber{
|
||||
sub := &Subscriber{
|
||||
topic: topic,
|
||||
opts: options,
|
||||
handler: handler,
|
||||
|
@@ -33,7 +33,7 @@ type consumer struct {
|
||||
recs chan kgo.FetchTopicPartition
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
type Subscriber struct {
|
||||
c *kgo.Client
|
||||
topic string
|
||||
htracer *hookTracer
|
||||
@@ -46,15 +46,19 @@ type subscriber struct {
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (s *subscriber) Options() broker.SubscribeOptions {
|
||||
func (s *Subscriber) Client() *kgo.Client {
|
||||
return s.c
|
||||
}
|
||||
|
||||
func (s *Subscriber) Options() broker.SubscribeOptions {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
func (s *subscriber) Topic() string {
|
||||
func (s *Subscriber) Topic() string {
|
||||
return s.topic
|
||||
}
|
||||
|
||||
func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
||||
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||
if s.closed {
|
||||
return nil
|
||||
}
|
||||
@@ -76,7 +80,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) poll(ctx context.Context) {
|
||||
func (s *Subscriber) poll(ctx context.Context) {
|
||||
maxInflight := DefaultSubscribeMaxInflight
|
||||
if s.opts.Context != nil {
|
||||
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
|
||||
@@ -144,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
|
||||
func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
@@ -161,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
|
||||
func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
|
||||
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
|
||||
s.killConsumers(ctx, lost)
|
||||
}
|
||||
|
||||
func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
|
||||
func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
|
||||
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
|
||||
s.killConsumers(ctx, revoked)
|
||||
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
||||
@@ -174,7 +178,7 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
|
||||
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
|
||||
for topic, partitions := range assigned {
|
||||
for _, partition := range partitions {
|
||||
pc := &consumer{
|
||||
|
Reference in New Issue
Block a user