remove network
This commit is contained in:
parent
0bf54c122f
commit
eda380284c
@ -1,164 +0,0 @@
|
|||||||
package network
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/micro/go-micro/config/options"
|
|
||||||
"github.com/micro/go-micro/transport"
|
|
||||||
"github.com/micro/go-micro/util/addr"
|
|
||||||
)
|
|
||||||
|
|
||||||
// default network implementation
|
|
||||||
type network struct {
|
|
||||||
options.Options
|
|
||||||
|
|
||||||
// name of the network
|
|
||||||
name string
|
|
||||||
|
|
||||||
// network address used where one is specified
|
|
||||||
address string
|
|
||||||
|
|
||||||
// transport
|
|
||||||
transport transport.Transport
|
|
||||||
}
|
|
||||||
|
|
||||||
type listener struct {
|
|
||||||
// start accepting once
|
|
||||||
once sync.Once
|
|
||||||
// close channel to close the connection
|
|
||||||
closed chan bool
|
|
||||||
// the listener
|
|
||||||
listener transport.Listener
|
|
||||||
// the connection queue
|
|
||||||
conns chan Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) Create() (*Node, error) {
|
|
||||||
ip, err := addr.Extract(n.address)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &Node{
|
|
||||||
Id: fmt.Sprintf("%s-%s", n.name, uuid.New().String()),
|
|
||||||
Address: ip,
|
|
||||||
Network: n.Name(),
|
|
||||||
Metadata: map[string]string{
|
|
||||||
"network": n.String(),
|
|
||||||
"transport": n.transport.String(),
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) Name() string {
|
|
||||||
return n.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) String() string {
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) Connect(node *Node) (Conn, error) {
|
|
||||||
c, err := n.transport.Dial(node.Address)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return newLink(c.(transport.Socket)), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *network) Listen(node *Node) (Listener, error) {
|
|
||||||
l, err := n.transport.Listen(node.Address)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return newListener(l), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *listener) process() {
|
|
||||||
if err := l.listener.Accept(l.accept); err != nil {
|
|
||||||
// close the listener
|
|
||||||
l.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *listener) accept(sock transport.Socket) {
|
|
||||||
// create a new link and pass it through
|
|
||||||
link := newLink(sock)
|
|
||||||
|
|
||||||
// send it
|
|
||||||
l.conns <- link
|
|
||||||
|
|
||||||
// wait for it to be closed
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return
|
|
||||||
case <-link.closed:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *listener) Address() string {
|
|
||||||
return l.listener.Addr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *listener) Close() error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
close(l.closed)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *listener) Accept() (Conn, error) {
|
|
||||||
l.once.Do(func() {
|
|
||||||
// TODO: catch the error
|
|
||||||
go l.process()
|
|
||||||
})
|
|
||||||
select {
|
|
||||||
case c := <-l.conns:
|
|
||||||
return c, nil
|
|
||||||
case <-l.closed:
|
|
||||||
return nil, io.EOF
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newListener(l transport.Listener) *listener {
|
|
||||||
return &listener{
|
|
||||||
closed: make(chan bool),
|
|
||||||
conns: make(chan Conn),
|
|
||||||
listener: l,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newNetwork(opts ...options.Option) *network {
|
|
||||||
options := options.NewOptions(opts...)
|
|
||||||
|
|
||||||
net := &network{
|
|
||||||
name: DefaultName,
|
|
||||||
transport: transport.DefaultTransport,
|
|
||||||
}
|
|
||||||
|
|
||||||
// get network name
|
|
||||||
name, ok := options.Values().Get("network.name")
|
|
||||||
if ok {
|
|
||||||
net.name = name.(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get network name
|
|
||||||
address, ok := options.Values().Get("network.address")
|
|
||||||
if ok {
|
|
||||||
net.address = address.(string)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get network transport
|
|
||||||
t, ok := options.Values().Get("network.transport")
|
|
||||||
if ok {
|
|
||||||
net.transport = t.(transport.Transport)
|
|
||||||
}
|
|
||||||
|
|
||||||
return net
|
|
||||||
}
|
|
@ -1,86 +0,0 @@
|
|||||||
package network
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNetwork(t *testing.T) {
|
|
||||||
// create a new network
|
|
||||||
n := newNetwork()
|
|
||||||
|
|
||||||
// create a new node
|
|
||||||
node, err := n.Create()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set ourselves a random port
|
|
||||||
node.Address = node.Address + ":0"
|
|
||||||
|
|
||||||
l, err := n.Listen(node)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
wait := make(chan error)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
var gerr error
|
|
||||||
|
|
||||||
for {
|
|
||||||
c, err := l.Accept()
|
|
||||||
if err != nil {
|
|
||||||
gerr = err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
m := new(Message)
|
|
||||||
if err := c.Recv(m); err != nil {
|
|
||||||
gerr = err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err := c.Send(m); err != nil {
|
|
||||||
gerr = err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wait <- gerr
|
|
||||||
}()
|
|
||||||
|
|
||||||
node.Address = l.Address()
|
|
||||||
|
|
||||||
// connect to the node
|
|
||||||
conn, err := n.Connect(node)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// send a message
|
|
||||||
if err := conn.Send(&Message{
|
|
||||||
Header: map[string]string{"Foo": "bar"},
|
|
||||||
Body: []byte(`hello world`),
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m := new(Message)
|
|
||||||
// send a message
|
|
||||||
if err := conn.Recv(m); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.Header["Foo"] != "bar" {
|
|
||||||
t.Fatalf("Received unexpected message %+v", m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// close the listener
|
|
||||||
l.Close()
|
|
||||||
|
|
||||||
// get listener error
|
|
||||||
err = <-wait
|
|
||||||
|
|
||||||
if err != io.EOF {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
172
network/link.go
172
network/link.go
@ -1,172 +0,0 @@
|
|||||||
package network
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/micro/go-micro/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
type link struct {
|
|
||||||
closed chan bool
|
|
||||||
|
|
||||||
sync.RWMutex
|
|
||||||
|
|
||||||
// the link id
|
|
||||||
id string
|
|
||||||
|
|
||||||
// the send queue to the socket
|
|
||||||
sendQueue chan *Message
|
|
||||||
// the recv queue to the socket
|
|
||||||
recvQueue chan *Message
|
|
||||||
|
|
||||||
// the socket for this link
|
|
||||||
socket transport.Socket
|
|
||||||
|
|
||||||
// determines the cost of the link
|
|
||||||
// based on queue length and roundtrip
|
|
||||||
length int
|
|
||||||
weight int
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrLinkClosed = errors.New("link closed")
|
|
||||||
)
|
|
||||||
|
|
||||||
func newLink(sock transport.Socket) *link {
|
|
||||||
l := &link{
|
|
||||||
id: uuid.New().String(),
|
|
||||||
socket: sock,
|
|
||||||
closed: make(chan bool),
|
|
||||||
sendQueue: make(chan *Message, 128),
|
|
||||||
recvQueue: make(chan *Message, 128),
|
|
||||||
}
|
|
||||||
go l.process()
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
// link methods
|
|
||||||
|
|
||||||
// process processes messages on the send queue.
|
|
||||||
// these are messages to be sent to the remote side.
|
|
||||||
func (l *link) process() {
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
m := new(Message)
|
|
||||||
if err := l.recv(m); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case l.recvQueue <- m:
|
|
||||||
case <-l.closed:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case m := <-l.sendQueue:
|
|
||||||
if err := l.send(m); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-l.closed:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// send a message over the link
|
|
||||||
func (l *link) send(m *Message) error {
|
|
||||||
tm := new(transport.Message)
|
|
||||||
tm.Header = m.Header
|
|
||||||
tm.Body = m.Body
|
|
||||||
// send via the transport socket
|
|
||||||
return l.socket.Send(tm)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recv a message on the link
|
|
||||||
func (l *link) recv(m *Message) error {
|
|
||||||
if m.Header == nil {
|
|
||||||
m.Header = make(map[string]string)
|
|
||||||
}
|
|
||||||
|
|
||||||
tm := new(transport.Message)
|
|
||||||
|
|
||||||
// receive the transport message
|
|
||||||
if err := l.socket.Recv(tm); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the message
|
|
||||||
m.Header = tm.Header
|
|
||||||
m.Body = tm.Body
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the link
|
|
||||||
func (l *link) Close() error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
close(l.closed)
|
|
||||||
return l.socket.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns the node id
|
|
||||||
func (l *link) Id() string {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *link) Remote() string {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.socket.Remote()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *link) Local() string {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.socket.Local()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *link) Length() int {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
return l.length
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *link) Weight() int {
|
|
||||||
return len(l.sendQueue) + len(l.recvQueue)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept accepts a message on the socket
|
|
||||||
func (l *link) Recv(m *Message) error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return io.EOF
|
|
||||||
case rm := <-l.recvQueue:
|
|
||||||
*m = *rm
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// never reach
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send sends a message on the socket immediately
|
|
||||||
func (l *link) Send(m *Message) error {
|
|
||||||
select {
|
|
||||||
case <-l.closed:
|
|
||||||
return io.EOF
|
|
||||||
case l.sendQueue <- m:
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,88 +0,0 @@
|
|||||||
// Package network is a package for defining a network overlay
|
|
||||||
package network
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/micro/go-micro/config/options"
|
|
||||||
"github.com/micro/go-micro/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Network defines a network interface. The network is a single
|
|
||||||
// shared network between all nodes connected to it. The network
|
|
||||||
// is responsible for routing messages to the correct services.
|
|
||||||
type Network interface {
|
|
||||||
options.Options
|
|
||||||
// Name of the network
|
|
||||||
Name() string
|
|
||||||
// Create returns a new network node id/address
|
|
||||||
Create() (*Node, error)
|
|
||||||
// Connect to a node on the network
|
|
||||||
Connect(*Node) (Conn, error)
|
|
||||||
// Listen for connections for this node
|
|
||||||
Listen(*Node) (Listener, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Node is a network node represented with id/address and
|
|
||||||
// metadata which includes the network name, transport, etc
|
|
||||||
type Node struct {
|
|
||||||
Id string
|
|
||||||
Address string
|
|
||||||
Network string
|
|
||||||
Metadata map[string]string
|
|
||||||
}
|
|
||||||
|
|
||||||
// A network node listener which can be used to receive messages
|
|
||||||
type Listener interface {
|
|
||||||
Address() string
|
|
||||||
Close() error
|
|
||||||
Accept() (Conn, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A connection from another node on the network
|
|
||||||
type Conn interface {
|
|
||||||
// Unique id of the connection
|
|
||||||
Id() string
|
|
||||||
// Close the connection
|
|
||||||
Close() error
|
|
||||||
// Send a message
|
|
||||||
Send(*Message) error
|
|
||||||
// Receive a message
|
|
||||||
Recv(*Message) error
|
|
||||||
// The remote node
|
|
||||||
Remote() string
|
|
||||||
// The local node
|
|
||||||
Local() string
|
|
||||||
}
|
|
||||||
|
|
||||||
// The message type sent over the network
|
|
||||||
type Message struct {
|
|
||||||
Header map[string]string
|
|
||||||
Body []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
// The default network name is local
|
|
||||||
DefaultName = "go.micro"
|
|
||||||
|
|
||||||
// just the standard network element
|
|
||||||
DefaultNetwork = NewNetwork()
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewNetwork returns a new network interface
|
|
||||||
func NewNetwork(opts ...options.Option) Network {
|
|
||||||
return newNetwork(opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Name sets the network name
|
|
||||||
func Name(n string) options.Option {
|
|
||||||
return options.WithValue("network.name", n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Address sets the network address
|
|
||||||
func Address(a string) options.Option {
|
|
||||||
return options.WithValue("network.address", a)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transport sets the network transport
|
|
||||||
func Transport(t transport.Transport) options.Option {
|
|
||||||
return options.WithValue("network.transport", t)
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user