364 lines
6.7 KiB
Go
364 lines
6.7 KiB
Go
package network
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
|
|
gproto "github.com/golang/protobuf/proto"
|
|
"github.com/google/uuid"
|
|
"github.com/micro/go-micro/codec"
|
|
"github.com/micro/go-micro/codec/proto"
|
|
pb "github.com/micro/go-micro/network/proto"
|
|
"github.com/micro/go-micro/transport"
|
|
)
|
|
|
|
type link struct {
|
|
// the embedded node
|
|
*node
|
|
|
|
closed chan bool
|
|
|
|
sync.RWMutex
|
|
|
|
// the link id
|
|
id string
|
|
|
|
// the send queue to the socket
|
|
sendQueue chan *Message
|
|
// the recv queue to the socket
|
|
recvQueue chan *Message
|
|
|
|
// codec we use to marshal things
|
|
codec codec.Marshaler
|
|
|
|
// the socket for this link
|
|
socket transport.Socket
|
|
|
|
// the lease for this link
|
|
lease *pb.Lease
|
|
|
|
// determines the cost of the link
|
|
// based on queue length and roundtrip
|
|
length int
|
|
weight int
|
|
}
|
|
|
|
var (
|
|
ErrLinkClosed = errors.New("link closed")
|
|
)
|
|
|
|
func newLink(n *node, sock transport.Socket, lease *pb.Lease) *link {
|
|
return &link{
|
|
id: uuid.New().String(),
|
|
closed: make(chan bool),
|
|
codec: &proto.Marshaler{},
|
|
node: n,
|
|
lease: lease,
|
|
socket: sock,
|
|
sendQueue: make(chan *Message, 128),
|
|
recvQueue: make(chan *Message, 128),
|
|
}
|
|
}
|
|
|
|
// link methods
|
|
|
|
// process processes messages on the send queue.
|
|
// these are messages to be sent to the remote side.
|
|
func (l *link) process() {
|
|
go func() {
|
|
for {
|
|
m := new(Message)
|
|
if err := l.recv(m, nil); err != nil {
|
|
return
|
|
}
|
|
|
|
// check if it's an internal close method
|
|
if m.Header["Micro-Method"] == "close" {
|
|
l.Close()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case l.recvQueue <- m:
|
|
case <-l.closed:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case m := <-l.sendQueue:
|
|
if err := l.send(m, nil); err != nil {
|
|
return
|
|
}
|
|
case <-l.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// accept waits for the connect message from the remote end
|
|
// if it receives anything else it throws an error
|
|
func (l *link) accept() error {
|
|
for {
|
|
m := new(transport.Message)
|
|
err := l.socket.Recv(m)
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: pick a reliable header
|
|
event := m.Header["Micro-Method"]
|
|
|
|
switch event {
|
|
// connect event
|
|
case "connect":
|
|
// process connect events from network.Connect()
|
|
// these are new connections to join the network
|
|
|
|
// decode the connection event
|
|
conn := new(pb.Connect)
|
|
// expecting a connect message
|
|
if err := l.codec.Unmarshal(m.Body, conn); err != nil {
|
|
// skip error
|
|
continue
|
|
}
|
|
|
|
// no micro id close the link
|
|
if len(conn.Muid) == 0 {
|
|
l.Close()
|
|
return errors.New("invalid muid " + conn.Muid)
|
|
}
|
|
|
|
// get the existing lease if it exists
|
|
lease := conn.Lease
|
|
// if there's no lease create a new one
|
|
if lease == nil {
|
|
// create a new lease/node
|
|
lease = l.node.network.lease(conn.Muid)
|
|
}
|
|
|
|
// check if we connected to ourself
|
|
if conn.Muid == l.node.muid {
|
|
// check our own leasae
|
|
l.node.Lock()
|
|
if l.node.lease == nil {
|
|
l.node.lease = lease
|
|
}
|
|
l.node.Unlock()
|
|
}
|
|
|
|
// set the author to our own muid
|
|
lease.Author = l.node.muid
|
|
|
|
// send back a lease offer for the node
|
|
if err := l.send(&Message{
|
|
Header: map[string]string{
|
|
"Micro-Method": "lease",
|
|
},
|
|
}, lease); err != nil {
|
|
return err
|
|
}
|
|
|
|
// the lease is saved
|
|
l.Lock()
|
|
l.lease = lease
|
|
l.Unlock()
|
|
|
|
// we've connected
|
|
// start processing the messages
|
|
go l.process()
|
|
return nil
|
|
case "close":
|
|
l.Close()
|
|
return io.EOF
|
|
default:
|
|
return errors.New("unknown method: " + event)
|
|
}
|
|
}
|
|
}
|
|
|
|
// connect sends a connect request and waits on a lease.
|
|
// this is for a new connection. in the event we send
|
|
// an existing lease, the same lease should be returned.
|
|
// if it differs then we assume our address for this link
|
|
// is different...
|
|
func (l *link) connect() error {
|
|
// get the current lease
|
|
l.RLock()
|
|
lease := l.lease
|
|
l.RUnlock()
|
|
|
|
// send a lease request
|
|
if err := l.send(&Message{
|
|
Header: map[string]string{
|
|
"Micro-Method": "connect",
|
|
},
|
|
}, &pb.Connect{Muid: l.node.muid, Lease: lease}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// create the new things
|
|
tm := new(Message)
|
|
newLease := new(pb.Lease)
|
|
|
|
// wait for a response, hopefully a lease
|
|
if err := l.recv(tm, newLease); err != nil {
|
|
return err
|
|
}
|
|
|
|
event := tm.Header["Micro-Method"]
|
|
|
|
// check the method
|
|
switch event {
|
|
case "lease":
|
|
// save the lease
|
|
l.Lock()
|
|
l.lease = newLease
|
|
l.Unlock()
|
|
|
|
// start processing the messages
|
|
go l.process()
|
|
case "close":
|
|
l.Close()
|
|
return io.EOF
|
|
default:
|
|
l.Close()
|
|
return errors.New("unable to attain lease")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// send a message over the link
|
|
func (l *link) send(m *Message, v interface{}) error {
|
|
tm := new(transport.Message)
|
|
tm.Header = m.Header
|
|
tm.Body = m.Body
|
|
|
|
// set the body if not nil
|
|
// we're assuming this is network message
|
|
if v != nil {
|
|
// encode the data
|
|
b, err := l.codec.Marshal(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// set the content type
|
|
tm.Header["Content-Type"] = "application/protobuf"
|
|
// set the marshalled body
|
|
tm.Body = b
|
|
}
|
|
|
|
// send via the transport socket
|
|
return l.socket.Send(tm)
|
|
}
|
|
|
|
// recv a message on the link
|
|
func (l *link) recv(m *Message, v interface{}) error {
|
|
if m.Header == nil {
|
|
m.Header = make(map[string]string)
|
|
}
|
|
|
|
tm := new(transport.Message)
|
|
|
|
// receive the transport message
|
|
if err := l.socket.Recv(tm); err != nil {
|
|
return err
|
|
}
|
|
|
|
// set the message
|
|
m.Header = tm.Header
|
|
m.Body = tm.Body
|
|
|
|
// bail early
|
|
if v == nil {
|
|
return nil
|
|
}
|
|
|
|
// try unmarshal the body
|
|
// skip if there's no content-type
|
|
if tm.Header["Content-Type"] != "application/protobuf" {
|
|
return nil
|
|
}
|
|
|
|
// return unmarshalled
|
|
return l.codec.Unmarshal(m.Body, v.(gproto.Message))
|
|
}
|
|
|
|
// Close the link
|
|
func (l *link) Close() error {
|
|
select {
|
|
case <-l.closed:
|
|
return nil
|
|
default:
|
|
close(l.closed)
|
|
}
|
|
|
|
// send a final close message
|
|
return l.socket.Send(&transport.Message{
|
|
Header: map[string]string{
|
|
"Micro-Method": "close",
|
|
},
|
|
})
|
|
}
|
|
|
|
// returns the node id
|
|
func (l *link) Id() string {
|
|
l.RLock()
|
|
defer l.RUnlock()
|
|
if l.lease == nil {
|
|
return ""
|
|
}
|
|
return l.lease.Node.Id
|
|
}
|
|
|
|
// Address of the node we're connected to
|
|
func (l *link) Address() string {
|
|
l.RLock()
|
|
defer l.RUnlock()
|
|
if l.lease == nil {
|
|
return l.socket.Remote()
|
|
}
|
|
// the node in the lease
|
|
return l.lease.Node.Address
|
|
}
|
|
|
|
func (l *link) Length() int {
|
|
l.RLock()
|
|
defer l.RUnlock()
|
|
return l.length
|
|
}
|
|
|
|
func (l *link) Weight() int {
|
|
return len(l.sendQueue) + len(l.recvQueue)
|
|
}
|
|
|
|
// Accept accepts a message on the socket
|
|
func (l *link) Accept() (*Message, error) {
|
|
select {
|
|
case <-l.closed:
|
|
return nil, io.EOF
|
|
case m := <-l.recvQueue:
|
|
return m, nil
|
|
}
|
|
// never reach
|
|
return nil, nil
|
|
}
|
|
|
|
// Send sends a message on the socket immediately
|
|
func (l *link) Send(m *Message) error {
|
|
select {
|
|
case <-l.closed:
|
|
return io.EOF
|
|
case l.sendQueue <- m:
|
|
}
|
|
return nil
|
|
}
|