Merge pull request #673 from micro/multiplex

Stream Multiplexing
This commit is contained in:
Asim Aslam 2019-08-16 17:41:45 +01:00 committed by GitHub
commit d9a699ae6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 391 additions and 65 deletions

View File

@ -96,19 +96,14 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
} }
} }
var grr error
c, err := r.pool.Get(address, transport.WithTimeout(opts.DialTimeout)) c, err := r.pool.Get(address, transport.WithTimeout(opts.DialTimeout))
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err) return errors.InternalServerError("go.micro.client", "connection error: %v", err)
} }
defer func() {
// defer execution of release
r.pool.Release(c, grr)
}()
seq := atomic.LoadUint64(&r.seq) seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1) atomic.AddUint64(&r.seq, 1)
codec := newRpcCodec(msg, c, cf) codec := newRpcCodec(msg, c, cf, "")
rsp := &rpcResponse{ rsp := &rpcResponse{
socket: c, socket: c,
@ -116,15 +111,19 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
} }
stream := &rpcStream{ stream := &rpcStream{
id: fmt.Sprintf("%v", seq),
context: ctx, context: ctx,
request: req, request: req,
response: rsp, response: rsp,
codec: codec, codec: codec,
closed: make(chan bool), closed: make(chan bool),
id: fmt.Sprintf("%v", seq), release: func(err error) { r.pool.Release(c, err) },
sendEOS: false,
} }
// close the stream on exiting this function
defer stream.Close() defer stream.Close()
// wait for error response
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
@ -150,14 +149,26 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
ch <- nil ch <- nil
}() }()
var grr error
select { select {
case err := <-ch: case err := <-ch:
grr = err grr = err
return err return err
case <-ctx.Done(): case <-ctx.Done():
grr = ctx.Err() grr = errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
} }
// set the stream error
if grr != nil {
stream.Lock()
stream.err = grr
stream.Unlock()
return grr
}
return nil
} }
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
@ -201,12 +212,18 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
} }
c, err := r.opts.Transport.Dial(address, dOpts...) c, err := r.pool.Get(address, dOpts...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
} }
codec := newRpcCodec(msg, c, cf) // increment the sequence number
seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1)
id := fmt.Sprintf("%v", seq)
// create codec with stream id
codec := newRpcCodec(msg, c, cf, id)
rsp := &rpcResponse{ rsp := &rpcResponse{
socket: c, socket: c,
@ -219,16 +236,24 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
} }
stream := &rpcStream{ stream := &rpcStream{
id: id,
context: ctx, context: ctx,
request: req, request: req,
response: rsp, response: rsp,
closed: make(chan bool),
codec: codec, codec: codec,
// used to close the stream
closed: make(chan bool),
// signal the end of stream,
sendEOS: true,
// release func
release: func(err error) { r.pool.Release(c, err) },
} }
// wait for error response
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
// send the first message
ch <- stream.Send(req.Body()) ch <- stream.Send(req.Body())
}() }()
@ -242,6 +267,12 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
} }
if grr != nil { if grr != nil {
// set the error
stream.Lock()
stream.err = grr
stream.Unlock()
// close the stream
stream.Close() stream.Close()
return nil, grr return nil, grr
} }

View File

@ -39,6 +39,9 @@ type rpcCodec struct {
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
// signify if its a stream
stream string
} }
type readWriteCloser struct { type readWriteCloser struct {
@ -113,7 +116,7 @@ func getHeaders(m *codec.Message) {
} }
} }
func setHeaders(m *codec.Message) { func setHeaders(m *codec.Message, stream string) {
set := func(hdr, v string) { set := func(hdr, v string) {
if len(v) == 0 { if len(v) == 0 {
return return
@ -126,6 +129,11 @@ func setHeaders(m *codec.Message) {
set("Micro-Service", m.Target) set("Micro-Service", m.Target)
set("Micro-Method", m.Method) set("Micro-Method", m.Method)
set("Micro-Endpoint", m.Endpoint) set("Micro-Endpoint", m.Endpoint)
set("Micro-Error", m.Error)
if len(stream) > 0 {
set("Micro-Stream", stream)
}
} }
// setupProtocol sets up the old protocol // setupProtocol sets up the old protocol
@ -149,7 +157,7 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
return defaultCodecs[msg.Header["Content-Type"]] return defaultCodecs[msg.Header["Content-Type"]]
} }
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec, stream string) codec.Codec {
rwc := &readWriteCloser{ rwc := &readWriteCloser{
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
rbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil),
@ -159,6 +167,7 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod
client: client, client: client,
codec: c(rwc), codec: c(rwc),
req: req, req: req,
stream: stream,
} }
return r return r
} }
@ -177,7 +186,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
} }
// set the mucp headers // set the mucp headers
setHeaders(m) setHeaders(m, c.stream)
// if body is bytes Frame don't encode // if body is bytes Frame don't encode
if body != nil { if body != nil {
@ -240,6 +249,12 @@ func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
func (c *rpcCodec) ReadBody(b interface{}) error { func (c *rpcCodec) ReadBody(b interface{}) error {
// read body // read body
// read raw data
if v, ok := b.(*raw.Frame); ok {
v.Data = c.buf.rbuf.Bytes()
return nil
}
if err := c.codec.ReadBody(b); err != nil { if err := c.codec.ReadBody(b); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error()) return errors.InternalServerError("go.micro.client.codec", err.Error())
} }

View File

@ -18,6 +18,12 @@ type rpcStream struct {
response Response response Response
codec codec.Codec codec codec.Codec
context context.Context context context.Context
// signal whether we should send EOS
sendEOS bool
// release releases the connection back to the pool
release func(err error)
} }
func (r *rpcStream) isClosed() bool { func (r *rpcStream) isClosed() bool {
@ -120,6 +126,26 @@ func (r *rpcStream) Close() error {
return nil return nil
default: default:
close(r.closed) close(r.closed)
return r.codec.Close()
// send the end of stream message
if r.sendEOS {
// no need to check for error
r.codec.Write(&codec.Message{
Id: r.id,
Target: r.request.Service(),
Method: r.request.Method(),
Endpoint: r.request.Endpoint(),
Type: codec.Error,
Error: lastStreamResponseError,
}, nil)
}
err := r.codec.Close()
// release the connection
r.release(r.Error())
// return the codec error
return err
} }
} }

View File

@ -220,6 +220,23 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
// create new request with raw bytes body // create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType()))
// not a stream so make a client.Call request
if !req.Stream() {
crsp := new(bytes.Frame)
// make a call to the backend
if err := p.Client.Call(ctx, creq, crsp, opts...); err != nil {
return err
}
// write the response
if err := rsp.Write(crsp.Data); err != nil {
return err
}
return nil
}
// create new stream // create new stream
stream, err := p.Client.Stream(ctx, creq, opts...) stream, err := p.Client.Stream(ctx, creq, opts...)
if err != nil { if err != nil {
@ -227,7 +244,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
defer stream.Close() defer stream.Close()
// create client request read loop // create client request read loop if streaming
go readLoop(req, stream) go readLoop(req, stream)
// get raw response // get raw response

View File

@ -154,11 +154,10 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{ rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body), rbuf: bytes.NewBuffer(nil),
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
} }
r := &rpcCodec{ r := &rpcCodec{
first: true,
buf: rwc, buf: rwc,
codec: c(rwc), codec: c(rwc),
req: req, req: req,
@ -174,33 +173,27 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
Body: c.req.Body, Body: c.req.Body,
} }
// if its a follow on request read it var tm transport.Message
if !c.first {
var tm transport.Message
// read off the socket // read off the socket
if err := c.socket.Recv(&tm); err != nil { if err := c.socket.Recv(&tm); err != nil {
return err return err
} }
// reset the read buffer // reset the read buffer
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
// write the body to the buffer // write the body to the buffer
if _, err := c.buf.rbuf.Write(tm.Body); err != nil { if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err return err
}
// set the message header
m.Header = tm.Header
// set the message body
m.Body = tm.Body
// set req
c.req = &tm
} }
// no longer first read // set the message header
c.first = false m.Header = tm.Header
// set the message body
m.Body = tm.Body
// set req
c.req = &tm
// set some internal things // set some internal things
getHeaders(&m) getHeaders(&m)

View File

@ -19,6 +19,7 @@ import (
"github.com/micro/go-micro/util/addr" "github.com/micro/go-micro/util/addr"
log "github.com/micro/go-micro/util/log" log "github.com/micro/go-micro/util/log"
mnet "github.com/micro/go-micro/util/net" mnet "github.com/micro/go-micro/util/net"
"github.com/micro/go-micro/util/socket"
) )
type rpcServer struct { type rpcServer struct {
@ -70,23 +71,99 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
} }
}() }()
// multiplex the streams on a single socket by Micro-Stream
var mtx sync.RWMutex
sockets := make(map[string]*socket.Socket)
for { for {
var msg transport.Message var msg transport.Message
if err := sock.Recv(&msg); err != nil { if err := sock.Recv(&msg); err != nil {
return return
} }
// use Micro-Stream as the stream identifier
// in the event its blank we'll always process
// on the same socket
id := msg.Header["Micro-Stream"]
// if there's no stream id then its a standard request
// use the Micro-Id
if len(id) == 0 {
id = msg.Header["Micro-Id"]
}
// add to wait group if "wait" is opt-in // add to wait group if "wait" is opt-in
if s.wg != nil { if s.wg != nil {
s.wg.Add(1) s.wg.Add(1)
} }
// check we have an existing socket
mtx.RLock()
psock, ok := sockets[id]
mtx.RUnlock()
// got the socket
if ok {
// accept the message
if err := psock.Accept(&msg); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
}
// done(1)
if s.wg != nil {
s.wg.Done()
}
// continue to the next message
continue
}
// no socket was found
psock = socket.New()
psock.SetLocal(sock.Local())
psock.SetRemote(sock.Remote())
// load the socket
psock.Accept(&msg)
// save a new socket
mtx.Lock()
sockets[id] = psock
mtx.Unlock()
// process the outbound messages from the socket
go func(id string, psock *socket.Socket) {
defer psock.Close()
for {
// get the message from our internal handler/stream
m := new(transport.Message)
if err := psock.Process(m); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
return
}
// send the message back over the socket
if err := sock.Send(m); err != nil {
return
}
}
}(id, psock)
// now walk the usual path
// we use this Timeout header to set a server deadline // we use this Timeout header to set a server deadline
to := msg.Header["Timeout"] to := msg.Header["Timeout"]
// we use this Content-Type header to identify the codec needed // we use this Content-Type header to identify the codec needed
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
// strip our headers // copy the message headers
hdr := make(map[string]string) hdr := make(map[string]string)
for k, v := range msg.Header { for k, v := range msg.Header {
hdr[k] = v hdr[k] = v
@ -96,17 +173,17 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
hdr["Local"] = sock.Local() hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote() hdr["Remote"] = sock.Remote()
// create new context // create new context with the metadata
ctx := metadata.NewContext(context.Background(), hdr) ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it // set the timeout from the header if we have it
if len(to) > 0 { if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil { if n, err := strconv.ParseUint(to, 10, 64); err == nil {
ctx, _ = context.WithTimeout(ctx, time.Duration(n)) ctx, _ = context.WithTimeout(ctx, time.Duration(n))
} }
} }
// no content type // if there's no content type default it
if len(ct) == 0 { if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType ct = DefaultContentType
@ -133,7 +210,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
} }
} }
rcodec := newRpcCodec(&msg, sock, cf) rcodec := newRpcCodec(&msg, psock, cf)
// check stream id
var stream bool
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
stream = true
}
// internal request // internal request
request := &rpcRequest{ request := &rpcRequest{
@ -144,15 +227,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
codec: rcodec, codec: rcodec,
header: msg.Header, header: msg.Header,
body: msg.Body, body: msg.Body,
socket: sock, socket: psock,
stream: true, stream: stream,
first: true,
} }
// internal response // internal response
response := &rpcResponse{ response := &rpcResponse{
header: make(map[string]string), header: make(map[string]string),
socket: sock, socket: psock,
codec: rcodec, codec: rcodec,
} }
@ -175,25 +257,34 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
r = rpcRouter{handler} r = rpcRouter{handler}
} }
// serve the actual request using the request router // serve the request in a go routine as this may be a stream
if err := r.ServeRequest(ctx, request, response); err != nil { go func(id string, psock *socket.Socket) {
// write an error response // serve the actual request using the request router
err = rcodec.Write(&codec.Message{ if err := r.ServeRequest(ctx, request, response); err != nil {
Header: msg.Header, // write an error response
Error: err.Error(), err = rcodec.Write(&codec.Message{
Type: codec.Error, Header: msg.Header,
}, nil) Error: err.Error(),
// could not write the error response Type: codec.Error,
if err != nil { }, nil)
log.Logf("rpc: unable to write error response: %v", err)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
} }
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
// once done serving signal we're done
if s.wg != nil { if s.wg != nil {
s.wg.Done() s.wg.Done()
} }
return }(id, psock)
}
// done // signal we're done
if s.wg != nil { if s.wg != nil {
s.wg.Done() s.wg.Done()
} }

View File

@ -2,6 +2,8 @@ package server
import ( import (
"context" "context"
"errors"
"io"
"sync" "sync"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
@ -59,6 +61,20 @@ func (r *rpcStream) Recv(msg interface{}) error {
return err return err
} }
// check the error
if len(req.Error) > 0 {
// Check the client closed the stream
switch req.Error {
case lastStreamResponseError.Error():
// discard body
r.codec.ReadBody(nil)
r.err = io.EOF
return io.EOF
default:
return errors.New(req.Error)
}
}
// we need to stay up to date with sequence numbers // we need to stay up to date with sequence numbers
r.id = req.Id r.id = req.Id
if err := r.codec.ReadBody(msg); err != nil { if err := r.codec.ReadBody(msg); err != nil {

137
util/socket/socket.go Normal file
View File

@ -0,0 +1,137 @@
// Package socket provides a pseudo socket
package socket
import (
"io"
"github.com/micro/go-micro/transport"
)
// Socket is our pseudo socket for transport.Socket
type Socket struct {
// closed
closed chan bool
// remote addr
remote string
// local addr
local string
// send chan
send chan *transport.Message
// recv chan
recv chan *transport.Message
}
func (s *Socket) SetLocal(l string) {
s.local = l
}
func (s *Socket) SetRemote(r string) {
s.remote = r
}
// Accept passes a message to the socket which will be processed by the call to Recv
func (s *Socket) Accept(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
case s.recv <- m:
return nil
}
return nil
}
// Process takes the next message off the send queue created by a call to Send
func (s *Socket) Process(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
case msg := <-s.send:
*m = *msg
}
return nil
}
func (s *Socket) Remote() string {
return s.remote
}
func (s *Socket) Local() string {
return s.local
}
func (s *Socket) Send(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// make copy
msg := &transport.Message{
Header: make(map[string]string),
Body: make([]byte, len(m.Body)),
}
// copy headers
for k, v := range m.Header {
msg.Header[k] = v
}
// copy body
copy(msg.Body, m.Body)
// send a message
select {
case s.send <- msg:
case <-s.closed:
return io.EOF
}
return nil
}
func (s *Socket) Recv(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// receive a message
select {
case msg := <-s.recv:
// set message
*m = *msg
case <-s.closed:
return io.EOF
}
// return nil
return nil
}
// Close closes the socket
func (s *Socket) Close() error {
select {
case <-s.closed:
// no op
default:
close(s.closed)
}
return nil
}
// New returns a new pseudo socket which can be used in the place of a transport socket.
// Messages are sent to the socket via Accept and receives from the socket via Process.
// SetLocal/SetRemote should be called before using the socket.
func New() *Socket {
return &Socket{
closed: make(chan bool),
local: "local",
remote: "remote",
send: make(chan *transport.Message, 128),
recv: make(chan *transport.Message, 128),
}
}