diff --git a/broker/mqtt/mqtt_mock.go b/broker/mqtt/mqtt_mock.go new file mode 100644 index 00000000..748de41a --- /dev/null +++ b/broker/mqtt/mqtt_mock.go @@ -0,0 +1,171 @@ +package mqtt + +import ( + "math/rand" + "sync" + "time" + + "github.com/eclipse/paho.mqtt.golang" +) + +type mockClient struct { + sync.Mutex + connected bool + exit chan bool + + subs map[string][]mqtt.MessageHandler +} + +type mockMessage struct { + id uint16 + topic string + qos byte + retained bool + payload interface{} +} + +var ( + _ mqtt.Client = newMockClient() + _ mqtt.Message = newMockMessage("mock", 0, false, nil) +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func newMockClient() mqtt.Client { + return &mockClient{ + subs: make(map[string][]mqtt.MessageHandler), + } +} + +func newMockMessage(topic string, qos byte, retained bool, payload interface{}) mqtt.Message { + return &mockMessage{ + id: uint16(rand.Int()), + topic: topic, + qos: qos, + retained: retained, + payload: payload, + } +} + +func (m *mockMessage) Duplicate() bool { + return false +} + +func (m *mockMessage) Qos() byte { + return m.qos +} + +func (m *mockMessage) Retained() bool { + return m.retained +} + +func (m *mockMessage) Topic() string { + return m.topic +} + +func (m *mockMessage) MessageID() uint16 { + return m.id +} + +func (m *mockMessage) Payload() []byte { + return m.payload.([]byte) +} + +func (m *mockClient) IsConnected() bool { + m.Lock() + defer m.Unlock() + return m.connected +} + +func (m *mockClient) Connect() mqtt.Token { + m.Lock() + defer m.Unlock() + + if m.connected { + return nil + } + + m.connected = true + m.exit = make(chan bool) + return &mqtt.ConnectToken{} +} + +func (m *mockClient) Disconnect(uint) { + m.Lock() + defer m.Unlock() + + if !m.connected { + return + } + + m.connected = false + + select { + case <-m.exit: + return + default: + close(m.exit) + } +} + +func (m *mockClient) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { + m.Lock() + defer m.Unlock() + + if !m.connected { + return nil + } + + msg := newMockMessage(topic, qos, retained, payload) + + for _, sub := range m.subs[topic] { + sub(m, msg) + } + + return &mqtt.PublishToken{} +} + +func (m *mockClient) Subscribe(topic string, qos byte, h mqtt.MessageHandler) mqtt.Token { + m.Lock() + defer m.Unlock() + + if !m.connected { + return nil + } + + m.subs[topic] = append(m.subs[topic], h) + + return &mqtt.SubscribeToken{} +} + +func (m *mockClient) SubscribeMultiple(topics map[string]byte, h mqtt.MessageHandler) mqtt.Token { + m.Lock() + defer m.Unlock() + + if !m.connected { + return nil + } + + for topic, _ := range topics { + m.subs[topic] = append(m.subs[topic], h) + } + + return &mqtt.SubscribeToken{} +} + +func (m *mockClient) Unsubscribe(topics ...string) mqtt.Token { + m.Lock() + defer m.Unlock() + + if !m.connected { + return nil + } + + for _, topic := range topics { + delete(m.subs, topic) + } + + return &mqtt.UnsubscribeToken{} +} diff --git a/broker/mqtt/mqtt_test.go b/broker/mqtt/mqtt_test.go new file mode 100644 index 00000000..50cd87d4 --- /dev/null +++ b/broker/mqtt/mqtt_test.go @@ -0,0 +1,88 @@ +package mqtt + +import ( + "testing" + + "github.com/eclipse/paho.mqtt.golang" + "github.com/micro/go-micro/broker" +) + +func TestMQTTMock(t *testing.T) { + c := newMockClient() + + if tk := c.Connect(); tk == nil { + t.Fatal("got nil token") + } + + if tk := c.Subscribe("mock", 0, func(cm mqtt.Client, m mqtt.Message) { + t.Logf("Received payload %+v", string(m.Payload())) + }); tk == nil { + t.Fatal("got nil token") + } + + if tk := c.Publish("mock", 0, false, []byte(`hello world`)); tk == nil { + t.Fatal("got nil token") + } + + if tk := c.Unsubscribe("mock"); tk == nil { + t.Fatal("got nil token") + } + + c.Disconnect(0) +} + +func TestMQTTHandler(t *testing.T) { + p := &mqttPub{ + msg: newMockMessage("mock", 0, false, []byte(`hello`)), + } + + if p.Topic() != "mock" { + t.Fatal("Expected topic mock got", p.Topic()) + } + + if string(p.Message().Body) != "hello" { + t.Fatal("Expected `hello` message got %s", string(p.Message().Body)) + } + + s := &mqttSub{ + topic: "mock", + client: newMockClient(), + } + + s.client.Connect() + + if s.Topic() != "mock" { + t.Fatal("Expected topic mock got", s.Topic()) + } + + if err := s.Unsubscribe(); err != nil { + t.Fatal("Error unsubscribing", err) + } + + s.client.Disconnect(0) +} + +func TestMQTT(t *testing.T) { + b := NewBroker() + + if err := b.Init(); err != nil { + t.Fatal(err) + } + + // use mock client + b.(*mqttBroker).client = newMockClient() + + if tk := b.(*mqttBroker).client.Connect(); tk == nil { + t.Fatal("got nil token") + } + + if err := b.Publish("mock", &broker.Message{Body: []byte(`hello`)}); err != nil { + t.Fatal(err) + } + + if err := b.Disconnect(); err != nil { + t.Fatal(err) + } + + b.(*mqttBroker).client.Disconnect(0) +}