135 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package broker is an interface used for asynchronous messaging
 | |
| package broker
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.unistack.org/micro/v3/codec"
 | |
| 	"go.unistack.org/micro/v3/metadata"
 | |
| )
 | |
| 
 | |
| // DefaultBroker default memory broker
 | |
| var DefaultBroker Broker = NewBroker()
 | |
| 
 | |
| var (
 | |
| 	// ErrNotConnected returns when broker used but not connected yet
 | |
| 	ErrNotConnected = errors.New("broker not connected")
 | |
| 	// ErrDisconnected returns when broker disconnected
 | |
| 	ErrDisconnected = errors.New("broker disconnected")
 | |
| 	// DefaultGracefulTimeout
 | |
| 	DefaultGracefulTimeout = 5 * time.Second
 | |
| )
 | |
| 
 | |
| // Broker is an interface used for asynchronous messaging.
 | |
| type Broker interface {
 | |
| 	// Name returns broker instance name
 | |
| 	Name() string
 | |
| 	// Init initilize broker
 | |
| 	Init(opts ...Option) error
 | |
| 	// Options returns broker options
 | |
| 	Options() Options
 | |
| 	// Address return configured address
 | |
| 	Address() string
 | |
| 	// Connect connects to broker
 | |
| 	Connect(ctx context.Context) error
 | |
| 	// Disconnect disconnect from broker
 | |
| 	Disconnect(ctx context.Context) error
 | |
| 	// Publish message to broker topic
 | |
| 	Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
 | |
| 	// Subscribe subscribes to topic message via handler
 | |
| 	Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
 | |
| 	// BatchPublish messages to broker with multiple topics
 | |
| 	BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
 | |
| 	// BatchSubscribe subscribes to topic messages via handler
 | |
| 	BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
 | |
| 	// String type of broker
 | |
| 	String() string
 | |
| 	// Live returns broker liveness
 | |
| 	Live() bool
 | |
| 	// Ready returns broker readiness
 | |
| 	Ready() bool
 | |
| 	// Health returns broker health
 | |
| 	Health() bool
 | |
| }
 | |
| 
 | |
| type (
 | |
| 	FuncPublish        func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
 | |
| 	HookPublish        func(next FuncPublish) FuncPublish
 | |
| 	FuncBatchPublish   func(ctx context.Context, msgs []*Message, opts ...PublishOption) error
 | |
| 	HookBatchPublish   func(next FuncBatchPublish) FuncBatchPublish
 | |
| 	FuncSubscribe      func(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
 | |
| 	HookSubscribe      func(next FuncSubscribe) FuncSubscribe
 | |
| 	FuncBatchSubscribe func(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
 | |
| 	HookBatchSubscribe func(next FuncBatchSubscribe) FuncBatchSubscribe
 | |
| )
 | |
| 
 | |
| // Handler is used to process messages via a subscription of a topic.
 | |
| type Handler func(Event) error
 | |
| 
 | |
| // Events contains multiple events
 | |
| type Events []Event
 | |
| 
 | |
| // Ack try to ack all events and return
 | |
| func (evs Events) Ack() error {
 | |
| 	var err error
 | |
| 	for _, ev := range evs {
 | |
| 		if err = ev.Ack(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // SetError sets error on event
 | |
| func (evs Events) SetError(err error) {
 | |
| 	for _, ev := range evs {
 | |
| 		ev.SetError(err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // BatchHandler is used to process messages in batches via a subscription of a topic.
 | |
| type BatchHandler func(Events) error
 | |
| 
 | |
| // Event is given to a subscription handler for processing
 | |
| type Event interface {
 | |
| 	// Context return context.Context for event
 | |
| 	Context() context.Context
 | |
| 	// Topic returns event topic
 | |
| 	Topic() string
 | |
| 	// Message returns broker message
 | |
| 	Message() *Message
 | |
| 	// Ack acknowledge message
 | |
| 	Ack() error
 | |
| 	// Error returns message error (like decoding errors or some other)
 | |
| 	Error() error
 | |
| 	// SetError set event processing error
 | |
| 	SetError(err error)
 | |
| }
 | |
| 
 | |
| // Message is used to transfer data
 | |
| type Message struct {
 | |
| 	// Header contains message metadata
 | |
| 	Header metadata.Metadata
 | |
| 	// Body contains message body
 | |
| 	Body codec.RawMessage
 | |
| }
 | |
| 
 | |
| // NewMessage create broker message with topic filled
 | |
| func NewMessage(topic string) *Message {
 | |
| 	m := &Message{Header: metadata.New(2)}
 | |
| 	m.Header.Set(metadata.HeaderTopic, topic)
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // Subscriber is a convenience return type for the Subscribe method
 | |
| type Subscriber interface {
 | |
| 	// Options returns subscriber options
 | |
| 	Options() SubscribeOptions
 | |
| 	// Topic returns topic for subscription
 | |
| 	Topic() string
 | |
| 	// Unsubscribe from topic
 | |
| 	Unsubscribe(ctx context.Context) error
 | |
| }
 |