Commit fixes for MQTT
This commit is contained in:
		| @@ -5,11 +5,12 @@ package mqtt | ||||
| 	This can be integrated with any broker that supports MQTT, | ||||
| 	including Mosquito and AWS IoT. | ||||
|  | ||||
| 	TODO: Add encoding | ||||
| 	Usually we'll translate Message headers over to | ||||
| 	the equivalent in a broker. In MQTT we don't have that | ||||
| 	and we don't want to assume data format because the whole | ||||
| 	point is that it could be anything. So we'll defer for now. | ||||
| 	TODO: Strip encoding? | ||||
| 	Where brokers don't support headers we're actually | ||||
| 	encoding the broker.Message in json to simplify usage | ||||
| 	and cross broker compatibility. To actually use the | ||||
| 	MQTT broker more widely on the internet we may need to | ||||
| 	support stripping the encoding. | ||||
|  | ||||
| 	Note: Because of the way the MQTT library works, when you | ||||
| 	unsubscribe from a topic it will unsubscribe all subscribers. | ||||
| @@ -19,6 +20,7 @@ package mqtt | ||||
| */ | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| @@ -116,8 +118,8 @@ func setAddrs(addrs []string) []string { | ||||
| func newClient(addrs []string, opts broker.Options) mqtt.Client { | ||||
| 	// create opts | ||||
| 	cOpts := mqtt.NewClientOptions() | ||||
| 	cOpts.SetAutoReconnect(true) | ||||
| 	cOpts.SetClientID(fmt.Sprintf("%d-%d", rand.Intn(10), time.Now().UnixNano())) | ||||
| 	cOpts.SetClientID(fmt.Sprintf("%d%d", time.Now().UnixNano(), rand.Intn(10))) | ||||
| 	cOpts.SetCleanSession(false) | ||||
|  | ||||
| 	// setup tls | ||||
| 	if opts.TLSConfig != nil { | ||||
| @@ -157,13 +159,21 @@ func (m *mqttBroker) Address() string { | ||||
| } | ||||
|  | ||||
| func (m *mqttBroker) Connect() error { | ||||
| 	if m.client.IsConnected() { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if t := m.client.Connect(); t.Wait() && t.Error() != nil { | ||||
| 		return t.Error() | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *mqttBroker) Disconnect() error { | ||||
| 	if !m.client.IsConnected() { | ||||
| 		return nil | ||||
| 	} | ||||
| 	m.client.Disconnect(0) | ||||
| 	return nil | ||||
| } | ||||
| @@ -183,19 +193,37 @@ func (m *mqttBroker) Init(opts ...broker.Option) error { | ||||
| } | ||||
|  | ||||
| func (m *mqttBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { | ||||
| 	// TODO: Support encoding to preserve headers | ||||
| 	t := m.client.Publish(topic, 0, false, msg.Body) | ||||
| 	if !m.client.IsConnected() { | ||||
| 		return errors.New("not connected") | ||||
| 	} | ||||
|  | ||||
| 	b, err := json.Marshal(msg) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	t := m.client.Publish(topic, 1, false, b) | ||||
| 	return t.Error() | ||||
| } | ||||
|  | ||||
| func (m *mqttBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { | ||||
| 	if !m.client.IsConnected() { | ||||
| 		return nil, errors.New("not connected") | ||||
| 	} | ||||
|  | ||||
| 	var options broker.SubscribeOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	t := m.client.Subscribe(topic, 0, func(c mqtt.Client, m mqtt.Message) { | ||||
| 		if err := h(&mqttPub{msg: m}); err != nil { | ||||
| 	t := m.client.Subscribe(topic, 1, func(c mqtt.Client, m mqtt.Message) { | ||||
| 		var msg *broker.Message | ||||
| 		if err := json.Unmarshal(m.Payload(), &msg); err != nil { | ||||
| 			log.Println(err) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		if err := h(&mqttPub{topic: topic, msg: msg}); err != nil { | ||||
| 			log.Println(err) | ||||
| 		} | ||||
| 	}) | ||||
|   | ||||
| @@ -7,7 +7,8 @@ import ( | ||||
|  | ||||
| // mqttPub is a broker.Publication | ||||
| type mqttPub struct { | ||||
| 	msg mqtt.Message | ||||
| 	topic string | ||||
| 	msg   *broker.Message | ||||
| } | ||||
|  | ||||
| // mqttPub is a broker.Subscriber | ||||
| @@ -22,14 +23,11 @@ func (m *mqttPub) Ack() error { | ||||
| } | ||||
|  | ||||
| func (m *mqttPub) Topic() string { | ||||
| 	return m.msg.Topic() | ||||
| 	return m.topic | ||||
| } | ||||
|  | ||||
| func (m *mqttPub) Message() *broker.Message { | ||||
| 	// TODO: Support encoding to preserve headers | ||||
| 	return &broker.Message{ | ||||
| 		Body: m.msg.Payload(), | ||||
| 	} | ||||
| 	return m.msg | ||||
| } | ||||
|  | ||||
| func (m *mqttSub) Options() broker.SubscribeOptions { | ||||
|   | ||||
| @@ -33,7 +33,8 @@ func TestMQTTMock(t *testing.T) { | ||||
|  | ||||
| func TestMQTTHandler(t *testing.T) { | ||||
| 	p := &mqttPub{ | ||||
| 		msg: newMockMessage("mock", 0, false, []byte(`hello`)), | ||||
| 		topic: "mock", | ||||
| 		msg:   &broker.Message{Body: []byte(`hello`)}, | ||||
| 	} | ||||
|  | ||||
| 	if p.Topic() != "mock" { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user