From ef04331b8699462866bb11360d3243b582e69ffc Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 20:08:49 +0100 Subject: [PATCH] multiplexing cruft --- client/rpc_client.go | 13 +++- client/rpc_codec.go | 14 +++- client/rpc_stream.go | 17 +++++ server/rpc_codec.go | 41 +++++------- server/rpc_server.go | 148 +++++++++++++++++++++++++++++++++++------- util/socket/socket.go | 42 ++++++++---- 6 files changed, 209 insertions(+), 66 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 07b7d6e0..3619540c 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -108,7 +108,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, seq := atomic.LoadUint64(&r.seq) atomic.AddUint64(&r.seq, 1) - codec := newRpcCodec(msg, c, cf) + codec := newRpcCodec(msg, c, cf, "") rsp := &rpcResponse{ socket: c, @@ -206,7 +206,13 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) } - codec := newRpcCodec(msg, c, cf) + // increment the sequence number + seq := atomic.LoadUint64(&r.seq) + atomic.AddUint64(&r.seq, 1) + id := fmt.Sprintf("%v", seq) + + // create codec with stream id + codec := newRpcCodec(msg, c, cf, id) rsp := &rpcResponse{ socket: c, @@ -224,6 +230,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request response: rsp, closed: make(chan bool), codec: codec, + id: id, + // signal the end of stream, + eos: true, } ch := make(chan error, 1) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 93ed9fbf..37987496 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -39,6 +39,9 @@ type rpcCodec struct { req *transport.Message buf *readWriteCloser + + // signify if its a stream + stream string } type readWriteCloser struct { @@ -113,7 +116,7 @@ func getHeaders(m *codec.Message) { } } -func setHeaders(m *codec.Message) { +func setHeaders(m *codec.Message, stream string) { set := func(hdr, v string) { if len(v) == 0 { return @@ -127,6 +130,10 @@ func setHeaders(m *codec.Message) { set("Micro-Method", m.Method) set("Micro-Endpoint", m.Endpoint) set("Micro-Error", m.Error) + + if len(stream) > 0 { + set("Micro-Stream", stream) + } } // setupProtocol sets up the old protocol @@ -150,7 +157,7 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { return defaultCodecs[msg.Header["Content-Type"]] } -func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { +func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec, stream string) codec.Codec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil), @@ -160,6 +167,7 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod client: client, codec: c(rwc), req: req, + stream: stream, } return r } @@ -178,7 +186,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { } // set the mucp headers - setHeaders(m) + setHeaders(m, c.stream) // if body is bytes Frame don't encode if body != nil { diff --git a/client/rpc_stream.go b/client/rpc_stream.go index f605c11e..6db6b596 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -18,6 +18,9 @@ type rpcStream struct { response Response codec codec.Codec context context.Context + + // signal whether we should send EOS + eos bool } func (r *rpcStream) isClosed() bool { @@ -120,6 +123,20 @@ func (r *rpcStream) Close() error { return nil default: close(r.closed) + + // send the end of stream message + if r.eos { + // no need to check for error + r.codec.Write(&codec.Message{ + Id: r.id, + Target: r.request.Service(), + Method: r.request.Method(), + Endpoint: r.request.Endpoint(), + Type: codec.Error, + Error: lastStreamResponseError, + }, nil) + } + return r.codec.Close() } } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 797fa79c..ff2e4261 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -158,7 +158,6 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod wbuf: bytes.NewBuffer(nil), } r := &rpcCodec{ - first: true, buf: rwc, codec: c(rwc), req: req, @@ -174,33 +173,27 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { Body: c.req.Body, } - // if its a follow on request read it - if !c.first { - var tm transport.Message + var tm transport.Message - // read off the socket - if err := c.socket.Recv(&tm); err != nil { - return err - } - // reset the read buffer - c.buf.rbuf.Reset() + // read off the socket + if err := c.socket.Recv(&tm); err != nil { + return err + } + // reset the read buffer + c.buf.rbuf.Reset() - // write the body to the buffer - if _, err := c.buf.rbuf.Write(tm.Body); err != nil { - return err - } - - // set the message header - m.Header = tm.Header - // set the message body - m.Body = tm.Body - - // set req - c.req = &tm + // write the body to the buffer + if _, err := c.buf.rbuf.Write(tm.Body); err != nil { + return err } - // no longer first read - c.first = false + // set the message header + m.Header = tm.Header + // set the message body + m.Body = tm.Body + + // set req + c.req = &tm // set some internal things getHeaders(&m) diff --git a/server/rpc_server.go b/server/rpc_server.go index 516825db..86951451 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -19,6 +19,7 @@ import ( "github.com/micro/go-micro/util/addr" log "github.com/micro/go-micro/util/log" mnet "github.com/micro/go-micro/util/net" + "github.com/micro/go-micro/util/socket" ) type rpcServer struct { @@ -70,23 +71,108 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } }() + // multiplex the streams on a single socket by Micro-Stream + var mtx sync.RWMutex + sockets := make(map[string]*socket.Socket) + + log.Info("New socket") + for { var msg transport.Message if err := sock.Recv(&msg); err != nil { return } + // use Micro-Id as the stream identifier + // in the event its blank we'll always process + // on the same socket + id := msg.Header["Micro-Stream"] + + // if there's no stream id then its a standard request + if len(id) == 0 { + id = msg.Header["Micro-Id"] + } + // add to wait group if "wait" is opt-in if s.wg != nil { s.wg.Add(1) } + // check we have an existing socket + mtx.RLock() + psock, ok := sockets[id] + mtx.RUnlock() + + log.Infof("Got socket %v %v", id, ok) + + // got the socket + if ok { + // accept the message + if err := psock.Accept(&msg); err != nil { + log.Infof("Accept Error %+v", err) + // close the socket + psock.Close() + + // delete the socket + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + } + + // done(1) + if s.wg != nil { + s.wg.Done() + } + + // continue to the next message + continue + } + + // no socket was found + psock = socket.New() + psock.SetLocal(sock.Local()) + psock.SetRemote(sock.Remote()) + + // load the socket + psock.Accept(&msg) + + // save a new socket + mtx.Lock() + sockets[id] = psock + mtx.Unlock() + + // process the outbound messages from the socket + go func(id string, psock *socket.Socket) { + defer psock.Close() + + for { + // get the message from our internal handler/stream + m := new(transport.Message) + if err := psock.Process(m); err != nil { + log.Infof("Process Error %+v", err) + + // delete the socket + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + return + } + + // send the message back over the socket + if err := sock.Send(m); err != nil { + return + } + } + }(id, psock) + + // now walk the usual path + // we use this Timeout header to set a server deadline to := msg.Header["Timeout"] // we use this Content-Type header to identify the codec needed ct := msg.Header["Content-Type"] - // strip our headers + // copy the message headers hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v @@ -96,17 +182,17 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { hdr["Local"] = sock.Local() hdr["Remote"] = sock.Remote() - // create new context + // create new context with the metadata ctx := metadata.NewContext(context.Background(), hdr) - // set the timeout if we have it + // set the timeout from the header if we have it if len(to) > 0 { if n, err := strconv.ParseUint(to, 10, 64); err == nil { ctx, _ = context.WithTimeout(ctx, time.Duration(n)) } } - // no content type + // if there's no content type default it if len(ct) == 0 { msg.Header["Content-Type"] = DefaultContentType ct = DefaultContentType @@ -133,7 +219,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } } - rcodec := newRpcCodec(&msg, sock, cf) + rcodec := newRpcCodec(&msg, psock, cf) + + // check stream id + var stream bool + if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { + stream = true + } // internal request request := &rpcRequest{ @@ -144,15 +236,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { codec: rcodec, header: msg.Header, body: msg.Body, - socket: sock, - stream: true, - first: true, + socket: psock, + stream: stream, } // internal response response := &rpcResponse{ header: make(map[string]string), - socket: sock, + socket: psock, codec: rcodec, } @@ -175,25 +266,36 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { r = rpcRouter{handler} } - // serve the actual request using the request router - if err := r.ServeRequest(ctx, request, response); err != nil { - // write an error response - err = rcodec.Write(&codec.Message{ - Header: msg.Header, - Error: err.Error(), - Type: codec.Error, - }, nil) - // could not write the error response - if err != nil { - log.Logf("rpc: unable to write error response: %v", err) + // serve the request in a go routine as this may be a stream + go func(id string, psock *socket.Socket) { + // serve the actual request using the request router + if err := r.ServeRequest(ctx, request, response); err != nil { + // write an error response + err = rcodec.Write(&codec.Message{ + Header: msg.Header, + Error: err.Error(), + Type: codec.Error, + }, nil) + + // could not write the error response + if err != nil { + log.Logf("rpc: unable to write error response: %v", err) + } } + + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + + psock.Close() + + // once done serving signal we're done if s.wg != nil { s.wg.Done() } - return - } + }(id, psock) - // done + // signal we're done if s.wg != nil { s.wg.Done() } diff --git a/util/socket/socket.go b/util/socket/socket.go index 19c671bb..fc8cb089 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -7,8 +7,8 @@ import ( "github.com/micro/go-micro/transport" ) -// socket is our pseudo socket for transport.Socket -type socket struct { +// Socket is our pseudo socket for transport.Socket +type Socket struct { // closed closed chan bool // remote addr @@ -21,16 +21,16 @@ type socket struct { recv chan *transport.Message } -func (s *socket) SetLocal(l string) { +func (s *Socket) SetLocal(l string) { s.local = l } -func (s *socket) SetRemote(r string) { +func (s *Socket) SetRemote(r string) { s.remote = r } // Accept passes a message to the socket which will be processed by the call to Recv -func (s *socket) Accept(m *transport.Message) error { +func (s *Socket) Accept(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -41,7 +41,7 @@ func (s *socket) Accept(m *transport.Message) error { } // Process takes the next message off the send queue created by a call to Send -func (s *socket) Process(m *transport.Message) error { +func (s *Socket) Process(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -51,15 +51,15 @@ func (s *socket) Process(m *transport.Message) error { return nil } -func (s *socket) Remote() string { +func (s *Socket) Remote() string { return s.remote } -func (s *socket) Local() string { +func (s *Socket) Local() string { return s.local } -func (s *socket) Send(m *transport.Message) error { +func (s *Socket) Send(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -70,13 +70,17 @@ func (s *socket) Send(m *transport.Message) error { // make copy msg := &transport.Message{ Header: make(map[string]string), - Body: m.Body, + Body: make([]byte, len(m.Body)), } + // copy headers for k, v := range m.Header { msg.Header[k] = v } + // copy body + copy(msg.Body, m.Body) + // send a message select { case s.send <- msg: @@ -87,7 +91,7 @@ func (s *socket) Send(m *transport.Message) error { return nil } -func (s *socket) Recv(m *transport.Message) error { +func (s *Socket) Recv(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -109,7 +113,7 @@ func (s *socket) Recv(m *transport.Message) error { } // Close closes the socket -func (s *socket) Close() error { +func (s *Socket) Close() error { select { case <-s.closed: // no op @@ -119,11 +123,21 @@ func (s *socket) Close() error { return nil } +// Indicates its closed +func (s *socket) Done() bool { + select { + case <-s.closed: + return true + default: + return false + } +} + // New returns a new pseudo socket which can be used in the place of a transport socket. // Messages are sent to the socket via Accept and receives from the socket via Process. // SetLocal/SetRemote should be called before using the socket. -func New() *socket { - return &socket{ +func New() *Socket { + return &Socket{ closed: make(chan bool), local: "local", remote: "remote",