micro/broker/memory.go

343 lines
6.8 KiB
Go
Raw Normal View History

package broker
import (
"context"
"sync"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
maddr "go.unistack.org/micro/v3/util/addr"
"go.unistack.org/micro/v3/util/id"
mnet "go.unistack.org/micro/v3/util/net"
"go.unistack.org/micro/v3/util/rand"
)
type memoryBroker struct {
subscribers map[string][]*memorySubscriber
addr string
opts Options
sync.RWMutex
connected bool
}
type memoryEvent struct {
err error
message interface{}
topic string
opts Options
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler Handler
batchhandler BatchHandler
id string
topic string
opts SubscribeOptions
}
func (m *memoryBroker) Options() Options {
return m.opts
}
func (m *memoryBroker) Address() string {
return m.addr
}
func (m *memoryBroker) Connect(ctx context.Context) error {
m.Lock()
defer m.Unlock()
if m.connected {
return nil
}
// use 127.0.0.1 to avoid scan of all network interfaces
addr, err := maddr.Extract("127.0.0.1")
if err != nil {
return err
}
var rng rand.Rand
i := rng.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
m.addr = addr
m.connected = true
return nil
}
func (m *memoryBroker) Disconnect(ctx context.Context) error {
m.Lock()
defer m.Unlock()
if !m.connected {
return nil
}
m.connected = false
return nil
}
func (m *memoryBroker) Init(opts ...Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
msg.Header.Set(metadata.HeaderTopic, topic)
return m.publish(ctx, []*Message{msg}, opts...)
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
return m.publish(ctx, msgs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
}
m.RUnlock()
var err error
select {
case <-ctx.Done():
return ctx.Err()
default:
options := NewPublishOptions(opts...)
msgTopicMap := make(map[string]Events)
for _, v := range msgs {
p := &memoryEvent{opts: m.opts}
if m.opts.Codec == nil || options.BodyOnly {
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
p.message = v.Body
} else {
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
p.message, err = m.opts.Codec.Marshal(v)
if err != nil {
return err
}
}
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
beh := m.opts.BatchErrorHandler
eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap {
m.RLock()
subs, ok := m.subscribers[t]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
switch {
// batch processing
case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil {
ms.SetError(err)
if beh != nil {
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
// single processing
case sub.handler != nil:
for _, p := range ms {
if err = sub.handler(p); err != nil {
p.SetError(err)
if eh != nil {
_ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
}
}
}
return nil
}
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
sid, err := id.New()
if err != nil {
return nil, err
}
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: sid,
topic: topic,
batchhandler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
sid, err := id.New()
if err != nil {
return nil, err
}
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: sid,
topic: topic,
handler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) String() string {
return "memory"
}
func (m *memoryBroker) Name() string {
return m.opts.Name
}
func (m *memoryEvent) Topic() string {
return m.topic
}
func (m *memoryEvent) Message() *Message {
switch v := m.message.(type) {
case *Message:
return v
case []byte:
msg := &Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
}
return nil
}
return msg
}
return nil
}
func (m *memoryEvent) Ack() error {
return nil
}
func (m *memoryEvent) Error() error {
return m.err
}
func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts
}
func (m *memorySubscriber) Topic() string {
return m.topic
}
func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
m.exit <- true
return nil
}
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
return &memoryBroker{
opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber),
}
}