add ability to fail probes and fatal on broker errors
All checks were successful
test / test (push) Successful in 3m12s
All checks were successful
test / test (push) Successful in 3m12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
41
kgo.go
41
kgo.go
@@ -109,11 +109,18 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho
|
||||
}
|
||||
}
|
||||
|
||||
var fatalOnError bool
|
||||
if k.opts.Context != nil {
|
||||
if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v {
|
||||
fatalOnError = v
|
||||
}
|
||||
}
|
||||
|
||||
htracer := &hookTracer{group: group, clientID: clientID, tracer: k.opts.Tracer}
|
||||
opts = append(opts,
|
||||
kgo.WithHooks(&hookMeter{meter: k.opts.Meter}),
|
||||
kgo.WithHooks(htracer),
|
||||
kgo.WithHooks(&hookEvent{connected: k.connected}),
|
||||
kgo.WithHooks(&hookEvent{log: k.opts.Logger, fatalOnError: fatalOnError, connected: k.connected}),
|
||||
)
|
||||
|
||||
select {
|
||||
@@ -260,7 +267,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
k.connected.Store(1)
|
||||
}
|
||||
k.Unlock()
|
||||
|
||||
fmt.Printf("EEE\n")
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
records := make([]*kgo.Record, 0, len(msgs))
|
||||
var errs []string
|
||||
@@ -315,7 +322,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
return nil
|
||||
}
|
||||
ts := time.Now()
|
||||
fmt.Printf("SSSSSSEEE\n")
|
||||
results := k.c.ProduceSync(ctx, records...)
|
||||
fmt.Printf("SSSSSS\n")
|
||||
te := time.Since(ts)
|
||||
for _, result := range results {
|
||||
k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
||||
@@ -374,13 +383,27 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
}
|
||||
}
|
||||
|
||||
var fatalOnError bool
|
||||
if k.opts.Context != nil {
|
||||
if v, ok := k.opts.Context.Value(fatalOnErrorKey{}).(bool); ok && v {
|
||||
fatalOnError = v
|
||||
}
|
||||
}
|
||||
|
||||
if options.Context != nil {
|
||||
if v, ok := options.Context.Value(fatalOnErrorKey{}).(bool); ok && v {
|
||||
fatalOnError = v
|
||||
}
|
||||
}
|
||||
|
||||
sub := &Subscriber{
|
||||
topic: topic,
|
||||
opts: options,
|
||||
handler: handler,
|
||||
kopts: k.opts,
|
||||
consumers: make(map[tp]*consumer),
|
||||
done: make(chan struct{}),
|
||||
topic: topic,
|
||||
opts: options,
|
||||
handler: handler,
|
||||
kopts: k.opts,
|
||||
consumers: make(map[tp]*consumer),
|
||||
done: make(chan struct{}),
|
||||
fatalOnError: fatalOnError,
|
||||
}
|
||||
|
||||
kopts := append(k.kopts,
|
||||
@@ -454,7 +477,7 @@ func NewBroker(opts ...broker.Option) *Broker {
|
||||
kgo.BlockRebalanceOnPoll(),
|
||||
kgo.Balancers(kgo.CooperativeStickyBalancer()),
|
||||
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
|
||||
kgo.UnknownTopicRetries(0),
|
||||
kgo.UnknownTopicRetries(1),
|
||||
}
|
||||
|
||||
if options.Context != nil {
|
||||
|
Reference in New Issue
Block a user