add additional options for server and broke #26
6
kgo.go
6
kgo.go
@ -243,6 +243,12 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||||
|
k.RLock()
|
||||||
|
if !k.connected {
|
||||||
|
k.RUnlock()
|
||||||
|
return broker.ErrNotConnected
|
||||||
|
}
|
||||||
|
k.RUnlock()
|
||||||
options := broker.NewPublishOptions(opts...)
|
options := broker.NewPublishOptions(opts...)
|
||||||
records := make([]*kgo.Record, 0, len(msgs))
|
records := make([]*kgo.Record, 0, len(msgs))
|
||||||
var errs []string
|
var errs []string
|
||||||
|
Loading…
Reference in New Issue
Block a user