From b776fbb766d83307184b703da6ee426ddbe8b572 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 15:09:56 +0100 Subject: [PATCH] add a pseudo socket implementation --- util/socket/socket.go | 133 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 util/socket/socket.go diff --git a/util/socket/socket.go b/util/socket/socket.go new file mode 100644 index 00000000..19c671bb --- /dev/null +++ b/util/socket/socket.go @@ -0,0 +1,133 @@ +// Package socket provides a pseudo socket +package socket + +import ( + "io" + + "github.com/micro/go-micro/transport" +) + +// socket is our pseudo socket for transport.Socket +type socket struct { + // closed + closed chan bool + // remote addr + remote string + // local addr + local string + // send chan + send chan *transport.Message + // recv chan + recv chan *transport.Message +} + +func (s *socket) SetLocal(l string) { + s.local = l +} + +func (s *socket) SetRemote(r string) { + s.remote = r +} + +// Accept passes a message to the socket which will be processed by the call to Recv +func (s *socket) Accept(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + case s.recv <- m: + return nil + } + return nil +} + +// Process takes the next message off the send queue created by a call to Send +func (s *socket) Process(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + case msg := <-s.send: + *m = *msg + } + return nil +} + +func (s *socket) Remote() string { + return s.remote +} + +func (s *socket) Local() string { + return s.local +} + +func (s *socket) Send(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + default: + // no op + } + + // make copy + msg := &transport.Message{ + Header: make(map[string]string), + Body: m.Body, + } + + for k, v := range m.Header { + msg.Header[k] = v + } + + // send a message + select { + case s.send <- msg: + case <-s.closed: + return io.EOF + } + + return nil +} + +func (s *socket) Recv(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + default: + // no op + } + + // receive a message + select { + case msg := <-s.recv: + // set message + *m = *msg + case <-s.closed: + return io.EOF + } + + // return nil + return nil +} + +// Close closes the socket +func (s *socket) Close() error { + select { + case <-s.closed: + // no op + default: + close(s.closed) + } + return nil +} + +// New returns a new pseudo socket which can be used in the place of a transport socket. +// Messages are sent to the socket via Accept and receives from the socket via Process. +// SetLocal/SetRemote should be called before using the socket. +func New() *socket { + return &socket{ + closed: make(chan bool), + local: "local", + remote: "remote", + send: make(chan *transport.Message, 128), + recv: make(chan *transport.Message, 128), + } +}