package watch

import (
	"bufio"
	"encoding/json"
	"net/http"
	"time"
)

// bodyWatcher scans the body of a request for chunks
type bodyWatcher struct {
	results chan Event
	stop    chan struct{}
	res     *http.Response
	req     *http.Request
}

// Changes returns the results channel
func (wr *bodyWatcher) ResultChan() <-chan Event {
	return wr.results
}

// Stop cancels the request
func (wr *bodyWatcher) Stop() {
	select {
	case <-wr.stop:
		return
	default:
		close(wr.stop)
		close(wr.results)
	}
}

func (wr *bodyWatcher) stream() {
	reader := bufio.NewReader(wr.res.Body)

	// ignore first few messages from stream,
	// as they are usually old.
	ignore := true

	go func() {
		<-time.After(time.Second)
		ignore = false
	}()

	go func() {
		// stop the watcher
		defer wr.Stop()

		for {
			// read a line
			b, err := reader.ReadBytes('\n')
			if err != nil {
				return
			}

			// ignore for the first second
			if ignore {
				continue
			}

			// send the event
			var event Event
			if err := json.Unmarshal(b, &event); err != nil {
				continue
			}
			wr.results <- event
		}
	}()
}

// NewBodyWatcher creates a k8s body watcher for
// a given http request
func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) {
	stop := make(chan struct{})
	req.Cancel = stop

	res, err := client.Do(req)
	if err != nil {
		return nil, err
	}

	wr := &bodyWatcher{
		results: make(chan Event),
		stop:    stop,
		req:     req,
		res:     res,
	}

	go wr.stream()
	return wr, nil
}