Full support for grpc server side

This commit is contained in:
Asim Aslam 2019-08-26 12:33:59 +01:00
parent 36623bfe50
commit 6daf4fda72
3 changed files with 59 additions and 28 deletions

View File

@ -15,9 +15,10 @@ import (
) )
type rpcCodec struct { type rpcCodec struct {
socket transport.Socket socket transport.Socket
codec codec.Codec codec codec.Codec
first bool first bool
protocol string
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
@ -157,12 +158,27 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
rbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil),
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
} }
r := &rpcCodec{ r := &rpcCodec{
buf: rwc, buf: rwc,
codec: c(rwc), codec: c(rwc),
req: req, req: req,
socket: socket, socket: socket,
protocol: "mucp",
} }
// if grpc pre-load the buffer
// TODO: remove this terrible hack
switch r.codec.String() {
case "grpc":
// set as first
r.first = true
// write the body
rwc.rbuf.Write(req.Body)
// set the protocol
r.protocol = "grpc"
}
return r return r
} }
@ -173,27 +189,33 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
Body: c.req.Body, Body: c.req.Body,
} }
var tm transport.Message // first message could be pre-loaded
if !c.first {
var tm transport.Message
// read off the socket // read off the socket
if err := c.socket.Recv(&tm); err != nil { if err := c.socket.Recv(&tm); err != nil {
return err return err
} }
// reset the read buffer // reset the read buffer
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
// write the body to the buffer // write the body to the buffer
if _, err := c.buf.rbuf.Write(tm.Body); err != nil { if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err return err
}
// set the message header
m.Header = tm.Header
// set the message body
m.Body = tm.Body
// set req
c.req = &tm
} }
// set the message header // disable first
m.Header = tm.Header c.first = false
// set the message body
m.Body = tm.Body
// set req
c.req = &tm
// set some internal things // set some internal things
getHeaders(&m) getHeaders(&m)
@ -293,5 +315,5 @@ func (c *rpcCodec) Close() error {
} }
func (c *rpcCodec) String() string { func (c *rpcCodec) String() string {
return "rpc" return c.protocol
} }

View File

@ -213,9 +213,11 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
} }
rcodec := newRpcCodec(&msg, psock, cf) rcodec := newRpcCodec(&msg, psock, cf)
protocol := rcodec.String()
// check stream id // check stream id
var stream bool var stream bool
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
stream = true stream = true
} }
@ -265,6 +267,13 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// process the outbound messages from the socket // process the outbound messages from the socket
go func(id string, psock *socket.Socket) { go func(id string, psock *socket.Socket) {
defer func() { defer func() {
// TODO: don't hack this but if its grpc just break out of the stream
// We do this because the underlying connection is h2 and its a stream
switch protocol {
case "grpc":
sock.Close()
}
wg.Done() wg.Done()
}() }()

View File

@ -363,6 +363,9 @@ func (h *httpTransportSocket) Close() error {
// close the channel // close the channel
close(h.closed) close(h.closed)
// close the buffer
h.r.Body.Close()
// close the connection // close the connection
if h.r.ProtoMajor == 1 { if h.r.ProtoMajor == 1 {
return h.conn.Close() return h.conn.Close()
@ -436,9 +439,6 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
closed: make(chan bool), closed: make(chan bool),
} }
// cleanup
//defer sock.Close()
// execute the socket // execute the socket
fn(sock) fn(sock)
}) })