343 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			343 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package broker
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 
 | |
| 	"go.unistack.org/micro/v3/logger"
 | |
| 	"go.unistack.org/micro/v3/metadata"
 | |
| 	maddr "go.unistack.org/micro/v3/util/addr"
 | |
| 	"go.unistack.org/micro/v3/util/id"
 | |
| 	mnet "go.unistack.org/micro/v3/util/net"
 | |
| 	"go.unistack.org/micro/v3/util/rand"
 | |
| )
 | |
| 
 | |
| type memoryBroker struct {
 | |
| 	subscribers map[string][]*memorySubscriber
 | |
| 	addr        string
 | |
| 	opts        Options
 | |
| 	sync.RWMutex
 | |
| 	connected bool
 | |
| }
 | |
| 
 | |
| type memoryEvent struct {
 | |
| 	err     error
 | |
| 	message interface{}
 | |
| 	topic   string
 | |
| 	opts    Options
 | |
| }
 | |
| 
 | |
| type memorySubscriber struct {
 | |
| 	ctx          context.Context
 | |
| 	exit         chan bool
 | |
| 	handler      Handler
 | |
| 	batchhandler BatchHandler
 | |
| 	id           string
 | |
| 	topic        string
 | |
| 	opts         SubscribeOptions
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Options() Options {
 | |
| 	return m.opts
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Address() string {
 | |
| 	return m.addr
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Connect(ctx context.Context) error {
 | |
| 	m.Lock()
 | |
| 	defer m.Unlock()
 | |
| 
 | |
| 	if m.connected {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// use 127.0.0.1 to avoid scan of all network interfaces
 | |
| 	addr, err := maddr.Extract("127.0.0.1")
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	var rng rand.Rand
 | |
| 	i := rng.Intn(20000)
 | |
| 	// set addr with port
 | |
| 	addr = mnet.HostPort(addr, 10000+i)
 | |
| 
 | |
| 	m.addr = addr
 | |
| 	m.connected = true
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Disconnect(ctx context.Context) error {
 | |
| 	m.Lock()
 | |
| 	defer m.Unlock()
 | |
| 
 | |
| 	if !m.connected {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	m.connected = false
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Init(opts ...Option) error {
 | |
| 	for _, o := range opts {
 | |
| 		o(&m.opts)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
 | |
| 	msg.Header.Set(metadata.HeaderTopic, topic)
 | |
| 	return m.publish(ctx, []*Message{msg}, opts...)
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
 | |
| 	return m.publish(ctx, msgs, opts...)
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
 | |
| 	m.RLock()
 | |
| 	if !m.connected {
 | |
| 		m.RUnlock()
 | |
| 		return ErrNotConnected
 | |
| 	}
 | |
| 	m.RUnlock()
 | |
| 
 | |
| 	var err error
 | |
| 
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	default:
 | |
| 		options := NewPublishOptions(opts...)
 | |
| 
 | |
| 		msgTopicMap := make(map[string]Events)
 | |
| 		for _, v := range msgs {
 | |
| 			p := &memoryEvent{opts: m.opts}
 | |
| 
 | |
| 			if m.opts.Codec == nil || options.BodyOnly {
 | |
| 				p.topic, _ = v.Header.Get(metadata.HeaderTopic)
 | |
| 				p.message = v.Body
 | |
| 			} else {
 | |
| 				p.topic, _ = v.Header.Get(metadata.HeaderTopic)
 | |
| 				p.message, err = m.opts.Codec.Marshal(v)
 | |
| 				if err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 			msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
 | |
| 		}
 | |
| 
 | |
| 		beh := m.opts.BatchErrorHandler
 | |
| 		eh := m.opts.ErrorHandler
 | |
| 
 | |
| 		for t, ms := range msgTopicMap {
 | |
| 			m.RLock()
 | |
| 			subs, ok := m.subscribers[t]
 | |
| 			m.RUnlock()
 | |
| 			if !ok {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			for _, sub := range subs {
 | |
| 				if sub.opts.BatchErrorHandler != nil {
 | |
| 					beh = sub.opts.BatchErrorHandler
 | |
| 				}
 | |
| 				if sub.opts.ErrorHandler != nil {
 | |
| 					eh = sub.opts.ErrorHandler
 | |
| 				}
 | |
| 
 | |
| 				switch {
 | |
| 				// batch processing
 | |
| 				case sub.batchhandler != nil:
 | |
| 					if err = sub.batchhandler(ms); err != nil {
 | |
| 						ms.SetError(err)
 | |
| 						if beh != nil {
 | |
| 							_ = beh(ms)
 | |
| 						} else if m.opts.Logger.V(logger.ErrorLevel) {
 | |
| 							m.opts.Logger.Error(m.opts.Context, err.Error())
 | |
| 						}
 | |
| 					} else if sub.opts.AutoAck {
 | |
| 						if err = ms.Ack(); err != nil {
 | |
| 							m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
 | |
| 						}
 | |
| 					}
 | |
| 					// single processing
 | |
| 				case sub.handler != nil:
 | |
| 					for _, p := range ms {
 | |
| 						if err = sub.handler(p); err != nil {
 | |
| 							p.SetError(err)
 | |
| 							if eh != nil {
 | |
| 								_ = eh(p)
 | |
| 							} else if m.opts.Logger.V(logger.ErrorLevel) {
 | |
| 								m.opts.Logger.Error(m.opts.Context, err.Error())
 | |
| 							}
 | |
| 						} else if sub.opts.AutoAck {
 | |
| 							if err = p.Ack(); err != nil {
 | |
| 								m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
 | |
| 							}
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
 | |
| 	m.RLock()
 | |
| 	if !m.connected {
 | |
| 		m.RUnlock()
 | |
| 		return nil, ErrNotConnected
 | |
| 	}
 | |
| 	m.RUnlock()
 | |
| 
 | |
| 	sid, err := id.New()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	options := NewSubscribeOptions(opts...)
 | |
| 
 | |
| 	sub := &memorySubscriber{
 | |
| 		exit:         make(chan bool, 1),
 | |
| 		id:           sid,
 | |
| 		topic:        topic,
 | |
| 		batchhandler: handler,
 | |
| 		opts:         options,
 | |
| 		ctx:          ctx,
 | |
| 	}
 | |
| 
 | |
| 	m.Lock()
 | |
| 	m.subscribers[topic] = append(m.subscribers[topic], sub)
 | |
| 	m.Unlock()
 | |
| 
 | |
| 	go func() {
 | |
| 		<-sub.exit
 | |
| 		m.Lock()
 | |
| 		newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
 | |
| 		for _, sb := range m.subscribers[topic] {
 | |
| 			if sb.id == sub.id {
 | |
| 				continue
 | |
| 			}
 | |
| 			newSubscribers = append(newSubscribers, sb)
 | |
| 		}
 | |
| 		m.subscribers[topic] = newSubscribers
 | |
| 		m.Unlock()
 | |
| 	}()
 | |
| 
 | |
| 	return sub, nil
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
 | |
| 	m.RLock()
 | |
| 	if !m.connected {
 | |
| 		m.RUnlock()
 | |
| 		return nil, ErrNotConnected
 | |
| 	}
 | |
| 	m.RUnlock()
 | |
| 
 | |
| 	sid, err := id.New()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	options := NewSubscribeOptions(opts...)
 | |
| 
 | |
| 	sub := &memorySubscriber{
 | |
| 		exit:    make(chan bool, 1),
 | |
| 		id:      sid,
 | |
| 		topic:   topic,
 | |
| 		handler: handler,
 | |
| 		opts:    options,
 | |
| 		ctx:     ctx,
 | |
| 	}
 | |
| 
 | |
| 	m.Lock()
 | |
| 	m.subscribers[topic] = append(m.subscribers[topic], sub)
 | |
| 	m.Unlock()
 | |
| 
 | |
| 	go func() {
 | |
| 		<-sub.exit
 | |
| 		m.Lock()
 | |
| 		newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
 | |
| 		for _, sb := range m.subscribers[topic] {
 | |
| 			if sb.id == sub.id {
 | |
| 				continue
 | |
| 			}
 | |
| 			newSubscribers = append(newSubscribers, sb)
 | |
| 		}
 | |
| 		m.subscribers[topic] = newSubscribers
 | |
| 		m.Unlock()
 | |
| 	}()
 | |
| 
 | |
| 	return sub, nil
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) String() string {
 | |
| 	return "memory"
 | |
| }
 | |
| 
 | |
| func (m *memoryBroker) Name() string {
 | |
| 	return m.opts.Name
 | |
| }
 | |
| 
 | |
| func (m *memoryEvent) Topic() string {
 | |
| 	return m.topic
 | |
| }
 | |
| 
 | |
| func (m *memoryEvent) Message() *Message {
 | |
| 	switch v := m.message.(type) {
 | |
| 	case *Message:
 | |
| 		return v
 | |
| 	case []byte:
 | |
| 		msg := &Message{}
 | |
| 		if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
 | |
| 			if m.opts.Logger.V(logger.ErrorLevel) {
 | |
| 				m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 		return msg
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryEvent) Ack() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryEvent) Error() error {
 | |
| 	return m.err
 | |
| }
 | |
| 
 | |
| func (m *memoryEvent) SetError(err error) {
 | |
| 	m.err = err
 | |
| }
 | |
| 
 | |
| func (m *memorySubscriber) Options() SubscribeOptions {
 | |
| 	return m.opts
 | |
| }
 | |
| 
 | |
| func (m *memorySubscriber) Topic() string {
 | |
| 	return m.topic
 | |
| }
 | |
| 
 | |
| func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
 | |
| 	m.exit <- true
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewBroker return new memory broker
 | |
| func NewBroker(opts ...Option) Broker {
 | |
| 	return &memoryBroker{
 | |
| 		opts:        NewOptions(opts...),
 | |
| 		subscribers: make(map[string][]*memorySubscriber),
 | |
| 	}
 | |
| }
 |