diff --git a/tunnel/link.go b/tunnel/link.go index b360e826..f00f1797 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -1,6 +1,7 @@ package tunnel import ( + "io" "sync" "time" @@ -12,7 +13,8 @@ type link struct { transport.Socket sync.RWMutex - + // stops the link + closed chan bool // unique id of this link e.g uuid // which we define for ourselves id string @@ -30,22 +32,100 @@ type link struct { lastKeepAlive time.Time // channels keeps a mapping of channels and last seen channels map[string]time.Time - // stop the link - closed chan bool + + // the send queue to the socket + sendQueue chan *transport.Message + // the recv queue to the socket + recvQueue chan *transport.Message + + // determines the cost of the link + // based on queue length and roundtrip + length int + weight int } func newLink(s transport.Socket) *link { l := &link{ - Socket: s, - id: uuid.New().String(), - channels: make(map[string]time.Time), - closed: make(chan bool), + Socket: s, + id: uuid.New().String(), + channels: make(map[string]time.Time), + closed: make(chan bool), + sendQueue: make(chan *transport.Message, 128), + recvQueue: make(chan *transport.Message, 128), } - go l.run() + go l.expiry() + go l.process() return l } -func (l *link) run() { +// process processes messages on the send and receive queues. +func (l *link) process() { + go func() { + for { + m := new(transport.Message) + if err := l.Socket.Recv(m); err != nil { + return + } + + select { + case l.recvQueue <- m: + case <-l.closed: + return + } + } + }() + + // messages sent + i := 0 + length := 0 + + for { + select { + case m := <-l.sendQueue: + t := time.Now() + + // send the message + if err := l.Socket.Send(m); err != nil { + return + } + + // get header size, body size and time taken + hl := len(m.Header) + bl := len(m.Body) + d := time.Since(t) + + // don't calculate on empty messages + if hl == 0 && bl == 0 { + continue + } + + // increment sent + i++ + + // time take to send some bits and bytes + td := float64(hl+bl) / float64(d.Nanoseconds()) + // increase the scale + td += 1 + + // judge the length + length = int(td) / (length + int(td)) + + // every 10 messages update length + if (i % 10) == 1 { + // cost average the length + // save it + l.Lock() + l.length = length + l.Unlock() + } + case <-l.closed: + return + } + } +} + +// watches the channel expiry +func (l *link) expiry() { t := time.NewTicker(time.Minute) defer t.Stop() @@ -99,3 +179,47 @@ func (l *link) Close() error { return nil } + +// length/rate of the link +func (l *link) Length() int { + l.RLock() + defer l.RUnlock() + return l.length +} + +// weight checks the size of the queues +func (l *link) Weight() int { + return len(l.sendQueue) + len(l.recvQueue) +} + +// Accept accepts a message on the socket +func (l *link) Recv(m *transport.Message) error { + select { + case <-l.closed: + return io.EOF + case rm := <-l.recvQueue: + *m = *rm + return nil + } + // never reach + return nil +} + +// Send sends a message on the socket immediately +func (l *link) Send(m *transport.Message) error { + select { + case <-l.closed: + return io.EOF + case l.sendQueue <- m: + } + return nil +} + +func (l *link) Status() string { + select { + case <-l.closed: + return "closed" + default: + return "connected" + } +} diff --git a/tunnel/link/default.go b/tunnel/link/default.go deleted file mode 100644 index f6742b83..00000000 --- a/tunnel/link/default.go +++ /dev/null @@ -1,288 +0,0 @@ -// Package link provides a measured transport.Socket link -package link - -import ( - "io" - "sync" - "time" - - "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/transport" -) - -type link struct { - sync.RWMutex - - // the link id - id string - - // the remote end to dial - addr string - - // channel used to close the link - closed chan bool - - // if its connected - connected bool - - // the transport to use - transport transport.Transport - - // the send queue to the socket - sendQueue chan *transport.Message - // the recv queue to the socket - recvQueue chan *transport.Message - - // the socket for this link - socket transport.Socket - - // determines the cost of the link - // based on queue length and roundtrip - length int - weight int -} - -func newLink(options options.Options) *link { - // default values - var sock transport.Socket - var addr string - id := "local" - tr := transport.DefaultTransport - - lid, ok := options.Values().Get("link.id") - if ok { - id = lid.(string) - } - - laddr, ok := options.Values().Get("link.address") - if ok { - addr = laddr.(string) - } - - ltr, ok := options.Values().Get("link.transport") - if ok { - tr = ltr.(transport.Transport) - } - - lsock, ok := options.Values().Get("link.socket") - if ok { - sock = lsock.(transport.Socket) - } - - l := &link{ - // the remote end to dial - addr: addr, - // transport to dial link - transport: tr, - // the socket to use - // this is nil if not specified - socket: sock, - // unique id assigned to the link - id: id, - // the closed channel used to close the conn - closed: make(chan bool), - // then send queue - sendQueue: make(chan *transport.Message, 128), - // the receive queue - recvQueue: make(chan *transport.Message, 128), - } - - // return the link - return l -} - -// link methods - -// process processes messages on the send and receive queues. -func (l *link) process() { - go func() { - for { - m := new(transport.Message) - if err := l.recv(m); err != nil { - return - } - - select { - case l.recvQueue <- m: - case <-l.closed: - return - } - } - }() - - // messages sent - i := 0 - length := 0 - - for { - select { - case m := <-l.sendQueue: - t := time.Now() - - // send the message - if err := l.send(m); err != nil { - return - } - - // get header size, body size and time taken - hl := len(m.Header) - bl := len(m.Body) - d := time.Since(t) - - // don't calculate on empty messages - if hl == 0 && bl == 0 { - continue - } - - // increment sent - i++ - - // time take to send some bits and bytes - td := float64(hl+bl) / float64(d.Nanoseconds()) - // increase the scale - td += 1 - - // judge the length - length = int(td) / (length + int(td)) - - // every 10 messages update length - if (i % 10) == 1 { - // cost average the length - // save it - l.Lock() - l.length = length - l.Unlock() - } - case <-l.closed: - return - } - } -} - -// send a message over the link -func (l *link) send(m *transport.Message) error { - // TODO: measure time taken and calculate length/rate - // send via the transport socket - return l.socket.Send(m) -} - -// recv a message on the link -func (l *link) recv(m *transport.Message) error { - if m.Header == nil { - m.Header = make(map[string]string) - } - // receive the transport message - return l.socket.Recv(m) -} - -// Connect attempts to connect to an address and sets the socket -func (l *link) Connect() error { - l.Lock() - if l.connected { - l.Unlock() - return nil - } - defer l.Unlock() - - // replace closed - l.closed = make(chan bool) - - // assume existing socket - if len(l.addr) == 0 { - go l.process() - return nil - } - - // dial the endpoint - c, err := l.transport.Dial(l.addr) - if err != nil { - return err - } - - // set the socket - l.socket = c - - // kick start the processing - go l.process() - - return nil -} - -// Close the link -func (l *link) Close() error { - select { - case <-l.closed: - return nil - default: - close(l.closed) - l.Lock() - l.connected = false - l.Unlock() - return l.socket.Close() - } -} - -// returns the node id -func (l *link) Id() string { - l.RLock() - defer l.RUnlock() - return l.id -} - -// the remote ip of the link -func (l *link) Remote() string { - l.RLock() - defer l.RUnlock() - return l.socket.Remote() -} - -// the local ip of the link -func (l *link) Local() string { - l.RLock() - defer l.RUnlock() - return l.socket.Local() -} - -// length/rate of the link -func (l *link) Length() int { - l.RLock() - defer l.RUnlock() - return l.length -} - -// weight checks the size of the queues -func (l *link) Weight() int { - return len(l.sendQueue) + len(l.recvQueue) -} - -// Accept accepts a message on the socket -func (l *link) Recv(m *transport.Message) error { - select { - case <-l.closed: - return io.EOF - case rm := <-l.recvQueue: - *m = *rm - return nil - } - // never reach - return nil -} - -// Send sends a message on the socket immediately -func (l *link) Send(m *transport.Message) error { - select { - case <-l.closed: - return io.EOF - case l.sendQueue <- m: - } - return nil -} - -func (l *link) Status() string { - select { - case <-l.closed: - return "closed" - default: - return "connected" - } -} diff --git a/tunnel/link/link.go b/tunnel/link/link.go deleted file mode 100644 index 1c42831a..00000000 --- a/tunnel/link/link.go +++ /dev/null @@ -1,61 +0,0 @@ -// Package link provides a measured link on top of a transport.Socket -package link - -import ( - "errors" - - "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/transport" -) - -var ( - // ErrLinkClosed is returned when attempting i/o operation on the closed link - ErrLinkClosed = errors.New("link closed") -) - -// Link is a layer on top of a transport socket with the -// buffering send and recv queues with the ability to -// measure the actual transport link and reconnect if -// an address is specified. -type Link interface { - // provides the transport.Socket interface - transport.Socket - // Connect connects the link. It must be called first - // if there's an expectation to create a new socket. - Connect() error - // Id of the link is "local" if not specified - Id() string - // Status of the link - Status() string - // Depth of the buffers - Weight() int - // Rate of the link - Length() int -} - -// NewLink creates a new link on top of a socket -func NewLink(opts ...options.Option) Link { - return newLink(options.NewOptions(opts...)) -} - -// Sets the link id which otherwise defaults to "local" -func Id(id string) options.Option { - return options.WithValue("link.id", id) -} - -// The address to use for the link. Connect must be -// called for this to be used, its otherwise unused. -func Address(a string) options.Option { - return options.WithValue("link.address", a) -} - -// The transport to use for the link where we -// want to dial the connection first. -func Transport(t transport.Transport) options.Option { - return options.WithValue("link.transport", t) -} - -// Socket sets the socket to use instead of dialing. -func Socket(s transport.Socket) options.Option { - return options.WithValue("link.socket", s) -}