180 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			180 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package mock
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/broker"
 | 
						|
	"github.com/pborman/uuid"
 | 
						|
)
 | 
						|
 | 
						|
type mockBroker struct {
 | 
						|
	opts broker.Options
 | 
						|
 | 
						|
	sync.RWMutex
 | 
						|
	connected   bool
 | 
						|
	Subscribers map[string][]*mockSubscriber
 | 
						|
}
 | 
						|
 | 
						|
type mockPublication struct {
 | 
						|
	topic   string
 | 
						|
	message *broker.Message
 | 
						|
}
 | 
						|
 | 
						|
type mockSubscriber struct {
 | 
						|
	id      string
 | 
						|
	topic   string
 | 
						|
	exit    chan bool
 | 
						|
	handler broker.Handler
 | 
						|
	opts    broker.SubscribeOptions
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Options() broker.Options {
 | 
						|
	return m.opts
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Address() string {
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Connect() error {
 | 
						|
	m.Lock()
 | 
						|
	defer m.Unlock()
 | 
						|
 | 
						|
	if m.connected {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	m.connected = true
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Disconnect() error {
 | 
						|
	m.Lock()
 | 
						|
	defer m.Unlock()
 | 
						|
 | 
						|
	if !m.connected {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	m.connected = false
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Init(opts ...broker.Option) error {
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&m.opts)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error {
 | 
						|
	m.Lock()
 | 
						|
	defer m.Unlock()
 | 
						|
 | 
						|
	if !m.connected {
 | 
						|
		return errors.New("not connected")
 | 
						|
	}
 | 
						|
 | 
						|
	subs, ok := m.Subscribers[topic]
 | 
						|
	if !ok {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	p := &mockPublication{
 | 
						|
		topic:   topic,
 | 
						|
		message: message,
 | 
						|
	}
 | 
						|
 | 
						|
	for _, sub := range subs {
 | 
						|
		if err := sub.handler(p); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | 
						|
	m.Lock()
 | 
						|
	defer m.Unlock()
 | 
						|
 | 
						|
	if !m.connected {
 | 
						|
		return nil, errors.New("not connected")
 | 
						|
	}
 | 
						|
 | 
						|
	var options broker.SubscribeOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	sub := &mockSubscriber{
 | 
						|
		exit:    make(chan bool, 1),
 | 
						|
		id:      uuid.NewUUID().String(),
 | 
						|
		topic:   topic,
 | 
						|
		handler: handler,
 | 
						|
		opts:    options,
 | 
						|
	}
 | 
						|
 | 
						|
	m.Subscribers[topic] = append(m.Subscribers[topic], sub)
 | 
						|
 | 
						|
	go func() {
 | 
						|
		<-sub.exit
 | 
						|
		m.Lock()
 | 
						|
		var newSubscribers []*mockSubscriber
 | 
						|
		for _, sb := range m.Subscribers[topic] {
 | 
						|
			if sb.id == sub.id {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			newSubscribers = append(newSubscribers, sb)
 | 
						|
		}
 | 
						|
		m.Subscribers[topic] = newSubscribers
 | 
						|
		m.Unlock()
 | 
						|
	}()
 | 
						|
 | 
						|
	return sub, nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockBroker) String() string {
 | 
						|
	return "mock"
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockPublication) Topic() string {
 | 
						|
	return m.topic
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockPublication) Message() *broker.Message {
 | 
						|
	return m.message
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockPublication) Ack() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockSubscriber) Options() broker.SubscribeOptions {
 | 
						|
	return m.opts
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockSubscriber) Topic() string {
 | 
						|
	return m.topic
 | 
						|
}
 | 
						|
 | 
						|
func (m *mockSubscriber) Unsubscribe() error {
 | 
						|
	m.exit <- true
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func NewBroker(opts ...broker.Option) broker.Broker {
 | 
						|
	var options broker.Options
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	return &mockBroker{
 | 
						|
		opts:        options,
 | 
						|
		Subscribers: make(map[string][]*mockSubscriber),
 | 
						|
	}
 | 
						|
}
 |