micro/network/link.go

173 lines
2.7 KiB
Go
Raw Normal View History

package network
import (
2019-07-02 22:54:21 +03:00
"errors"
"io"
"sync"
2019-07-03 21:26:24 +03:00
"github.com/google/uuid"
2019-07-07 12:37:34 +03:00
"github.com/micro/go-micro/network/transport"
)
type link struct {
2019-07-03 21:26:24 +03:00
closed chan bool
2019-07-02 22:54:21 +03:00
sync.RWMutex
// the link id
id string
2019-07-02 22:54:21 +03:00
// the send queue to the socket
2019-07-03 21:26:24 +03:00
sendQueue chan *Message
// the recv queue to the socket
recvQueue chan *Message
// the socket for this link
2019-07-02 22:54:21 +03:00
socket transport.Socket
// determines the cost of the link
// based on queue length and roundtrip
length int
weight int
}
2019-07-03 21:26:24 +03:00
var (
ErrLinkClosed = errors.New("link closed")
)
2019-07-08 18:24:57 +03:00
func newLink(sock transport.Socket) *link {
l := &link{
2019-07-03 21:26:24 +03:00
id: uuid.New().String(),
socket: sock,
2019-07-08 18:24:57 +03:00
closed: make(chan bool),
2019-07-03 21:26:24 +03:00
sendQueue: make(chan *Message, 128),
recvQueue: make(chan *Message, 128),
}
2019-07-08 18:24:57 +03:00
go l.process()
return l
2019-07-03 21:26:24 +03:00
}
// link methods
2019-07-03 21:26:24 +03:00
// process processes messages on the send queue.
// these are messages to be sent to the remote side.
2019-07-02 22:54:21 +03:00
func (l *link) process() {
2019-07-03 21:26:24 +03:00
go func() {
for {
m := new(Message)
2019-07-08 18:24:57 +03:00
if err := l.recv(m); err != nil {
2019-07-03 21:26:24 +03:00
return
}
2019-07-03 21:51:40 +03:00
2019-07-03 21:26:24 +03:00
select {
case l.recvQueue <- m:
case <-l.closed:
return
}
}
}()
2019-07-02 22:54:21 +03:00
for {
select {
2019-07-03 21:26:24 +03:00
case m := <-l.sendQueue:
2019-07-08 18:24:57 +03:00
if err := l.send(m); err != nil {
2019-07-02 22:54:21 +03:00
return
}
2019-07-03 21:26:24 +03:00
case <-l.closed:
return
2019-07-02 22:54:21 +03:00
}
}
}
2019-07-02 22:54:21 +03:00
// send a message over the link
2019-07-08 18:24:57 +03:00
func (l *link) send(m *Message) error {
2019-07-02 22:54:21 +03:00
tm := new(transport.Message)
tm.Header = m.Header
tm.Body = m.Body
// send via the transport socket
2019-07-03 21:26:24 +03:00
return l.socket.Send(tm)
2019-07-02 22:54:21 +03:00
}
// recv a message on the link
2019-07-08 18:24:57 +03:00
func (l *link) recv(m *Message) error {
2019-07-02 22:54:21 +03:00
if m.Header == nil {
m.Header = make(map[string]string)
}
tm := new(transport.Message)
// receive the transport message
if err := l.socket.Recv(tm); err != nil {
return err
}
// set the message
m.Header = tm.Header
m.Body = tm.Body
2019-07-08 18:24:57 +03:00
return nil
2019-07-02 22:54:21 +03:00
}
// Close the link
func (l *link) Close() error {
2019-07-03 21:26:24 +03:00
select {
case <-l.closed:
return nil
default:
close(l.closed)
2019-07-08 18:24:57 +03:00
return l.socket.Close()
2019-07-03 21:26:24 +03:00
}
2019-07-02 22:54:21 +03:00
}
// returns the node id
func (l *link) Id() string {
l.RLock()
defer l.RUnlock()
2019-07-08 18:24:57 +03:00
return l.id
2019-07-02 22:54:21 +03:00
}
2019-07-08 18:24:57 +03:00
func (l *link) Remote() string {
2019-07-02 22:54:21 +03:00
l.RLock()
defer l.RUnlock()
2019-07-08 18:24:57 +03:00
return l.socket.Remote()
}
func (l *link) Local() string {
l.RLock()
defer l.RUnlock()
return l.socket.Local()
}
func (l *link) Length() int {
2019-07-02 22:54:21 +03:00
l.RLock()
defer l.RUnlock()
return l.length
}
func (l *link) Weight() int {
2019-07-03 21:26:24 +03:00
return len(l.sendQueue) + len(l.recvQueue)
}
2019-07-02 22:54:21 +03:00
2019-07-03 21:26:24 +03:00
// Accept accepts a message on the socket
2019-07-08 18:24:57 +03:00
func (l *link) Recv(m *Message) error {
2019-07-03 21:26:24 +03:00
select {
case <-l.closed:
2019-07-08 18:24:57 +03:00
return io.EOF
case rm := <-l.recvQueue:
*m = *rm
return nil
2019-07-02 22:54:21 +03:00
}
2019-07-03 21:26:24 +03:00
// never reach
2019-07-08 18:24:57 +03:00
return nil
2019-07-02 22:54:21 +03:00
}
2019-07-03 21:26:24 +03:00
// Send sends a message on the socket immediately
2019-07-02 22:54:21 +03:00
func (l *link) Send(m *Message) error {
2019-07-03 21:26:24 +03:00
select {
case <-l.closed:
2019-07-03 21:51:40 +03:00
return io.EOF
2019-07-03 21:26:24 +03:00
case l.sendQueue <- m:
}
return nil
2019-07-02 22:54:21 +03:00
}