diff --git a/broker/mqtt/mqtt.go b/broker/mqtt/mqtt.go index 7b4e917c..13bf21a5 100644 --- a/broker/mqtt/mqtt.go +++ b/broker/mqtt/mqtt.go @@ -5,11 +5,12 @@ package mqtt This can be integrated with any broker that supports MQTT, including Mosquito and AWS IoT. - TODO: Add encoding - Usually we'll translate Message headers over to - the equivalent in a broker. In MQTT we don't have that - and we don't want to assume data format because the whole - point is that it could be anything. So we'll defer for now. + TODO: Strip encoding? + Where brokers don't support headers we're actually + encoding the broker.Message in json to simplify usage + and cross broker compatibility. To actually use the + MQTT broker more widely on the internet we may need to + support stripping the encoding. Note: Because of the way the MQTT library works, when you unsubscribe from a topic it will unsubscribe all subscribers. @@ -19,6 +20,7 @@ package mqtt */ import ( + "encoding/json" "errors" "fmt" "log" @@ -116,8 +118,8 @@ func setAddrs(addrs []string) []string { func newClient(addrs []string, opts broker.Options) mqtt.Client { // create opts cOpts := mqtt.NewClientOptions() - cOpts.SetAutoReconnect(true) - cOpts.SetClientID(fmt.Sprintf("%d-%d", rand.Intn(10), time.Now().UnixNano())) + cOpts.SetClientID(fmt.Sprintf("%d%d", time.Now().UnixNano(), rand.Intn(10))) + cOpts.SetCleanSession(false) // setup tls if opts.TLSConfig != nil { @@ -157,13 +159,21 @@ func (m *mqttBroker) Address() string { } func (m *mqttBroker) Connect() error { + if m.client.IsConnected() { + return nil + } + if t := m.client.Connect(); t.Wait() && t.Error() != nil { return t.Error() } + return nil } func (m *mqttBroker) Disconnect() error { + if !m.client.IsConnected() { + return nil + } m.client.Disconnect(0) return nil } @@ -183,19 +193,37 @@ func (m *mqttBroker) Init(opts ...broker.Option) error { } func (m *mqttBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { - // TODO: Support encoding to preserve headers - t := m.client.Publish(topic, 0, false, msg.Body) + if !m.client.IsConnected() { + return errors.New("not connected") + } + + b, err := json.Marshal(msg) + if err != nil { + return err + } + + t := m.client.Publish(topic, 1, false, b) return t.Error() } func (m *mqttBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + if !m.client.IsConnected() { + return nil, errors.New("not connected") + } + var options broker.SubscribeOptions for _, o := range opts { o(&options) } - t := m.client.Subscribe(topic, 0, func(c mqtt.Client, m mqtt.Message) { - if err := h(&mqttPub{msg: m}); err != nil { + t := m.client.Subscribe(topic, 1, func(c mqtt.Client, m mqtt.Message) { + var msg *broker.Message + if err := json.Unmarshal(m.Payload(), &msg); err != nil { + log.Println(err) + return + } + + if err := h(&mqttPub{topic: topic, msg: msg}); err != nil { log.Println(err) } }) diff --git a/broker/mqtt/mqtt_handler.go b/broker/mqtt/mqtt_handler.go index 6300f8d9..00ff30e5 100644 --- a/broker/mqtt/mqtt_handler.go +++ b/broker/mqtt/mqtt_handler.go @@ -7,7 +7,8 @@ import ( // mqttPub is a broker.Publication type mqttPub struct { - msg mqtt.Message + topic string + msg *broker.Message } // mqttPub is a broker.Subscriber @@ -22,14 +23,11 @@ func (m *mqttPub) Ack() error { } func (m *mqttPub) Topic() string { - return m.msg.Topic() + return m.topic } func (m *mqttPub) Message() *broker.Message { - // TODO: Support encoding to preserve headers - return &broker.Message{ - Body: m.msg.Payload(), - } + return m.msg } func (m *mqttSub) Options() broker.SubscribeOptions { diff --git a/broker/mqtt/mqtt_test.go b/broker/mqtt/mqtt_test.go index 50cd87d4..6d7fd1cd 100644 --- a/broker/mqtt/mqtt_test.go +++ b/broker/mqtt/mqtt_test.go @@ -33,7 +33,8 @@ func TestMQTTMock(t *testing.T) { func TestMQTTHandler(t *testing.T) { p := &mqttPub{ - msg: newMockMessage("mock", 0, false, []byte(`hello`)), + topic: "mock", + msg: &broker.Message{Body: []byte(`hello`)}, } if p.Topic() != "mock" {