Compare commits

..

2 Commits

Author SHA1 Message Date
894d6f4f20 tracing fixes
Some checks failed
build / lint (push) Successful in 27s
build / test (push) Failing after 29s
codeql / analyze (go) (push) Failing after 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-22 01:11:33 +03:00
d404fa31ab export Subscriber
Some checks failed
build / test (push) Failing after 1m38s
codeql / analyze (go) (push) Failing after 1m59s
build / lint (push) Successful in 9m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-22 18:28:51 +03:00
3 changed files with 24 additions and 17 deletions

4
kgo.go
View File

@@ -62,7 +62,7 @@ type Broker struct {
connected bool
sync.RWMutex
opts broker.Options
subs []*subscriber
subs []*Subscriber
}
func (k *Broker) Address() string {
@@ -364,7 +364,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
}
}
sub := &subscriber{
sub := &Subscriber{
topic: topic,
opts: options,
handler: handler,

View File

@@ -33,7 +33,7 @@ type consumer struct {
recs chan kgo.FetchTopicPartition
}
type subscriber struct {
type Subscriber struct {
c *kgo.Client
topic string
htracer *hookTracer
@@ -46,19 +46,19 @@ type subscriber struct {
sync.RWMutex
}
func (s *subscriber) Client() *kgo.Client {
func (s *Subscriber) Client() *kgo.Client {
return s.c
}
func (s *subscriber) Options() broker.SubscribeOptions {
func (s *Subscriber) Options() broker.SubscribeOptions {
return s.opts
}
func (s *subscriber) Topic() string {
func (s *Subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Unsubscribe(ctx context.Context) error {
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
if s.closed {
return nil
}
@@ -80,7 +80,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
return nil
}
func (s *subscriber) poll(ctx context.Context) {
func (s *Subscriber) poll(ctx context.Context) {
maxInflight := DefaultSubscribeMaxInflight
if s.opts.Context != nil {
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
@@ -148,7 +148,7 @@ func (s *subscriber) poll(ctx context.Context) {
}
}
func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) {
var wg sync.WaitGroup
defer wg.Wait()
@@ -165,12 +165,12 @@ func (s *subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
}
}
func (s *subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] lost %#+v", lost)
s.killConsumers(ctx, lost)
}
func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) {
s.kopts.Logger.Debugf(ctx, "[kgo] revoked %#+v", revoked)
s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil {
@@ -178,7 +178,7 @@ func (s *subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
}
}
func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
for topic, partitions := range assigned {
for _, partition := range partitions {
pc := &consumer{

View File

@@ -58,12 +58,15 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
md.Set(h.Key, string(h.Value))
}
// Start the "publish" span.
ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
// Inject the span context into the record.
// t.propagators.Inject(ctx, NewRecordCarrier(r))
// Update the record context.
r.Context = ctx
if !ok {
r.Context, _ = m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
} else {
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" publish", opts...)
}
}
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
@@ -124,9 +127,13 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
// Extract the span context from the record.
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
// Start the "receive" span.
newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
if !ok {
r.Context, _ = m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
} else {
r.Context, _ = m.tracer.Start(r.Context, r.Topic+" receive", opts...)
}
// Update the record context.
r.Context = newCtx
}
// OnFetchRecordUnbuffered continues and ends the "receive" span for an