@@ -122,11 +122,10 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
||||
p.message = v.Body
|
||||
} else {
|
||||
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
||||
buf, err := m.opts.Codec.Marshal(v)
|
||||
p.message, err = m.opts.Codec.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.message = buf
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ func TestMemoryBatchBroker(t *testing.T) {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, 0)
|
||||
msgs := make([]*Message, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
Header: map[string]string{
|
||||
@@ -74,7 +74,7 @@ func TestMemoryBroker(t *testing.T) {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, 0)
|
||||
msgs := make([]*Message, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
Header: map[string]string{
|
||||
|
||||
Reference in New Issue
Block a user