This commit is contained in:
Manfred Touron
2017-05-18 18:54:23 +02:00
parent dc386661ca
commit 5448f25fd6
645 changed files with 55908 additions and 33297 deletions

View File

@@ -35,9 +35,7 @@ package transport
import (
"fmt"
"math"
"sync"
"time"
"golang.org/x/net/http2"
)
@@ -46,12 +44,8 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
infinity = time.Duration(math.MaxInt64)
defaultKeepaliveTime = infinity
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
defaultMaxStreamsClient = 100
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
)
// The following defines various control items which could flow through
@@ -117,9 +111,35 @@ func newQuotaPool(q int) *quotaPool {
return qb
}
// add cancels the pending quota sent on acquired, incremented by v and sends
// add adds n to the available quota and tries to send it on acquire.
func (qb *quotaPool) add(n int) {
qb.mu.Lock()
defer qb.mu.Unlock()
qb.quota += n
if qb.quota <= 0 {
return
}
select {
case qb.c <- qb.quota:
qb.quota = 0
default:
}
}
// cancel cancels the pending quota sent on acquire, if any.
func (qb *quotaPool) cancel() {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
case n := <-qb.c:
qb.quota += n
default:
}
}
// reset cancels the pending quota sent on acquired, incremented by v and sends
// it back on acquire.
func (qb *quotaPool) add(v int) {
func (qb *quotaPool) reset(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
@@ -131,10 +151,6 @@ func (qb *quotaPool) add(v int) {
if qb.quota <= 0 {
return
}
// After the pool has been created, this is the only place that sends on
// the channel. Since mu is held at this point and any quota that was sent
// on the channel has been retrieved, we know that this code will always
// place any positive quota value on the channel.
select {
case qb.c <- qb.quota:
qb.quota = 0

View File

@@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
}
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var ctx context.Context

View File

@@ -188,7 +188,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) {
},
RequestURI: "/service/foo.bar",
},
wantErr: `stream error: code = Internal desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`,
wantErr: `stream error: code = 13 desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`,
},
{
name: "with metadata",
@@ -300,10 +300,7 @@ func TestHandlerTransport_HandleStreams(t *testing.T) {
st.bodyw.Close() // no body
st.ht.WriteStatus(s, codes.OK, "")
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
func(ctx context.Context, method string) context.Context { return ctx },
)
st.ht.HandleStreams(func(s *Stream) { go handleStream(s) })
wantHeader := http.Header{
"Date": nil,
"Content-Type": {"application/grpc"},
@@ -330,10 +327,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
handleStream := func(s *Stream) {
st.ht.WriteStatus(s, statusCode, msg)
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
func(ctx context.Context, method string) context.Context { return ctx },
)
st.ht.HandleStreams(func(s *Stream) { go handleStream(s) })
wantHeader := http.Header{
"Date": nil,
"Content-Type": {"application/grpc"},
@@ -381,10 +375,7 @@ func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
}
ht.WriteStatus(s, codes.DeadlineExceeded, "too slow")
}
ht.HandleStreams(
func(s *Stream) { go runStream(s) },
func(ctx context.Context, method string) context.Context { return ctx },
)
ht.HandleStreams(func(s *Stream) { go runStream(s) })
wantHeader := http.Header{
"Date": nil,
"Content-Type": {"application/grpc"},

View File

@@ -41,7 +41,6 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
@@ -50,23 +49,18 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
)
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
ctx context.Context
target string // server name/addr
userAgent string
md interface{}
conn net.Conn // underlying communication channel
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
target string // server name/addr
userAgent string
md interface{}
conn net.Conn // underlying communication channel
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by sending a value on writableChan
@@ -82,8 +76,6 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
awakenKeepalive chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@@ -103,13 +95,6 @@ type http2Client struct {
creds []credentials.PerRPCCredentials
// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
statsHandler stats.Handler
mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
@@ -165,9 +150,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
scheme := "http"
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
}
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
@@ -191,31 +173,19 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
}
kp := opts.KeepaliveParams
// Validate keepalive parameters.
if kp.Time == 0 {
kp.Time = defaultKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
target: addr.Addr,
userAgent: ua,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
target: addr.Addr,
userAgent: ua,
md: addr.Metadata,
conn: conn,
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
nextID: 1,
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@@ -226,24 +196,8 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
state: reachable,
activeStreams: make(map[uint32]*Stream),
creds: opts.PerRPCCredentials,
maxStreams: defaultMaxStreamsClient,
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
@@ -279,9 +233,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
}
go t.controller()
if t.kp.Time != infinity {
go t.keepalive()
}
t.writableChan <- 0
return t, nil
}
@@ -319,13 +270,12 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
Addr: t.remoteAddr,
Addr: t.conn.RemoteAddr(),
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
@@ -363,18 +313,21 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
if checkStreamsQuota {
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok {
if _, ok := err.(StreamError); ok && checkStreamsQuota {
t.streamsQuota.add(1)
}
return nil, err
@@ -382,7 +335,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
t.streamsQuota.add(1)
if checkStreamsQuota {
t.streamsQuota.add(1)
}
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
@@ -392,19 +347,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len(t.activeStreams) == 1 {
select {
case t.awakenKeepalive <- struct{}{}:
t.framer.writePing(false, false, [8]byte{})
default:
}
}
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
var reset bool
if !checkStreamsQuota && t.streamsQuota != nil {
reset = true
}
t.mu.Unlock()
if reset {
t.streamsQuota.reset(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
@@ -459,7 +413,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
first := true
bufLen := t.hBuf.Len()
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
size := t.hBuf.Len()
@@ -494,17 +447,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
if t.statsHandler != nil {
outHeader := &stats.OutHeader{
Client: true,
WireLength: bufLen,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
}
t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader)
}
t.writableChan <- 0
return s, nil
}
@@ -512,11 +454,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
var updateStreams bool
t.mu.Lock()
if t.activeStreams == nil {
t.mu.Unlock()
return
}
if t.streamsQuota != nil {
updateStreams = true
}
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
@@ -525,27 +471,10 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return
}
t.mu.Unlock()
// rstStream is true in case the stream is being closed at the client-side
// and the server needs to be intimated about it by sending a RST_STREAM
// frame.
// To make sure this frame is written to the wire before the headers of the
// next stream waiting for streamsQuota, we add to streamsQuota pool only
// after having acquired the writableChan to send RST_STREAM out (look at
// the controller() routine).
var rstStream bool
var rstError http2.ErrCode
defer func() {
// In case, the client doesn't have to send RST_STREAM to server
// we can safely add back to streamsQuota pool now.
if !rstStream {
t.streamsQuota.add(1)
return
}
t.controlBuf.put(&resetStream{s.id, rstError})
}()
if updateStreams {
t.streamsQuota.add(1)
}
s.mu.Lock()
rstStream = s.rstStream
rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
@@ -561,9 +490,8 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
}
s.state = streamDone
s.mu.Unlock()
if _, ok := err.(StreamError); ok {
rstStream = true
rstError = http2.ErrCodeCancel
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
}
}
@@ -597,12 +525,6 @@ func (t *http2Client) Close() (err error) {
s.mu.Unlock()
s.write(recvMsg{err: ErrConnClosing})
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return
}
@@ -660,14 +582,19 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
var p []byte
if r.Len() > 0 {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok || err == io.EOF {
t.sendQuotaPool.cancel()
}
return err
}
if sq < size {
@@ -777,7 +704,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
}
func (t *http2Client) handleData(f *http2.DataFrame) {
size := f.Header().Length
size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
@@ -791,11 +718,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -809,27 +731,19 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
s.rstStream = true
s.rstError = http2.ErrCodeFlowControl
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
}
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
@@ -960,24 +874,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
endStream := frame.StreamEnded()
var isHeader bool
defer func() {
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer)
}
}
}()
s.mu.Lock()
if !endStream {
@@ -989,7 +885,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
close(s.headerChan)
s.headerDone = true
isHeader = true
}
if !endStream || s.state == streamDone {
s.mu.Unlock()
@@ -1030,7 +925,6 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
@@ -1041,7 +935,6 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
@@ -1093,15 +986,21 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
s.Val = math.MaxInt32
}
t.mu.Lock()
reset := t.streamsQuota != nil
if !reset {
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
}
ms := t.maxStreams
t.maxStreams = int(s.Val)
t.mu.Unlock()
t.streamsQuota.add(int(s.Val) - ms)
if reset {
t.streamsQuota.reset(int(s.Val) - ms)
}
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
// Adjust the sending quota for each stream.
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
@@ -1129,12 +1028,6 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
// If the server needs to be to intimated about stream closing,
// then we need to make sure the RST_STREAM frame is written to
// the wire before the headers of the next stream waiting on
// streamQuota. We ensure this by adding to the streamsQuota pool
// only after having acquired the writableChan to send RST_STREAM.
t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
@@ -1154,61 +1047,6 @@ func (t *http2Client) controller() {
}
}
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.shutdownChan:
return
}
} else {
t.mu.Unlock()
// Send ping.
t.controlBuf.put(p)
}
// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
t.Close()
return
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
case <-t.shutdownChan:
if !timer.Stop() {
<-timer.C
}
return
}
}
}
func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}

View File

@@ -50,8 +50,6 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
)
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
@@ -60,13 +58,9 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
conn net.Conn
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
@@ -88,8 +82,6 @@ type http2Server struct {
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
stats stats.Handler
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
@@ -99,13 +91,12 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
@@ -130,16 +121,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
authInfo: authInfo,
framer: framer,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@@ -148,15 +135,6 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
shutdownChan: make(chan struct{}),
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
t.writableChan <- 0
@@ -164,7 +142,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -190,12 +168,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
if state.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
s.ctx, s.cancel = context.WithCancel(context.TODO())
}
pr := &peer.Peer{
Addr: t.remoteAddr,
Addr: t.conn.RemoteAddr(),
}
// Attach Auth info if there is any.
if t.authInfo != nil {
@@ -217,18 +195,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
s.method = state.method
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
// TODO: Log the real error.
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -252,26 +218,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
func (t *http2Server) HandleStreams(handle func(*Stream)) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
@@ -326,7 +279,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
if t.operateHeaders(frame, handle) {
t.Close()
break
}
@@ -381,7 +334,7 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
}
func (t *http2Server) handleData(f *http2.DataFrame) {
size := f.Header().Length
size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
grpclog.Printf("transport: http2Server %v", err)
t.Close()
@@ -396,11 +349,6 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{0, w})
}
}
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -416,20 +364,13 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
}
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
}
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
@@ -551,16 +492,9 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
if t.stats != nil {
outHeader := &stats.OutHeader{
WireLength: bufLen,
}
t.stats.HandleRPC(s.Context(), outHeader)
}
t.writableChan <- 0
return nil
}
@@ -613,17 +547,10 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
if t.stats != nil {
outTrailer := &stats.OutTrailer{
WireLength: bufLen,
}
t.stats.HandleRPC(s.Context(), outTrailer)
}
t.closeStream(s)
t.writableChan <- 0
return nil
@@ -652,14 +579,19 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
return nil
}
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok {
t.sendQuotaPool.cancel()
}
return err
}
if sq < size {
@@ -727,7 +659,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
}
@@ -804,10 +736,6 @@ func (t *http2Server) Close() (err error) {
for _, s := range streams {
s.cancel()
}
if t.stats != nil {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
}
return
}
@@ -839,7 +767,7 @@ func (t *http2Server) closeStream(s *Stream) {
}
func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
return t.conn.RemoteAddr()
}
func (t *http2Server) Drain() {

View File

@@ -45,13 +45,10 @@ import (
"sync"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
)
// recvMsg represents the received msg from the transport. All transport
@@ -170,11 +167,6 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
// clientStatsCtx keeps the user context for stats handling.
// It's only valid on client side. Server side stats context is same as s.ctx.
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
@@ -215,11 +207,6 @@ type Stream struct {
// the status received from the server.
statusCode codes.Code
statusDesc string
// rstStream indicates whether a RST_STREAM frame needs to be sent
// to the server to signify that this stream is closing.
rstStream bool
// rstError is the error that needs to be sent along with the RST_STREAM frame.
rstError http2.ErrCode
}
// RecvCompress returns the compression algorithm applied to the inbound
@@ -279,6 +266,11 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
// TraceContext recreates the context of s with a trace.Trace.
func (s *Stream) TraceContext(tr trace.Trace) {
s.ctx = trace.NewContext(s.ctx, tr)
}
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@@ -363,39 +355,22 @@ const (
draining
)
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
return newHTTP2Server(conn, config)
func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
return newHTTP2Server(conn, maxStreams, authInfo)
}
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Authority is the :authority pseudo-header to use. This field has no effect if
// TransportCredentials is set.
Authority string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
// TargetInfo contains the information of the target such as network address and metadata.
@@ -491,7 +466,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
HandleStreams(func(*Stream))
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
@@ -577,7 +552,7 @@ type StreamError struct {
}
func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
}
// ContextErr converts the error from context package into a StreamError.

View File

@@ -49,7 +49,6 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
)
type server struct {
@@ -180,10 +179,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
if err != nil {
return
}
config := &ServerConfig{
MaxStreams: maxStreams,
}
transport, err := NewServerTransport("http2", conn, config)
transport, err := NewServerTransport("http2", conn, maxStreams, nil)
if err != nil {
return
}
@@ -198,33 +194,22 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
h := &testStreamHandler{transport.(*http2Server)}
switch ht {
case suspended:
go transport.HandleStreams(h.handleStreamSuspension,
func(ctx context.Context, method string) context.Context {
return ctx
})
go transport.HandleStreams(h.handleStreamSuspension)
case misbehaved:
go transport.HandleStreams(func(s *Stream) {
go h.handleStreamMisbehave(t, s)
}, func(ctx context.Context, method string) context.Context {
return ctx
})
case encodingRequiredStatus:
go transport.HandleStreams(func(s *Stream) {
go h.handleStreamEncodingRequiredStatus(t, s)
}, func(ctx context.Context, method string) context.Context {
return ctx
})
case invalidHeaderField:
go transport.HandleStreams(func(s *Stream) {
go h.handleStreamInvalidHeaderField(t, s)
}, func(ctx context.Context, method string) context.Context {
return ctx
})
default:
go transport.HandleStreams(func(s *Stream) {
go h.handleStream(t, s)
}, func(ctx context.Context, method string) context.Context {
return ctx
})
}
}
@@ -252,10 +237,6 @@ func (s *server) stop() {
}
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
}
func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
server := &server{startedErr: make(chan error, 1)}
go server.start(t, port, maxStreams, ht)
server.wait(t, 2*time.Second)
@@ -267,135 +248,13 @@ func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts
target := TargetInfo{
Addr: addr,
}
ct, connErr = NewClientTransport(context.Background(), target, copts)
ct, connErr = NewClientTransport(context.Background(), target, ConnectOptions{})
if connErr != nil {
t.Fatalf("failed to create transport: %v", connErr)
}
return server, ct
}
func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
// Launch a non responsive server.
go func() {
defer lis.Close()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error at server-side while accepting: %v", err)
close(done)
return
}
done <- conn
}()
tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts)
if err != nil {
// Server clean-up.
lis.Close()
if conn, ok := <-done; ok {
conn.Close()
}
t.Fatalf("Failed to dial: %v", err)
}
return tr
}
func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
}}, done)
defer tr.Close()
conn, ok := <-done
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Sleep for keepalive to close the connection.
time.Sleep(4 * time.Second)
// Assert that the connection was closed.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test Failed: Expected client transport to have closed.")
}
}
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done)
defer tr.Close()
conn, ok := <-done
if !ok {
t.Fatalf("server didn't reutrn connection object")
}
defer conn.Close()
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Assert that connections is still healthy.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected client transport to be healthy.")
}
}
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done)
defer tr.Close()
conn, ok := <-done
if !ok {
t.Fatalf("Server didn't return connection object")
}
defer conn.Close()
// Create a stream.
_, err := tr.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create a new stream: %v", err)
}
// Give keepalive some time.
time.Sleep(4 * time.Second)
// Assert that transport was closed.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test failed: Expected client transport to have closed.")
}
}
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
}})
defer s.stop()
defer tr.Close()
// Give keep alive some time.
time.Sleep(4 * time.Second)
// Assert that transport is healthy.
ct := tr.(*http2Client)
ct.mu.Lock()
defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected client transport to be healthy.")
}
}
func TestClientSendAndReceive(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
callHdr := &CallHdr{
@@ -634,10 +493,7 @@ func TestMaxStreams(t *testing.T) {
case <-cc.streamsQuota.acquire():
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
default:
cc.streamsQuota.mu.Lock()
quota := cc.streamsQuota.quota
cc.streamsQuota.mu.Unlock()
if quota != 0 {
if cc.streamsQuota.quota != 0 {
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
}
}