Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
6a68533824 | ||
058b6354c0 |
9
kgo.go
9
kgo.go
@@ -76,9 +76,8 @@ func (k *Broker) Name() string {
|
||||
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, error) {
|
||||
var c *kgo.Client
|
||||
var err error
|
||||
var span tracer.Span
|
||||
ctx, span = k.opts.Tracer.Start(ctx, "Connect")
|
||||
defer span.Finish()
|
||||
|
||||
sp, _ := tracer.SpanFromContext(ctx)
|
||||
|
||||
clientID := "kgo"
|
||||
group := ""
|
||||
@@ -99,7 +98,7 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
span.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
|
||||
sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
@@ -108,7 +107,7 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
|
||||
err = c.Ping(ctx) // check connectivity to cluster
|
||||
}
|
||||
if err != nil {
|
||||
span.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@@ -105,11 +105,13 @@ func (s *subscriber) poll(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
for tp := range s.consumers {
|
||||
if v, ok := lmap[tp.p]; ok {
|
||||
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(tp.p)), "lag", strconv.Itoa(int(v.Lag)))
|
||||
}
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
}
|
||||
}
|
||||
@@ -186,7 +188,9 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
|
||||
kopts: s.kopts,
|
||||
opts: s.opts,
|
||||
}
|
||||
s.Lock()
|
||||
s.consumers[tp{topic, partition}] = pc
|
||||
s.Unlock()
|
||||
go pc.consume()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user