minimize checks on publish
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
11
kgo.go
11
kgo.go
@@ -344,23 +344,12 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||||
if b.connected.Load() == 0 {
|
|
||||||
c, _, err := b.connect(ctx, b.kopts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.Lock()
|
|
||||||
b.c = c
|
|
||||||
b.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
records := make([]*kgo.Record, 0, len(messages))
|
records := make([]*kgo.Record, 0, len(messages))
|
||||||
var errs []string
|
var errs []string
|
||||||
var key []byte
|
var key []byte
|
||||||
var promise func(*kgo.Record, error)
|
var promise func(*kgo.Record, error)
|
||||||
|
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
|
|
||||||
if mctx := msg.Context(); mctx != nil {
|
if mctx := msg.Context(); mctx != nil {
|
||||||
if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil {
|
if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil {
|
||||||
key = k
|
key = k
|
||||||
|
@@ -222,7 +222,7 @@ func TestPubSub(t *testing.T) {
|
|||||||
if prc := atomic.LoadInt64(&idx); prc == msgcnt {
|
if prc := atomic.LoadInt64(&idx); prc == msgcnt {
|
||||||
close(done)
|
close(done)
|
||||||
} else {
|
} else {
|
||||||
t.Logf("processed %v\n", prc)
|
t.Logf("processed %v of %v\n", prc, msgcnt)
|
||||||
}
|
}
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
close(done)
|
close(done)
|
||||||
|
Reference in New Issue
Block a user