micro/broker/broker.go
2021-07-24 16:16:18 +03:00

131 lines
3.6 KiB
Go

// Package broker is an interface used for asynchronous messaging
package broker
import (
"context"
"errors"
"github.com/unistack-org/micro/v3/metadata"
)
// DefaultBroker default memory broker
var DefaultBroker Broker = NewBroker()
var (
// ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected")
)
// Broker is an interface used for asynchronous messaging.
type Broker interface {
// Name returns broker instance name
Name() string
// Init initilize broker
Init(opts ...Option) error
// Options returns broker options
Options() Options
// Address return configured address
Address() string
// Connect connects to broker
Connect(ctx context.Context) error
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// Publish message to broker topic
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker
String() string
}
// Handler is used to process messages via a subscription of a topic.
type Handler func(Event) error
// Events contains multiple events
type Events []Event
func (evs Events) Ack() error {
var err error
for _, ev := range evs {
if err = ev.Ack(); err != nil {
return err
}
}
return nil
}
func (evs Events) SetError(err error) {
for _, ev := range evs {
ev.SetError(err)
}
}
// BatchHandler is used to process messages in batches via a subscription of a topic.
type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Topic returns event topic
Topic() string
// Message returns broker message
Message() *Message
// Ack acknowledge message
Ack() error
// Error returns message error (like decoding errors or some other)
Error() error
// SetError set event processing error
SetError(err error)
}
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}
// Message is used to transfer data
type Message struct {
// Header contains message metadata
Header metadata.Metadata
// Body contains message body
Body RawMessage
}
// NewMessage create broker message with topic filled
func NewMessage(topic string) *Message {
m := &Message{Header: metadata.New(2)}
m.Header.Set(metadata.HeaderTopic, topic)
return m
}
// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface {
// Options returns subscriber options
Options() SubscribeOptions
// Topic returns topic for subscription
Topic() string
// Unsubscribe from topic
Unsubscribe(ctx context.Context) error
}