289 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			289 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package link provides a measured transport.Socket link
 | 
						|
package link
 | 
						|
 | 
						|
import (
 | 
						|
	"io"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/config/options"
 | 
						|
	"github.com/micro/go-micro/transport"
 | 
						|
)
 | 
						|
 | 
						|
type link struct {
 | 
						|
	sync.RWMutex
 | 
						|
 | 
						|
	// the link id
 | 
						|
	id string
 | 
						|
 | 
						|
	// the remote end to dial
 | 
						|
	addr string
 | 
						|
 | 
						|
	// channel used to close the link
 | 
						|
	closed chan bool
 | 
						|
 | 
						|
	// if its connected
 | 
						|
	connected bool
 | 
						|
 | 
						|
	// the transport to use
 | 
						|
	transport transport.Transport
 | 
						|
 | 
						|
	// the send queue to the socket
 | 
						|
	sendQueue chan *transport.Message
 | 
						|
	// the recv queue to the socket
 | 
						|
	recvQueue chan *transport.Message
 | 
						|
 | 
						|
	// the socket for this link
 | 
						|
	socket transport.Socket
 | 
						|
 | 
						|
	// determines the cost of the link
 | 
						|
	// based on queue length and roundtrip
 | 
						|
	length int
 | 
						|
	weight int
 | 
						|
}
 | 
						|
 | 
						|
func newLink(options options.Options) *link {
 | 
						|
	// default values
 | 
						|
	var sock transport.Socket
 | 
						|
	var addr string
 | 
						|
	id := "local"
 | 
						|
	tr := transport.DefaultTransport
 | 
						|
 | 
						|
	lid, ok := options.Values().Get("link.id")
 | 
						|
	if ok {
 | 
						|
		id = lid.(string)
 | 
						|
	}
 | 
						|
 | 
						|
	laddr, ok := options.Values().Get("link.address")
 | 
						|
	if ok {
 | 
						|
		addr = laddr.(string)
 | 
						|
	}
 | 
						|
 | 
						|
	ltr, ok := options.Values().Get("link.transport")
 | 
						|
	if ok {
 | 
						|
		tr = ltr.(transport.Transport)
 | 
						|
	}
 | 
						|
 | 
						|
	lsock, ok := options.Values().Get("link.socket")
 | 
						|
	if ok {
 | 
						|
		sock = lsock.(transport.Socket)
 | 
						|
	}
 | 
						|
 | 
						|
	l := &link{
 | 
						|
		// the remote end to dial
 | 
						|
		addr: addr,
 | 
						|
		// transport to dial link
 | 
						|
		transport: tr,
 | 
						|
		// the socket to use
 | 
						|
		// this is nil if not specified
 | 
						|
		socket: sock,
 | 
						|
		// unique id assigned to the link
 | 
						|
		id: id,
 | 
						|
		// the closed channel used to close the conn
 | 
						|
		closed: make(chan bool),
 | 
						|
		// then send queue
 | 
						|
		sendQueue: make(chan *transport.Message, 128),
 | 
						|
		// the receive queue
 | 
						|
		recvQueue: make(chan *transport.Message, 128),
 | 
						|
	}
 | 
						|
 | 
						|
	// return the link
 | 
						|
	return l
 | 
						|
}
 | 
						|
 | 
						|
// link methods
 | 
						|
 | 
						|
// process processes messages on the send and receive queues.
 | 
						|
func (l *link) process() {
 | 
						|
	go func() {
 | 
						|
		for {
 | 
						|
			m := new(transport.Message)
 | 
						|
			if err := l.recv(m); err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			select {
 | 
						|
			case l.recvQueue <- m:
 | 
						|
			case <-l.closed:
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	// messages sent
 | 
						|
	i := 0
 | 
						|
	length := 0
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case m := <-l.sendQueue:
 | 
						|
			t := time.Now()
 | 
						|
 | 
						|
			// send the message
 | 
						|
			if err := l.send(m); err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			// get header size, body size and time taken
 | 
						|
			hl := len(m.Header)
 | 
						|
			bl := len(m.Body)
 | 
						|
			d := time.Since(t)
 | 
						|
 | 
						|
			// don't calculate on empty messages
 | 
						|
			if hl == 0 && bl == 0 {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// increment sent
 | 
						|
			i++
 | 
						|
 | 
						|
			// time take to send some bits and bytes
 | 
						|
			td := float64(hl+bl) / float64(d.Nanoseconds())
 | 
						|
			// increase the scale
 | 
						|
			td += 1
 | 
						|
 | 
						|
			// judge the length
 | 
						|
			length = int(td) / (length + int(td))
 | 
						|
 | 
						|
			// every 10 messages update length
 | 
						|
			if (i % 10) == 1 {
 | 
						|
				// cost average the length
 | 
						|
				// save it
 | 
						|
				l.Lock()
 | 
						|
				l.length = length
 | 
						|
				l.Unlock()
 | 
						|
			}
 | 
						|
		case <-l.closed:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// send a message over the link
 | 
						|
func (l *link) send(m *transport.Message) error {
 | 
						|
	// TODO: measure time taken and calculate length/rate
 | 
						|
	// send via the transport socket
 | 
						|
	return l.socket.Send(m)
 | 
						|
}
 | 
						|
 | 
						|
// recv a message on the link
 | 
						|
func (l *link) recv(m *transport.Message) error {
 | 
						|
	if m.Header == nil {
 | 
						|
		m.Header = make(map[string]string)
 | 
						|
	}
 | 
						|
	// receive the transport message
 | 
						|
	return l.socket.Recv(m)
 | 
						|
}
 | 
						|
 | 
						|
// Connect attempts to connect to an address and sets the socket
 | 
						|
func (l *link) Connect() error {
 | 
						|
	l.Lock()
 | 
						|
	if l.connected {
 | 
						|
		l.Unlock()
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	defer l.Unlock()
 | 
						|
 | 
						|
	// replace closed
 | 
						|
	l.closed = make(chan bool)
 | 
						|
 | 
						|
	// assume existing socket
 | 
						|
	if len(l.addr) == 0 {
 | 
						|
		go l.process()
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// dial the endpoint
 | 
						|
	c, err := l.transport.Dial(l.addr)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// set the socket
 | 
						|
	l.socket = c
 | 
						|
 | 
						|
	// kick start the processing
 | 
						|
	go l.process()
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Close the link
 | 
						|
func (l *link) Close() error {
 | 
						|
	select {
 | 
						|
	case <-l.closed:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(l.closed)
 | 
						|
		l.Lock()
 | 
						|
		l.connected = false
 | 
						|
		l.Unlock()
 | 
						|
		return l.socket.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// returns the node id
 | 
						|
func (l *link) Id() string {
 | 
						|
	l.RLock()
 | 
						|
	defer l.RUnlock()
 | 
						|
	return l.id
 | 
						|
}
 | 
						|
 | 
						|
// the remote ip of the link
 | 
						|
func (l *link) Remote() string {
 | 
						|
	l.RLock()
 | 
						|
	defer l.RUnlock()
 | 
						|
	return l.socket.Remote()
 | 
						|
}
 | 
						|
 | 
						|
// the local ip of the link
 | 
						|
func (l *link) Local() string {
 | 
						|
	l.RLock()
 | 
						|
	defer l.RUnlock()
 | 
						|
	return l.socket.Local()
 | 
						|
}
 | 
						|
 | 
						|
// length/rate of the link
 | 
						|
func (l *link) Length() int {
 | 
						|
	l.RLock()
 | 
						|
	defer l.RUnlock()
 | 
						|
	return l.length
 | 
						|
}
 | 
						|
 | 
						|
// weight checks the size of the queues
 | 
						|
func (l *link) Weight() int {
 | 
						|
	return len(l.sendQueue) + len(l.recvQueue)
 | 
						|
}
 | 
						|
 | 
						|
// Accept accepts a message on the socket
 | 
						|
func (l *link) Recv(m *transport.Message) error {
 | 
						|
	select {
 | 
						|
	case <-l.closed:
 | 
						|
		return io.EOF
 | 
						|
	case rm := <-l.recvQueue:
 | 
						|
		*m = *rm
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	// never reach
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Send sends a message on the socket immediately
 | 
						|
func (l *link) Send(m *transport.Message) error {
 | 
						|
	select {
 | 
						|
	case <-l.closed:
 | 
						|
		return io.EOF
 | 
						|
	case l.sendQueue <- m:
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *link) Status() string {
 | 
						|
	select {
 | 
						|
	case <-l.closed:
 | 
						|
		return "closed"
 | 
						|
	default:
 | 
						|
		return "connected"
 | 
						|
	}
 | 
						|
}
 |