diff --git a/network/link/default.go b/network/link/default.go new file mode 100644 index 00000000..0e55f033 --- /dev/null +++ b/network/link/default.go @@ -0,0 +1,231 @@ +// Package link provides a measured transport.Socket link +package link + +import ( + "io" + "sync" + + "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/transport" +) + +type link struct { + sync.RWMutex + + // the link id + id string + + // the remote end to dial + addr string + + // channel used to close the link + closed chan bool + + // if its connected + connected bool + + // the transport to use + transport transport.Transport + + // the send queue to the socket + sendQueue chan *transport.Message + // the recv queue to the socket + recvQueue chan *transport.Message + + // the socket for this link + socket transport.Socket + + // determines the cost of the link + // based on queue length and roundtrip + length int + weight int +} + +func newLink(options options.Options) *link { + // default values + id := "local" + addr := "127.0.0.1:10001" + tr := transport.DefaultTransport + + lid, ok := options.Values().Get("link.id") + if ok { + id = lid.(string) + } + + laddr, ok := options.Values().Get("link.address") + if ok { + addr = laddr.(string) + } + + ltr, ok := options.Values().Get("link.transport") + if ok { + tr = ltr.(transport.Transport) + } + + l := &link{ + // the remote end to dial + addr: addr, + // transport to dial link + transport: tr, + // unique id assigned to the link + id: id, + // the closed channel used to close the conn + closed: make(chan bool), + // then send queue + sendQueue: make(chan *transport.Message, 128), + // the receive queue + recvQueue: make(chan *transport.Message, 128), + } + + // return the link + return l +} + +// link methods + +// process processes messages on the send queue. +// these are messages to be sent to the remote side. +func (l *link) process() { + go func() { + for { + m := new(transport.Message) + if err := l.recv(m); err != nil { + return + } + + select { + case l.recvQueue <- m: + case <-l.closed: + return + } + } + }() + + for { + select { + case m := <-l.sendQueue: + if err := l.send(m); err != nil { + return + } + case <-l.closed: + return + } + } +} + +// send a message over the link +func (l *link) send(m *transport.Message) error { + // TODO: measure time taken and calculate length/rate + // send via the transport socket + return l.socket.Send(m) +} + +// recv a message on the link +func (l *link) recv(m *transport.Message) error { + if m.Header == nil { + m.Header = make(map[string]string) + } + // receive the transport message + return l.socket.Recv(m) +} + +func (l *link) Connect() error { + l.Lock() + if l.connected { + l.Unlock() + return nil + } + + // replace closed + l.closed = make(chan bool) + + // dial the endpoint + c, err := l.transport.Dial(l.addr) + if err != nil { + l.Unlock() + return nil + } + + // set the socket + l.socket = c + + // kick start the processing + go l.process() + + return nil +} + +// Close the link +func (l *link) Close() error { + select { + case <-l.closed: + return nil + default: + close(l.closed) + l.Lock() + l.connected = false + l.Unlock() + return l.socket.Close() + } +} + +// returns the node id +func (l *link) Id() string { + l.RLock() + defer l.RUnlock() + return l.id +} + +func (l *link) Remote() string { + l.RLock() + defer l.RUnlock() + return l.socket.Remote() +} + +func (l *link) Local() string { + l.RLock() + defer l.RUnlock() + return l.socket.Local() +} + +func (l *link) Length() int { + l.RLock() + defer l.RUnlock() + return l.length +} + +func (l *link) Weight() int { + return len(l.sendQueue) + len(l.recvQueue) +} + +// Accept accepts a message on the socket +func (l *link) Recv(m *transport.Message) error { + select { + case <-l.closed: + return io.EOF + case rm := <-l.recvQueue: + *m = *rm + return nil + } + // never reach + return nil +} + +// Send sends a message on the socket immediately +func (l *link) Send(m *transport.Message) error { + select { + case <-l.closed: + return io.EOF + case l.sendQueue <- m: + } + return nil +} + +func (l *link) Status() string { + select { + case <-l.closed: + return "closed" + default: + return "connected" + } +} diff --git a/network/link/link.go b/network/link/link.go index e50c683f..7f3c7496 100644 --- a/network/link/link.go +++ b/network/link/link.go @@ -3,8 +3,6 @@ package link import ( "errors" - "io" - "sync" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/transport" @@ -28,231 +26,10 @@ type Link interface { Length() int } -type link struct { - sync.RWMutex - - // the link id - id string - - // the remote end to dial - addr string - - // channel used to close the link - closed chan bool - - // if its connected - connected bool - - // the transport to use - transport transport.Transport - - // the send queue to the socket - sendQueue chan *transport.Message - // the recv queue to the socket - recvQueue chan *transport.Message - - // the socket for this link - socket transport.Socket - - // determines the cost of the link - // based on queue length and roundtrip - length int - weight int -} - var ( ErrLinkClosed = errors.New("link closed") ) -func newLink(options options.Options) *link { - // default values - id := "local" - addr := "127.0.0.1:10001" - tr := transport.DefaultTransport - - lid, ok := options.Values().Get("link.id") - if ok { - id = lid.(string) - } - - laddr, ok := options.Values().Get("link.address") - if ok { - addr = laddr.(string) - } - - ltr, ok := options.Values().Get("link.transport") - if ok { - tr = ltr.(transport.Transport) - } - - l := &link{ - // the remote end to dial - addr: addr, - // transport to dial link - transport: tr, - // unique id assigned to the link - id: id, - // the closed channel used to close the conn - closed: make(chan bool), - // then send queue - sendQueue: make(chan *transport.Message, 128), - // the receive queue - recvQueue: make(chan *transport.Message, 128), - } - - // return the link - return l -} - -// link methods - -// process processes messages on the send queue. -// these are messages to be sent to the remote side. -func (l *link) process() { - go func() { - for { - m := new(transport.Message) - if err := l.recv(m); err != nil { - return - } - - select { - case l.recvQueue <- m: - case <-l.closed: - return - } - } - }() - - for { - select { - case m := <-l.sendQueue: - if err := l.send(m); err != nil { - return - } - case <-l.closed: - return - } - } -} - -// send a message over the link -func (l *link) send(m *transport.Message) error { - // TODO: measure time taken and calculate length/rate - // send via the transport socket - return l.socket.Send(m) -} - -// recv a message on the link -func (l *link) recv(m *transport.Message) error { - if m.Header == nil { - m.Header = make(map[string]string) - } - // receive the transport message - return l.socket.Recv(m) -} - -func (l *link) Connect() error { - l.Lock() - if l.connected { - l.Unlock() - return nil - } - - // replace closed - l.closed = make(chan bool) - - // dial the endpoint - c, err := l.transport.Dial(l.addr) - if err != nil { - l.Unlock() - return nil - } - - // set the socket - l.socket = c - - // kick start the processing - go l.process() - - return nil -} - -// Close the link -func (l *link) Close() error { - select { - case <-l.closed: - return nil - default: - close(l.closed) - l.Lock() - l.connected = false - l.Unlock() - return l.socket.Close() - } -} - -// returns the node id -func (l *link) Id() string { - l.RLock() - defer l.RUnlock() - return l.id -} - -func (l *link) Remote() string { - l.RLock() - defer l.RUnlock() - return l.socket.Remote() -} - -func (l *link) Local() string { - l.RLock() - defer l.RUnlock() - return l.socket.Local() -} - -func (l *link) Length() int { - l.RLock() - defer l.RUnlock() - return l.length -} - -func (l *link) Weight() int { - return len(l.sendQueue) + len(l.recvQueue) -} - -// Accept accepts a message on the socket -func (l *link) Recv(m *transport.Message) error { - select { - case <-l.closed: - return io.EOF - case rm := <-l.recvQueue: - *m = *rm - return nil - } - // never reach - return nil -} - -// Send sends a message on the socket immediately -func (l *link) Send(m *transport.Message) error { - select { - case <-l.closed: - return io.EOF - case l.sendQueue <- m: - } - return nil -} - -func (l *link) Status() string { - select { - case <-l.closed: - return "closed" - default: - return "connected" - } -} - // NewLink creates a new link on top of a socket func NewLink(opts ...options.Option) Link { return newLink(options.NewOptions(opts...))