micro/broker/broker.go

81 lines
2.5 KiB
Go
Raw Permalink Normal View History

2016-12-14 18:41:48 +03:00
// Package broker is an interface used for asynchronous messaging
package broker // import "go.unistack.org/micro/v4/broker"
import (
"context"
"errors"
2024-04-07 20:48:47 +03:00
"time"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/options"
)
// DefaultBroker default memory broker
var DefaultBroker Broker = NewBroker()
var (
// ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected")
// ErrInvalidMessage returns when message has nvalid format
ErrInvalidMessage = errors.New("broker message has invalid format")
2024-04-07 20:48:47 +03:00
// DefaultGracefulTimeout
DefaultGracefulTimeout = 5 * time.Second
)
2016-01-31 00:18:57 +03:00
// Broker is an interface used for asynchronous messaging.
type Broker interface {
// Name returns broker instance name
Name() string
// Init initilize broker
Init(opts ...options.Option) error
// Options returns broker options
Options() Options
// Address return configured address
2019-07-10 21:58:30 +03:00
Address() string
// Connect connects to broker
Connect(ctx context.Context) error
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// Publish message, msg can be single broker.Message or []broker.Message
Publish(ctx context.Context, msg interface{}, opts ...options.Option) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error)
// String type of broker
2015-12-20 00:56:14 +03:00
String() string
}
// Message is given to a subscription handler for processing
type Message interface {
// Context for the message
Context() context.Context
// Topic
Topic() string
// Header returns message headers
Header() metadata.Metadata
// Body returns broker message may be []byte slice or some go struct
Body() interface{}
// Ack acknowledge message
Ack() error
// Error returns message error (like decoding errors or some other)
// In this case Body contains raw []byte from broker
Error() error
}
// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface {
// Options returns subscriber options
Options() SubscribeOptions
// Topic returns topic for subscription
Topic() string
// Unsubscribe from topic
Unsubscribe(ctx context.Context) error
}
// MessageHandler func signature for single message processing
type MessageHandler func(Message) error
// MessagesHandler func signature for batch message processing
type MessagesHandler func([]Message) error