diff --git a/transport/http_transport.go b/transport/http_transport.go index c18f3561..e2ee8344 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -1,6 +1,7 @@ package transport import ( + //"fmt" "bufio" "bytes" "crypto/tls" @@ -13,10 +14,11 @@ import ( "sync" "time" - "github.com/micro/go-log" + "github.com/micro/h2c" maddr "github.com/micro/util/go/lib/addr" mnet "github.com/micro/util/go/lib/net" mls "github.com/micro/util/go/lib/tls" + "golang.org/x/net/http2" ) type buffer struct { @@ -41,13 +43,14 @@ type httpTransportClient struct { } type httpTransportSocket struct { - ht *httpTransport - r chan *http.Request - conn net.Conn - once sync.Once + ht *httpTransport + w http.ResponseWriter + r *http.Request + rw *bufio.ReadWriter - sync.Mutex - buff *bufio.Reader + conn net.Conn + // for the first request + ch chan *http.Request } type httpTransportListener struct { @@ -175,28 +178,74 @@ func (h *httpTransportSocket) Recv(m *Message) error { return errors.New("message passed in is nil") } - // set timeout if its greater than 0 - if h.ht.opts.Timeout > time.Duration(0) { - h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) - } - - r, err := http.ReadRequest(h.buff) - if err != nil { - return err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return err - } - r.Body.Close() - m.Body = b - if m.Header == nil { m.Header = make(map[string]string) } - for k, v := range r.Header { + // process http 1 + if h.r.ProtoMajor == 1 { + // set timeout if its greater than 0 + if h.ht.opts.Timeout > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) + } + + var r *http.Request + + select { + // get first request + case r = <-h.ch: + // read next request + default: + rr, err := http.ReadRequest(h.rw.Reader) + if err != nil { + return err + } + r = rr + } + + // read body + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + + // set body + r.Body.Close() + m.Body = b + + // set headers + for k, v := range h.r.Header { + if len(v) > 0 { + m.Header[k] = v[0] + } else { + m.Header[k] = "" + } + } + + // return early early + return nil + } + + // processing http2 request + // read streaming body + + // set max buffer size + buf := make([]byte, 4*1024) + + // read the request body + n, err := h.r.Body.Read(buf) + // not an eof error + if err != nil { + return err + } + + // check if we have data + if n > 0 { + m.Body = buf[:n] + } + + // set headers + for k, v := range h.r.Header { if len(v) > 0 { m.Header[k] = v[0] } else { @@ -204,78 +253,73 @@ func (h *httpTransportSocket) Recv(m *Message) error { } } - select { - case h.r <- r: - default: - } - return nil } func (h *httpTransportSocket) Send(m *Message) error { - b := bytes.NewBuffer(m.Body) - defer b.Reset() + if h.r.ProtoMajor == 1 { + rsp := &http.Response{ + Header: h.r.Header, + Body: ioutil.NopCloser(bytes.NewReader(m.Body)), + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: int64(len(m.Body)), + } - r := <-h.r + for k, v := range m.Header { + rsp.Header.Set(k, v) + } - rsp := &http.Response{ - Header: r.Header, - Body: &buffer{b}, - Status: "200 OK", - StatusCode: 200, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - ContentLength: int64(len(m.Body)), + // set timeout if its greater than 0 + if h.ht.opts.Timeout > time.Duration(0) { + h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) + } + + return rsp.Write(h.conn) } + // http2 request + + // set headers for k, v := range m.Header { - rsp.Header.Set(k, v) + h.w.Header().Set(k, v) } - select { - case h.r <- r: - default: - } - - // set timeout if its greater than 0 - if h.ht.opts.Timeout > time.Duration(0) { - h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) - } - - return rsp.Write(h.conn) + // write request + _, err := h.w.Write(m.Body) + return err } func (h *httpTransportSocket) error(m *Message) error { - b := bytes.NewBuffer(m.Body) - defer b.Reset() - rsp := &http.Response{ - Header: make(http.Header), - Body: &buffer{b}, - Status: "500 Internal Server Error", - StatusCode: 500, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - ContentLength: int64(len(m.Body)), - } + if h.r.ProtoMajor == 1 { + rsp := &http.Response{ + Header: make(http.Header), + Body: ioutil.NopCloser(bytes.NewReader(m.Body)), + Status: "500 Internal Server Error", + StatusCode: 500, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: int64(len(m.Body)), + } - for k, v := range m.Header { - rsp.Header.Set(k, v) - } + for k, v := range m.Header { + rsp.Header.Set(k, v) + } - return rsp.Write(h.conn) + return rsp.Write(h.conn) + } + return nil } func (h *httpTransportSocket) Close() error { - err := h.conn.Close() - h.once.Do(func() { - h.Lock() - h.buff.Reset(nil) - h.buff = nil - h.Unlock() - }) - return err + if h.r.ProtoMajor == 1 { + return h.conn.Close() + } + return nil } func (h *httpTransportListener) Addr() string { @@ -287,46 +331,67 @@ func (h *httpTransportListener) Close() error { } func (h *httpTransportListener) Accept(fn func(Socket)) error { - var tempDelay time.Duration + // create handler mux + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + var buf *bufio.ReadWriter + var con net.Conn - for { - c, err := h.listener.Accept() - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - if tempDelay == 0 { - tempDelay = 5 * time.Millisecond - } else { - tempDelay *= 2 - } - if max := 1 * time.Second; tempDelay > max { - tempDelay = max - } - log.Logf("http: Accept error: %v; retrying in %v\n", err, tempDelay) - time.Sleep(tempDelay) - continue + // read a regular request + if r.ProtoMajor == 1 { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } - return err + r.Body = ioutil.NopCloser(bytes.NewReader(b)) + // hijack the conn + hj, ok := w.(http.Hijacker) + if !ok { + // we're screwed + http.Error(w, "cannot serve conn", http.StatusInternalServerError) + return + } + + conn, bufrw, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer conn.Close() + buf = bufrw + con = conn } - sock := &httpTransportSocket{ + // save the request + ch := make(chan *http.Request, 1) + ch <- r + + fn(&httpTransportSocket{ ht: h.ht, - conn: c, - buff: bufio.NewReader(c), - r: make(chan *http.Request, 1), - } + w: w, + r: r, + rw: buf, + ch: ch, + conn: con, + }) + }) - go func() { - // TODO: think of a better error response strategy - defer func() { - if r := recover(); r != nil { - log.Log("panic recovered: ", r) - sock.Close() - } - }() - - fn(sock) - }() + // default http2 server + srv := &http.Server{ + Handler: mux, } + + // insecure connection use h2c + if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) { + srv.Handler = &h2c.HandlerH2C{ + Handler: mux, + H2Server: &http2.Server{}, + } + } + + // begin serving + return srv.Serve(h.listener) } func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {