micro/api/handler/broker/broker.go
Vasiliy Tolstov 58598d0fe0
fixes for safe conversation and avoid panics (#1213)
* fixes for safe convertation

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix client publish panic

If broker connect returns error we dont check it status and use
it later to publish message, mostly this is unexpected because
broker connection failed and we cant use it.
Also proposed solution have benefit - we flag connection status
only when we have succeseful broker connection

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* api/handler/broker: fix possible broker publish panic

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2020-02-18 23:05:38 +00:00

277 lines
4.8 KiB
Go

// Package broker provides a go-micro/broker handler
package broker
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/micro/go-micro/v2/api/handler"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/util/log"
)
const (
Handler = "broker"
pingTime = (readDeadline * 9) / 10
readLimit = 16384
readDeadline = 60 * time.Second
writeDeadline = 10 * time.Second
)
type brokerHandler struct {
once atomic.Value
opts handler.Options
u websocket.Upgrader
}
type conn struct {
b broker.Broker
cType string
topic string
queue string
exit chan bool
sync.Mutex
ws *websocket.Conn
}
var (
contentType = "text/plain"
)
func checkOrigin(r *http.Request) bool {
origin := r.Header["Origin"]
if len(origin) == 0 {
return true
}
u, err := url.Parse(origin[0])
if err != nil {
return false
}
return u.Host == r.Host
}
func (c *conn) close() {
select {
case <-c.exit:
return
default:
close(c.exit)
}
}
func (c *conn) readLoop() {
defer func() {
c.close()
c.ws.Close()
}()
// set read limit/deadline
c.ws.SetReadLimit(readLimit)
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
// set close handler
ch := c.ws.CloseHandler()
c.ws.SetCloseHandler(func(code int, text string) error {
err := ch(code, text)
c.close()
return err
})
// set pong handler
c.ws.SetPongHandler(func(string) error {
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
return nil
})
for {
_, message, err := c.ws.ReadMessage()
if err != nil {
return
}
c.b.Publish(c.topic, &broker.Message{
Header: map[string]string{"Content-Type": c.cType},
Body: message,
})
}
}
func (c *conn) write(mType int, data []byte) error {
c.Lock()
c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
err := c.ws.WriteMessage(mType, data)
c.Unlock()
return err
}
func (c *conn) writeLoop() {
ticker := time.NewTicker(pingTime)
var opts []broker.SubscribeOption
if len(c.queue) > 0 {
opts = append(opts, broker.Queue(c.queue))
}
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
b, err := json.Marshal(p.Message())
if err != nil {
return nil
}
return c.write(websocket.TextMessage, b)
}, opts...)
defer func() {
subscriber.Unsubscribe()
ticker.Stop()
c.ws.Close()
}()
if err != nil {
log.Log(err.Error())
return
}
for {
select {
case <-ticker.C:
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
return
}
case <-c.exit:
return
}
}
}
func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
br := b.opts.Service.Client().Options().Broker
// Setup the broker
if !b.once.Load().(bool) {
if err := br.Init(); err != nil {
http.Error(w, err.Error(), 500)
}
if err := br.Connect(); err != nil {
http.Error(w, err.Error(), 500)
}
b.once.Store(true)
}
// Parse
r.ParseForm()
topic := r.Form.Get("topic")
// Can't do anything without a topic
if len(topic) == 0 {
http.Error(w, "Topic not specified", 400)
return
}
// Post assumed to be Publish
if r.Method == "POST" {
// Create a broker message
msg := &broker.Message{
Header: make(map[string]string),
}
// Set header
for k, v := range r.Header {
msg.Header[k] = strings.Join(v, ", ")
}
// Read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// Set body
msg.Body = b
// Publish
br.Publish(topic, msg)
return
}
// now back to our regularly scheduled programming
if r.Method != "GET" {
http.Error(w, "Method not allowed", 405)
return
}
queue := r.Form.Get("queue")
ws, err := b.u.Upgrade(w, r, nil)
if err != nil {
log.Log(err.Error())
return
}
cType := r.Header.Get("Content-Type")
if len(cType) == 0 {
cType = contentType
}
c := &conn{
b: br,
cType: cType,
topic: topic,
queue: queue,
exit: make(chan bool),
ws: ws,
}
go c.writeLoop()
c.readLoop()
}
func (b *brokerHandler) String() string {
return "broker"
}
func NewHandler(opts ...handler.Option) handler.Handler {
h := &brokerHandler{
u: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
},
opts: handler.NewOptions(opts...),
}
h.once.Store(true)
return h
}
func WithCors(cors map[string]bool, opts ...handler.Option) handler.Handler {
return &brokerHandler{
u: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
if origin := r.Header.Get("Origin"); cors[origin] {
return true
} else if len(origin) > 0 && cors["*"] {
return true
} else if checkOrigin(r) {
return true
}
return false
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
},
opts: handler.NewOptions(opts...),
}
}