link crufT

This commit is contained in:
Asim Aslam 2019-09-11 07:11:40 -07:00
parent b5eea02f7a
commit 9ca7d90f11
3 changed files with 133 additions and 358 deletions

View File

@ -1,6 +1,7 @@
package tunnel package tunnel
import ( import (
"io"
"sync" "sync"
"time" "time"
@ -12,7 +13,8 @@ type link struct {
transport.Socket transport.Socket
sync.RWMutex sync.RWMutex
// stops the link
closed chan bool
// unique id of this link e.g uuid // unique id of this link e.g uuid
// which we define for ourselves // which we define for ourselves
id string id string
@ -30,22 +32,100 @@ type link struct {
lastKeepAlive time.Time lastKeepAlive time.Time
// channels keeps a mapping of channels and last seen // channels keeps a mapping of channels and last seen
channels map[string]time.Time channels map[string]time.Time
// stop the link
closed chan bool // the send queue to the socket
sendQueue chan *transport.Message
// the recv queue to the socket
recvQueue chan *transport.Message
// determines the cost of the link
// based on queue length and roundtrip
length int
weight int
} }
func newLink(s transport.Socket) *link { func newLink(s transport.Socket) *link {
l := &link{ l := &link{
Socket: s, Socket: s,
id: uuid.New().String(), id: uuid.New().String(),
channels: make(map[string]time.Time), channels: make(map[string]time.Time),
closed: make(chan bool), closed: make(chan bool),
sendQueue: make(chan *transport.Message, 128),
recvQueue: make(chan *transport.Message, 128),
} }
go l.run() go l.expiry()
go l.process()
return l return l
} }
func (l *link) run() { // process processes messages on the send and receive queues.
func (l *link) process() {
go func() {
for {
m := new(transport.Message)
if err := l.Socket.Recv(m); err != nil {
return
}
select {
case l.recvQueue <- m:
case <-l.closed:
return
}
}
}()
// messages sent
i := 0
length := 0
for {
select {
case m := <-l.sendQueue:
t := time.Now()
// send the message
if err := l.Socket.Send(m); err != nil {
return
}
// get header size, body size and time taken
hl := len(m.Header)
bl := len(m.Body)
d := time.Since(t)
// don't calculate on empty messages
if hl == 0 && bl == 0 {
continue
}
// increment sent
i++
// time take to send some bits and bytes
td := float64(hl+bl) / float64(d.Nanoseconds())
// increase the scale
td += 1
// judge the length
length = int(td) / (length + int(td))
// every 10 messages update length
if (i % 10) == 1 {
// cost average the length
// save it
l.Lock()
l.length = length
l.Unlock()
}
case <-l.closed:
return
}
}
}
// watches the channel expiry
func (l *link) expiry() {
t := time.NewTicker(time.Minute) t := time.NewTicker(time.Minute)
defer t.Stop() defer t.Stop()
@ -99,3 +179,47 @@ func (l *link) Close() error {
return nil return nil
} }
// length/rate of the link
func (l *link) Length() int {
l.RLock()
defer l.RUnlock()
return l.length
}
// weight checks the size of the queues
func (l *link) Weight() int {
return len(l.sendQueue) + len(l.recvQueue)
}
// Accept accepts a message on the socket
func (l *link) Recv(m *transport.Message) error {
select {
case <-l.closed:
return io.EOF
case rm := <-l.recvQueue:
*m = *rm
return nil
}
// never reach
return nil
}
// Send sends a message on the socket immediately
func (l *link) Send(m *transport.Message) error {
select {
case <-l.closed:
return io.EOF
case l.sendQueue <- m:
}
return nil
}
func (l *link) Status() string {
select {
case <-l.closed:
return "closed"
default:
return "connected"
}
}

View File

@ -1,288 +0,0 @@
// Package link provides a measured transport.Socket link
package link
import (
"io"
"sync"
"time"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/transport"
)
type link struct {
sync.RWMutex
// the link id
id string
// the remote end to dial
addr string
// channel used to close the link
closed chan bool
// if its connected
connected bool
// the transport to use
transport transport.Transport
// the send queue to the socket
sendQueue chan *transport.Message
// the recv queue to the socket
recvQueue chan *transport.Message
// the socket for this link
socket transport.Socket
// determines the cost of the link
// based on queue length and roundtrip
length int
weight int
}
func newLink(options options.Options) *link {
// default values
var sock transport.Socket
var addr string
id := "local"
tr := transport.DefaultTransport
lid, ok := options.Values().Get("link.id")
if ok {
id = lid.(string)
}
laddr, ok := options.Values().Get("link.address")
if ok {
addr = laddr.(string)
}
ltr, ok := options.Values().Get("link.transport")
if ok {
tr = ltr.(transport.Transport)
}
lsock, ok := options.Values().Get("link.socket")
if ok {
sock = lsock.(transport.Socket)
}
l := &link{
// the remote end to dial
addr: addr,
// transport to dial link
transport: tr,
// the socket to use
// this is nil if not specified
socket: sock,
// unique id assigned to the link
id: id,
// the closed channel used to close the conn
closed: make(chan bool),
// then send queue
sendQueue: make(chan *transport.Message, 128),
// the receive queue
recvQueue: make(chan *transport.Message, 128),
}
// return the link
return l
}
// link methods
// process processes messages on the send and receive queues.
func (l *link) process() {
go func() {
for {
m := new(transport.Message)
if err := l.recv(m); err != nil {
return
}
select {
case l.recvQueue <- m:
case <-l.closed:
return
}
}
}()
// messages sent
i := 0
length := 0
for {
select {
case m := <-l.sendQueue:
t := time.Now()
// send the message
if err := l.send(m); err != nil {
return
}
// get header size, body size and time taken
hl := len(m.Header)
bl := len(m.Body)
d := time.Since(t)
// don't calculate on empty messages
if hl == 0 && bl == 0 {
continue
}
// increment sent
i++
// time take to send some bits and bytes
td := float64(hl+bl) / float64(d.Nanoseconds())
// increase the scale
td += 1
// judge the length
length = int(td) / (length + int(td))
// every 10 messages update length
if (i % 10) == 1 {
// cost average the length
// save it
l.Lock()
l.length = length
l.Unlock()
}
case <-l.closed:
return
}
}
}
// send a message over the link
func (l *link) send(m *transport.Message) error {
// TODO: measure time taken and calculate length/rate
// send via the transport socket
return l.socket.Send(m)
}
// recv a message on the link
func (l *link) recv(m *transport.Message) error {
if m.Header == nil {
m.Header = make(map[string]string)
}
// receive the transport message
return l.socket.Recv(m)
}
// Connect attempts to connect to an address and sets the socket
func (l *link) Connect() error {
l.Lock()
if l.connected {
l.Unlock()
return nil
}
defer l.Unlock()
// replace closed
l.closed = make(chan bool)
// assume existing socket
if len(l.addr) == 0 {
go l.process()
return nil
}
// dial the endpoint
c, err := l.transport.Dial(l.addr)
if err != nil {
return err
}
// set the socket
l.socket = c
// kick start the processing
go l.process()
return nil
}
// Close the link
func (l *link) Close() error {
select {
case <-l.closed:
return nil
default:
close(l.closed)
l.Lock()
l.connected = false
l.Unlock()
return l.socket.Close()
}
}
// returns the node id
func (l *link) Id() string {
l.RLock()
defer l.RUnlock()
return l.id
}
// the remote ip of the link
func (l *link) Remote() string {
l.RLock()
defer l.RUnlock()
return l.socket.Remote()
}
// the local ip of the link
func (l *link) Local() string {
l.RLock()
defer l.RUnlock()
return l.socket.Local()
}
// length/rate of the link
func (l *link) Length() int {
l.RLock()
defer l.RUnlock()
return l.length
}
// weight checks the size of the queues
func (l *link) Weight() int {
return len(l.sendQueue) + len(l.recvQueue)
}
// Accept accepts a message on the socket
func (l *link) Recv(m *transport.Message) error {
select {
case <-l.closed:
return io.EOF
case rm := <-l.recvQueue:
*m = *rm
return nil
}
// never reach
return nil
}
// Send sends a message on the socket immediately
func (l *link) Send(m *transport.Message) error {
select {
case <-l.closed:
return io.EOF
case l.sendQueue <- m:
}
return nil
}
func (l *link) Status() string {
select {
case <-l.closed:
return "closed"
default:
return "connected"
}
}

View File

@ -1,61 +0,0 @@
// Package link provides a measured link on top of a transport.Socket
package link
import (
"errors"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/transport"
)
var (
// ErrLinkClosed is returned when attempting i/o operation on the closed link
ErrLinkClosed = errors.New("link closed")
)
// Link is a layer on top of a transport socket with the
// buffering send and recv queues with the ability to
// measure the actual transport link and reconnect if
// an address is specified.
type Link interface {
// provides the transport.Socket interface
transport.Socket
// Connect connects the link. It must be called first
// if there's an expectation to create a new socket.
Connect() error
// Id of the link is "local" if not specified
Id() string
// Status of the link
Status() string
// Depth of the buffers
Weight() int
// Rate of the link
Length() int
}
// NewLink creates a new link on top of a socket
func NewLink(opts ...options.Option) Link {
return newLink(options.NewOptions(opts...))
}
// Sets the link id which otherwise defaults to "local"
func Id(id string) options.Option {
return options.WithValue("link.id", id)
}
// The address to use for the link. Connect must be
// called for this to be used, its otherwise unused.
func Address(a string) options.Option {
return options.WithValue("link.address", a)
}
// The transport to use for the link where we
// want to dial the connection first.
func Transport(t transport.Transport) options.Option {
return options.WithValue("link.transport", t)
}
// Socket sets the socket to use instead of dialing.
func Socket(s transport.Socket) options.Option {
return options.WithValue("link.socket", s)
}