multiplexing cruft
This commit is contained in:
parent
f6b8045dd5
commit
ef04331b86
@ -108,7 +108,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
|
||||
|
||||
seq := atomic.LoadUint64(&r.seq)
|
||||
atomic.AddUint64(&r.seq, 1)
|
||||
codec := newRpcCodec(msg, c, cf)
|
||||
codec := newRpcCodec(msg, c, cf, "")
|
||||
|
||||
rsp := &rpcResponse{
|
||||
socket: c,
|
||||
@ -206,7 +206,13 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
|
||||
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
|
||||
}
|
||||
|
||||
codec := newRpcCodec(msg, c, cf)
|
||||
// increment the sequence number
|
||||
seq := atomic.LoadUint64(&r.seq)
|
||||
atomic.AddUint64(&r.seq, 1)
|
||||
id := fmt.Sprintf("%v", seq)
|
||||
|
||||
// create codec with stream id
|
||||
codec := newRpcCodec(msg, c, cf, id)
|
||||
|
||||
rsp := &rpcResponse{
|
||||
socket: c,
|
||||
@ -224,6 +230,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
|
||||
response: rsp,
|
||||
closed: make(chan bool),
|
||||
codec: codec,
|
||||
id: id,
|
||||
// signal the end of stream,
|
||||
eos: true,
|
||||
}
|
||||
|
||||
ch := make(chan error, 1)
|
||||
|
@ -39,6 +39,9 @@ type rpcCodec struct {
|
||||
|
||||
req *transport.Message
|
||||
buf *readWriteCloser
|
||||
|
||||
// signify if its a stream
|
||||
stream string
|
||||
}
|
||||
|
||||
type readWriteCloser struct {
|
||||
@ -113,7 +116,7 @@ func getHeaders(m *codec.Message) {
|
||||
}
|
||||
}
|
||||
|
||||
func setHeaders(m *codec.Message) {
|
||||
func setHeaders(m *codec.Message, stream string) {
|
||||
set := func(hdr, v string) {
|
||||
if len(v) == 0 {
|
||||
return
|
||||
@ -127,6 +130,10 @@ func setHeaders(m *codec.Message) {
|
||||
set("Micro-Method", m.Method)
|
||||
set("Micro-Endpoint", m.Endpoint)
|
||||
set("Micro-Error", m.Error)
|
||||
|
||||
if len(stream) > 0 {
|
||||
set("Micro-Stream", stream)
|
||||
}
|
||||
}
|
||||
|
||||
// setupProtocol sets up the old protocol
|
||||
@ -150,7 +157,7 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
|
||||
return defaultCodecs[msg.Header["Content-Type"]]
|
||||
}
|
||||
|
||||
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
|
||||
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec, stream string) codec.Codec {
|
||||
rwc := &readWriteCloser{
|
||||
wbuf: bytes.NewBuffer(nil),
|
||||
rbuf: bytes.NewBuffer(nil),
|
||||
@ -160,6 +167,7 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod
|
||||
client: client,
|
||||
codec: c(rwc),
|
||||
req: req,
|
||||
stream: stream,
|
||||
}
|
||||
return r
|
||||
}
|
||||
@ -178,7 +186,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
|
||||
}
|
||||
|
||||
// set the mucp headers
|
||||
setHeaders(m)
|
||||
setHeaders(m, c.stream)
|
||||
|
||||
// if body is bytes Frame don't encode
|
||||
if body != nil {
|
||||
|
@ -18,6 +18,9 @@ type rpcStream struct {
|
||||
response Response
|
||||
codec codec.Codec
|
||||
context context.Context
|
||||
|
||||
// signal whether we should send EOS
|
||||
eos bool
|
||||
}
|
||||
|
||||
func (r *rpcStream) isClosed() bool {
|
||||
@ -120,6 +123,20 @@ func (r *rpcStream) Close() error {
|
||||
return nil
|
||||
default:
|
||||
close(r.closed)
|
||||
|
||||
// send the end of stream message
|
||||
if r.eos {
|
||||
// no need to check for error
|
||||
r.codec.Write(&codec.Message{
|
||||
Id: r.id,
|
||||
Target: r.request.Service(),
|
||||
Method: r.request.Method(),
|
||||
Endpoint: r.request.Endpoint(),
|
||||
Type: codec.Error,
|
||||
Error: lastStreamResponseError,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
return r.codec.Close()
|
||||
}
|
||||
}
|
||||
|
@ -158,7 +158,6 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
|
||||
wbuf: bytes.NewBuffer(nil),
|
||||
}
|
||||
r := &rpcCodec{
|
||||
first: true,
|
||||
buf: rwc,
|
||||
codec: c(rwc),
|
||||
req: req,
|
||||
@ -174,33 +173,27 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
||||
Body: c.req.Body,
|
||||
}
|
||||
|
||||
// if its a follow on request read it
|
||||
if !c.first {
|
||||
var tm transport.Message
|
||||
var tm transport.Message
|
||||
|
||||
// read off the socket
|
||||
if err := c.socket.Recv(&tm); err != nil {
|
||||
return err
|
||||
}
|
||||
// reset the read buffer
|
||||
c.buf.rbuf.Reset()
|
||||
// read off the socket
|
||||
if err := c.socket.Recv(&tm); err != nil {
|
||||
return err
|
||||
}
|
||||
// reset the read buffer
|
||||
c.buf.rbuf.Reset()
|
||||
|
||||
// write the body to the buffer
|
||||
if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// set the message header
|
||||
m.Header = tm.Header
|
||||
// set the message body
|
||||
m.Body = tm.Body
|
||||
|
||||
// set req
|
||||
c.req = &tm
|
||||
// write the body to the buffer
|
||||
if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// no longer first read
|
||||
c.first = false
|
||||
// set the message header
|
||||
m.Header = tm.Header
|
||||
// set the message body
|
||||
m.Body = tm.Body
|
||||
|
||||
// set req
|
||||
c.req = &tm
|
||||
|
||||
// set some internal things
|
||||
getHeaders(&m)
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/micro/go-micro/util/addr"
|
||||
log "github.com/micro/go-micro/util/log"
|
||||
mnet "github.com/micro/go-micro/util/net"
|
||||
"github.com/micro/go-micro/util/socket"
|
||||
)
|
||||
|
||||
type rpcServer struct {
|
||||
@ -70,23 +71,108 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
}
|
||||
}()
|
||||
|
||||
// multiplex the streams on a single socket by Micro-Stream
|
||||
var mtx sync.RWMutex
|
||||
sockets := make(map[string]*socket.Socket)
|
||||
|
||||
log.Info("New socket")
|
||||
|
||||
for {
|
||||
var msg transport.Message
|
||||
if err := sock.Recv(&msg); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// use Micro-Id as the stream identifier
|
||||
// in the event its blank we'll always process
|
||||
// on the same socket
|
||||
id := msg.Header["Micro-Stream"]
|
||||
|
||||
// if there's no stream id then its a standard request
|
||||
if len(id) == 0 {
|
||||
id = msg.Header["Micro-Id"]
|
||||
}
|
||||
|
||||
// add to wait group if "wait" is opt-in
|
||||
if s.wg != nil {
|
||||
s.wg.Add(1)
|
||||
}
|
||||
|
||||
// check we have an existing socket
|
||||
mtx.RLock()
|
||||
psock, ok := sockets[id]
|
||||
mtx.RUnlock()
|
||||
|
||||
log.Infof("Got socket %v %v", id, ok)
|
||||
|
||||
// got the socket
|
||||
if ok {
|
||||
// accept the message
|
||||
if err := psock.Accept(&msg); err != nil {
|
||||
log.Infof("Accept Error %+v", err)
|
||||
// close the socket
|
||||
psock.Close()
|
||||
|
||||
// delete the socket
|
||||
mtx.Lock()
|
||||
delete(sockets, id)
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
// done(1)
|
||||
if s.wg != nil {
|
||||
s.wg.Done()
|
||||
}
|
||||
|
||||
// continue to the next message
|
||||
continue
|
||||
}
|
||||
|
||||
// no socket was found
|
||||
psock = socket.New()
|
||||
psock.SetLocal(sock.Local())
|
||||
psock.SetRemote(sock.Remote())
|
||||
|
||||
// load the socket
|
||||
psock.Accept(&msg)
|
||||
|
||||
// save a new socket
|
||||
mtx.Lock()
|
||||
sockets[id] = psock
|
||||
mtx.Unlock()
|
||||
|
||||
// process the outbound messages from the socket
|
||||
go func(id string, psock *socket.Socket) {
|
||||
defer psock.Close()
|
||||
|
||||
for {
|
||||
// get the message from our internal handler/stream
|
||||
m := new(transport.Message)
|
||||
if err := psock.Process(m); err != nil {
|
||||
log.Infof("Process Error %+v", err)
|
||||
|
||||
// delete the socket
|
||||
mtx.Lock()
|
||||
delete(sockets, id)
|
||||
mtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// send the message back over the socket
|
||||
if err := sock.Send(m); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(id, psock)
|
||||
|
||||
// now walk the usual path
|
||||
|
||||
// we use this Timeout header to set a server deadline
|
||||
to := msg.Header["Timeout"]
|
||||
// we use this Content-Type header to identify the codec needed
|
||||
ct := msg.Header["Content-Type"]
|
||||
|
||||
// strip our headers
|
||||
// copy the message headers
|
||||
hdr := make(map[string]string)
|
||||
for k, v := range msg.Header {
|
||||
hdr[k] = v
|
||||
@ -96,17 +182,17 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
hdr["Local"] = sock.Local()
|
||||
hdr["Remote"] = sock.Remote()
|
||||
|
||||
// create new context
|
||||
// create new context with the metadata
|
||||
ctx := metadata.NewContext(context.Background(), hdr)
|
||||
|
||||
// set the timeout if we have it
|
||||
// set the timeout from the header if we have it
|
||||
if len(to) > 0 {
|
||||
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
|
||||
ctx, _ = context.WithTimeout(ctx, time.Duration(n))
|
||||
}
|
||||
}
|
||||
|
||||
// no content type
|
||||
// if there's no content type default it
|
||||
if len(ct) == 0 {
|
||||
msg.Header["Content-Type"] = DefaultContentType
|
||||
ct = DefaultContentType
|
||||
@ -133,7 +219,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
}
|
||||
}
|
||||
|
||||
rcodec := newRpcCodec(&msg, sock, cf)
|
||||
rcodec := newRpcCodec(&msg, psock, cf)
|
||||
|
||||
// check stream id
|
||||
var stream bool
|
||||
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
|
||||
stream = true
|
||||
}
|
||||
|
||||
// internal request
|
||||
request := &rpcRequest{
|
||||
@ -144,15 +236,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
codec: rcodec,
|
||||
header: msg.Header,
|
||||
body: msg.Body,
|
||||
socket: sock,
|
||||
stream: true,
|
||||
first: true,
|
||||
socket: psock,
|
||||
stream: stream,
|
||||
}
|
||||
|
||||
// internal response
|
||||
response := &rpcResponse{
|
||||
header: make(map[string]string),
|
||||
socket: sock,
|
||||
socket: psock,
|
||||
codec: rcodec,
|
||||
}
|
||||
|
||||
@ -175,25 +266,36 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
r = rpcRouter{handler}
|
||||
}
|
||||
|
||||
// serve the actual request using the request router
|
||||
if err := r.ServeRequest(ctx, request, response); err != nil {
|
||||
// write an error response
|
||||
err = rcodec.Write(&codec.Message{
|
||||
Header: msg.Header,
|
||||
Error: err.Error(),
|
||||
Type: codec.Error,
|
||||
}, nil)
|
||||
// could not write the error response
|
||||
if err != nil {
|
||||
log.Logf("rpc: unable to write error response: %v", err)
|
||||
// serve the request in a go routine as this may be a stream
|
||||
go func(id string, psock *socket.Socket) {
|
||||
// serve the actual request using the request router
|
||||
if err := r.ServeRequest(ctx, request, response); err != nil {
|
||||
// write an error response
|
||||
err = rcodec.Write(&codec.Message{
|
||||
Header: msg.Header,
|
||||
Error: err.Error(),
|
||||
Type: codec.Error,
|
||||
}, nil)
|
||||
|
||||
// could not write the error response
|
||||
if err != nil {
|
||||
log.Logf("rpc: unable to write error response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
delete(sockets, id)
|
||||
mtx.Unlock()
|
||||
|
||||
psock.Close()
|
||||
|
||||
// once done serving signal we're done
|
||||
if s.wg != nil {
|
||||
s.wg.Done()
|
||||
}
|
||||
return
|
||||
}
|
||||
}(id, psock)
|
||||
|
||||
// done
|
||||
// signal we're done
|
||||
if s.wg != nil {
|
||||
s.wg.Done()
|
||||
}
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
// socket is our pseudo socket for transport.Socket
|
||||
type socket struct {
|
||||
// Socket is our pseudo socket for transport.Socket
|
||||
type Socket struct {
|
||||
// closed
|
||||
closed chan bool
|
||||
// remote addr
|
||||
@ -21,16 +21,16 @@ type socket struct {
|
||||
recv chan *transport.Message
|
||||
}
|
||||
|
||||
func (s *socket) SetLocal(l string) {
|
||||
func (s *Socket) SetLocal(l string) {
|
||||
s.local = l
|
||||
}
|
||||
|
||||
func (s *socket) SetRemote(r string) {
|
||||
func (s *Socket) SetRemote(r string) {
|
||||
s.remote = r
|
||||
}
|
||||
|
||||
// Accept passes a message to the socket which will be processed by the call to Recv
|
||||
func (s *socket) Accept(m *transport.Message) error {
|
||||
func (s *Socket) Accept(m *transport.Message) error {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return io.EOF
|
||||
@ -41,7 +41,7 @@ func (s *socket) Accept(m *transport.Message) error {
|
||||
}
|
||||
|
||||
// Process takes the next message off the send queue created by a call to Send
|
||||
func (s *socket) Process(m *transport.Message) error {
|
||||
func (s *Socket) Process(m *transport.Message) error {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return io.EOF
|
||||
@ -51,15 +51,15 @@ func (s *socket) Process(m *transport.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *socket) Remote() string {
|
||||
func (s *Socket) Remote() string {
|
||||
return s.remote
|
||||
}
|
||||
|
||||
func (s *socket) Local() string {
|
||||
func (s *Socket) Local() string {
|
||||
return s.local
|
||||
}
|
||||
|
||||
func (s *socket) Send(m *transport.Message) error {
|
||||
func (s *Socket) Send(m *transport.Message) error {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return io.EOF
|
||||
@ -70,13 +70,17 @@ func (s *socket) Send(m *transport.Message) error {
|
||||
// make copy
|
||||
msg := &transport.Message{
|
||||
Header: make(map[string]string),
|
||||
Body: m.Body,
|
||||
Body: make([]byte, len(m.Body)),
|
||||
}
|
||||
|
||||
// copy headers
|
||||
for k, v := range m.Header {
|
||||
msg.Header[k] = v
|
||||
}
|
||||
|
||||
// copy body
|
||||
copy(msg.Body, m.Body)
|
||||
|
||||
// send a message
|
||||
select {
|
||||
case s.send <- msg:
|
||||
@ -87,7 +91,7 @@ func (s *socket) Send(m *transport.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *socket) Recv(m *transport.Message) error {
|
||||
func (s *Socket) Recv(m *transport.Message) error {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return io.EOF
|
||||
@ -109,7 +113,7 @@ func (s *socket) Recv(m *transport.Message) error {
|
||||
}
|
||||
|
||||
// Close closes the socket
|
||||
func (s *socket) Close() error {
|
||||
func (s *Socket) Close() error {
|
||||
select {
|
||||
case <-s.closed:
|
||||
// no op
|
||||
@ -119,11 +123,21 @@ func (s *socket) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Indicates its closed
|
||||
func (s *socket) Done() bool {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// New returns a new pseudo socket which can be used in the place of a transport socket.
|
||||
// Messages are sent to the socket via Accept and receives from the socket via Process.
|
||||
// SetLocal/SetRemote should be called before using the socket.
|
||||
func New() *socket {
|
||||
return &socket{
|
||||
func New() *Socket {
|
||||
return &Socket{
|
||||
closed: make(chan bool),
|
||||
local: "local",
|
||||
remote: "remote",
|
||||
|
Loading…
Reference in New Issue
Block a user