269 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			269 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package broker provides a go-micro/broker handler
 | 
						|
package broker
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"io/ioutil"
 | 
						|
	"net/http"
 | 
						|
	"net/url"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/gorilla/websocket"
 | 
						|
	"github.com/micro/go-micro/v2/api/handler"
 | 
						|
	"github.com/micro/go-micro/v2/broker"
 | 
						|
	"github.com/micro/go-micro/v2/util/log"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	Handler = "broker"
 | 
						|
 | 
						|
	pingTime      = (readDeadline * 9) / 10
 | 
						|
	readLimit     = 16384
 | 
						|
	readDeadline  = 60 * time.Second
 | 
						|
	writeDeadline = 10 * time.Second
 | 
						|
)
 | 
						|
 | 
						|
type brokerHandler struct {
 | 
						|
	opts handler.Options
 | 
						|
	u    websocket.Upgrader
 | 
						|
}
 | 
						|
 | 
						|
type conn struct {
 | 
						|
	b     broker.Broker
 | 
						|
	cType string
 | 
						|
	topic string
 | 
						|
	queue string
 | 
						|
	exit  chan bool
 | 
						|
 | 
						|
	sync.Mutex
 | 
						|
	ws *websocket.Conn
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	once        sync.Once
 | 
						|
	contentType = "text/plain"
 | 
						|
)
 | 
						|
 | 
						|
func checkOrigin(r *http.Request) bool {
 | 
						|
	origin := r.Header["Origin"]
 | 
						|
	if len(origin) == 0 {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	u, err := url.Parse(origin[0])
 | 
						|
	if err != nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	return u.Host == r.Host
 | 
						|
}
 | 
						|
 | 
						|
func (c *conn) close() {
 | 
						|
	select {
 | 
						|
	case <-c.exit:
 | 
						|
		return
 | 
						|
	default:
 | 
						|
		close(c.exit)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *conn) readLoop() {
 | 
						|
	defer func() {
 | 
						|
		c.close()
 | 
						|
		c.ws.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	// set read limit/deadline
 | 
						|
	c.ws.SetReadLimit(readLimit)
 | 
						|
	c.ws.SetReadDeadline(time.Now().Add(readDeadline))
 | 
						|
 | 
						|
	// set close handler
 | 
						|
	ch := c.ws.CloseHandler()
 | 
						|
	c.ws.SetCloseHandler(func(code int, text string) error {
 | 
						|
		err := ch(code, text)
 | 
						|
		c.close()
 | 
						|
		return err
 | 
						|
	})
 | 
						|
 | 
						|
	// set pong handler
 | 
						|
	c.ws.SetPongHandler(func(string) error {
 | 
						|
		c.ws.SetReadDeadline(time.Now().Add(readDeadline))
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
 | 
						|
	for {
 | 
						|
		_, message, err := c.ws.ReadMessage()
 | 
						|
		if err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		c.b.Publish(c.topic, &broker.Message{
 | 
						|
			Header: map[string]string{"Content-Type": c.cType},
 | 
						|
			Body:   message,
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *conn) write(mType int, data []byte) error {
 | 
						|
	c.Lock()
 | 
						|
	c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
 | 
						|
	err := c.ws.WriteMessage(mType, data)
 | 
						|
	c.Unlock()
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (c *conn) writeLoop() {
 | 
						|
	ticker := time.NewTicker(pingTime)
 | 
						|
 | 
						|
	var opts []broker.SubscribeOption
 | 
						|
 | 
						|
	if len(c.queue) > 0 {
 | 
						|
		opts = append(opts, broker.Queue(c.queue))
 | 
						|
	}
 | 
						|
 | 
						|
	subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
 | 
						|
		b, err := json.Marshal(p.Message())
 | 
						|
		if err != nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return c.write(websocket.TextMessage, b)
 | 
						|
	}, opts...)
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		subscriber.Unsubscribe()
 | 
						|
		ticker.Stop()
 | 
						|
		c.ws.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		log.Log(err.Error())
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ticker.C:
 | 
						|
			if err := c.write(websocket.PingMessage, []byte{}); err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		case <-c.exit:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
						|
	br := b.opts.Service.Client().Options().Broker
 | 
						|
 | 
						|
	// Setup the broker
 | 
						|
	once.Do(func() {
 | 
						|
		br.Init()
 | 
						|
		br.Connect()
 | 
						|
	})
 | 
						|
 | 
						|
	// Parse
 | 
						|
	r.ParseForm()
 | 
						|
	topic := r.Form.Get("topic")
 | 
						|
 | 
						|
	// Can't do anything without a topic
 | 
						|
	if len(topic) == 0 {
 | 
						|
		http.Error(w, "Topic not specified", 400)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Post assumed to be Publish
 | 
						|
	if r.Method == "POST" {
 | 
						|
		// Create a broker message
 | 
						|
		msg := &broker.Message{
 | 
						|
			Header: make(map[string]string),
 | 
						|
		}
 | 
						|
 | 
						|
		// Set header
 | 
						|
		for k, v := range r.Header {
 | 
						|
			msg.Header[k] = strings.Join(v, ", ")
 | 
						|
		}
 | 
						|
 | 
						|
		// Read body
 | 
						|
		b, err := ioutil.ReadAll(r.Body)
 | 
						|
		if err != nil {
 | 
						|
			http.Error(w, err.Error(), 500)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// Set body
 | 
						|
		msg.Body = b
 | 
						|
 | 
						|
		// Publish
 | 
						|
		br.Publish(topic, msg)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// now back to our regularly scheduled programming
 | 
						|
 | 
						|
	if r.Method != "GET" {
 | 
						|
		http.Error(w, "Method not allowed", 405)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	queue := r.Form.Get("queue")
 | 
						|
 | 
						|
	ws, err := b.u.Upgrade(w, r, nil)
 | 
						|
	if err != nil {
 | 
						|
		log.Log(err.Error())
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	cType := r.Header.Get("Content-Type")
 | 
						|
	if len(cType) == 0 {
 | 
						|
		cType = contentType
 | 
						|
	}
 | 
						|
 | 
						|
	c := &conn{
 | 
						|
		b:     br,
 | 
						|
		cType: cType,
 | 
						|
		topic: topic,
 | 
						|
		queue: queue,
 | 
						|
		exit:  make(chan bool),
 | 
						|
		ws:    ws,
 | 
						|
	}
 | 
						|
 | 
						|
	go c.writeLoop()
 | 
						|
	c.readLoop()
 | 
						|
}
 | 
						|
 | 
						|
func (b *brokerHandler) String() string {
 | 
						|
	return "broker"
 | 
						|
}
 | 
						|
 | 
						|
func NewHandler(opts ...handler.Option) handler.Handler {
 | 
						|
	return &brokerHandler{
 | 
						|
		u: websocket.Upgrader{
 | 
						|
			CheckOrigin: func(r *http.Request) bool {
 | 
						|
				return true
 | 
						|
			},
 | 
						|
			ReadBufferSize:  1024,
 | 
						|
			WriteBufferSize: 1024,
 | 
						|
		},
 | 
						|
		opts: handler.NewOptions(opts...),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func WithCors(cors map[string]bool, opts ...handler.Option) handler.Handler {
 | 
						|
	return &brokerHandler{
 | 
						|
		u: websocket.Upgrader{
 | 
						|
			CheckOrigin: func(r *http.Request) bool {
 | 
						|
				if origin := r.Header.Get("Origin"); cors[origin] {
 | 
						|
					return true
 | 
						|
				} else if len(origin) > 0 && cors["*"] {
 | 
						|
					return true
 | 
						|
				} else if checkOrigin(r) {
 | 
						|
					return true
 | 
						|
				}
 | 
						|
				return false
 | 
						|
			},
 | 
						|
			ReadBufferSize:  1024,
 | 
						|
			WriteBufferSize: 1024,
 | 
						|
		},
 | 
						|
		opts: handler.NewOptions(opts...),
 | 
						|
	}
 | 
						|
}
 |