From b776fbb766d83307184b703da6ee426ddbe8b572 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 15:09:56 +0100 Subject: [PATCH 1/9] add a pseudo socket implementation --- util/socket/socket.go | 133 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 util/socket/socket.go diff --git a/util/socket/socket.go b/util/socket/socket.go new file mode 100644 index 00000000..19c671bb --- /dev/null +++ b/util/socket/socket.go @@ -0,0 +1,133 @@ +// Package socket provides a pseudo socket +package socket + +import ( + "io" + + "github.com/micro/go-micro/transport" +) + +// socket is our pseudo socket for transport.Socket +type socket struct { + // closed + closed chan bool + // remote addr + remote string + // local addr + local string + // send chan + send chan *transport.Message + // recv chan + recv chan *transport.Message +} + +func (s *socket) SetLocal(l string) { + s.local = l +} + +func (s *socket) SetRemote(r string) { + s.remote = r +} + +// Accept passes a message to the socket which will be processed by the call to Recv +func (s *socket) Accept(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + case s.recv <- m: + return nil + } + return nil +} + +// Process takes the next message off the send queue created by a call to Send +func (s *socket) Process(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + case msg := <-s.send: + *m = *msg + } + return nil +} + +func (s *socket) Remote() string { + return s.remote +} + +func (s *socket) Local() string { + return s.local +} + +func (s *socket) Send(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + default: + // no op + } + + // make copy + msg := &transport.Message{ + Header: make(map[string]string), + Body: m.Body, + } + + for k, v := range m.Header { + msg.Header[k] = v + } + + // send a message + select { + case s.send <- msg: + case <-s.closed: + return io.EOF + } + + return nil +} + +func (s *socket) Recv(m *transport.Message) error { + select { + case <-s.closed: + return io.EOF + default: + // no op + } + + // receive a message + select { + case msg := <-s.recv: + // set message + *m = *msg + case <-s.closed: + return io.EOF + } + + // return nil + return nil +} + +// Close closes the socket +func (s *socket) Close() error { + select { + case <-s.closed: + // no op + default: + close(s.closed) + } + return nil +} + +// New returns a new pseudo socket which can be used in the place of a transport socket. +// Messages are sent to the socket via Accept and receives from the socket via Process. +// SetLocal/SetRemote should be called before using the socket. +func New() *socket { + return &socket{ + closed: make(chan bool), + local: "local", + remote: "remote", + send: make(chan *transport.Message, 128), + recv: make(chan *transport.Message, 128), + } +} From f6b8045dd5b322defdb17cfad84bd8c5c5a1cba0 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 15:22:53 +0100 Subject: [PATCH 2/9] send client error if it exists --- client/rpc_codec.go | 1 + server/rpc_stream.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 6ff84a64..93ed9fbf 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -126,6 +126,7 @@ func setHeaders(m *codec.Message) { set("Micro-Service", m.Target) set("Micro-Method", m.Method) set("Micro-Endpoint", m.Endpoint) + set("Micro-Error", m.Error) } // setupProtocol sets up the old protocol diff --git a/server/rpc_stream.go b/server/rpc_stream.go index 185f1ff9..a4e64af8 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -2,6 +2,8 @@ package server import ( "context" + "errors" + "io" "sync" "github.com/micro/go-micro/codec" @@ -59,6 +61,20 @@ func (r *rpcStream) Recv(msg interface{}) error { return err } + // check the error + if len(req.Error) > 0 { + // Check the client closed the stream + switch req.Error { + case lastStreamResponseError.Error(): + // discard body + r.codec.ReadBody(nil) + r.err = io.EOF + return io.EOF + default: + return errors.New(req.Error) + } + } + // we need to stay up to date with sequence numbers r.id = req.Id if err := r.codec.ReadBody(msg); err != nil { From ef04331b8699462866bb11360d3243b582e69ffc Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 20:08:49 +0100 Subject: [PATCH 3/9] multiplexing cruft --- client/rpc_client.go | 13 +++- client/rpc_codec.go | 14 +++- client/rpc_stream.go | 17 +++++ server/rpc_codec.go | 41 +++++------- server/rpc_server.go | 148 +++++++++++++++++++++++++++++++++++------- util/socket/socket.go | 42 ++++++++---- 6 files changed, 209 insertions(+), 66 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 07b7d6e0..3619540c 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -108,7 +108,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, seq := atomic.LoadUint64(&r.seq) atomic.AddUint64(&r.seq, 1) - codec := newRpcCodec(msg, c, cf) + codec := newRpcCodec(msg, c, cf, "") rsp := &rpcResponse{ socket: c, @@ -206,7 +206,13 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) } - codec := newRpcCodec(msg, c, cf) + // increment the sequence number + seq := atomic.LoadUint64(&r.seq) + atomic.AddUint64(&r.seq, 1) + id := fmt.Sprintf("%v", seq) + + // create codec with stream id + codec := newRpcCodec(msg, c, cf, id) rsp := &rpcResponse{ socket: c, @@ -224,6 +230,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request response: rsp, closed: make(chan bool), codec: codec, + id: id, + // signal the end of stream, + eos: true, } ch := make(chan error, 1) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 93ed9fbf..37987496 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -39,6 +39,9 @@ type rpcCodec struct { req *transport.Message buf *readWriteCloser + + // signify if its a stream + stream string } type readWriteCloser struct { @@ -113,7 +116,7 @@ func getHeaders(m *codec.Message) { } } -func setHeaders(m *codec.Message) { +func setHeaders(m *codec.Message, stream string) { set := func(hdr, v string) { if len(v) == 0 { return @@ -127,6 +130,10 @@ func setHeaders(m *codec.Message) { set("Micro-Method", m.Method) set("Micro-Endpoint", m.Endpoint) set("Micro-Error", m.Error) + + if len(stream) > 0 { + set("Micro-Stream", stream) + } } // setupProtocol sets up the old protocol @@ -150,7 +157,7 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { return defaultCodecs[msg.Header["Content-Type"]] } -func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { +func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec, stream string) codec.Codec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil), @@ -160,6 +167,7 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod client: client, codec: c(rwc), req: req, + stream: stream, } return r } @@ -178,7 +186,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { } // set the mucp headers - setHeaders(m) + setHeaders(m, c.stream) // if body is bytes Frame don't encode if body != nil { diff --git a/client/rpc_stream.go b/client/rpc_stream.go index f605c11e..6db6b596 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -18,6 +18,9 @@ type rpcStream struct { response Response codec codec.Codec context context.Context + + // signal whether we should send EOS + eos bool } func (r *rpcStream) isClosed() bool { @@ -120,6 +123,20 @@ func (r *rpcStream) Close() error { return nil default: close(r.closed) + + // send the end of stream message + if r.eos { + // no need to check for error + r.codec.Write(&codec.Message{ + Id: r.id, + Target: r.request.Service(), + Method: r.request.Method(), + Endpoint: r.request.Endpoint(), + Type: codec.Error, + Error: lastStreamResponseError, + }, nil) + } + return r.codec.Close() } } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 797fa79c..ff2e4261 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -158,7 +158,6 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod wbuf: bytes.NewBuffer(nil), } r := &rpcCodec{ - first: true, buf: rwc, codec: c(rwc), req: req, @@ -174,33 +173,27 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { Body: c.req.Body, } - // if its a follow on request read it - if !c.first { - var tm transport.Message + var tm transport.Message - // read off the socket - if err := c.socket.Recv(&tm); err != nil { - return err - } - // reset the read buffer - c.buf.rbuf.Reset() + // read off the socket + if err := c.socket.Recv(&tm); err != nil { + return err + } + // reset the read buffer + c.buf.rbuf.Reset() - // write the body to the buffer - if _, err := c.buf.rbuf.Write(tm.Body); err != nil { - return err - } - - // set the message header - m.Header = tm.Header - // set the message body - m.Body = tm.Body - - // set req - c.req = &tm + // write the body to the buffer + if _, err := c.buf.rbuf.Write(tm.Body); err != nil { + return err } - // no longer first read - c.first = false + // set the message header + m.Header = tm.Header + // set the message body + m.Body = tm.Body + + // set req + c.req = &tm // set some internal things getHeaders(&m) diff --git a/server/rpc_server.go b/server/rpc_server.go index 516825db..86951451 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -19,6 +19,7 @@ import ( "github.com/micro/go-micro/util/addr" log "github.com/micro/go-micro/util/log" mnet "github.com/micro/go-micro/util/net" + "github.com/micro/go-micro/util/socket" ) type rpcServer struct { @@ -70,23 +71,108 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } }() + // multiplex the streams on a single socket by Micro-Stream + var mtx sync.RWMutex + sockets := make(map[string]*socket.Socket) + + log.Info("New socket") + for { var msg transport.Message if err := sock.Recv(&msg); err != nil { return } + // use Micro-Id as the stream identifier + // in the event its blank we'll always process + // on the same socket + id := msg.Header["Micro-Stream"] + + // if there's no stream id then its a standard request + if len(id) == 0 { + id = msg.Header["Micro-Id"] + } + // add to wait group if "wait" is opt-in if s.wg != nil { s.wg.Add(1) } + // check we have an existing socket + mtx.RLock() + psock, ok := sockets[id] + mtx.RUnlock() + + log.Infof("Got socket %v %v", id, ok) + + // got the socket + if ok { + // accept the message + if err := psock.Accept(&msg); err != nil { + log.Infof("Accept Error %+v", err) + // close the socket + psock.Close() + + // delete the socket + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + } + + // done(1) + if s.wg != nil { + s.wg.Done() + } + + // continue to the next message + continue + } + + // no socket was found + psock = socket.New() + psock.SetLocal(sock.Local()) + psock.SetRemote(sock.Remote()) + + // load the socket + psock.Accept(&msg) + + // save a new socket + mtx.Lock() + sockets[id] = psock + mtx.Unlock() + + // process the outbound messages from the socket + go func(id string, psock *socket.Socket) { + defer psock.Close() + + for { + // get the message from our internal handler/stream + m := new(transport.Message) + if err := psock.Process(m); err != nil { + log.Infof("Process Error %+v", err) + + // delete the socket + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + return + } + + // send the message back over the socket + if err := sock.Send(m); err != nil { + return + } + } + }(id, psock) + + // now walk the usual path + // we use this Timeout header to set a server deadline to := msg.Header["Timeout"] // we use this Content-Type header to identify the codec needed ct := msg.Header["Content-Type"] - // strip our headers + // copy the message headers hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v @@ -96,17 +182,17 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { hdr["Local"] = sock.Local() hdr["Remote"] = sock.Remote() - // create new context + // create new context with the metadata ctx := metadata.NewContext(context.Background(), hdr) - // set the timeout if we have it + // set the timeout from the header if we have it if len(to) > 0 { if n, err := strconv.ParseUint(to, 10, 64); err == nil { ctx, _ = context.WithTimeout(ctx, time.Duration(n)) } } - // no content type + // if there's no content type default it if len(ct) == 0 { msg.Header["Content-Type"] = DefaultContentType ct = DefaultContentType @@ -133,7 +219,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } } - rcodec := newRpcCodec(&msg, sock, cf) + rcodec := newRpcCodec(&msg, psock, cf) + + // check stream id + var stream bool + if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { + stream = true + } // internal request request := &rpcRequest{ @@ -144,15 +236,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { codec: rcodec, header: msg.Header, body: msg.Body, - socket: sock, - stream: true, - first: true, + socket: psock, + stream: stream, } // internal response response := &rpcResponse{ header: make(map[string]string), - socket: sock, + socket: psock, codec: rcodec, } @@ -175,25 +266,36 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { r = rpcRouter{handler} } - // serve the actual request using the request router - if err := r.ServeRequest(ctx, request, response); err != nil { - // write an error response - err = rcodec.Write(&codec.Message{ - Header: msg.Header, - Error: err.Error(), - Type: codec.Error, - }, nil) - // could not write the error response - if err != nil { - log.Logf("rpc: unable to write error response: %v", err) + // serve the request in a go routine as this may be a stream + go func(id string, psock *socket.Socket) { + // serve the actual request using the request router + if err := r.ServeRequest(ctx, request, response); err != nil { + // write an error response + err = rcodec.Write(&codec.Message{ + Header: msg.Header, + Error: err.Error(), + Type: codec.Error, + }, nil) + + // could not write the error response + if err != nil { + log.Logf("rpc: unable to write error response: %v", err) + } } + + mtx.Lock() + delete(sockets, id) + mtx.Unlock() + + psock.Close() + + // once done serving signal we're done if s.wg != nil { s.wg.Done() } - return - } + }(id, psock) - // done + // signal we're done if s.wg != nil { s.wg.Done() } diff --git a/util/socket/socket.go b/util/socket/socket.go index 19c671bb..fc8cb089 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -7,8 +7,8 @@ import ( "github.com/micro/go-micro/transport" ) -// socket is our pseudo socket for transport.Socket -type socket struct { +// Socket is our pseudo socket for transport.Socket +type Socket struct { // closed closed chan bool // remote addr @@ -21,16 +21,16 @@ type socket struct { recv chan *transport.Message } -func (s *socket) SetLocal(l string) { +func (s *Socket) SetLocal(l string) { s.local = l } -func (s *socket) SetRemote(r string) { +func (s *Socket) SetRemote(r string) { s.remote = r } // Accept passes a message to the socket which will be processed by the call to Recv -func (s *socket) Accept(m *transport.Message) error { +func (s *Socket) Accept(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -41,7 +41,7 @@ func (s *socket) Accept(m *transport.Message) error { } // Process takes the next message off the send queue created by a call to Send -func (s *socket) Process(m *transport.Message) error { +func (s *Socket) Process(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -51,15 +51,15 @@ func (s *socket) Process(m *transport.Message) error { return nil } -func (s *socket) Remote() string { +func (s *Socket) Remote() string { return s.remote } -func (s *socket) Local() string { +func (s *Socket) Local() string { return s.local } -func (s *socket) Send(m *transport.Message) error { +func (s *Socket) Send(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -70,13 +70,17 @@ func (s *socket) Send(m *transport.Message) error { // make copy msg := &transport.Message{ Header: make(map[string]string), - Body: m.Body, + Body: make([]byte, len(m.Body)), } + // copy headers for k, v := range m.Header { msg.Header[k] = v } + // copy body + copy(msg.Body, m.Body) + // send a message select { case s.send <- msg: @@ -87,7 +91,7 @@ func (s *socket) Send(m *transport.Message) error { return nil } -func (s *socket) Recv(m *transport.Message) error { +func (s *Socket) Recv(m *transport.Message) error { select { case <-s.closed: return io.EOF @@ -109,7 +113,7 @@ func (s *socket) Recv(m *transport.Message) error { } // Close closes the socket -func (s *socket) Close() error { +func (s *Socket) Close() error { select { case <-s.closed: // no op @@ -119,11 +123,21 @@ func (s *socket) Close() error { return nil } +// Indicates its closed +func (s *socket) Done() bool { + select { + case <-s.closed: + return true + default: + return false + } +} + // New returns a new pseudo socket which can be used in the place of a transport socket. // Messages are sent to the socket via Accept and receives from the socket via Process. // SetLocal/SetRemote should be called before using the socket. -func New() *socket { - return &socket{ +func New() *Socket { + return &Socket{ closed: make(chan bool), local: "local", remote: "remote", From 88817dc53fc83ff40e877b3644ac3def82438410 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 20:54:00 +0100 Subject: [PATCH 4/9] Strip some dead code --- server/rpc_server.go | 12 ------------ util/socket/socket.go | 10 ---------- 2 files changed, 22 deletions(-) diff --git a/server/rpc_server.go b/server/rpc_server.go index 86951451..0b9883c2 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -75,8 +75,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { var mtx sync.RWMutex sockets := make(map[string]*socket.Socket) - log.Info("New socket") - for { var msg transport.Message if err := sock.Recv(&msg); err != nil { @@ -103,16 +101,10 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { psock, ok := sockets[id] mtx.RUnlock() - log.Infof("Got socket %v %v", id, ok) - // got the socket if ok { // accept the message if err := psock.Accept(&msg); err != nil { - log.Infof("Accept Error %+v", err) - // close the socket - psock.Close() - // delete the socket mtx.Lock() delete(sockets, id) @@ -149,8 +141,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // get the message from our internal handler/stream m := new(transport.Message) if err := psock.Process(m); err != nil { - log.Infof("Process Error %+v", err) - // delete the socket mtx.Lock() delete(sockets, id) @@ -287,8 +277,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { delete(sockets, id) mtx.Unlock() - psock.Close() - // once done serving signal we're done if s.wg != nil { s.wg.Done() diff --git a/util/socket/socket.go b/util/socket/socket.go index fc8cb089..77b0aac0 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -123,16 +123,6 @@ func (s *Socket) Close() error { return nil } -// Indicates its closed -func (s *socket) Done() bool { - select { - case <-s.closed: - return true - default: - return false - } -} - // New returns a new pseudo socket which can be used in the place of a transport socket. // Messages are sent to the socket via Accept and receives from the socket via Process. // SetLocal/SetRemote should be called before using the socket. From 58bc4c103fb4081c46c5caf2376e7cd4f4934c8d Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 20:54:09 +0100 Subject: [PATCH 5/9] go fmt --- client/rpc_client.go | 2 +- client/rpc_stream.go | 2 +- util/socket/socket.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 3619540c..91d11a40 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -230,7 +230,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request response: rsp, closed: make(chan bool), codec: codec, - id: id, + id: id, // signal the end of stream, eos: true, } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 6db6b596..269e6299 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -133,7 +133,7 @@ func (r *rpcStream) Close() error { Method: r.request.Method(), Endpoint: r.request.Endpoint(), Type: codec.Error, - Error: lastStreamResponseError, + Error: lastStreamResponseError, }, nil) } diff --git a/util/socket/socket.go b/util/socket/socket.go index 77b0aac0..59bb538d 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -70,7 +70,7 @@ func (s *Socket) Send(m *transport.Message) error { // make copy msg := &transport.Message{ Header: make(map[string]string), - Body: make([]byte, len(m.Body)), + Body: make([]byte, len(m.Body)), } // copy headers From 5a5b1b8f6e0ad129ae2a273167b304a13c56f440 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 15 Aug 2019 20:54:28 +0100 Subject: [PATCH 6/9] only continue to stream when its a stream --- proxy/mucp/mucp.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 3f54d9c0..9df28aec 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -42,6 +42,11 @@ type Proxy struct { // read client request and write to server func readLoop(r server.Request, s client.Stream) error { + // we don't loop unless its a stream + if !r.Stream() { + return nil + } + // request to backend server req := s.Request() @@ -266,6 +271,11 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server } else if err != nil { return err } + + // we don't continue unless its a stream + if !req.Stream() { + return nil + } } } From 991142cd5737d2a667bfc3f249ac899b3b548c1b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 16 Aug 2019 14:42:45 +0100 Subject: [PATCH 7/9] No need to set request in the buffer --- server/rpc_codec.go | 2 +- server/rpc_server.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/rpc_codec.go b/server/rpc_codec.go index ff2e4261..19cdc564 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -154,7 +154,7 @@ func setupProtocol(msg *transport.Message) codec.NewCodec { func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { rwc := &readWriteCloser{ - rbuf: bytes.NewBuffer(req.Body), + rbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil), } r := &rpcCodec{ diff --git a/server/rpc_server.go b/server/rpc_server.go index 0b9883c2..5fb4af29 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -81,12 +81,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { return } - // use Micro-Id as the stream identifier + // use Micro-Stream as the stream identifier // in the event its blank we'll always process // on the same socket id := msg.Header["Micro-Stream"] // if there's no stream id then its a standard request + // use the Micro-Id if len(id) == 0 { id = msg.Header["Micro-Id"] } From 0b0eee41d0e50f54461384e4e696e59ae29a219c Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 16 Aug 2019 16:46:29 +0100 Subject: [PATCH 8/9] functioning proxy code --- client/options.go | 10 ++++++++++ client/rpc_client.go | 46 ++++++++++++++++++++++++++++++++------------ client/rpc_stream.go | 15 ++++++++++++--- proxy/mucp/mucp.go | 16 ++++++++------- 4 files changed, 65 insertions(+), 22 deletions(-) diff --git a/client/options.go b/client/options.go index 9f363d74..0ba57ad7 100644 --- a/client/options.go +++ b/client/options.go @@ -59,6 +59,9 @@ type CallOptions struct { // Middleware for low level call func CallWrappers []CallWrapper + // SendEOS specifies whether to send EOS + SendEOS bool + // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -305,6 +308,13 @@ func WithDialTimeout(d time.Duration) CallOption { } } +// SendEOS specifies whether to send the end of stream message +func SendEOS(b bool) CallOption { + return func(o *CallOptions) { + o.SendEOS = b + } +} + // Request Options func WithContentType(ct string) RequestOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 91d11a40..5449dcd9 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -96,15 +96,10 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, } } - var grr error c, err := r.pool.Get(address, transport.WithTimeout(opts.DialTimeout)) if err != nil { return errors.InternalServerError("go.micro.client", "connection error: %v", err) } - defer func() { - // defer execution of release - r.pool.Release(c, grr) - }() seq := atomic.LoadUint64(&r.seq) atomic.AddUint64(&r.seq, 1) @@ -116,15 +111,19 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, } stream := &rpcStream{ + id: fmt.Sprintf("%v", seq), context: ctx, request: req, response: rsp, codec: codec, closed: make(chan bool), - id: fmt.Sprintf("%v", seq), + release: func(err error) { r.pool.Release(c, err) }, + sendEOS: opts.SendEOS, } + // close the stream on exiting this function defer stream.Close() + // wait for error response ch := make(chan error, 1) go func() { @@ -150,14 +149,26 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, ch <- nil }() + var grr error + select { case err := <-ch: grr = err return err case <-ctx.Done(): - grr = ctx.Err() - return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) + grr = errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) } + + // set the stream error + if grr != nil { + stream.Lock() + stream.err = grr + stream.Unlock() + + return grr + } + + return nil } func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { @@ -201,7 +212,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) } - c, err := r.opts.Transport.Dial(address, dOpts...) + c, err := r.pool.Get(address, dOpts...) if err != nil { return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) } @@ -225,19 +236,24 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request } stream := &rpcStream{ + id: id, context: ctx, request: req, response: rsp, - closed: make(chan bool), codec: codec, - id: id, + // used to close the stream + closed: make(chan bool), // signal the end of stream, - eos: true, + sendEOS: opts.SendEOS, + // release func + release: func(err error) { r.pool.Release(c, err) }, } + // wait for error response ch := make(chan error, 1) go func() { + // send the first message ch <- stream.Send(req.Body()) }() @@ -251,6 +267,12 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request } if grr != nil { + // set the error + stream.Lock() + stream.err = grr + stream.Unlock() + + // close the stream stream.Close() return nil, grr } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 269e6299..f904d2dd 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -20,7 +20,10 @@ type rpcStream struct { context context.Context // signal whether we should send EOS - eos bool + sendEOS bool + + // release releases the connection back to the pool + release func(err error) } func (r *rpcStream) isClosed() bool { @@ -125,7 +128,7 @@ func (r *rpcStream) Close() error { close(r.closed) // send the end of stream message - if r.eos { + if r.sendEOS { // no need to check for error r.codec.Write(&codec.Message{ Id: r.id, @@ -137,6 +140,12 @@ func (r *rpcStream) Close() error { }, nil) } - return r.codec.Close() + err := r.codec.Close() + + // release the connection + r.release(r.Error()) + + // return the codec error + return err } } diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 9df28aec..e4303d16 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -42,11 +42,6 @@ type Proxy struct { // read client request and write to server func readLoop(r server.Request, s client.Stream) error { - // we don't loop unless its a stream - if !r.Stream() { - return nil - } - // request to backend server req := s.Request() @@ -225,6 +220,11 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server // create new request with raw bytes body creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) + if !req.Stream() { + // specify not to send eos + opts = append(opts, client.SendEOS(false)) + } + // create new stream stream, err := p.Client.Stream(ctx, creq, opts...) if err != nil { @@ -232,8 +232,10 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server } defer stream.Close() - // create client request read loop - go readLoop(req, stream) + // create client request read loop if streaming + if req.Stream() { + go readLoop(req, stream) + } // get raw response resp := stream.Response() From 4495ca383968021631506d4e8fc9570089936346 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 16 Aug 2019 17:24:17 +0100 Subject: [PATCH 9/9] Use client.Call for non streaming requests --- client/options.go | 10 ---------- client/rpc_client.go | 4 ++-- client/rpc_codec.go | 6 ++++++ proxy/mucp/mucp.go | 25 +++++++++++++++---------- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/client/options.go b/client/options.go index 0ba57ad7..9f363d74 100644 --- a/client/options.go +++ b/client/options.go @@ -59,9 +59,6 @@ type CallOptions struct { // Middleware for low level call func CallWrappers []CallWrapper - // SendEOS specifies whether to send EOS - SendEOS bool - // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -308,13 +305,6 @@ func WithDialTimeout(d time.Duration) CallOption { } } -// SendEOS specifies whether to send the end of stream message -func SendEOS(b bool) CallOption { - return func(o *CallOptions) { - o.SendEOS = b - } -} - // Request Options func WithContentType(ct string) RequestOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 5449dcd9..754e4329 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -118,7 +118,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, codec: codec, closed: make(chan bool), release: func(err error) { r.pool.Release(c, err) }, - sendEOS: opts.SendEOS, + sendEOS: false, } // close the stream on exiting this function defer stream.Close() @@ -244,7 +244,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request // used to close the stream closed: make(chan bool), // signal the end of stream, - sendEOS: opts.SendEOS, + sendEOS: true, // release func release: func(err error) { r.pool.Release(c, err) }, } diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 37987496..c20537ea 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -249,6 +249,12 @@ func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error { func (c *rpcCodec) ReadBody(b interface{}) error { // read body + // read raw data + if v, ok := b.(*raw.Frame); ok { + v.Data = c.buf.rbuf.Bytes() + return nil + } + if err := c.codec.ReadBody(b); err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index e4303d16..095c10d8 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -220,9 +220,21 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server // create new request with raw bytes body creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) + // not a stream so make a client.Call request if !req.Stream() { - // specify not to send eos - opts = append(opts, client.SendEOS(false)) + crsp := new(bytes.Frame) + + // make a call to the backend + if err := p.Client.Call(ctx, creq, crsp, opts...); err != nil { + return err + } + + // write the response + if err := rsp.Write(crsp.Data); err != nil { + return err + } + + return nil } // create new stream @@ -233,9 +245,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server defer stream.Close() // create client request read loop if streaming - if req.Stream() { - go readLoop(req, stream) - } + go readLoop(req, stream) // get raw response resp := stream.Response() @@ -273,11 +283,6 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server } else if err != nil { return err } - - // we don't continue unless its a stream - if !req.Stream() { - return nil - } } }