// Package link provides a measured transport.Socket link
package link

import (
	"io"
	"sync"
	"time"

	"github.com/micro/go-micro/config/options"
	"github.com/micro/go-micro/transport"
)

type link struct {
	sync.RWMutex

	// the link id
	id string

	// the remote end to dial
	addr string

	// channel used to close the link
	closed chan bool

	// if its connected
	connected bool

	// the transport to use
	transport transport.Transport

	// the send queue to the socket
	sendQueue chan *transport.Message
	// the recv queue to the socket
	recvQueue chan *transport.Message

	// the socket for this link
	socket transport.Socket

	// determines the cost of the link
	// based on queue length and roundtrip
	length int
	weight int
}

func newLink(options options.Options) *link {
	// default values
	var sock transport.Socket
	var addr string
	id := "local"
	tr := transport.DefaultTransport

	lid, ok := options.Values().Get("link.id")
	if ok {
		id = lid.(string)
	}

	laddr, ok := options.Values().Get("link.address")
	if ok {
		addr = laddr.(string)
	}

	ltr, ok := options.Values().Get("link.transport")
	if ok {
		tr = ltr.(transport.Transport)
	}

	lsock, ok := options.Values().Get("link.socket")
	if ok {
		sock = lsock.(transport.Socket)
	}

	l := &link{
		// the remote end to dial
		addr: addr,
		// transport to dial link
		transport: tr,
		// the socket to use
		// this is nil if not specified
		socket: sock,
		// unique id assigned to the link
		id: id,
		// the closed channel used to close the conn
		closed: make(chan bool),
		// then send queue
		sendQueue: make(chan *transport.Message, 128),
		// the receive queue
		recvQueue: make(chan *transport.Message, 128),
	}

	// return the link
	return l
}

// link methods

// process processes messages on the send and receive queues.
func (l *link) process() {
	go func() {
		for {
			m := new(transport.Message)
			if err := l.recv(m); err != nil {
				return
			}

			select {
			case l.recvQueue <- m:
			case <-l.closed:
				return
			}
		}
	}()

	// messages sent
	i := 0
	length := 0

	for {
		select {
		case m := <-l.sendQueue:
			t := time.Now()

			// send the message
			if err := l.send(m); err != nil {
				return
			}

			// get header size, body size and time taken
			hl := len(m.Header)
			bl := len(m.Body)
			d := time.Since(t)

			// don't calculate on empty messages
			if hl == 0 && bl == 0 {
				continue
			}

			// increment sent
			i++

			// time take to send some bits and bytes
			td := float64(hl+bl) / float64(d.Nanoseconds())
			// increase the scale
			td += 1

			// judge the length
			length = int(td) / (length + int(td))

			// every 10 messages update length
			if (i % 10) == 1 {
				// cost average the length
				// save it
				l.Lock()
				l.length = length
				l.Unlock()
			}
		case <-l.closed:
			return
		}
	}
}

// send a message over the link
func (l *link) send(m *transport.Message) error {
	// TODO: measure time taken and calculate length/rate
	// send via the transport socket
	return l.socket.Send(m)
}

// recv a message on the link
func (l *link) recv(m *transport.Message) error {
	if m.Header == nil {
		m.Header = make(map[string]string)
	}
	// receive the transport message
	return l.socket.Recv(m)
}

// Connect attempts to connect to an address and sets the socket
func (l *link) Connect() error {
	l.Lock()
	if l.connected {
		l.Unlock()
		return nil
	}
	defer l.Unlock()

	// replace closed
	l.closed = make(chan bool)

	// assume existing socket
	if len(l.addr) == 0 {
		go l.process()
		return nil
	}

	// dial the endpoint
	c, err := l.transport.Dial(l.addr)
	if err != nil {
		return err
	}

	// set the socket
	l.socket = c

	// kick start the processing
	go l.process()

	return nil
}

// Close the link
func (l *link) Close() error {
	select {
	case <-l.closed:
		return nil
	default:
		close(l.closed)
		l.Lock()
		l.connected = false
		l.Unlock()
		return l.socket.Close()
	}
}

// returns the node id
func (l *link) Id() string {
	l.RLock()
	defer l.RUnlock()
	return l.id
}

// the remote ip of the link
func (l *link) Remote() string {
	l.RLock()
	defer l.RUnlock()
	return l.socket.Remote()
}

// the local ip of the link
func (l *link) Local() string {
	l.RLock()
	defer l.RUnlock()
	return l.socket.Local()
}

// length/rate of the link
func (l *link) Length() int {
	l.RLock()
	defer l.RUnlock()
	return l.length
}

// weight checks the size of the queues
func (l *link) Weight() int {
	return len(l.sendQueue) + len(l.recvQueue)
}

// Accept accepts a message on the socket
func (l *link) Recv(m *transport.Message) error {
	select {
	case <-l.closed:
		return io.EOF
	case rm := <-l.recvQueue:
		*m = *rm
		return nil
	}
	// never reach
	return nil
}

// Send sends a message on the socket immediately
func (l *link) Send(m *transport.Message) error {
	select {
	case <-l.closed:
		return io.EOF
	case l.sendQueue <- m:
	}
	return nil
}

func (l *link) Status() string {
	select {
	case <-l.closed:
		return "closed"
	default:
		return "connected"
	}
}