Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
88777a29ad | |||
23c2903c21 |
20
kgo.go
20
kgo.go
@@ -73,6 +73,10 @@ func (k *Broker) Name() string {
|
|||||||
return k.opts.Name
|
return k.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *Broker) Client() *kgo.Client {
|
||||||
|
return k.c
|
||||||
|
}
|
||||||
|
|
||||||
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
|
func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *hookTracer, error) {
|
||||||
var c *kgo.Client
|
var c *kgo.Client
|
||||||
var err error
|
var err error
|
||||||
@@ -322,6 +326,22 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *Broker) TopicExists(ctx context.Context, topic string) error {
|
||||||
|
mdreq := kmsg.NewMetadataRequest()
|
||||||
|
mdreq.Topics = []kmsg.MetadataRequestTopic{
|
||||||
|
{Topic: &topic},
|
||||||
|
}
|
||||||
|
|
||||||
|
mdrsp, err := mdreq.RequestWith(ctx, k.c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if mdrsp.Topics[0].ErrorCode != 0 {
|
||||||
|
return fmt.Errorf("topic %s not exists or permission error", topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (k *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@@ -46,6 +46,10 @@ type subscriber struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Client() *kgo.Client {
|
||||||
|
return s.c
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscriber) Options() broker.SubscribeOptions {
|
func (s *subscriber) Options() broker.SubscribeOptions {
|
||||||
return s.opts
|
return s.opts
|
||||||
}
|
}
|
||||||
|
34
tracer.go
34
tracer.go
@@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
"github.com/twmb/franz-go/pkg/kgo"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
||||||
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -44,8 +45,21 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
|
|||||||
tracer.WithSpanLabels(attrs...),
|
tracer.WithSpanLabels(attrs...),
|
||||||
tracer.WithSpanKind(tracer.SpanKindProducer),
|
tracer.WithSpanKind(tracer.SpanKindProducer),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.Context == nil {
|
||||||
|
r.Context = context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
|
md, ok := metadata.FromOutgoingContext(r.Context)
|
||||||
|
if !ok {
|
||||||
|
md = metadata.New(len(r.Headers))
|
||||||
|
}
|
||||||
|
for _, h := range r.Headers {
|
||||||
|
md.Set(h.Key, string(h.Value))
|
||||||
|
}
|
||||||
|
|
||||||
// Start the "publish" span.
|
// Start the "publish" span.
|
||||||
ctx, _ := m.tracer.Start(r.Context, r.Topic+" publish", opts...)
|
ctx, _ := m.tracer.Start(metadata.NewOutgoingContext(r.Context, md), r.Topic+" publish", opts...)
|
||||||
// Inject the span context into the record.
|
// Inject the span context into the record.
|
||||||
// t.propagators.Inject(ctx, NewRecordCarrier(r))
|
// t.propagators.Inject(ctx, NewRecordCarrier(r))
|
||||||
// Update the record context.
|
// Update the record context.
|
||||||
@@ -99,10 +113,18 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
|
|||||||
if r.Context == nil {
|
if r.Context == nil {
|
||||||
r.Context = context.Background()
|
r.Context = context.Background()
|
||||||
}
|
}
|
||||||
|
md, ok := metadata.FromIncomingContext(r.Context)
|
||||||
|
if !ok {
|
||||||
|
md = metadata.New(len(r.Headers))
|
||||||
|
}
|
||||||
|
for _, h := range r.Headers {
|
||||||
|
md.Set(h.Key, string(h.Value))
|
||||||
|
}
|
||||||
|
|
||||||
// Extract the span context from the record.
|
// Extract the span context from the record.
|
||||||
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
|
// ctx := t.propagators.Extract(r.Context, NewRecordCarrier(r))
|
||||||
// Start the "receive" span.
|
// Start the "receive" span.
|
||||||
newCtx, _ := m.tracer.Start(r.Context, r.Topic+" receive", opts...)
|
newCtx, _ := m.tracer.Start(metadata.NewIncomingContext(r.Context, md), r.Topic+" receive", opts...)
|
||||||
// Update the record context.
|
// Update the record context.
|
||||||
r.Context = newCtx
|
r.Context = newCtx
|
||||||
}
|
}
|
||||||
@@ -148,6 +170,14 @@ func (m *hookTracer) WithProcessSpan(r *kgo.Record) (context.Context, tracer.Spa
|
|||||||
if r.Context == nil {
|
if r.Context == nil {
|
||||||
r.Context = context.Background()
|
r.Context = context.Background()
|
||||||
}
|
}
|
||||||
|
md, ok := metadata.FromIncomingContext(r.Context)
|
||||||
|
if !ok {
|
||||||
|
md = metadata.New(len(r.Headers))
|
||||||
|
}
|
||||||
|
for _, h := range r.Headers {
|
||||||
|
md.Set(h.Key, string(h.Value))
|
||||||
|
}
|
||||||
|
|
||||||
// Start a new span using the provided context and options.
|
// Start a new span using the provided context and options.
|
||||||
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
|
return m.tracer.Start(r.Context, r.Topic+" process", opts...)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user