micro/network/link.go

368 lines
6.9 KiB
Go
Raw Normal View History

package network
import (
2019-07-02 22:54:21 +03:00
"errors"
"io"
"sync"
2019-07-02 22:54:21 +03:00
gproto "github.com/golang/protobuf/proto"
2019-07-03 21:26:24 +03:00
"github.com/google/uuid"
2019-07-02 22:54:21 +03:00
"github.com/micro/go-micro/codec"
2019-07-03 21:26:24 +03:00
"github.com/micro/go-micro/codec/proto"
pb "github.com/micro/go-micro/network/proto"
2019-07-02 22:54:21 +03:00
"github.com/micro/go-micro/transport"
2019-07-03 21:26:24 +03:00
"github.com/micro/go-micro/util/log"
)
type link struct {
// the embedded node
*node
2019-07-03 21:26:24 +03:00
closed chan bool
2019-07-02 22:54:21 +03:00
sync.RWMutex
// the link id
id string
2019-07-02 22:54:21 +03:00
// the send queue to the socket
2019-07-03 21:26:24 +03:00
sendQueue chan *Message
// the recv queue to the socket
recvQueue chan *Message
2019-07-02 22:54:21 +03:00
// codec we use to marshal things
codec codec.Marshaler
// the socket for this link
2019-07-02 22:54:21 +03:00
socket transport.Socket
// the lease for this link
lease *pb.Lease
// determines the cost of the link
// based on queue length and roundtrip
length int
weight int
}
2019-07-03 21:26:24 +03:00
var (
ErrLinkClosed = errors.New("link closed")
)
func newLink(n *node, sock transport.Socket, lease *pb.Lease) *link {
return &link{
id: uuid.New().String(),
closed: make(chan bool),
codec: &proto.Marshaler{},
node: n,
lease: lease,
socket: sock,
sendQueue: make(chan *Message, 128),
recvQueue: make(chan *Message, 128),
}
}
// link methods
2019-07-03 21:26:24 +03:00
// process processes messages on the send queue.
// these are messages to be sent to the remote side.
2019-07-02 22:54:21 +03:00
func (l *link) process() {
2019-07-03 21:26:24 +03:00
go func() {
for {
m := new(Message)
if err := l.recv(m, nil); err != nil {
l.Close()
return
}
select {
case l.recvQueue <- m:
log.Debugf("%s processing recv", l.id)
case <-l.closed:
return
}
}
}()
2019-07-02 22:54:21 +03:00
for {
select {
2019-07-03 21:26:24 +03:00
case m := <-l.sendQueue:
2019-07-02 22:54:21 +03:00
if err := l.send(m, nil); err != nil {
2019-07-03 21:26:24 +03:00
l.Close()
2019-07-02 22:54:21 +03:00
return
}
2019-07-03 21:26:24 +03:00
case <-l.closed:
return
2019-07-02 22:54:21 +03:00
}
}
}
2019-07-02 22:54:21 +03:00
// accept waits for the connect message from the remote end
// if it receives anything else it throws an error
func (l *link) accept() error {
for {
m := new(transport.Message)
err := l.socket.Recv(m)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// TODO: pick a reliable header
event := m.Header["Micro-Method"]
switch event {
// connect event
2019-07-03 21:26:24 +03:00
case "connect":
2019-07-02 22:54:21 +03:00
// process connect events from network.Connect()
// these are new connections to join the network
// decode the connection event
conn := new(pb.Connect)
2019-07-03 21:26:24 +03:00
// expecting a connect message
2019-07-02 22:54:21 +03:00
if err := l.codec.Unmarshal(m.Body, conn); err != nil {
// skip error
continue
}
2019-07-03 21:26:24 +03:00
// no micro id close the link
if len(conn.Muid) == 0 {
l.Close()
return errors.New("invalid muid " + conn.Muid)
}
2019-07-02 22:54:21 +03:00
// get the existing lease if it exists
lease := conn.Lease
// if there's no lease create a new one
if lease == nil {
// create a new lease/node
2019-07-03 21:26:24 +03:00
lease = l.node.network.lease(conn.Muid)
2019-07-02 22:54:21 +03:00
}
2019-07-03 21:26:24 +03:00
// check if we connected to ourself
if conn.Muid == l.node.muid {
// check our own leasae
l.node.Lock()
if l.node.lease == nil {
l.node.lease = lease
}
l.node.Unlock()
}
// set the author to our own muid
lease.Author = l.node.muid
2019-07-02 22:54:21 +03:00
// send back a lease offer for the node
if err := l.send(&Message{
Header: map[string]string{
2019-07-03 21:26:24 +03:00
"Micro-Method": "lease",
2019-07-02 22:54:21 +03:00
},
}, lease); err != nil {
return err
}
// the lease is saved
l.Lock()
l.lease = lease
l.Unlock()
// we've connected
// start processing the messages
go l.process()
return nil
2019-07-03 21:26:24 +03:00
case "close":
2019-07-02 22:54:21 +03:00
l.Close()
2019-07-03 21:26:24 +03:00
return io.EOF
2019-07-02 22:54:21 +03:00
default:
return errors.New("unknown method: " + event)
}
}
}
// connect sends a connect request and waits on a lease.
// this is for a new connection. in the event we send
// an existing lease, the same lease should be returned.
// if it differs then we assume our address for this link
// is different...
func (l *link) connect() error {
// get the current lease
l.RLock()
lease := l.lease
l.RUnlock()
// send a lease request
if err := l.send(&Message{
Header: map[string]string{
2019-07-03 21:26:24 +03:00
"Micro-Method": "connect",
2019-07-02 22:54:21 +03:00
},
2019-07-03 21:26:24 +03:00
}, &pb.Connect{Muid: l.node.muid, Lease: lease}); err != nil {
2019-07-02 22:54:21 +03:00
return err
}
// create the new things
tm := new(Message)
newLease := new(pb.Lease)
// wait for a response, hopefully a lease
if err := l.recv(tm, newLease); err != nil {
return err
}
event := tm.Header["Micro-Method"]
// check the method
switch event {
2019-07-03 21:26:24 +03:00
case "lease":
2019-07-02 22:54:21 +03:00
// save the lease
l.Lock()
l.lease = newLease
l.Unlock()
// start processing the messages
go l.process()
2019-07-03 21:26:24 +03:00
case "close":
l.Close()
return io.EOF
2019-07-02 22:54:21 +03:00
default:
2019-07-03 21:26:24 +03:00
l.Close()
2019-07-02 22:54:21 +03:00
return errors.New("unable to attain lease")
}
return nil
}
// send a message over the link
func (l *link) send(m *Message, v interface{}) error {
tm := new(transport.Message)
tm.Header = m.Header
tm.Body = m.Body
// set the body if not nil
// we're assuming this is network message
if v != nil {
// encode the data
b, err := l.codec.Marshal(v)
if err != nil {
return err
}
// set the content type
tm.Header["Content-Type"] = "application/protobuf"
// set the marshalled body
tm.Body = b
}
// send via the transport socket
2019-07-03 21:26:24 +03:00
return l.socket.Send(tm)
2019-07-02 22:54:21 +03:00
}
// recv a message on the link
func (l *link) recv(m *Message, v interface{}) error {
if m.Header == nil {
m.Header = make(map[string]string)
}
tm := new(transport.Message)
2019-07-03 21:26:24 +03:00
log.Debugf("link %s attempting receiving", l.id)
2019-07-02 22:54:21 +03:00
// receive the transport message
if err := l.socket.Recv(tm); err != nil {
return err
}
2019-07-03 21:26:24 +03:00
log.Debugf("link %s received %+v %+v\n", l.id, tm, v)
2019-07-02 22:54:21 +03:00
// set the message
m.Header = tm.Header
m.Body = tm.Body
// bail early
if v == nil {
return nil
}
// try unmarshal the body
// skip if there's no content-type
if tm.Header["Content-Type"] != "application/protobuf" {
return nil
}
// return unmarshalled
return l.codec.Unmarshal(m.Body, v.(gproto.Message))
}
// Close the link
func (l *link) Close() error {
2019-07-03 21:26:24 +03:00
select {
case <-l.closed:
return nil
default:
close(l.closed)
}
2019-07-02 22:54:21 +03:00
// send a final close message
l.socket.Send(&transport.Message{
Header: map[string]string{
2019-07-03 21:26:24 +03:00
"Micro-Method": "close",
2019-07-02 22:54:21 +03:00
},
})
2019-07-03 21:26:24 +03:00
2019-07-02 22:54:21 +03:00
// close the socket
return l.socket.Close()
}
// returns the node id
func (l *link) Id() string {
l.RLock()
defer l.RUnlock()
if l.lease == nil {
return ""
}
return l.lease.Node.Id
}
// Address of the node we're connected to
func (l *link) Address() string {
l.RLock()
defer l.RUnlock()
if l.lease == nil {
return l.socket.Remote()
}
// the node in the lease
return l.lease.Node.Address
}
func (l *link) Length() int {
2019-07-02 22:54:21 +03:00
l.RLock()
defer l.RUnlock()
return l.length
}
func (l *link) Weight() int {
2019-07-03 21:26:24 +03:00
return len(l.sendQueue) + len(l.recvQueue)
}
2019-07-02 22:54:21 +03:00
2019-07-03 21:26:24 +03:00
// Accept accepts a message on the socket
2019-07-02 22:54:21 +03:00
func (l *link) Accept() (*Message, error) {
2019-07-03 21:26:24 +03:00
select {
case <-l.closed:
return nil, ErrLinkClosed
case m := <-l.recvQueue:
return m, nil
2019-07-02 22:54:21 +03:00
}
2019-07-03 21:26:24 +03:00
// never reach
return nil, nil
2019-07-02 22:54:21 +03:00
}
2019-07-03 21:26:24 +03:00
// Send sends a message on the socket immediately
2019-07-02 22:54:21 +03:00
func (l *link) Send(m *Message) error {
2019-07-03 21:26:24 +03:00
select {
case <-l.closed:
return ErrLinkClosed
case l.sendQueue <- m:
}
return nil
2019-07-02 22:54:21 +03:00
}