Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
af8d81f3c6 | |||
5c9b3dae33 | |||
9f3957d101 | |||
8fd8bdcb39 | |||
80e3d239ab | |||
419cd486cf | |||
e64269b2a8 | |||
d18429e024 |
@@ -34,10 +34,10 @@ type Broker interface {
|
||||
Disconnect(ctx context.Context) error
|
||||
// Publish message to broker topic
|
||||
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
|
||||
// BatchPublish messages to broker with multiple topics
|
||||
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
|
||||
// Subscribe subscribes to topic message via handler
|
||||
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
|
||||
// BatchPublish messages to broker with multiple topics
|
||||
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
|
||||
// BatchSubscribe subscribes to topic messages via handler
|
||||
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
||||
// String type of broker
|
||||
|
278
broker/memory.go
278
broker/memory.go
@@ -13,10 +13,9 @@ import (
|
||||
)
|
||||
|
||||
type memoryBroker struct {
|
||||
subscribers map[string][]*memorySubscriber
|
||||
batchsubscribers map[string][]*memoryBatchSubscriber
|
||||
addr string
|
||||
opts Options
|
||||
subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
connected bool
|
||||
}
|
||||
@@ -29,21 +28,13 @@ type memoryEvent struct {
|
||||
}
|
||||
|
||||
type memorySubscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler Handler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
type memoryBatchSubscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler BatchHandler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler Handler
|
||||
batchhandler BatchHandler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() Options {
|
||||
@@ -97,6 +88,36 @@ func (m *memoryBroker) Init(opts ...Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
options := NewPublishOptions(opts...)
|
||||
vs := make([]msgWrapper, 0, 1)
|
||||
if m.opts.Codec == nil || options.BodyOnly {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
vs = append(vs, msgWrapper{topic: topic, body: msg})
|
||||
} else {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
buf, err := m.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vs = append(vs, msgWrapper{topic: topic, body: buf})
|
||||
}
|
||||
|
||||
return m.publish(ctx, vs, opts...)
|
||||
}
|
||||
|
||||
type msgWrapper struct {
|
||||
topic string
|
||||
body interface{}
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
@@ -105,155 +126,90 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
type msgWrapper struct {
|
||||
topic string
|
||||
body interface{}
|
||||
}
|
||||
|
||||
options := NewPublishOptions(opts...)
|
||||
vs := make([]msgWrapper, 0, len(msgs))
|
||||
if m.opts.Codec == nil {
|
||||
m.RLock()
|
||||
if m.opts.Codec == nil || options.BodyOnly {
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
vs = append(vs, msgWrapper{topic: topic, body: m})
|
||||
vs = append(vs, msgWrapper{topic: topic, body: msg})
|
||||
}
|
||||
m.RUnlock()
|
||||
} else {
|
||||
m.RLock()
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
buf, err := m.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
m.RUnlock()
|
||||
return err
|
||||
}
|
||||
vs = append(vs, msgWrapper{topic: topic, body: buf})
|
||||
}
|
||||
m.RUnlock()
|
||||
}
|
||||
|
||||
if len(m.batchsubscribers) > 0 {
|
||||
eh := m.opts.BatchErrorHandler
|
||||
return m.publish(ctx, vs, opts...)
|
||||
}
|
||||
|
||||
msgTopicMap := make(map[string]Events)
|
||||
for _, v := range vs {
|
||||
p := &memoryEvent{
|
||||
topic: v.topic,
|
||||
message: v.body,
|
||||
opts: m.opts,
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
for t, ms := range msgTopicMap {
|
||||
m.RLock()
|
||||
subs, ok := m.batchsubscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, sub := range subs {
|
||||
if err := sub.handler(ms); err != nil {
|
||||
ms.SetError(err)
|
||||
if sub.opts.BatchErrorHandler != nil {
|
||||
eh = sub.opts.BatchErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(ms)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err := ms.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
eh := m.opts.ErrorHandler
|
||||
func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error {
|
||||
var err error
|
||||
|
||||
msgTopicMap := make(map[string]Events)
|
||||
for _, v := range vs {
|
||||
p := &memoryEvent{
|
||||
topic: v.topic,
|
||||
message: v.body,
|
||||
opts: m.opts,
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
beh := m.opts.BatchErrorHandler
|
||||
eh := m.opts.ErrorHandler
|
||||
|
||||
for t, ms := range msgTopicMap {
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[p.topic]
|
||||
subs, ok := m.subscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sub := range subs {
|
||||
if err := sub.handler(p); err != nil {
|
||||
p.SetError(err)
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err := p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
// batch processing
|
||||
if sub.batchhandler != nil {
|
||||
if err = sub.batchhandler(ms); err != nil {
|
||||
ms.SetError(err)
|
||||
if sub.opts.BatchErrorHandler != nil {
|
||||
beh = sub.opts.BatchErrorHandler
|
||||
}
|
||||
if beh != nil {
|
||||
beh(ms)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = ms.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return ErrNotConnected
|
||||
}
|
||||
|
||||
subs, ok := m.subscribers[topic]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var v interface{}
|
||||
if m.opts.Codec != nil {
|
||||
buf, err := m.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v = buf
|
||||
} else {
|
||||
v = msg
|
||||
}
|
||||
|
||||
p := &memoryEvent{
|
||||
topic: topic,
|
||||
message: v,
|
||||
opts: m.opts,
|
||||
}
|
||||
|
||||
eh := m.opts.ErrorHandler
|
||||
|
||||
for _, sub := range subs {
|
||||
if err := sub.handler(p); err != nil {
|
||||
p.err = err
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
// single processing
|
||||
if sub.handler != nil {
|
||||
for _, p := range ms {
|
||||
if err = sub.handler(p); err != nil {
|
||||
p.SetError(err)
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,43 +224,41 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub := &memoryBatchSubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: id.String(),
|
||||
topic: topic,
|
||||
handler: handler,
|
||||
opts: options,
|
||||
ctx: ctx,
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: id.String(),
|
||||
topic: topic,
|
||||
batchhandler: handler,
|
||||
opts: options,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub)
|
||||
m.subscribers[topic] = append(m.subscribers[topic], sub)
|
||||
m.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
var newSubscribers []*memoryBatchSubscriber
|
||||
for _, sb := range m.batchsubscribers[topic] {
|
||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
}
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
m.batchsubscribers[topic] = newSubscribers
|
||||
m.subscribers[topic] = newSubscribers
|
||||
m.Unlock()
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
@@ -315,13 +269,13 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: id.String(),
|
||||
@@ -338,7 +292,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
var newSubscribers []*memorySubscriber
|
||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
@@ -394,19 +348,6 @@ func (m *memoryEvent) SetError(err error) {
|
||||
m.err = err
|
||||
}
|
||||
|
||||
func (m *memoryBatchSubscriber) Options() SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *memoryBatchSubscriber) Topic() string {
|
||||
return m.topic
|
||||
}
|
||||
|
||||
func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error {
|
||||
m.exit <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Options() SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
@@ -423,8 +364,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
||||
// NewBroker return new memory broker
|
||||
func NewBroker(opts ...Option) Broker {
|
||||
return &memoryBroker{
|
||||
opts: NewOptions(opts...),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
batchsubscribers: make(map[string][]*memoryBatchSubscriber),
|
||||
opts: NewOptions(opts...),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
}
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
@@ -73,11 +74,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||
options := PublishOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
@@ -95,6 +94,10 @@ type SubscribeOptions struct {
|
||||
AutoAck bool
|
||||
// BodyOnly flag specifies that message contains only body bytes without header
|
||||
BodyOnly bool
|
||||
// BatchSize flag specifies max batch size
|
||||
BatchSize int
|
||||
// BatchWait flag specifies max wait time for batch filling
|
||||
BatchWait time.Duration
|
||||
}
|
||||
|
||||
// Option func
|
||||
@@ -117,23 +120,6 @@ func PublishContext(ctx context.Context) PublishOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeOption func
|
||||
type SubscribeOption func(*SubscribeOptions)
|
||||
|
||||
// NewSubscribeOptions creates new SubscribeOptions
|
||||
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
options := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// Addrs sets the host addresses to be used by the broker
|
||||
func Addrs(addrs ...string) Option {
|
||||
return func(o *Options) {
|
||||
@@ -149,28 +135,6 @@ func Codec(c codec.Codec) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// DisableAutoAck disables auto ack
|
||||
func DisableAutoAck() SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = false
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeAutoAck will disable auto acking of messages
|
||||
// after they have been handled.
|
||||
func SubscribeAutoAck(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBodyOnly consumes only body of the message
|
||||
func SubscribeBodyOnly(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// ErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func ErrorHandler(h Handler) Option {
|
||||
@@ -266,3 +230,55 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// DisableAutoAck disables auto ack
|
||||
// DEPRECATED
|
||||
func DisableAutoAck() SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = false
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeAutoAck contol auto acking of messages
|
||||
// after they have been handled.
|
||||
func SubscribeAutoAck(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBodyOnly consumes only body of the message
|
||||
func SubscribeBodyOnly(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBatchSize specifies max batch size
|
||||
func SubscribeBatchSize(n int) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BatchSize = n
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBatchWait specifies max batch wait time
|
||||
func SubscribeBatchWait(td time.Duration) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BatchWait = td
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeOption func
|
||||
type SubscribeOption func(*SubscribeOptions)
|
||||
|
||||
// NewSubscribeOptions creates new SubscribeOptions
|
||||
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
options := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
@@ -40,6 +40,7 @@ type Client interface {
|
||||
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
||||
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
||||
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
|
||||
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
|
||||
String() string
|
||||
}
|
||||
|
||||
|
@@ -181,47 +181,59 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
|
||||
return &noopStream{}, nil
|
||||
}
|
||||
|
||||
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
||||
var body []byte
|
||||
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||
return n.publish(ctx, ps, opts...)
|
||||
}
|
||||
|
||||
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
||||
return n.publish(ctx, []Message{p}, opts...)
|
||||
}
|
||||
|
||||
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||
options := NewPublishOptions(opts...)
|
||||
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(0)
|
||||
}
|
||||
md[metadata.HeaderContentType] = p.ContentType()
|
||||
md[metadata.HeaderTopic] = p.Topic()
|
||||
msgs := make([]*broker.Message, 0, len(ps))
|
||||
|
||||
// passed in raw data
|
||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||
body = d.Data
|
||||
} else {
|
||||
// use codec for payload
|
||||
cf, err := n.newCodec(p.ContentType())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
for _, p := range ps {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(0)
|
||||
}
|
||||
md[metadata.HeaderContentType] = p.ContentType()
|
||||
|
||||
topic := p.Topic()
|
||||
|
||||
// get the exchange
|
||||
if len(options.Exchange) > 0 {
|
||||
topic = options.Exchange
|
||||
}
|
||||
|
||||
// set the body
|
||||
b, err := cf.Marshal(p.Payload())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
md[metadata.HeaderTopic] = topic
|
||||
|
||||
var body []byte
|
||||
|
||||
// passed in raw data
|
||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||
body = d.Data
|
||||
} else {
|
||||
// use codec for payload
|
||||
cf, err := n.newCodec(p.ContentType())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
// set the body
|
||||
b, err := cf.Marshal(p.Payload())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
body = b
|
||||
}
|
||||
body = b
|
||||
|
||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||
}
|
||||
|
||||
topic := p.Topic()
|
||||
|
||||
// get the exchange
|
||||
if len(options.Exchange) > 0 {
|
||||
topic = options.Exchange
|
||||
}
|
||||
|
||||
return n.opts.Broker.Publish(ctx, topic, &broker.Message{
|
||||
Header: md,
|
||||
Body: body,
|
||||
},
|
||||
return n.opts.Broker.BatchPublish(ctx, msgs,
|
||||
broker.PublishContext(options.Context),
|
||||
broker.PublishBodyOnly(options.BodyOnly),
|
||||
)
|
||||
|
@@ -373,19 +373,35 @@ func DialTimeout(d time.Duration) Option {
|
||||
}
|
||||
|
||||
// WithExchange sets the exchange to route a message through
|
||||
// DEPRECATED
|
||||
func WithExchange(e string) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Exchange = e
|
||||
}
|
||||
}
|
||||
|
||||
// PublishExchange sets the exchange to route a message through
|
||||
func PublishExchange(e string) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Exchange = e
|
||||
}
|
||||
}
|
||||
|
||||
// WithBodyOnly publish only message body
|
||||
// DERECATED
|
||||
func WithBodyOnly(b bool) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// PublishBodyOnly publish only message body
|
||||
func PublishBodyOnly(b bool) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// PublishContext sets the context in publish options
|
||||
func PublishContext(ctx context.Context) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
@@ -498,12 +514,20 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
|
||||
}
|
||||
|
||||
// WithMessageContentType sets the message content type
|
||||
// DEPRECATED
|
||||
func WithMessageContentType(ct string) MessageOption {
|
||||
return func(o *MessageOptions) {
|
||||
o.ContentType = ct
|
||||
}
|
||||
}
|
||||
|
||||
// MessageContentType sets the message content type
|
||||
func MessageContentType(ct string) MessageOption {
|
||||
return func(o *MessageOptions) {
|
||||
o.ContentType = ct
|
||||
}
|
||||
}
|
||||
|
||||
// StreamingRequest specifies that request is streaming
|
||||
func StreamingRequest(b bool) RequestOption {
|
||||
return func(o *RequestOptions) {
|
||||
|
@@ -35,9 +35,6 @@ func (l *defaultLogger) Init(opts ...Option) error {
|
||||
o(&l.opts)
|
||||
}
|
||||
l.enc = json.NewEncoder(l.opts.Out)
|
||||
|
||||
l.logFunc = l.Log
|
||||
l.logfFunc = l.Logf
|
||||
// wrap the Log func
|
||||
for i := len(l.opts.Wrappers); i > 0; i-- {
|
||||
l.logFunc = l.opts.Wrappers[i-1].Log(l.logFunc)
|
||||
@@ -218,7 +215,11 @@ func (l *defaultLogger) Options() Options {
|
||||
|
||||
// NewLogger builds a new logger based on options
|
||||
func NewLogger(opts ...Option) Logger {
|
||||
l := &defaultLogger{opts: NewOptions(opts...)}
|
||||
l := &defaultLogger{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
l.logFunc = l.Log
|
||||
l.logfFunc = l.Logf
|
||||
l.enc = json.NewEncoder(l.opts.Out)
|
||||
return l
|
||||
}
|
||||
|
@@ -8,6 +8,8 @@ var (
|
||||
DefaultLogger Logger = NewLogger()
|
||||
// DefaultLevel used by logger
|
||||
DefaultLevel Level = InfoLevel
|
||||
// DefaultCallerSkipCount used by logger
|
||||
DefaultCallerSkipCount = 2
|
||||
)
|
||||
|
||||
// Logger is a generic logging interface
|
||||
|
@@ -33,7 +33,7 @@ func NewOptions(opts ...Option) Options {
|
||||
Level: DefaultLevel,
|
||||
Fields: make(map[string]interface{}),
|
||||
Out: os.Stderr,
|
||||
CallerSkipCount: 0,
|
||||
CallerSkipCount: DefaultCallerSkipCount,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
|
@@ -17,6 +17,8 @@ var (
|
||||
HeaderService = "Micro-Service"
|
||||
// HeaderTimeout specifies timeout of operation
|
||||
HeaderTimeout = "Micro-Timeout"
|
||||
// HeaderAuthorization specifies Authorization header
|
||||
HeaderAuthorization = "Authorization"
|
||||
)
|
||||
|
||||
// Metadata is our way of representing request headers internally.
|
||||
|
Reference in New Issue
Block a user