micro/broker/mqtt/mqtt.go

249 lines
4.6 KiB
Go
Raw Normal View History

2016-04-23 00:06:29 +01:00
package mqtt
/*
MQTT is a go-micro Broker for the MQTT protocol.
This can be integrated with any broker that supports MQTT,
including Mosquito and AWS IoT.
2016-04-25 22:15:01 +01:00
TODO: Strip encoding?
Where brokers don't support headers we're actually
encoding the broker.Message in json to simplify usage
and cross broker compatibility. To actually use the
MQTT broker more widely on the internet we may need to
support stripping the encoding.
2016-04-23 00:06:29 +01:00
Note: Because of the way the MQTT library works, when you
unsubscribe from a topic it will unsubscribe all subscribers.
TODO: Perhaps create a unique client per subscription.
Becomes slightly more difficult to track a disconnect.
*/
import (
2016-04-25 22:15:01 +01:00
"encoding/json"
2016-04-23 00:06:29 +01:00
"errors"
"fmt"
"log"
"math/rand"
"strconv"
"strings"
"time"
"github.com/eclipse/paho.mqtt.golang"
"github.com/micro/go-micro/broker"
2016-04-23 00:06:29 +01:00
)
type mqttBroker struct {
addrs []string
opts broker.Options
client mqtt.Client
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func setAddrs(addrs []string) []string {
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
var scheme string
var host string
var port int
// split on scheme
parts := strings.Split(addr, "://")
// no scheme
if len(parts) < 2 {
// default tcp scheme
scheme = "tcp"
parts = strings.Split(parts[0], ":")
// got scheme
} else {
scheme = parts[0]
parts = strings.Split(parts[1], ":")
}
// no parts
if len(parts) == 0 {
continue
}
// check scheme
switch scheme {
case "tcp", "ssl", "ws":
default:
continue
}
if len(parts) < 2 {
// no port
host = parts[0]
switch scheme {
case "tcp":
port = 1883
case "ssl":
port = 8883
case "ws":
// support secure port
port = 80
default:
port = 1883
}
// got host port
} else {
host = parts[0]
port, _ = strconv.Atoi(parts[1])
}
addr = fmt.Sprintf("%s://%s:%d", scheme, host, port)
cAddrs = append(cAddrs, addr)
}
// default an address if we have none
if len(cAddrs) == 0 {
cAddrs = []string{"tcp://127.0.0.1:1883"}
}
return cAddrs
}
func newClient(addrs []string, opts broker.Options) mqtt.Client {
// create opts
cOpts := mqtt.NewClientOptions()
2016-04-25 22:15:01 +01:00
cOpts.SetClientID(fmt.Sprintf("%d%d", time.Now().UnixNano(), rand.Intn(10)))
cOpts.SetCleanSession(false)
2016-04-23 00:06:29 +01:00
// setup tls
if opts.TLSConfig != nil {
cOpts.SetTLSConfig(opts.TLSConfig)
}
// add brokers
for _, addr := range addrs {
cOpts.AddBroker(addr)
}
return mqtt.NewClient(cOpts)
}
func newBroker(opts ...broker.Option) broker.Broker {
var options broker.Options
for _, o := range opts {
o(&options)
}
addrs := setAddrs(options.Addrs)
client := newClient(addrs, options)
return &mqttBroker{
opts: options,
client: client,
addrs: addrs,
}
}
func (m *mqttBroker) Options() broker.Options {
return m.opts
}
func (m *mqttBroker) Address() string {
return strings.Join(m.addrs, ",")
}
func (m *mqttBroker) Connect() error {
2016-04-25 22:15:01 +01:00
if m.client.IsConnected() {
return nil
}
if t := m.client.Connect(); t.Wait() && t.Error() != nil {
return t.Error()
}
2016-04-25 22:15:01 +01:00
return nil
2016-04-23 00:06:29 +01:00
}
func (m *mqttBroker) Disconnect() error {
2016-04-25 22:15:01 +01:00
if !m.client.IsConnected() {
return nil
}
2016-04-23 00:06:29 +01:00
m.client.Disconnect(0)
return nil
}
func (m *mqttBroker) Init(opts ...broker.Option) error {
if m.client.IsConnected() {
return errors.New("cannot init while connected")
}
for _, o := range opts {
o(&m.opts)
}
m.addrs = setAddrs(m.opts.Addrs)
m.client = newClient(m.addrs, m.opts)
return nil
}
func (m *mqttBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
2016-04-25 22:15:01 +01:00
if !m.client.IsConnected() {
return errors.New("not connected")
}
b, err := json.Marshal(msg)
if err != nil {
return err
}
t := m.client.Publish(topic, 1, false, b)
2016-04-23 00:06:29 +01:00
return t.Error()
}
func (m *mqttBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
2016-04-25 22:15:01 +01:00
if !m.client.IsConnected() {
return nil, errors.New("not connected")
}
2016-04-23 00:06:29 +01:00
var options broker.SubscribeOptions
for _, o := range opts {
o(&options)
}
2016-04-25 22:15:01 +01:00
t := m.client.Subscribe(topic, 1, func(c mqtt.Client, m mqtt.Message) {
var msg *broker.Message
if err := json.Unmarshal(m.Payload(), &msg); err != nil {
log.Println(err)
return
}
if err := h(&mqttPub{topic: topic, msg: msg}); err != nil {
2016-04-23 00:06:29 +01:00
log.Println(err)
}
})
if t.Wait() && t.Error() != nil {
2016-04-23 00:06:29 +01:00
return nil, t.Error()
}
return &mqttSub{
opts: options,
client: m.client,
topic: topic,
}, nil
}
func (m *mqttBroker) String() string {
return "mqtt"
}
func NewBroker(opts ...broker.Option) broker.Broker {
return newBroker(opts...)
}