diff --git a/network/default.go b/network/default.go deleted file mode 100644 index e282adeb..00000000 --- a/network/default.go +++ /dev/null @@ -1,164 +0,0 @@ -package network - -import ( - "fmt" - "io" - "sync" - - "github.com/google/uuid" - "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/transport" - "github.com/micro/go-micro/util/addr" -) - -// default network implementation -type network struct { - options.Options - - // name of the network - name string - - // network address used where one is specified - address string - - // transport - transport transport.Transport -} - -type listener struct { - // start accepting once - once sync.Once - // close channel to close the connection - closed chan bool - // the listener - listener transport.Listener - // the connection queue - conns chan Conn -} - -func (n *network) Create() (*Node, error) { - ip, err := addr.Extract(n.address) - if err != nil { - return nil, err - } - return &Node{ - Id: fmt.Sprintf("%s-%s", n.name, uuid.New().String()), - Address: ip, - Network: n.Name(), - Metadata: map[string]string{ - "network": n.String(), - "transport": n.transport.String(), - }, - }, nil -} - -func (n *network) Name() string { - return n.name -} - -func (n *network) String() string { - return "local" -} - -func (n *network) Connect(node *Node) (Conn, error) { - c, err := n.transport.Dial(node.Address) - if err != nil { - return nil, err - } - return newLink(c.(transport.Socket)), nil -} - -func (n *network) Listen(node *Node) (Listener, error) { - l, err := n.transport.Listen(node.Address) - if err != nil { - return nil, err - } - return newListener(l), nil -} - -func (l *listener) process() { - if err := l.listener.Accept(l.accept); err != nil { - // close the listener - l.Close() - } -} - -func (l *listener) accept(sock transport.Socket) { - // create a new link and pass it through - link := newLink(sock) - - // send it - l.conns <- link - - // wait for it to be closed - select { - case <-l.closed: - return - case <-link.closed: - return - } -} - -func (l *listener) Address() string { - return l.listener.Addr() -} - -func (l *listener) Close() error { - select { - case <-l.closed: - return nil - default: - close(l.closed) - } - return nil -} - -func (l *listener) Accept() (Conn, error) { - l.once.Do(func() { - // TODO: catch the error - go l.process() - }) - select { - case c := <-l.conns: - return c, nil - case <-l.closed: - return nil, io.EOF - } -} - -func newListener(l transport.Listener) *listener { - return &listener{ - closed: make(chan bool), - conns: make(chan Conn), - listener: l, - } -} - -func newNetwork(opts ...options.Option) *network { - options := options.NewOptions(opts...) - - net := &network{ - name: DefaultName, - transport: transport.DefaultTransport, - } - - // get network name - name, ok := options.Values().Get("network.name") - if ok { - net.name = name.(string) - } - - // get network name - address, ok := options.Values().Get("network.address") - if ok { - net.address = address.(string) - } - - // get network transport - t, ok := options.Values().Get("network.transport") - if ok { - net.transport = t.(transport.Transport) - } - - return net -} diff --git a/network/default_test.go b/network/default_test.go deleted file mode 100644 index da10fdc8..00000000 --- a/network/default_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package network - -import ( - "io" - "testing" -) - -func TestNetwork(t *testing.T) { - // create a new network - n := newNetwork() - - // create a new node - node, err := n.Create() - if err != nil { - t.Fatal(err) - } - - // set ourselves a random port - node.Address = node.Address + ":0" - - l, err := n.Listen(node) - if err != nil { - t.Fatal(err) - } - - wait := make(chan error) - - go func() { - var gerr error - - for { - c, err := l.Accept() - if err != nil { - gerr = err - break - } - m := new(Message) - if err := c.Recv(m); err != nil { - gerr = err - break - } - if err := c.Send(m); err != nil { - gerr = err - break - } - } - - wait <- gerr - }() - - node.Address = l.Address() - - // connect to the node - conn, err := n.Connect(node) - if err != nil { - t.Fatal(err) - } - - // send a message - if err := conn.Send(&Message{ - Header: map[string]string{"Foo": "bar"}, - Body: []byte(`hello world`), - }); err != nil { - t.Fatal(err) - } - - m := new(Message) - // send a message - if err := conn.Recv(m); err != nil { - t.Fatal(err) - } - - if m.Header["Foo"] != "bar" { - t.Fatalf("Received unexpected message %+v", m) - } - - // close the listener - l.Close() - - // get listener error - err = <-wait - - if err != io.EOF { - t.Fatal(err) - } -} diff --git a/network/link.go b/network/link.go deleted file mode 100644 index 319692d3..00000000 --- a/network/link.go +++ /dev/null @@ -1,172 +0,0 @@ -package network - -import ( - "errors" - "io" - "sync" - - "github.com/google/uuid" - "github.com/micro/go-micro/transport" -) - -type link struct { - closed chan bool - - sync.RWMutex - - // the link id - id string - - // the send queue to the socket - sendQueue chan *Message - // the recv queue to the socket - recvQueue chan *Message - - // the socket for this link - socket transport.Socket - - // determines the cost of the link - // based on queue length and roundtrip - length int - weight int -} - -var ( - ErrLinkClosed = errors.New("link closed") -) - -func newLink(sock transport.Socket) *link { - l := &link{ - id: uuid.New().String(), - socket: sock, - closed: make(chan bool), - sendQueue: make(chan *Message, 128), - recvQueue: make(chan *Message, 128), - } - go l.process() - return l -} - -// link methods - -// process processes messages on the send queue. -// these are messages to be sent to the remote side. -func (l *link) process() { - go func() { - for { - m := new(Message) - if err := l.recv(m); err != nil { - return - } - - select { - case l.recvQueue <- m: - case <-l.closed: - return - } - } - }() - - for { - select { - case m := <-l.sendQueue: - if err := l.send(m); err != nil { - return - } - case <-l.closed: - return - } - } -} - -// send a message over the link -func (l *link) send(m *Message) error { - tm := new(transport.Message) - tm.Header = m.Header - tm.Body = m.Body - // send via the transport socket - return l.socket.Send(tm) -} - -// recv a message on the link -func (l *link) recv(m *Message) error { - if m.Header == nil { - m.Header = make(map[string]string) - } - - tm := new(transport.Message) - - // receive the transport message - if err := l.socket.Recv(tm); err != nil { - return err - } - - // set the message - m.Header = tm.Header - m.Body = tm.Body - - return nil -} - -// Close the link -func (l *link) Close() error { - select { - case <-l.closed: - return nil - default: - close(l.closed) - return l.socket.Close() - } -} - -// returns the node id -func (l *link) Id() string { - l.RLock() - defer l.RUnlock() - return l.id -} - -func (l *link) Remote() string { - l.RLock() - defer l.RUnlock() - return l.socket.Remote() -} - -func (l *link) Local() string { - l.RLock() - defer l.RUnlock() - return l.socket.Local() -} - -func (l *link) Length() int { - l.RLock() - defer l.RUnlock() - return l.length -} - -func (l *link) Weight() int { - return len(l.sendQueue) + len(l.recvQueue) -} - -// Accept accepts a message on the socket -func (l *link) Recv(m *Message) error { - select { - case <-l.closed: - return io.EOF - case rm := <-l.recvQueue: - *m = *rm - return nil - } - // never reach - return nil -} - -// Send sends a message on the socket immediately -func (l *link) Send(m *Message) error { - select { - case <-l.closed: - return io.EOF - case l.sendQueue <- m: - } - return nil -} diff --git a/network/network.go b/network/network.go deleted file mode 100644 index b27347df..00000000 --- a/network/network.go +++ /dev/null @@ -1,88 +0,0 @@ -// Package network is a package for defining a network overlay -package network - -import ( - "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/transport" -) - -// Network defines a network interface. The network is a single -// shared network between all nodes connected to it. The network -// is responsible for routing messages to the correct services. -type Network interface { - options.Options - // Name of the network - Name() string - // Create returns a new network node id/address - Create() (*Node, error) - // Connect to a node on the network - Connect(*Node) (Conn, error) - // Listen for connections for this node - Listen(*Node) (Listener, error) -} - -// Node is a network node represented with id/address and -// metadata which includes the network name, transport, etc -type Node struct { - Id string - Address string - Network string - Metadata map[string]string -} - -// A network node listener which can be used to receive messages -type Listener interface { - Address() string - Close() error - Accept() (Conn, error) -} - -// A connection from another node on the network -type Conn interface { - // Unique id of the connection - Id() string - // Close the connection - Close() error - // Send a message - Send(*Message) error - // Receive a message - Recv(*Message) error - // The remote node - Remote() string - // The local node - Local() string -} - -// The message type sent over the network -type Message struct { - Header map[string]string - Body []byte -} - -var ( - // The default network name is local - DefaultName = "go.micro" - - // just the standard network element - DefaultNetwork = NewNetwork() -) - -// NewNetwork returns a new network interface -func NewNetwork(opts ...options.Option) Network { - return newNetwork(opts...) -} - -// Name sets the network name -func Name(n string) options.Option { - return options.WithValue("network.name", n) -} - -// Address sets the network address -func Address(a string) options.Option { - return options.WithValue("network.address", a) -} - -// Transport sets the network transport -func Transport(t transport.Transport) options.Option { - return options.WithValue("network.transport", t) -}