rewrite network interface

This commit is contained in:
Asim Aslam 2019-07-08 16:24:57 +01:00
parent d3edad474e
commit 4a02e1ff2f
8 changed files with 244 additions and 896 deletions

View File

@ -1,144 +1,136 @@
package network
import (
"crypto/sha256"
"fmt"
"io"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/network/proxy"
"github.com/micro/go-micro/network/proxy/mucp"
"github.com/micro/go-micro/network/resolver"
"github.com/micro/go-micro/network/router"
"github.com/micro/go-micro/registry"
pb "github.com/micro/go-micro/network/proto"
nreg "github.com/micro/go-micro/network/resolver/registry"
"github.com/micro/go-micro/network/transport"
"github.com/micro/go-micro/util/addr"
)
// default network implementation
type network struct {
options.Options
// resolver use to connect to the network
resolver resolver.Resolver
// router used to find routes in the network
router router.Router
// proxy used to route through the network
proxy proxy.Proxy
// name of this network
// name of the network
name string
// links maintained for this network
// based on peers not nodes. maybe maintain
// node separately or note that links have nodes
mtx sync.RWMutex
links []Link
// transport
transport transport.Transport
}
// network methods
type listener struct {
// start accepting once
once sync.Once
// close channel to close the connection
closed chan bool
// the listener
listener transport.Listener
// the connection queue
conns chan Conn
}
// lease generates a new lease with a node id/address
// TODO: use a consensus mechanism, pool or some deterministic
// unique addressing method.
func (n *network) lease(muid string) *pb.Lease {
// create the id
id := uuid.New().String()
// create a timestamp
now := time.Now().UnixNano()
// create the address by hashing the id and timestamp
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%s-%d\n", id, now)))
// magic new address
address := fmt.Sprintf("%x", h.Sum(nil))
// return the node
return &pb.Lease{
Id: id,
Timestamp: now,
Node: &pb.Node{
Muid: muid,
Id: id,
Address: address,
Network: n.name,
func (n *network) Create() (*Node, error) {
ip, err := addr.Extract("")
if err != nil {
return nil, err
}
return &Node{
Id: fmt.Sprintf("%s-%s", n.name, uuid.New().String()),
Address: ip,
Metadata: map[string]string{
"network": n.Name(),
},
}
}
// lookup returns a list of network records in priority order of local
func (n *network) lookup(r registry.Registry) []*resolver.Record {
// create a registry resolver to find local nodes
rr := nreg.Resolver{Registry: r}
// get all the nodes for the network that are local
localRecords, err := rr.Resolve(n.Name())
if err != nil {
// we're not in a good place here
}
// if its a local network we never try lookup anything else
if n.Name() == "local" {
return localRecords
}
// now resolve incrementally based on resolvers specified
networkRecords, err := n.resolver.Resolve(n.Name())
if err != nil {
// still not in a good place
}
// return aggregate records
return append(localRecords, networkRecords...)
}, nil
}
func (n *network) Name() string {
return n.name
}
// Connect connects to the network and returns a new node.
// The node is the callers connection to the network. They
// should advertise this address to people. Anyone else
// on the network should be able to route to it.
func (n *network) Connect() (Node, error) {
return newNode(n)
func (n *network) Connect(node *Node) (Conn, error) {
c, err := n.transport.Dial(node.Address)
if err != nil {
return nil, err
}
return newLink(c.(transport.Socket)), nil
}
// Peer is used to establish a link between two networks.
// e.g micro.mu connects to example.com and share routes
// This is done by creating a new node on both networks
// and creating a link between them.
func (n *network) Peer(Network) (Link, error) {
// New network was created using NewNetwork after receiving routes from a different node
// Connect to the new network and be assigned a node
// Transfer data between the networks
// take other resolver
// order: registry (local), ...resolver
// resolve the network
// periodically connect to nodes resolved in the network
// and add to the network links
return nil, nil
func (n *network) Listen(node *Node) (Listener, error) {
l, err := n.transport.Listen(node.Address)
if err != nil {
return nil, err
}
return newListener(l), nil
}
func (l *listener) process() {
if err := l.listener.Accept(l.accept); err != nil {
// close the listener
l.Close()
}
}
func (l *listener) accept(sock transport.Socket) {
// create a new link and pass it through
link := newLink(sock)
// send it
l.conns <- link
// wait for it to be closed
select {
case <-l.closed:
return
case <-link.closed:
return
}
}
func (l *listener) Address() string {
return l.listener.Addr()
}
func (l *listener) Close() error {
select {
case <-l.closed:
return nil
default:
close(l.closed)
}
return nil
}
func (l *listener) Accept() (Conn, error) {
l.once.Do(func() {
// TODO: catch the error
go l.process()
})
select {
case c := <-l.conns:
return c, nil
case <-l.closed:
return nil, io.EOF
}
}
func newListener(l transport.Listener) *listener {
return &listener{
closed: make(chan bool),
conns: make(chan Conn),
listener: l,
}
}
// newNetwork returns a new network interface
func newNetwork(opts ...options.Option) *network {
options := options.NewOptions(opts...)
// new network instance with defaults
net := &network{
Options: options,
name: DefaultName,
router: router.DefaultRouter,
proxy: new(mucp.Proxy),
resolver: new(nreg.Resolver),
name: DefaultName,
transport: transport.DefaultTransport,
}
// get network name
@ -147,22 +139,10 @@ func newNetwork(opts ...options.Option) *network {
net.name = name.(string)
}
// get router
r, ok := options.Values().Get("network.router")
// get network transport
t, ok := options.Values().Get("network.transport")
if ok {
net.router = r.(router.Router)
}
// get proxy
p, ok := options.Values().Get("network.proxy")
if ok {
net.proxy = p.(proxy.Proxy)
}
// get resolver
res, ok := options.Values().Get("network.resolver")
if ok {
net.resolver = res.(resolver.Resolver)
net.transport = t.(transport.Transport)
}
return net

86
network/default_test.go Normal file
View File

@ -0,0 +1,86 @@
package network
import (
"io"
"testing"
)
func TestNetwork(t *testing.T) {
// create a new network
n := newNetwork()
// create a new node
node, err := n.Create()
if err != nil {
t.Fatal(err)
}
// set ourselves a random port
node.Address = node.Address + ":0"
l, err := n.Listen(node)
if err != nil {
t.Fatal(err)
}
wait := make(chan error)
go func() {
var gerr error
for {
c, err := l.Accept()
if err != nil {
gerr = err
break
}
m := new(Message)
if err := c.Recv(m); err != nil {
gerr = err
break
}
if err := c.Send(m); err != nil {
gerr = err
break
}
}
wait <- gerr
}()
node.Address = l.Address()
// connect to the node
conn, err := n.Connect(node)
if err != nil {
t.Fatal(err)
}
// send a message
if err := conn.Send(&Message{
Header: map[string]string{"Foo": "bar"},
Body: []byte(`hello world`),
}); err != nil {
t.Fatal(err)
}
m := new(Message)
// send a message
if err := conn.Recv(m); err != nil {
t.Fatal(err)
}
if m.Header["Foo"] != "bar" {
t.Fatalf("Received unexpected message %+v", m)
}
// close the listener
l.Close()
// get listener error
err = <-wait
if err != io.EOF {
t.Fatal(err)
}
}

View File

@ -5,18 +5,11 @@ import (
"io"
"sync"
gproto "github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/proto"
pb "github.com/micro/go-micro/network/proto"
"github.com/micro/go-micro/network/transport"
)
type link struct {
// the embedded node
*node
closed chan bool
sync.RWMutex
@ -29,15 +22,9 @@ type link struct {
// the recv queue to the socket
recvQueue chan *Message
// codec we use to marshal things
codec codec.Marshaler
// the socket for this link
socket transport.Socket
// the lease for this link
lease *pb.Lease
// determines the cost of the link
// based on queue length and roundtrip
length int
@ -48,17 +35,16 @@ var (
ErrLinkClosed = errors.New("link closed")
)
func newLink(n *node, sock transport.Socket, lease *pb.Lease) *link {
return &link{
func newLink(sock transport.Socket) *link {
l := &link{
id: uuid.New().String(),
closed: make(chan bool),
codec: &proto.Marshaler{},
node: n,
lease: lease,
socket: sock,
closed: make(chan bool),
sendQueue: make(chan *Message, 128),
recvQueue: make(chan *Message, 128),
}
go l.process()
return l
}
// link methods
@ -69,13 +55,7 @@ func (l *link) process() {
go func() {
for {
m := new(Message)
if err := l.recv(m, nil); err != nil {
return
}
// check if it's an internal close method
if m.Header["Micro-Method"] == "close" {
l.Close()
if err := l.recv(m); err != nil {
return
}
@ -90,7 +70,7 @@ func (l *link) process() {
for {
select {
case m := <-l.sendQueue:
if err := l.send(m, nil); err != nil {
if err := l.send(m); err != nil {
return
}
case <-l.closed:
@ -99,169 +79,17 @@ func (l *link) process() {
}
}
// accept waits for the connect message from the remote end
// if it receives anything else it throws an error
func (l *link) accept() error {
for {
m := new(transport.Message)
err := l.socket.Recv(m)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// TODO: pick a reliable header
event := m.Header["Micro-Method"]
switch event {
// connect event
case "connect":
// process connect events from network.Connect()
// these are new connections to join the network
// decode the connection event
conn := new(pb.Connect)
// expecting a connect message
if err := l.codec.Unmarshal(m.Body, conn); err != nil {
// skip error
continue
}
// no micro id close the link
if len(conn.Muid) == 0 {
l.Close()
return errors.New("invalid muid " + conn.Muid)
}
// get the existing lease if it exists
lease := conn.Lease
// if there's no lease create a new one
if lease == nil {
// create a new lease/node
lease = l.node.network.lease(conn.Muid)
}
// check if we connected to ourself
if conn.Muid == l.node.muid {
// check our own leasae
l.node.Lock()
if l.node.lease == nil {
l.node.lease = lease
}
l.node.Unlock()
}
// set the author to our own muid
lease.Author = l.node.muid
// send back a lease offer for the node
if err := l.send(&Message{
Header: map[string]string{
"Micro-Method": "lease",
},
}, lease); err != nil {
return err
}
// the lease is saved
l.Lock()
l.lease = lease
l.Unlock()
// we've connected
// start processing the messages
go l.process()
return nil
case "close":
l.Close()
return io.EOF
default:
return errors.New("unknown method: " + event)
}
}
}
// connect sends a connect request and waits on a lease.
// this is for a new connection. in the event we send
// an existing lease, the same lease should be returned.
// if it differs then we assume our address for this link
// is different...
func (l *link) connect() error {
// get the current lease
l.RLock()
lease := l.lease
l.RUnlock()
// send a lease request
if err := l.send(&Message{
Header: map[string]string{
"Micro-Method": "connect",
},
}, &pb.Connect{Muid: l.node.muid, Lease: lease}); err != nil {
return err
}
// create the new things
tm := new(Message)
newLease := new(pb.Lease)
// wait for a response, hopefully a lease
if err := l.recv(tm, newLease); err != nil {
return err
}
event := tm.Header["Micro-Method"]
// check the method
switch event {
case "lease":
// save the lease
l.Lock()
l.lease = newLease
l.Unlock()
// start processing the messages
go l.process()
case "close":
l.Close()
return io.EOF
default:
l.Close()
return errors.New("unable to attain lease")
}
return nil
}
// send a message over the link
func (l *link) send(m *Message, v interface{}) error {
func (l *link) send(m *Message) error {
tm := new(transport.Message)
tm.Header = m.Header
tm.Body = m.Body
// set the body if not nil
// we're assuming this is network message
if v != nil {
// encode the data
b, err := l.codec.Marshal(v)
if err != nil {
return err
}
// set the content type
tm.Header["Content-Type"] = "application/protobuf"
// set the marshalled body
tm.Body = b
}
// send via the transport socket
return l.socket.Send(tm)
}
// recv a message on the link
func (l *link) recv(m *Message, v interface{}) error {
func (l *link) recv(m *Message) error {
if m.Header == nil {
m.Header = make(map[string]string)
}
@ -277,19 +105,7 @@ func (l *link) recv(m *Message, v interface{}) error {
m.Header = tm.Header
m.Body = tm.Body
// bail early
if v == nil {
return nil
}
// try unmarshal the body
// skip if there's no content-type
if tm.Header["Content-Type"] != "application/protobuf" {
return nil
}
// return unmarshalled
return l.codec.Unmarshal(m.Body, v.(gproto.Message))
return nil
}
// Close the link
@ -299,35 +115,27 @@ func (l *link) Close() error {
return nil
default:
close(l.closed)
return l.socket.Close()
}
// send a final close message
return l.socket.Send(&transport.Message{
Header: map[string]string{
"Micro-Method": "close",
},
})
}
// returns the node id
func (l *link) Id() string {
l.RLock()
defer l.RUnlock()
if l.lease == nil {
return ""
}
return l.lease.Node.Id
return l.id
}
// Address of the node we're connected to
func (l *link) Address() string {
func (l *link) Remote() string {
l.RLock()
defer l.RUnlock()
if l.lease == nil {
return l.socket.Remote()
}
// the node in the lease
return l.lease.Node.Address
return l.socket.Remote()
}
func (l *link) Local() string {
l.RLock()
defer l.RUnlock()
return l.socket.Local()
}
func (l *link) Length() int {
@ -341,15 +149,16 @@ func (l *link) Weight() int {
}
// Accept accepts a message on the socket
func (l *link) Accept() (*Message, error) {
func (l *link) Recv(m *Message) error {
select {
case <-l.closed:
return nil, io.EOF
case m := <-l.recvQueue:
return m, nil
return io.EOF
case rm := <-l.recvQueue:
*m = *rm
return nil
}
// never reach
return nil, nil
return nil
}
// Send sends a message on the socket immediately

View File

@ -10,46 +10,46 @@ import (
// is responsible for routing messages to the correct services.
type Network interface {
options.Options
// Create starts the network
Create() (*Node, error)
// Name of the network
Name() string
// Connect to the network
Connect() (Node, error)
// Peer with a neighboring network
Peer(Network) (Link, error)
// Connect to a node
Connect(*Node) (Conn, error)
// Listen for connections
Listen(*Node) (Listener, error)
}
// Node represents a single node on a network
type Node interface {
// Id of the node
Id() string
// Address of the node
type Node struct {
Id string
Address string
Metadata map[string]string
}
type Listener interface {
Address() string
// The network of the node
Network() string
// Close the network connection
Close() error
// Accept messages on the network
Accept() (*Message, error)
// Send a message to the network
Accept() (Conn, error)
}
type Conn interface {
// Unique id of the connection
Id() string
// Close the connection
Close() error
// Send a message
Send(*Message) error
// Receive a message
Recv(*Message) error
// The remote node
Remote() string
// The local node
Local() string
}
// Link is a connection between one network and another
type Link interface {
// remote node the link is peered with
Node
// length defines the speed or distance of the link
Length() int
// weight defines the saturation or usage of the link
Weight() int
}
// Message is the base type for opaque data
type Message struct {
// Headers which provide local/remote info
Header map[string]string
// The opaque data being sent
Body []byte
Body []byte
}
var (

View File

@ -1,44 +0,0 @@
package mucp
import (
"github.com/micro/go-micro/network/transport"
)
type listener struct {
// stream id
id string
// address of the listener
addr string
// close channel
closed chan bool
// accept socket
accept chan *socket
}
func (n *listener) Addr() string {
return n.addr
}
func (n *listener) Close() error {
select {
case <-n.closed:
default:
close(n.closed)
}
return nil
}
func (n *listener) Accept(fn func(s transport.Socket)) error {
for {
select {
case <-n.closed:
return nil
case s, ok := <-n.accept:
if !ok {
return nil
}
go fn(s)
}
}
return nil
}

View File

@ -1,342 +0,0 @@
// Package mucp provides a mucp network transport
package mucp
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"sync"
"github.com/micro/go-micro/network"
"github.com/micro/go-micro/network/transport"
)
type networkKey struct{}
// Transport is a mucp transport. It should only
// be created with NewTransport and cast to
// *Transport if there's a need to close it.
type Transport struct {
options transport.Options
// the network interface
network network.Network
// protect all the things
sync.RWMutex
// connect
connected bool
// connected node
node network.Node
// the send channel
send chan *message
// close channel
closed chan bool
// sockets
sockets map[string]*socket
// listeners
listeners map[string]*listener
}
func (n *Transport) newListener(addr string) *listener {
// hash the id
h := sha256.New()
h.Write([]byte(addr))
id := fmt.Sprintf("%x", h.Sum(nil))
// create the listener
l := &listener{
id: id,
addr: addr,
closed: make(chan bool),
accept: make(chan *socket, 128),
}
// save it
n.Lock()
n.listeners[id] = l
n.Unlock()
return l
}
func (n *Transport) getListener(id string) (*listener, bool) {
// get the listener
n.RLock()
s, ok := n.listeners[id]
n.RUnlock()
return s, ok
}
func (n *Transport) getSocket(id string) (*socket, bool) {
// get the socket
n.RLock()
s, ok := n.sockets[id]
n.RUnlock()
return s, ok
}
func (n *Transport) newSocket(id string) *socket {
// hash the id
h := sha256.New()
h.Write([]byte(id))
id = fmt.Sprintf("%x", h.Sum(nil))
// new socket
s := &socket{
id: id,
closed: make(chan bool),
recv: make(chan *message, 128),
send: n.send,
}
// save socket
n.Lock()
n.sockets[id] = s
n.Unlock()
// return socket
return s
}
// process outgoing messages
func (n *Transport) process() {
// manage the send buffer
// all pseudo sockets throw everything down this
for {
select {
case msg := <-n.send:
netmsg := &network.Message{
Header: msg.data.Header,
Body: msg.data.Body,
}
// set the stream id on the outgoing message
netmsg.Header["Micro-Stream"] = msg.id
// send the message via the interface
if err := n.node.Send(netmsg); err != nil {
// no op
// TODO: do something
}
case <-n.closed:
return
}
}
}
// process incoming messages
func (n *Transport) listen() {
for {
// process anything via the net interface
msg, err := n.node.Accept()
if err != nil {
return
}
// a stream id
id := msg.Header["Micro-Stream"]
// get the socket
s, exists := n.getSocket(id)
if !exists {
// get the listener
l, ok := n.getListener(id)
// there's no socket and there's no listener
if !ok {
continue
}
// listener is closed
select {
case <-l.closed:
// delete it
n.Lock()
delete(n.listeners, l.id)
n.Unlock()
continue
default:
}
// no socket, create one
s = n.newSocket(id)
// set remote address
s.remote = msg.Header["Remote"]
// drop that to the listener
// TODO: non blocking
l.accept <- s
}
// is the socket closed?
select {
case <-s.closed:
// closed
delete(n.sockets, id)
continue
default:
// process
}
tmsg := &transport.Message{
Header: msg.Header,
Body: msg.Body,
}
// TODO: don't block on queuing
// append to recv backlog
s.recv <- &message{id: id, data: tmsg}
}
}
func (n *Transport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&n.options)
}
return nil
}
func (n *Transport) Options() transport.Options {
return n.options
}
// Close the tunnel
func (n *Transport) Close() error {
n.Lock()
defer n.Unlock()
if !n.connected {
return nil
}
select {
case <-n.closed:
return nil
default:
// close all the sockets
for _, s := range n.sockets {
s.Close()
}
for _, l := range n.listeners {
l.Close()
}
// close the connection
close(n.closed)
// close node connection
n.node.Close()
// reset connected
n.connected = false
}
return nil
}
// Connect the tunnel
func (n *Transport) Connect() error {
n.Lock()
defer n.Unlock()
// already connected
if n.connected {
return nil
}
// get a new node
node, err := n.network.Connect()
if err != nil {
return err
}
// set as connected
n.connected = true
// create new close channel
n.closed = make(chan bool)
// save node
n.node = node
// process messages to be sent
go n.process()
// process incoming messages
go n.listen()
return nil
}
// Dial an address
func (n *Transport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
if err := n.Connect(); err != nil {
return nil, err
}
// create new socket
s := n.newSocket(addr)
// set remote
s.remote = addr
// set local
n.RLock()
s.local = n.node.Address()
n.RUnlock()
return s, nil
}
func (n *Transport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
// check existing listeners
n.RLock()
for _, l := range n.listeners {
if l.addr == addr {
n.RUnlock()
return nil, errors.New("already listening on " + addr)
}
}
n.RUnlock()
// try to connect to the network
if err := n.Connect(); err != nil {
return nil, err
}
return n.newListener(addr), nil
}
func (n *Transport) String() string {
return "network"
}
// NewTransport creates a new network transport
func NewTransport(opts ...transport.Option) transport.Transport {
options := transport.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
// get the network interface
n, ok := options.Context.Value(networkKey{}).(network.Network)
if !ok {
n = network.DefaultNetwork
}
return &Transport{
options: options,
network: n,
send: make(chan *message, 128),
closed: make(chan bool),
sockets: make(map[string]*socket),
}
}
// WithNetwork sets the network interface
func WithNetwork(n network.Network) transport.Option {
return func(o *transport.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, networkKey{}, n)
}
}

View File

@ -1,80 +0,0 @@
package mucp
import (
"errors"
"github.com/micro/go-micro/network/transport"
)
// socket is our pseudo socket for transport.Socket
type socket struct {
// socket id based on Micro-Stream
id string
// closed
closed chan bool
// remote addr
remote string
// local addr
local string
// send chan
send chan *message
// recv chan
recv chan *message
}
// message is sent over the send channel
type message struct {
// socket id
id string
// transport data
data *transport.Message
}
func (s *socket) Remote() string {
return s.remote
}
func (s *socket) Local() string {
return s.local
}
func (s *socket) Id() string {
return s.id
}
func (s *socket) Send(m *transport.Message) error {
select {
case <-s.closed:
return errors.New("socket is closed")
default:
// no op
}
// append to backlog
s.send <- &message{id: s.id, data: m}
return nil
}
func (s *socket) Recv(m *transport.Message) error {
select {
case <-s.closed:
return errors.New("socket is closed")
default:
// no op
}
// recv from backlog
msg := <-s.recv
// set message
*m = *msg.data
// return nil
return nil
}
func (s *socket) Close() error {
select {
case <-s.closed:
// no op
default:
close(s.closed)
}
return nil
}

View File

@ -1,61 +0,0 @@
package mucp
import (
"testing"
"github.com/micro/go-micro/network/transport"
)
func TestTunnelSocket(t *testing.T) {
s := &socket{
id: "1",
closed: make(chan bool),
remote: "remote",
local: "local",
send: make(chan *message, 1),
recv: make(chan *message, 1),
}
// check addresses local and remote
if s.Local() != s.local {
t.Fatalf("Expected s.Local %s got %s", s.local, s.Local())
}
if s.Remote() != s.remote {
t.Fatalf("Expected s.Remote %s got %s", s.remote, s.Remote())
}
// send a message
s.Send(&transport.Message{Header: map[string]string{}})
// get sent message
msg := <-s.send
if msg.id != s.id {
t.Fatalf("Expected sent message id %s got %s", s.id, msg.id)
}
// recv a message
msg.data.Header["Foo"] = "bar"
s.recv <- msg
m := new(transport.Message)
s.Recv(m)
// check header
if m.Header["Foo"] != "bar" {
t.Fatalf("Did not receive correct message %+v", m)
}
// close the connection
s.Close()
// check connection
err := s.Send(m)
if err == nil {
t.Fatal("Expected closed connection")
}
err = s.Recv(m)
if err == nil {
t.Fatal("Expected closed connection")
}
}