// Package broker provides a go-micro/broker handler
package broker

import (
	"encoding/json"
	"io/ioutil"
	"net/http"
	"net/url"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	"github.com/micro/go-micro/api/handler"
	"github.com/micro/go-micro/broker"
	"github.com/micro/go-micro/util/log"
)

const (
	Handler = "broker"

	pingTime      = (readDeadline * 9) / 10
	readLimit     = 16384
	readDeadline  = 60 * time.Second
	writeDeadline = 10 * time.Second
)

type brokerHandler struct {
	opts handler.Options
	u    websocket.Upgrader
}

type conn struct {
	b     broker.Broker
	cType string
	topic string
	queue string
	exit  chan bool

	sync.Mutex
	ws *websocket.Conn
}

var (
	once        sync.Once
	contentType = "text/plain"
)

func checkOrigin(r *http.Request) bool {
	origin := r.Header["Origin"]
	if len(origin) == 0 {
		return true
	}
	u, err := url.Parse(origin[0])
	if err != nil {
		return false
	}
	return u.Host == r.Host
}

func (c *conn) close() {
	select {
	case <-c.exit:
		return
	default:
		close(c.exit)
	}
}

func (c *conn) readLoop() {
	defer func() {
		c.close()
		c.ws.Close()
	}()

	// set read limit/deadline
	c.ws.SetReadLimit(readLimit)
	c.ws.SetReadDeadline(time.Now().Add(readDeadline))

	// set close handler
	ch := c.ws.CloseHandler()
	c.ws.SetCloseHandler(func(code int, text string) error {
		err := ch(code, text)
		c.close()
		return err
	})

	// set pong handler
	c.ws.SetPongHandler(func(string) error {
		c.ws.SetReadDeadline(time.Now().Add(readDeadline))
		return nil
	})

	for {
		_, message, err := c.ws.ReadMessage()
		if err != nil {
			return
		}
		c.b.Publish(c.topic, &broker.Message{
			Header: map[string]string{"Content-Type": c.cType},
			Body:   message,
		})
	}
}

func (c *conn) write(mType int, data []byte) error {
	c.Lock()
	c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
	err := c.ws.WriteMessage(mType, data)
	c.Unlock()
	return err
}

func (c *conn) writeLoop() {
	ticker := time.NewTicker(pingTime)

	var opts []broker.SubscribeOption

	if len(c.queue) > 0 {
		opts = append(opts, broker.Queue(c.queue))
	}

	subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
		b, err := json.Marshal(p.Message())
		if err != nil {
			return nil
		}
		return c.write(websocket.TextMessage, b)
	}, opts...)

	defer func() {
		subscriber.Unsubscribe()
		ticker.Stop()
		c.ws.Close()
	}()

	if err != nil {
		log.Log(err.Error())
		return
	}

	for {
		select {
		case <-ticker.C:
			if err := c.write(websocket.PingMessage, []byte{}); err != nil {
				return
			}
		case <-c.exit:
			return
		}
	}
}

func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	br := b.opts.Service.Client().Options().Broker

	// Setup the broker
	once.Do(func() {
		br.Init()
		br.Connect()
	})

	// Parse
	r.ParseForm()
	topic := r.Form.Get("topic")

	// Can't do anything without a topic
	if len(topic) == 0 {
		http.Error(w, "Topic not specified", 400)
		return
	}

	// Post assumed to be Publish
	if r.Method == "POST" {
		// Create a broker message
		msg := &broker.Message{
			Header: make(map[string]string),
		}

		// Set header
		for k, v := range r.Header {
			msg.Header[k] = strings.Join(v, ", ")
		}

		// Read body
		b, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, err.Error(), 500)
			return
		}

		// Set body
		msg.Body = b

		// Publish
		br.Publish(topic, msg)
		return
	}

	// now back to our regularly scheduled programming

	if r.Method != "GET" {
		http.Error(w, "Method not allowed", 405)
		return
	}

	queue := r.Form.Get("queue")

	ws, err := b.u.Upgrade(w, r, nil)
	if err != nil {
		log.Log(err.Error())
		return
	}

	cType := r.Header.Get("Content-Type")
	if len(cType) == 0 {
		cType = contentType
	}

	c := &conn{
		b:     br,
		cType: cType,
		topic: topic,
		queue: queue,
		exit:  make(chan bool),
		ws:    ws,
	}

	go c.writeLoop()
	c.readLoop()
}

func (b *brokerHandler) String() string {
	return "broker"
}

func NewHandler(opts ...handler.Option) handler.Handler {
	return &brokerHandler{
		u: websocket.Upgrader{
			CheckOrigin: func(r *http.Request) bool {
				return true
			},
			ReadBufferSize:  1024,
			WriteBufferSize: 1024,
		},
		opts: handler.NewOptions(opts...),
	}
}

func WithCors(cors map[string]bool, opts ...handler.Option) handler.Handler {
	return &brokerHandler{
		u: websocket.Upgrader{
			CheckOrigin: func(r *http.Request) bool {
				if origin := r.Header.Get("Origin"); cors[origin] {
					return true
				} else if len(origin) > 0 && cors["*"] {
					return true
				} else if checkOrigin(r) {
					return true
				}
				return false
			},
			ReadBufferSize:  1024,
			WriteBufferSize: 1024,
		},
		opts: handler.NewOptions(opts...),
	}
}