Fix a codec race by locking the buffers. Include a buffer pool for perf. (#941)

* Fix a codec race by locking the buffers. Include a buffer pool for perf.

* Read Lock on buffer Read
This commit is contained in:
Asim Aslam 2019-11-13 11:05:53 +00:00 committed by GitHub
parent cffa5b6b50
commit 9f481542f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,6 +2,7 @@ package server
import ( import (
"bytes" "bytes"
"sync"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes" raw "github.com/micro/go-micro/codec/bytes"
@ -11,20 +12,25 @@ import (
"github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type rpcCodec struct { type rpcCodec struct {
socket transport.Socket socket transport.Socket
codec codec.Codec codec codec.Codec
first bool
protocol string protocol string
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
// check if we're the first
sync.RWMutex
first chan bool
} }
type readWriteCloser struct { type readWriteCloser struct {
sync.RWMutex
wbuf *bytes.Buffer wbuf *bytes.Buffer
rbuf *bytes.Buffer rbuf *bytes.Buffer
} }
@ -51,19 +57,24 @@ var (
"application/proto-rpc": protorpc.NewCodec, "application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec,
} }
// the local buffer pool
bufferPool = bpool.NewSizedBufferPool(32, 1)
) )
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
rwc.RLock()
defer rwc.RUnlock()
return rwc.rbuf.Read(p) return rwc.rbuf.Read(p)
} }
func (rwc *readWriteCloser) Write(p []byte) (n int, err error) { func (rwc *readWriteCloser) Write(p []byte) (n int, err error) {
rwc.Lock()
defer rwc.Unlock()
return rwc.wbuf.Write(p) return rwc.wbuf.Write(p)
} }
func (rwc *readWriteCloser) Close() error { func (rwc *readWriteCloser) Close() error {
rwc.rbuf.Reset()
rwc.wbuf.Reset()
return nil return nil
} }
@ -155,8 +166,8 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{ rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(nil), rbuf: bufferPool.Get(),
wbuf: bytes.NewBuffer(nil), wbuf: bufferPool.Get(),
} }
r := &rpcCodec{ r := &rpcCodec{
@ -165,18 +176,20 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
req: req, req: req,
socket: socket, socket: socket,
protocol: "mucp", protocol: "mucp",
first: make(chan bool),
} }
// if grpc pre-load the buffer // if grpc pre-load the buffer
// TODO: remove this terrible hack // TODO: remove this terrible hack
switch r.codec.String() { switch r.codec.String() {
case "grpc": case "grpc":
// set as first
r.first = true
// write the body // write the body
rwc.rbuf.Write(req.Body) rwc.rbuf.Write(req.Body)
// set the protocol // set the protocol
r.protocol = "grpc" r.protocol = "grpc"
default:
// first is not preloaded
close(r.first)
} }
return r return r
@ -190,7 +203,9 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
} }
// first message could be pre-loaded // first message could be pre-loaded
if !c.first { select {
case <-c.first:
// not the first
var tm transport.Message var tm transport.Message
// read off the socket // read off the socket
@ -212,10 +227,25 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// set req // set req
c.req = &tm c.req = &tm
} default:
// we need to lock here to prevent race conditions
// and we make use of a channel otherwise because
// this does not result in a context switch
// locking to check c.first on every call to ReadHeader
// would otherwise drastically slow the code execution
c.Lock()
// recheck before closing because the select statement
// above is not thread safe, so thread safety here is
// mandatory
select {
case <-c.first:
default:
// disable first // disable first
c.first = false close(c.first)
}
// now unlock and we never need this again
c.Unlock()
}
// set some internal things // set some internal things
getHeaders(&m) getHeaders(&m)
@ -309,9 +339,15 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
} }
func (c *rpcCodec) Close() error { func (c *rpcCodec) Close() error {
c.buf.Close() // close the codec
c.codec.Close() c.codec.Close()
return c.socket.Close() // close the socket
err := c.socket.Close()
// put back the buffers
bufferPool.Put(c.buf.rbuf)
bufferPool.Put(c.buf.wbuf)
// return the error
return err
} }
func (c *rpcCodec) String() string { func (c *rpcCodec) String() string {