diff --git a/tunnel/default.go b/tunnel/default.go index 7d265544..001c7156 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -932,6 +932,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // non multicast so we need to find the link t.RLock() for _, link := range t.links { + // use the link specified it its available + if id := options.Link; len(id) > 0 && link.id != id { + continue + } + link.RLock() _, ok := link.channels[channel] link.RUnlock() @@ -944,6 +949,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } t.RUnlock() + // link not found + if len(links) == 0 && len(options.Link) > 0 { + return nil, ErrLinkNotFound + } + // discovered so set the link if not multicast // TODO: pick the link efficiently based // on link status and saturation. diff --git a/tunnel/link.go b/tunnel/link.go index f00f1797..a8423c73 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -1,7 +1,6 @@ package tunnel import ( - "io" "sync" "time" @@ -32,98 +31,19 @@ type link struct { lastKeepAlive time.Time // channels keeps a mapping of channels and last seen channels map[string]time.Time - - // the send queue to the socket - sendQueue chan *transport.Message - // the recv queue to the socket - recvQueue chan *transport.Message - - // determines the cost of the link - // based on queue length and roundtrip - length int - weight int } func newLink(s transport.Socket) *link { l := &link{ - Socket: s, - id: uuid.New().String(), - channels: make(map[string]time.Time), - closed: make(chan bool), - sendQueue: make(chan *transport.Message, 128), - recvQueue: make(chan *transport.Message, 128), + Socket: s, + id: uuid.New().String(), + channels: make(map[string]time.Time), + closed: make(chan bool), } go l.expiry() - go l.process() return l } -// process processes messages on the send and receive queues. -func (l *link) process() { - go func() { - for { - m := new(transport.Message) - if err := l.Socket.Recv(m); err != nil { - return - } - - select { - case l.recvQueue <- m: - case <-l.closed: - return - } - } - }() - - // messages sent - i := 0 - length := 0 - - for { - select { - case m := <-l.sendQueue: - t := time.Now() - - // send the message - if err := l.Socket.Send(m); err != nil { - return - } - - // get header size, body size and time taken - hl := len(m.Header) - bl := len(m.Body) - d := time.Since(t) - - // don't calculate on empty messages - if hl == 0 && bl == 0 { - continue - } - - // increment sent - i++ - - // time take to send some bits and bytes - td := float64(hl+bl) / float64(d.Nanoseconds()) - // increase the scale - td += 1 - - // judge the length - length = int(td) / (length + int(td)) - - // every 10 messages update length - if (i % 10) == 1 { - // cost average the length - // save it - l.Lock() - l.length = length - l.Unlock() - } - case <-l.closed: - return - } - } -} - // watches the channel expiry func (l *link) expiry() { t := time.NewTicker(time.Minute) @@ -180,41 +100,6 @@ func (l *link) Close() error { return nil } -// length/rate of the link -func (l *link) Length() int { - l.RLock() - defer l.RUnlock() - return l.length -} - -// weight checks the size of the queues -func (l *link) Weight() int { - return len(l.sendQueue) + len(l.recvQueue) -} - -// Accept accepts a message on the socket -func (l *link) Recv(m *transport.Message) error { - select { - case <-l.closed: - return io.EOF - case rm := <-l.recvQueue: - *m = *rm - return nil - } - // never reach - return nil -} - -// Send sends a message on the socket immediately -func (l *link) Send(m *transport.Message) error { - select { - case <-l.closed: - return io.EOF - case l.sendQueue <- m: - } - return nil -} - func (l *link) Status() string { select { case <-l.closed: diff --git a/tunnel/options.go b/tunnel/options.go index 39795671..903f7fb7 100644 --- a/tunnel/options.go +++ b/tunnel/options.go @@ -34,6 +34,8 @@ type Options struct { type DialOption func(*DialOptions) type DialOptions struct { + // Link specifies the link to use + Link string // specify a multicast connection Multicast bool // the dial timeout @@ -94,8 +96,17 @@ func DialMulticast() DialOption { } } +// DialTimeout sets the dial timeout of the connection func DialTimeout(t time.Duration) DialOption { return func(o *DialOptions) { o.Timeout = t } } + +// DialLink specifies the link to pin this connection to. +// This is not applicable if the multicast option is set. +func DialLink(id string) DialOption { + return func(o *DialOptions) { + o.Link = id + } +} diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 14604215..29a479e6 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -15,6 +15,8 @@ var ( ErrDialTimeout = errors.New("dial timeout") // ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery ErrDiscoverChan = errors.New("failed to discover channel") + // ErrLinkNotFound is returned when a link is specified at dial time and does not exist + ErrLinkNotFound = errors.New("link not found") ) // Tunnel creates a gre tunnel on top of the go-micro/transport.