commit
634c55e2d7
@ -932,6 +932,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
|
|||||||
// non multicast so we need to find the link
|
// non multicast so we need to find the link
|
||||||
t.RLock()
|
t.RLock()
|
||||||
for _, link := range t.links {
|
for _, link := range t.links {
|
||||||
|
// use the link specified it its available
|
||||||
|
if id := options.Link; len(id) > 0 && link.id != id {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
link.RLock()
|
link.RLock()
|
||||||
_, ok := link.channels[channel]
|
_, ok := link.channels[channel]
|
||||||
link.RUnlock()
|
link.RUnlock()
|
||||||
@ -944,6 +949,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
|
|||||||
}
|
}
|
||||||
t.RUnlock()
|
t.RUnlock()
|
||||||
|
|
||||||
|
// link not found
|
||||||
|
if len(links) == 0 && len(options.Link) > 0 {
|
||||||
|
return nil, ErrLinkNotFound
|
||||||
|
}
|
||||||
|
|
||||||
// discovered so set the link if not multicast
|
// discovered so set the link if not multicast
|
||||||
// TODO: pick the link efficiently based
|
// TODO: pick the link efficiently based
|
||||||
// on link status and saturation.
|
// on link status and saturation.
|
||||||
|
@ -12,7 +12,8 @@ type link struct {
|
|||||||
transport.Socket
|
transport.Socket
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
// stops the link
|
||||||
|
closed chan bool
|
||||||
// unique id of this link e.g uuid
|
// unique id of this link e.g uuid
|
||||||
// which we define for ourselves
|
// which we define for ourselves
|
||||||
id string
|
id string
|
||||||
@ -30,8 +31,6 @@ type link struct {
|
|||||||
lastKeepAlive time.Time
|
lastKeepAlive time.Time
|
||||||
// channels keeps a mapping of channels and last seen
|
// channels keeps a mapping of channels and last seen
|
||||||
channels map[string]time.Time
|
channels map[string]time.Time
|
||||||
// stop the link
|
|
||||||
closed chan bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLink(s transport.Socket) *link {
|
func newLink(s transport.Socket) *link {
|
||||||
@ -41,11 +40,12 @@ func newLink(s transport.Socket) *link {
|
|||||||
channels: make(map[string]time.Time),
|
channels: make(map[string]time.Time),
|
||||||
closed: make(chan bool),
|
closed: make(chan bool),
|
||||||
}
|
}
|
||||||
go l.run()
|
go l.expiry()
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) run() {
|
// watches the channel expiry
|
||||||
|
func (l *link) expiry() {
|
||||||
t := time.NewTicker(time.Minute)
|
t := time.NewTicker(time.Minute)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
@ -99,3 +99,12 @@ func (l *link) Close() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *link) Status() string {
|
||||||
|
select {
|
||||||
|
case <-l.closed:
|
||||||
|
return "closed"
|
||||||
|
default:
|
||||||
|
return "connected"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,288 +0,0 @@
|
|||||||
// Package link provides a measured transport.Socket link
|
|
||||||
package link
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/config/options"
|
|
||||||
"github.com/micro/go-micro/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
type link struct {
|
|
||||||
sync.RWMutex
|
|
||||||
|
|
||||||
// the link id
|
|
||||||
id string
|
|
||||||
|
|
||||||
// the remote end to dial
|
|
||||||
addr string
|
|
||||||
|
|
||||||
// channel used to close the link
|
|
||||||
closed chan bool
|
|
||||||
|
|
||||||
// if its connected
|
|
||||||
connected bool
|
|
||||||
|
|
||||||
// the transport to use
|
|
||||||
transport transport.Transport
|
|
||||||
|
|
||||||
// the send queue to the socket
|
|
||||||
sendQueue chan *transport.Message
|
|
||||||
// the recv queue to the socket
|
|
||||||
recvQueue chan *transport.Message
|
|
||||||
|
|
||||||
// the socket for this link
|
|
||||||
socket transport.Socket
|
|
||||||
|
|
||||||
// determines the cost of the link
|
|
||||||
// based on queue length and roundtrip
|
|
||||||
length int
|
|
||||||
weight int
|
|
||||||
}
|
|
||||||
|
|
||||||
func newLink(options options.Options) *link {
|
|
||||||
// default values
|
|
||||||
var sock transport.Socket
|
|
||||||
var addr string
|
|
||||||
id := "local"
|
|
||||||
tr := transport.DefaultTransport
|
|
||||||
|
|
||||||
lid, ok := options.Values().Get("link.id")
|
|
||||||
if ok {
|
|
||||||
id = lid.(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
laddr, ok := options.Values().Get("link.address")
|
|
||||||
if ok {
|
|
||||||
addr = laddr.(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
ltr, ok := options.Values().Get("link.transport")
|
|
||||||
if ok {
|
|
||||||
tr = ltr.(transport.Transport)
|
|
||||||
}
|
|
||||||
|
|
||||||
lsock, ok := options.Values().Get("link.socket")
|
|
||||||
if ok {
|
|
||||||
sock = lsock.(transport.Socket)
|
|
||||||
}
|
|
||||||
|
|
||||||
l := &link{
|
|
||||||
// the remote end to dial
|
|
||||||
addr: addr,
|
|
||||||
// transport to dial link
|
|
||||||
transport: tr,
|
|
||||||
// the socket to use
|
|
||||||
// this is nil if not specified
|
|
||||||
socket: sock,
|
|
||||||
// unique id assigned to the link
|
|
||||||
id: id,
|
|
||||||
// the closed channel used to close the conn
|
|
||||||
closed: make(chan bool),
|
|
||||||
// then send queue
|
|
||||||
sendQueue: make(chan *transport.Message, 128),
|
|
||||||
// the receive queue
|
|
||||||
recvQueue: make(chan *transport.Message, 128),
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the link
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
// link methods
|
|
||||||
|
|
||||||
// process processes messages on the send and receive queues.
|
|
||||||
func (l *link) process() {
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
m := new(transport.Message)
|
|
||||||
if err := l.recv(m); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case l.recvQueue <- m:
|
|
||||||
case <-l.closed:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// messages sent
|
|
||||||
i := 0
|
|
||||||
length := 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case m := <-l.sendQueue:
|
|
||||||
t := time.Now()
|
|
||||||
|
|
||||||
// send the message
|
|
||||||
if err := l.send(m); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// get header size, body size and time taken
|
|
||||||
hl := len(m.Header)
|
|
||||||
bl := len(m.Body)
|
|
||||||
d := time.Since(t)
|
|
||||||
|
|
||||||
// don't calculate on empty messages
|
|
||||||
if hl == 0 && bl == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// increment sent
|
|
||||||
i++
|
|
||||||
|
|
||||||
// time take to send some bits and bytes
|
|
||||||
td := float64(hl+bl) / float64(d.Nanoseconds())
|
|
||||||
// increase the scale
|
|
||||||
td += 1
|
|
||||||
|
|
||||||
// judge the length
|
|
||||||
length = int(td) / (length + int(td))
|
|
||||||
|
|
||||||
// every 10 messages update length
|
|
||||||
if (i % 10) == 1 {
|
|
||||||
// cost average the length
|
|
||||||
// save it
|
|
||||||
l.Lock()
|
|
||||||
l.length = length
|
|
||||||
l.Unlock()
|
|
||||||
}
|
|
||||||
case <-l.closed:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// send a message over the link
|
|
||||||
func (l *link) send(m *transport.Message) error {
|
|
||||||
// TODO: measure time taken and calculate length/rate
|
|
||||||
// send via the transport socket
|
|
||||||
return l.socket.Send(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recv a message on the link
|
|
||||||
func (l *link) recv(m *transport.Message) error {
|
|
||||||
if m.Header == nil {
|
|
||||||
m.Header = make(map[string]string)
|
|
||||||
}
|
|
||||||
// receive the transport message
|
|
||||||
return l.socket.Recv(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect attempts to connect to an address and sets the socket
|
|
||||||
func (l *link) Connect() error {
|
|
||||||
l.Lock()
|
|
||||||
if l.connected {
|
|
||||||
l.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
defer l.Unlock()
|
|
||||||
|
|
||||||
// replace closed
|
|
||||||
l.closed = make(chan bool)
|
|
||||||
|
|
||||||
// assume existing socket
|
|
||||||
if len(l.addr) == 0 {
|
|
||||||
go l.process()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// dial the endpoint
|
|
||||||
c, err := l.transport.Dial(l.addr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the socket
|
|
||||||
l.socket = c
|
|
||||||
|
|
||||||
// kick start the processing
|
|
||||||
go l.process()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the link
|
|
||||||
func (l *link) Close() error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
close(l.closed)
|
|
||||||
l.Lock()
|
|
||||||
l.connected = false
|
|
||||||
l.Unlock()
|
|
||||||
return l.socket.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns the node id
|
|
||||||
func (l *link) Id() string {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.id
|
|
||||||
}
|
|
||||||
|
|
||||||
// the remote ip of the link
|
|
||||||
func (l *link) Remote() string {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.socket.Remote()
|
|
||||||
}
|
|
||||||
|
|
||||||
// the local ip of the link
|
|
||||||
func (l *link) Local() string {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.socket.Local()
|
|
||||||
}
|
|
||||||
|
|
||||||
// length/rate of the link
|
|
||||||
func (l *link) Length() int {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.length
|
|
||||||
}
|
|
||||||
|
|
||||||
// weight checks the size of the queues
|
|
||||||
func (l *link) Weight() int {
|
|
||||||
return len(l.sendQueue) + len(l.recvQueue)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept accepts a message on the socket
|
|
||||||
func (l *link) Recv(m *transport.Message) error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return io.EOF
|
|
||||||
case rm := <-l.recvQueue:
|
|
||||||
*m = *rm
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// never reach
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send sends a message on the socket immediately
|
|
||||||
func (l *link) Send(m *transport.Message) error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return io.EOF
|
|
||||||
case l.sendQueue <- m:
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *link) Status() string {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return "closed"
|
|
||||||
default:
|
|
||||||
return "connected"
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,61 +0,0 @@
|
|||||||
// Package link provides a measured link on top of a transport.Socket
|
|
||||||
package link
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/config/options"
|
|
||||||
"github.com/micro/go-micro/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrLinkClosed is returned when attempting i/o operation on the closed link
|
|
||||||
ErrLinkClosed = errors.New("link closed")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Link is a layer on top of a transport socket with the
|
|
||||||
// buffering send and recv queues with the ability to
|
|
||||||
// measure the actual transport link and reconnect if
|
|
||||||
// an address is specified.
|
|
||||||
type Link interface {
|
|
||||||
// provides the transport.Socket interface
|
|
||||||
transport.Socket
|
|
||||||
// Connect connects the link. It must be called first
|
|
||||||
// if there's an expectation to create a new socket.
|
|
||||||
Connect() error
|
|
||||||
// Id of the link is "local" if not specified
|
|
||||||
Id() string
|
|
||||||
// Status of the link
|
|
||||||
Status() string
|
|
||||||
// Depth of the buffers
|
|
||||||
Weight() int
|
|
||||||
// Rate of the link
|
|
||||||
Length() int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLink creates a new link on top of a socket
|
|
||||||
func NewLink(opts ...options.Option) Link {
|
|
||||||
return newLink(options.NewOptions(opts...))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets the link id which otherwise defaults to "local"
|
|
||||||
func Id(id string) options.Option {
|
|
||||||
return options.WithValue("link.id", id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The address to use for the link. Connect must be
|
|
||||||
// called for this to be used, its otherwise unused.
|
|
||||||
func Address(a string) options.Option {
|
|
||||||
return options.WithValue("link.address", a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The transport to use for the link where we
|
|
||||||
// want to dial the connection first.
|
|
||||||
func Transport(t transport.Transport) options.Option {
|
|
||||||
return options.WithValue("link.transport", t)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Socket sets the socket to use instead of dialing.
|
|
||||||
func Socket(s transport.Socket) options.Option {
|
|
||||||
return options.WithValue("link.socket", s)
|
|
||||||
}
|
|
@ -34,6 +34,8 @@ type Options struct {
|
|||||||
type DialOption func(*DialOptions)
|
type DialOption func(*DialOptions)
|
||||||
|
|
||||||
type DialOptions struct {
|
type DialOptions struct {
|
||||||
|
// Link specifies the link to use
|
||||||
|
Link string
|
||||||
// specify a multicast connection
|
// specify a multicast connection
|
||||||
Multicast bool
|
Multicast bool
|
||||||
// the dial timeout
|
// the dial timeout
|
||||||
@ -94,8 +96,17 @@ func DialMulticast() DialOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DialTimeout sets the dial timeout of the connection
|
||||||
func DialTimeout(t time.Duration) DialOption {
|
func DialTimeout(t time.Duration) DialOption {
|
||||||
return func(o *DialOptions) {
|
return func(o *DialOptions) {
|
||||||
o.Timeout = t
|
o.Timeout = t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DialLink specifies the link to pin this connection to.
|
||||||
|
// This is not applicable if the multicast option is set.
|
||||||
|
func DialLink(id string) DialOption {
|
||||||
|
return func(o *DialOptions) {
|
||||||
|
o.Link = id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -83,6 +83,10 @@ func (s *session) Local() string {
|
|||||||
return s.local
|
return s.local
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *session) Link() string {
|
||||||
|
return s.link
|
||||||
|
}
|
||||||
|
|
||||||
func (s *session) Id() string {
|
func (s *session) Id() string {
|
||||||
return s.session
|
return s.session
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,8 @@ var (
|
|||||||
ErrDialTimeout = errors.New("dial timeout")
|
ErrDialTimeout = errors.New("dial timeout")
|
||||||
// ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery
|
// ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery
|
||||||
ErrDiscoverChan = errors.New("failed to discover channel")
|
ErrDiscoverChan = errors.New("failed to discover channel")
|
||||||
|
// ErrLinkNotFound is returned when a link is specified at dial time and does not exist
|
||||||
|
ErrLinkNotFound = errors.New("link not found")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Tunnel creates a gre tunnel on top of the go-micro/transport.
|
// Tunnel creates a gre tunnel on top of the go-micro/transport.
|
||||||
@ -43,6 +45,8 @@ type Tunnel interface {
|
|||||||
type Link interface {
|
type Link interface {
|
||||||
// The id of the link
|
// The id of the link
|
||||||
Id() string
|
Id() string
|
||||||
|
// Status of the link e.g connected/closed
|
||||||
|
Status() string
|
||||||
// honours transport socket
|
// honours transport socket
|
||||||
transport.Socket
|
transport.Socket
|
||||||
}
|
}
|
||||||
@ -60,6 +64,8 @@ type Session interface {
|
|||||||
Id() string
|
Id() string
|
||||||
// The channel name
|
// The channel name
|
||||||
Channel() string
|
Channel() string
|
||||||
|
// The link the session is on
|
||||||
|
Link() string
|
||||||
// a transport socket
|
// a transport socket
|
||||||
transport.Socket
|
transport.Socket
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user