249 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			249 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package mqtt
 | |
| 
 | |
| /*
 | |
| 	MQTT is a go-micro Broker for the MQTT protocol.
 | |
| 	This can be integrated with any broker that supports MQTT,
 | |
| 	including Mosquito and AWS IoT.
 | |
| 
 | |
| 	TODO: Strip encoding?
 | |
| 	Where brokers don't support headers we're actually
 | |
| 	encoding the broker.Message in json to simplify usage
 | |
| 	and cross broker compatibility. To actually use the
 | |
| 	MQTT broker more widely on the internet we may need to
 | |
| 	support stripping the encoding.
 | |
| 
 | |
| 	Note: Because of the way the MQTT library works, when you
 | |
| 	unsubscribe from a topic it will unsubscribe all subscribers.
 | |
| 	TODO: Perhaps create a unique client per subscription.
 | |
| 	Becomes slightly more difficult to track a disconnect.
 | |
| 
 | |
| */
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"math/rand"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/eclipse/paho.mqtt.golang"
 | |
| 	"github.com/micro/go-micro/broker"
 | |
| )
 | |
| 
 | |
| type mqttBroker struct {
 | |
| 	addrs  []string
 | |
| 	opts   broker.Options
 | |
| 	client mqtt.Client
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	rand.Seed(time.Now().UnixNano())
 | |
| }
 | |
| 
 | |
| func setAddrs(addrs []string) []string {
 | |
| 	var cAddrs []string
 | |
| 
 | |
| 	for _, addr := range addrs {
 | |
| 		if len(addr) == 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		var scheme string
 | |
| 		var host string
 | |
| 		var port int
 | |
| 
 | |
| 		// split on scheme
 | |
| 		parts := strings.Split(addr, "://")
 | |
| 
 | |
| 		// no scheme
 | |
| 		if len(parts) < 2 {
 | |
| 			// default tcp scheme
 | |
| 			scheme = "tcp"
 | |
| 			parts = strings.Split(parts[0], ":")
 | |
| 			// got scheme
 | |
| 		} else {
 | |
| 			scheme = parts[0]
 | |
| 			parts = strings.Split(parts[1], ":")
 | |
| 		}
 | |
| 
 | |
| 		// no parts
 | |
| 		if len(parts) == 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// check scheme
 | |
| 		switch scheme {
 | |
| 		case "tcp", "ssl", "ws":
 | |
| 		default:
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if len(parts) < 2 {
 | |
| 			// no port
 | |
| 			host = parts[0]
 | |
| 
 | |
| 			switch scheme {
 | |
| 			case "tcp":
 | |
| 				port = 1883
 | |
| 			case "ssl":
 | |
| 				port = 8883
 | |
| 			case "ws":
 | |
| 				// support secure port
 | |
| 				port = 80
 | |
| 			default:
 | |
| 				port = 1883
 | |
| 			}
 | |
| 			// got host port
 | |
| 		} else {
 | |
| 			host = parts[0]
 | |
| 			port, _ = strconv.Atoi(parts[1])
 | |
| 		}
 | |
| 
 | |
| 		addr = fmt.Sprintf("%s://%s:%d", scheme, host, port)
 | |
| 		cAddrs = append(cAddrs, addr)
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	// default an address if we have none
 | |
| 	if len(cAddrs) == 0 {
 | |
| 		cAddrs = []string{"tcp://127.0.0.1:1883"}
 | |
| 	}
 | |
| 
 | |
| 	return cAddrs
 | |
| }
 | |
| 
 | |
| func newClient(addrs []string, opts broker.Options) mqtt.Client {
 | |
| 	// create opts
 | |
| 	cOpts := mqtt.NewClientOptions()
 | |
| 	cOpts.SetClientID(fmt.Sprintf("%d%d", time.Now().UnixNano(), rand.Intn(10)))
 | |
| 	cOpts.SetCleanSession(false)
 | |
| 
 | |
| 	// setup tls
 | |
| 	if opts.TLSConfig != nil {
 | |
| 		cOpts.SetTLSConfig(opts.TLSConfig)
 | |
| 	}
 | |
| 
 | |
| 	// add brokers
 | |
| 	for _, addr := range addrs {
 | |
| 		cOpts.AddBroker(addr)
 | |
| 	}
 | |
| 
 | |
| 	return mqtt.NewClient(cOpts)
 | |
| }
 | |
| 
 | |
| func newBroker(opts ...broker.Option) broker.Broker {
 | |
| 	var options broker.Options
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	addrs := setAddrs(options.Addrs)
 | |
| 	client := newClient(addrs, options)
 | |
| 
 | |
| 	return &mqttBroker{
 | |
| 		opts:   options,
 | |
| 		client: client,
 | |
| 		addrs:  addrs,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Options() broker.Options {
 | |
| 	return m.opts
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Address() string {
 | |
| 	return strings.Join(m.addrs, ",")
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Connect() error {
 | |
| 	if m.client.IsConnected() {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if t := m.client.Connect(); t.Wait() && t.Error() != nil {
 | |
| 		return t.Error()
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Disconnect() error {
 | |
| 	if !m.client.IsConnected() {
 | |
| 		return nil
 | |
| 	}
 | |
| 	m.client.Disconnect(0)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Init(opts ...broker.Option) error {
 | |
| 	if m.client.IsConnected() {
 | |
| 		return errors.New("cannot init while connected")
 | |
| 	}
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&m.opts)
 | |
| 	}
 | |
| 
 | |
| 	m.addrs = setAddrs(m.opts.Addrs)
 | |
| 	m.client = newClient(m.addrs, m.opts)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
 | |
| 	if !m.client.IsConnected() {
 | |
| 		return errors.New("not connected")
 | |
| 	}
 | |
| 
 | |
| 	b, err := json.Marshal(msg)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	t := m.client.Publish(topic, 1, false, b)
 | |
| 	return t.Error()
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | |
| 	if !m.client.IsConnected() {
 | |
| 		return nil, errors.New("not connected")
 | |
| 	}
 | |
| 
 | |
| 	var options broker.SubscribeOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	t := m.client.Subscribe(topic, 1, func(c mqtt.Client, m mqtt.Message) {
 | |
| 		var msg *broker.Message
 | |
| 		if err := json.Unmarshal(m.Payload(), &msg); err != nil {
 | |
| 			log.Println(err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		if err := h(&mqttPub{topic: topic, msg: msg}); err != nil {
 | |
| 			log.Println(err)
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	if t.Wait() && t.Error() != nil {
 | |
| 		return nil, t.Error()
 | |
| 	}
 | |
| 
 | |
| 	return &mqttSub{
 | |
| 		opts:   options,
 | |
| 		client: m.client,
 | |
| 		topic:  topic,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (m *mqttBroker) String() string {
 | |
| 	return "mqtt"
 | |
| }
 | |
| 
 | |
| func NewBroker(opts ...broker.Option) broker.Broker {
 | |
| 	return newBroker(opts...)
 | |
| }
 |