Vasiliy Tolstov
58598d0fe0
* fixes for safe convertation Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * fix client publish panic If broker connect returns error we dont check it status and use it later to publish message, mostly this is unexpected because broker connection failed and we cant use it. Also proposed solution have benefit - we flag connection status only when we have succeseful broker connection Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * api/handler/broker: fix possible broker publish panic Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
277 lines
4.8 KiB
Go
277 lines
4.8 KiB
Go
// Package broker provides a go-micro/broker handler
|
|
package broker
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/micro/go-micro/v2/api/handler"
|
|
"github.com/micro/go-micro/v2/broker"
|
|
"github.com/micro/go-micro/v2/util/log"
|
|
)
|
|
|
|
const (
|
|
Handler = "broker"
|
|
|
|
pingTime = (readDeadline * 9) / 10
|
|
readLimit = 16384
|
|
readDeadline = 60 * time.Second
|
|
writeDeadline = 10 * time.Second
|
|
)
|
|
|
|
type brokerHandler struct {
|
|
once atomic.Value
|
|
opts handler.Options
|
|
u websocket.Upgrader
|
|
}
|
|
|
|
type conn struct {
|
|
b broker.Broker
|
|
cType string
|
|
topic string
|
|
queue string
|
|
exit chan bool
|
|
|
|
sync.Mutex
|
|
ws *websocket.Conn
|
|
}
|
|
|
|
var (
|
|
contentType = "text/plain"
|
|
)
|
|
|
|
func checkOrigin(r *http.Request) bool {
|
|
origin := r.Header["Origin"]
|
|
if len(origin) == 0 {
|
|
return true
|
|
}
|
|
u, err := url.Parse(origin[0])
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return u.Host == r.Host
|
|
}
|
|
|
|
func (c *conn) close() {
|
|
select {
|
|
case <-c.exit:
|
|
return
|
|
default:
|
|
close(c.exit)
|
|
}
|
|
}
|
|
|
|
func (c *conn) readLoop() {
|
|
defer func() {
|
|
c.close()
|
|
c.ws.Close()
|
|
}()
|
|
|
|
// set read limit/deadline
|
|
c.ws.SetReadLimit(readLimit)
|
|
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
|
|
|
|
// set close handler
|
|
ch := c.ws.CloseHandler()
|
|
c.ws.SetCloseHandler(func(code int, text string) error {
|
|
err := ch(code, text)
|
|
c.close()
|
|
return err
|
|
})
|
|
|
|
// set pong handler
|
|
c.ws.SetPongHandler(func(string) error {
|
|
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, message, err := c.ws.ReadMessage()
|
|
if err != nil {
|
|
return
|
|
}
|
|
c.b.Publish(c.topic, &broker.Message{
|
|
Header: map[string]string{"Content-Type": c.cType},
|
|
Body: message,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *conn) write(mType int, data []byte) error {
|
|
c.Lock()
|
|
c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
|
|
err := c.ws.WriteMessage(mType, data)
|
|
c.Unlock()
|
|
return err
|
|
}
|
|
|
|
func (c *conn) writeLoop() {
|
|
ticker := time.NewTicker(pingTime)
|
|
|
|
var opts []broker.SubscribeOption
|
|
|
|
if len(c.queue) > 0 {
|
|
opts = append(opts, broker.Queue(c.queue))
|
|
}
|
|
|
|
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
|
|
b, err := json.Marshal(p.Message())
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return c.write(websocket.TextMessage, b)
|
|
}, opts...)
|
|
|
|
defer func() {
|
|
subscriber.Unsubscribe()
|
|
ticker.Stop()
|
|
c.ws.Close()
|
|
}()
|
|
|
|
if err != nil {
|
|
log.Log(err.Error())
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
|
|
return
|
|
}
|
|
case <-c.exit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
br := b.opts.Service.Client().Options().Broker
|
|
|
|
// Setup the broker
|
|
if !b.once.Load().(bool) {
|
|
if err := br.Init(); err != nil {
|
|
http.Error(w, err.Error(), 500)
|
|
}
|
|
if err := br.Connect(); err != nil {
|
|
http.Error(w, err.Error(), 500)
|
|
}
|
|
b.once.Store(true)
|
|
}
|
|
|
|
// Parse
|
|
r.ParseForm()
|
|
topic := r.Form.Get("topic")
|
|
|
|
// Can't do anything without a topic
|
|
if len(topic) == 0 {
|
|
http.Error(w, "Topic not specified", 400)
|
|
return
|
|
}
|
|
|
|
// Post assumed to be Publish
|
|
if r.Method == "POST" {
|
|
// Create a broker message
|
|
msg := &broker.Message{
|
|
Header: make(map[string]string),
|
|
}
|
|
|
|
// Set header
|
|
for k, v := range r.Header {
|
|
msg.Header[k] = strings.Join(v, ", ")
|
|
}
|
|
|
|
// Read body
|
|
b, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), 500)
|
|
return
|
|
}
|
|
|
|
// Set body
|
|
msg.Body = b
|
|
|
|
// Publish
|
|
br.Publish(topic, msg)
|
|
return
|
|
}
|
|
|
|
// now back to our regularly scheduled programming
|
|
|
|
if r.Method != "GET" {
|
|
http.Error(w, "Method not allowed", 405)
|
|
return
|
|
}
|
|
|
|
queue := r.Form.Get("queue")
|
|
|
|
ws, err := b.u.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Log(err.Error())
|
|
return
|
|
}
|
|
|
|
cType := r.Header.Get("Content-Type")
|
|
if len(cType) == 0 {
|
|
cType = contentType
|
|
}
|
|
|
|
c := &conn{
|
|
b: br,
|
|
cType: cType,
|
|
topic: topic,
|
|
queue: queue,
|
|
exit: make(chan bool),
|
|
ws: ws,
|
|
}
|
|
|
|
go c.writeLoop()
|
|
c.readLoop()
|
|
}
|
|
|
|
func (b *brokerHandler) String() string {
|
|
return "broker"
|
|
}
|
|
|
|
func NewHandler(opts ...handler.Option) handler.Handler {
|
|
h := &brokerHandler{
|
|
u: websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
},
|
|
opts: handler.NewOptions(opts...),
|
|
}
|
|
h.once.Store(true)
|
|
return h
|
|
}
|
|
|
|
func WithCors(cors map[string]bool, opts ...handler.Option) handler.Handler {
|
|
return &brokerHandler{
|
|
u: websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
if origin := r.Header.Get("Origin"); cors[origin] {
|
|
return true
|
|
} else if len(origin) > 0 && cors["*"] {
|
|
return true
|
|
} else if checkOrigin(r) {
|
|
return true
|
|
}
|
|
return false
|
|
},
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
},
|
|
opts: handler.NewOptions(opts...),
|
|
}
|
|
}
|