Merge pull request #700 from micro/h2-grpc

H2 grpc
This commit is contained in:
Asim Aslam 2019-08-26 15:55:31 +01:00 committed by GitHub
commit 443fc0ebde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 202 additions and 82 deletions

View File

@ -89,9 +89,22 @@ func (c *Codec) Write(m *codec.Message, b interface{}) error {
m.Header[":authority"] = m.Target m.Header[":authority"] = m.Target
m.Header["content-type"] = c.ContentType m.Header["content-type"] = c.ContentType
case codec.Response: case codec.Response:
m.Header["Trailer"] = "grpc-status, grpc-message" m.Header["Trailer"] = "grpc-status" //, grpc-message"
m.Header["content-type"] = c.ContentType
m.Header[":status"] = "200"
m.Header["grpc-status"] = "0" m.Header["grpc-status"] = "0"
m.Header["grpc-message"] = "" // m.Header["grpc-message"] = ""
case codec.Error:
m.Header["Trailer"] = "grpc-status, grpc-message"
// micro end of stream
if m.Error == "EOS" {
m.Header["grpc-status"] = "0"
} else {
m.Header["grpc-message"] = m.Error
m.Header["grpc-status"] = "13"
}
return nil
} }
// marshal content // marshal content

View File

@ -15,9 +15,10 @@ import (
) )
type rpcCodec struct { type rpcCodec struct {
socket transport.Socket socket transport.Socket
codec codec.Codec codec codec.Codec
first bool first bool
protocol string
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
@ -157,12 +158,27 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
rbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil),
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
} }
r := &rpcCodec{ r := &rpcCodec{
buf: rwc, buf: rwc,
codec: c(rwc), codec: c(rwc),
req: req, req: req,
socket: socket, socket: socket,
protocol: "mucp",
} }
// if grpc pre-load the buffer
// TODO: remove this terrible hack
switch r.codec.String() {
case "grpc":
// set as first
r.first = true
// write the body
rwc.rbuf.Write(req.Body)
// set the protocol
r.protocol = "grpc"
}
return r return r
} }
@ -173,27 +189,33 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
Body: c.req.Body, Body: c.req.Body,
} }
var tm transport.Message // first message could be pre-loaded
if !c.first {
var tm transport.Message
// read off the socket // read off the socket
if err := c.socket.Recv(&tm); err != nil { if err := c.socket.Recv(&tm); err != nil {
return err return err
} }
// reset the read buffer // reset the read buffer
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
// write the body to the buffer // write the body to the buffer
if _, err := c.buf.rbuf.Write(tm.Body); err != nil { if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err return err
}
// set the message header
m.Header = tm.Header
// set the message body
m.Body = tm.Body
// set req
c.req = &tm
} }
// set the message header // disable first
m.Header = tm.Header c.first = false
// set the message body
m.Body = tm.Body
// set req
c.req = &tm
// set some internal things // set some internal things
getHeaders(&m) getHeaders(&m)
@ -293,5 +315,5 @@ func (c *rpcCodec) Close() error {
} }
func (c *rpcCodec) String() string { func (c *rpcCodec) String() string {
return "rpc" return c.protocol
} }

View File

@ -63,20 +63,33 @@ func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response)
// ServeConn serves a single connection // ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) { func (s *rpcServer) ServeConn(sock transport.Socket) {
var wg sync.WaitGroup
var mtx sync.RWMutex
// streams are multiplexed on Micro-Stream or Micro-Id header
sockets := make(map[string]*socket.Socket)
defer func() { defer func() {
// close socket // wait till done
wg.Wait()
// close underlying socket
sock.Close() sock.Close()
// close the sockets
mtx.Lock()
for id, psock := range sockets {
psock.Close()
delete(sockets, id)
}
mtx.Unlock()
// recover any panics
if r := recover(); r != nil { if r := recover(); r != nil {
log.Log("panic recovered: ", r) log.Log("panic recovered: ", r)
log.Log(string(debug.Stack())) log.Log(string(debug.Stack()))
} }
}() }()
// multiplex the streams on a single socket by Micro-Stream
var mtx sync.RWMutex
sockets := make(map[string]*socket.Socket)
for { for {
var msg transport.Message var msg transport.Message
if err := sock.Recv(&msg); err != nil { if err := sock.Recv(&msg); err != nil {
@ -94,6 +107,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
id = msg.Header["Micro-Id"] id = msg.Header["Micro-Id"]
} }
// we're starting processing
wg.Add(1)
// add to wait group if "wait" is opt-in // add to wait group if "wait" is opt-in
if s.wg != nil { if s.wg != nil {
s.wg.Add(1) s.wg.Add(1)
@ -119,6 +135,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
s.wg.Done() s.wg.Done()
} }
wg.Done()
// continue to the next message // continue to the next message
continue continue
} }
@ -136,28 +154,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
sockets[id] = psock sockets[id] = psock
mtx.Unlock() mtx.Unlock()
// process the outbound messages from the socket
go func(id string, psock *socket.Socket) {
defer psock.Close()
for {
// get the message from our internal handler/stream
m := new(transport.Message)
if err := psock.Process(m); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
return
}
// send the message back over the socket
if err := sock.Send(m); err != nil {
return
}
}
}(id, psock)
// now walk the usual path // now walk the usual path
// we use this Timeout header to set a server deadline // we use this Timeout header to set a server deadline
@ -205,17 +201,23 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
}, },
Body: []byte(err.Error()), Body: []byte(err.Error()),
}) })
if s.wg != nil { if s.wg != nil {
s.wg.Done() s.wg.Done()
} }
wg.Done()
return return
} }
} }
rcodec := newRpcCodec(&msg, psock, cf) rcodec := newRpcCodec(&msg, psock, cf)
protocol := rcodec.String()
// check stream id // check stream id
var stream bool var stream bool
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
stream = true stream = true
} }
@ -259,8 +261,44 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
r = rpcRouter{handler} r = rpcRouter{handler}
} }
// wait for processing to exit
wg.Add(1)
// process the outbound messages from the socket
go func(id string, psock *socket.Socket) {
defer func() {
// TODO: don't hack this but if its grpc just break out of the stream
// We do this because the underlying connection is h2 and its a stream
switch protocol {
case "grpc":
sock.Close()
}
wg.Done()
}()
for {
// get the message from our internal handler/stream
m := new(transport.Message)
if err := psock.Process(m); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
return
}
// send the message back over the socket
if err := sock.Send(m); err != nil {
return
}
}
}(id, psock)
// serve the request in a go routine as this may be a stream // serve the request in a go routine as this may be a stream
go func(id string, psock *socket.Socket) { go func(id string, psock *socket.Socket) {
defer psock.Close()
// serve the actual request using the request router // serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil { if err := r.ServeRequest(ctx, request, response); err != nil {
// write an error response // write an error response
@ -285,8 +323,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
s.wg.Done() s.wg.Done()
} }
// done with this socket
wg.Done()
}(id, psock) }(id, psock)
} }
} }

View File

@ -33,6 +33,8 @@ type httpTransportClient struct {
once sync.Once once sync.Once
sync.RWMutex sync.RWMutex
// request must be stored for response processing
r chan *http.Request r chan *http.Request
bl []*http.Request bl []*http.Request
buff *bufio.Reader buff *bufio.Reader
@ -48,10 +50,18 @@ type httpTransportSocket struct {
r *http.Request r *http.Request
rw *bufio.ReadWriter rw *bufio.ReadWriter
mtx sync.RWMutex
// the hijacked when using http 1
conn net.Conn conn net.Conn
// for the first request // for the first request
ch chan *http.Request ch chan *http.Request
// h2 things
buf *bufio.Reader
// indicate if socket is closed
closed chan bool
// local/remote ip // local/remote ip
local string local string
remote string remote string
@ -161,14 +171,13 @@ func (h *httpTransportClient) Recv(m *Message) error {
} }
func (h *httpTransportClient) Close() error { func (h *httpTransportClient) Close() error {
err := h.conn.Close()
h.once.Do(func() { h.once.Do(func() {
h.Lock() h.Lock()
h.buff.Reset(nil) h.buff.Reset(nil)
h.Unlock() h.Unlock()
close(h.r) close(h.r)
}) })
return err return h.conn.Close()
} }
func (h *httpTransportSocket) Local() string { func (h *httpTransportSocket) Local() string {
@ -232,14 +241,23 @@ func (h *httpTransportSocket) Recv(m *Message) error {
return nil return nil
} }
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// processing http2 request // processing http2 request
// read streaming body // read streaming body
// set max buffer size // set max buffer size
buf := make([]byte, 4*1024) // TODO: adjustable buffer size
buf := make([]byte, 4*1024*1024)
// read the request body // read the request body
n, err := h.r.Body.Read(buf) n, err := h.buf.Read(buf)
// not an eof error // not an eof error
if err != nil { if err != nil {
return err return err
@ -290,7 +308,13 @@ func (h *httpTransportSocket) Send(m *Message) error {
return rsp.Write(h.conn) return rsp.Write(h.conn)
} }
// http2 request // only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// set headers // set headers
for k, v := range m.Header { for k, v := range m.Header {
@ -299,6 +323,10 @@ func (h *httpTransportSocket) Send(m *Message) error {
// write request // write request
_, err := h.w.Write(m.Body) _, err := h.w.Write(m.Body)
// flush the trailers
h.w.(http.Flusher).Flush()
return err return err
} }
@ -321,13 +349,29 @@ func (h *httpTransportSocket) error(m *Message) error {
return rsp.Write(h.conn) return rsp.Write(h.conn)
} }
return nil return nil
} }
func (h *httpTransportSocket) Close() error { func (h *httpTransportSocket) Close() error {
if h.r.ProtoMajor == 1 { h.mtx.Lock()
return h.conn.Close() defer h.mtx.Unlock()
select {
case <-h.closed:
return nil
default:
// close the channel
close(h.closed)
// close the buffer
h.r.Body.Close()
// close the connection
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
} }
return nil return nil
} }
@ -374,20 +418,29 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
con = conn con = conn
} }
// buffered reader
bufr := bufio.NewReader(r.Body)
// save the request // save the request
ch := make(chan *http.Request, 1) ch := make(chan *http.Request, 1)
ch <- r ch <- r
fn(&httpTransportSocket{ // create a new transport socket
sock := &httpTransportSocket{
ht: h.ht, ht: h.ht,
w: w, w: w,
r: r, r: r,
rw: buf, rw: buf,
buf: bufr,
ch: ch, ch: ch,
conn: con, conn: con,
local: h.Addr(), local: h.Addr(),
remote: r.RemoteAddr, remote: r.RemoteAddr,
}) closed: make(chan bool),
}
// execute the socket
fn(sock)
}) })
// get optional handlers // get optional handlers

View File

@ -32,10 +32,10 @@ func (s *Socket) SetRemote(r string) {
// Accept passes a message to the socket which will be processed by the call to Recv // Accept passes a message to the socket which will be processed by the call to Recv
func (s *Socket) Accept(m *transport.Message) error { func (s *Socket) Accept(m *transport.Message) error {
select { select {
case <-s.closed:
return io.EOF
case s.recv <- m: case s.recv <- m:
return nil return nil
case <-s.closed:
return io.EOF
} }
return nil return nil
} }
@ -43,10 +43,17 @@ func (s *Socket) Accept(m *transport.Message) error {
// Process takes the next message off the send queue created by a call to Send // Process takes the next message off the send queue created by a call to Send
func (s *Socket) Process(m *transport.Message) error { func (s *Socket) Process(m *transport.Message) error {
select { select {
case <-s.closed:
return io.EOF
case msg := <-s.send: case msg := <-s.send:
*m = *msg *m = *msg
case <-s.closed:
// see if we need to drain
select {
case msg := <-s.send:
*m = *msg
return nil
default:
return io.EOF
}
} }
return nil return nil
} }
@ -60,13 +67,6 @@ func (s *Socket) Local() string {
} }
func (s *Socket) Send(m *transport.Message) error { func (s *Socket) Send(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// make copy // make copy
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
@ -92,13 +92,6 @@ func (s *Socket) Send(m *transport.Message) error {
} }
func (s *Socket) Recv(m *transport.Message) error { func (s *Socket) Recv(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// receive a message // receive a message
select { select {
case msg := <-s.recv: case msg := <-s.recv: