diff --git a/kgo.go b/kgo.go index b295b95..6ad858f 100644 --- a/kgo.go +++ b/kgo.go @@ -280,18 +280,6 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, } func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - k.Lock() - if k.connected.Load() == 0 { - c, _, err := k.connect(ctx, k.kopts...) - if err != nil { - k.Unlock() - return err - } - k.c = c - k.connected.Store(1) - } - k.Unlock() - options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string diff --git a/subscriber.go b/subscriber.go index 0ff859b..e0066cd 100644 --- a/subscriber.go +++ b/subscriber.go @@ -274,11 +274,17 @@ func (pc *consumer) consume() { } else if pc.opts.BodyOnly { p.msg.Body = record.Value } else { - sp.AddEvent("codec unmarshal start") + if sp != nil { + sp.AddEvent("codec unmarshal start") + } err := pc.kopts.Codec.Unmarshal(record.Value, p.msg) - sp.AddEvent("codec unmarshal stop") + if sp != nil { + sp.AddEvent("codec unmarshal stop") + } if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) + if sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() p.err = err p.msg.Body = record.Value @@ -308,17 +314,25 @@ func (pc *consumer) consume() { eventPool.Put(p) // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") - sp.Finish() + if sp != nil { + sp.Finish() + } return } } - sp.AddEvent("handler start") + if sp != nil { + sp.AddEvent("handler start") + } err := pc.handler(p) - sp.AddEvent("handler stop") + if sp != nil { + sp.AddEvent("handler stop") + } if err == nil { pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "success").Inc() } else { - sp.SetStatus(tracer.SpanStatusError, err.Error()) + if sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } pc.kopts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", record.Topic, "topic", record.Topic, "status", "failure").Inc() } pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() @@ -327,9 +341,13 @@ func (pc *consumer) consume() { } else if err != nil { p.err = err if eh != nil { - sp.AddEvent("error handler start") + if sp != nil { + sp.AddEvent("error handler start") + } _ = eh(p) - sp.AddEvent("error handler stop") + if sp != nil { + sp.AddEvent("error handler stop") + } } else { if pc.kopts.Logger.V(logger.ErrorLevel) { pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: subscriber error", err) @@ -346,11 +364,15 @@ func (pc *consumer) consume() { eventPool.Put(p) // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") - sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") - sp.Finish() + if sp != nil { + sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") + sp.Finish() + } return } - sp.Finish() + if sp != nil { + sp.Finish() + } } } }