| @@ -1,6 +1,7 @@ | |||||||
| package transport | package transport | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	//"fmt" | ||||||
| 	"bufio" | 	"bufio" | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
| @@ -13,10 +14,11 @@ import ( | |||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-log" | 	"github.com/micro/h2c" | ||||||
| 	maddr "github.com/micro/util/go/lib/addr" | 	maddr "github.com/micro/util/go/lib/addr" | ||||||
| 	mnet "github.com/micro/util/go/lib/net" | 	mnet "github.com/micro/util/go/lib/net" | ||||||
| 	mls "github.com/micro/util/go/lib/tls" | 	mls "github.com/micro/util/go/lib/tls" | ||||||
|  | 	"golang.org/x/net/http2" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type buffer struct { | type buffer struct { | ||||||
| @@ -42,12 +44,13 @@ type httpTransportClient struct { | |||||||
|  |  | ||||||
| type httpTransportSocket struct { | type httpTransportSocket struct { | ||||||
| 	ht *httpTransport | 	ht *httpTransport | ||||||
| 	r    chan *http.Request | 	w  http.ResponseWriter | ||||||
| 	conn net.Conn | 	r  *http.Request | ||||||
| 	once sync.Once | 	rw *bufio.ReadWriter | ||||||
|  |  | ||||||
| 	sync.Mutex | 	conn net.Conn | ||||||
| 	buff *bufio.Reader | 	// for the first request | ||||||
|  | 	ch chan *http.Request | ||||||
| } | } | ||||||
|  |  | ||||||
| type httpTransportListener struct { | type httpTransportListener struct { | ||||||
| @@ -175,28 +178,43 @@ func (h *httpTransportSocket) Recv(m *Message) error { | |||||||
| 		return errors.New("message passed in is nil") | 		return errors.New("message passed in is nil") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if m.Header == nil { | ||||||
|  | 		m.Header = make(map[string]string) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// process http 1 | ||||||
|  | 	if h.r.ProtoMajor == 1 { | ||||||
| 		// set timeout if its greater than 0 | 		// set timeout if its greater than 0 | ||||||
| 		if h.ht.opts.Timeout > time.Duration(0) { | 		if h.ht.opts.Timeout > time.Duration(0) { | ||||||
| 			h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) | 			h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	r, err := http.ReadRequest(h.buff) | 		var r *http.Request | ||||||
|  |  | ||||||
|  | 		select { | ||||||
|  | 		// get first request | ||||||
|  | 		case r = <-h.ch: | ||||||
|  | 		// read next request | ||||||
|  | 		default: | ||||||
|  | 			rr, err := http.ReadRequest(h.rw.Reader) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
|  | 			r = rr | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// read body | ||||||
| 		b, err := ioutil.ReadAll(r.Body) | 		b, err := ioutil.ReadAll(r.Body) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// set body | ||||||
| 		r.Body.Close() | 		r.Body.Close() | ||||||
| 		m.Body = b | 		m.Body = b | ||||||
|  |  | ||||||
| 	if m.Header == nil { | 		// set headers | ||||||
| 		m.Header = make(map[string]string) | 		for k, v := range h.r.Header { | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for k, v := range r.Header { |  | ||||||
| 			if len(v) > 0 { | 			if len(v) > 0 { | ||||||
| 				m.Header[k] = v[0] | 				m.Header[k] = v[0] | ||||||
| 			} else { | 			} else { | ||||||
| @@ -204,23 +222,45 @@ func (h *httpTransportSocket) Recv(m *Message) error { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	select { | 		// return early early | ||||||
| 	case h.r <- r: | 		return nil | ||||||
| 	default: | 	} | ||||||
|  |  | ||||||
|  | 	// processing http2 request | ||||||
|  | 	// read streaming body | ||||||
|  |  | ||||||
|  | 	// set max buffer size | ||||||
|  | 	buf := make([]byte, 4*1024) | ||||||
|  |  | ||||||
|  | 	// read the request body | ||||||
|  | 	n, err := h.r.Body.Read(buf) | ||||||
|  | 	// not an eof error | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// check if we have data | ||||||
|  | 	if n > 0 { | ||||||
|  | 		m.Body = buf[:n] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// set headers | ||||||
|  | 	for k, v := range h.r.Header { | ||||||
|  | 		if len(v) > 0 { | ||||||
|  | 			m.Header[k] = v[0] | ||||||
|  | 		} else { | ||||||
|  | 			m.Header[k] = "" | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpTransportSocket) Send(m *Message) error { | func (h *httpTransportSocket) Send(m *Message) error { | ||||||
| 	b := bytes.NewBuffer(m.Body) | 	if h.r.ProtoMajor == 1 { | ||||||
| 	defer b.Reset() |  | ||||||
|  |  | ||||||
| 	r := <-h.r |  | ||||||
|  |  | ||||||
| 		rsp := &http.Response{ | 		rsp := &http.Response{ | ||||||
| 		Header:        r.Header, | 			Header:        h.r.Header, | ||||||
| 		Body:          &buffer{b}, | 			Body:          ioutil.NopCloser(bytes.NewReader(m.Body)), | ||||||
| 			Status:        "200 OK", | 			Status:        "200 OK", | ||||||
| 			StatusCode:    200, | 			StatusCode:    200, | ||||||
| 			Proto:         "HTTP/1.1", | 			Proto:         "HTTP/1.1", | ||||||
| @@ -233,11 +273,6 @@ func (h *httpTransportSocket) Send(m *Message) error { | |||||||
| 			rsp.Header.Set(k, v) | 			rsp.Header.Set(k, v) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	select { |  | ||||||
| 	case h.r <- r: |  | ||||||
| 	default: |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 		// set timeout if its greater than 0 | 		// set timeout if its greater than 0 | ||||||
| 		if h.ht.opts.Timeout > time.Duration(0) { | 		if h.ht.opts.Timeout > time.Duration(0) { | ||||||
| 			h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) | 			h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) | ||||||
| @@ -246,12 +281,23 @@ func (h *httpTransportSocket) Send(m *Message) error { | |||||||
| 		return rsp.Write(h.conn) | 		return rsp.Write(h.conn) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// http2 request | ||||||
|  |  | ||||||
|  | 	// set headers | ||||||
|  | 	for k, v := range m.Header { | ||||||
|  | 		h.w.Header().Set(k, v) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// write request | ||||||
|  | 	_, err := h.w.Write(m.Body) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
| func (h *httpTransportSocket) error(m *Message) error { | func (h *httpTransportSocket) error(m *Message) error { | ||||||
| 	b := bytes.NewBuffer(m.Body) | 	if h.r.ProtoMajor == 1 { | ||||||
| 	defer b.Reset() |  | ||||||
| 		rsp := &http.Response{ | 		rsp := &http.Response{ | ||||||
| 			Header:        make(http.Header), | 			Header:        make(http.Header), | ||||||
| 		Body:          &buffer{b}, | 			Body:          ioutil.NopCloser(bytes.NewReader(m.Body)), | ||||||
| 			Status:        "500 Internal Server Error", | 			Status:        "500 Internal Server Error", | ||||||
| 			StatusCode:    500, | 			StatusCode:    500, | ||||||
| 			Proto:         "HTTP/1.1", | 			Proto:         "HTTP/1.1", | ||||||
| @@ -266,16 +312,14 @@ func (h *httpTransportSocket) error(m *Message) error { | |||||||
|  |  | ||||||
| 		return rsp.Write(h.conn) | 		return rsp.Write(h.conn) | ||||||
| 	} | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func (h *httpTransportSocket) Close() error { | func (h *httpTransportSocket) Close() error { | ||||||
| 	err := h.conn.Close() | 	if h.r.ProtoMajor == 1 { | ||||||
| 	h.once.Do(func() { | 		return h.conn.Close() | ||||||
| 		h.Lock() | 	} | ||||||
| 		h.buff.Reset(nil) | 	return nil | ||||||
| 		h.buff = nil |  | ||||||
| 		h.Unlock() |  | ||||||
| 	}) |  | ||||||
| 	return err |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpTransportListener) Addr() string { | func (h *httpTransportListener) Addr() string { | ||||||
| @@ -287,46 +331,67 @@ func (h *httpTransportListener) Close() error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpTransportListener) Accept(fn func(Socket)) error { | func (h *httpTransportListener) Accept(fn func(Socket)) error { | ||||||
| 	var tempDelay time.Duration | 	// create handler mux | ||||||
|  | 	mux := http.NewServeMux() | ||||||
|  | 	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 		var buf *bufio.ReadWriter | ||||||
|  | 		var con net.Conn | ||||||
|  |  | ||||||
| 	for { | 		// read a regular request | ||||||
| 		c, err := h.listener.Accept() | 		if r.ProtoMajor == 1 { | ||||||
|  | 			b, err := ioutil.ReadAll(r.Body) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 			if ne, ok := err.(net.Error); ok && ne.Temporary() { | 				http.Error(w, err.Error(), http.StatusInternalServerError) | ||||||
| 				if tempDelay == 0 { | 				return | ||||||
| 					tempDelay = 5 * time.Millisecond |  | ||||||
| 				} else { |  | ||||||
| 					tempDelay *= 2 |  | ||||||
| 			} | 			} | ||||||
| 				if max := 1 * time.Second; tempDelay > max { | 			r.Body = ioutil.NopCloser(bytes.NewReader(b)) | ||||||
| 					tempDelay = max | 			// hijack the conn | ||||||
| 				} | 			hj, ok := w.(http.Hijacker) | ||||||
| 				log.Logf("http: Accept error: %v; retrying in %v\n", err, tempDelay) | 			if !ok { | ||||||
| 				time.Sleep(tempDelay) | 				// we're screwed | ||||||
| 				continue | 				http.Error(w, "cannot serve conn", http.StatusInternalServerError) | ||||||
| 			} | 				return | ||||||
| 			return err |  | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 		sock := &httpTransportSocket{ | 			conn, bufrw, err := hj.Hijack() | ||||||
|  | 			if err != nil { | ||||||
|  | 				http.Error(w, err.Error(), http.StatusInternalServerError) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			defer conn.Close() | ||||||
|  | 			buf = bufrw | ||||||
|  | 			con = conn | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// save the request | ||||||
|  | 		ch := make(chan *http.Request, 1) | ||||||
|  | 		ch <- r | ||||||
|  |  | ||||||
|  | 		fn(&httpTransportSocket{ | ||||||
| 			ht:   h.ht, | 			ht:   h.ht, | ||||||
| 			conn: c, | 			w:    w, | ||||||
| 			buff: bufio.NewReader(c), | 			r:    r, | ||||||
| 			r:    make(chan *http.Request, 1), | 			rw:   buf, | ||||||
|  | 			ch:   ch, | ||||||
|  | 			conn: con, | ||||||
|  | 		}) | ||||||
|  | 	}) | ||||||
|  |  | ||||||
|  | 	// default http2 server | ||||||
|  | 	srv := &http.Server{ | ||||||
|  | 		Handler: mux, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 		go func() { | 	// insecure connection use h2c | ||||||
| 			// TODO: think of a better error response strategy | 	if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) { | ||||||
| 			defer func() { | 		srv.Handler = &h2c.HandlerH2C{ | ||||||
| 				if r := recover(); r != nil { | 			Handler:  mux, | ||||||
| 					log.Log("panic recovered: ", r) | 			H2Server: &http2.Server{}, | ||||||
| 					sock.Close() | 		} | ||||||
| 	} | 	} | ||||||
| 			}() |  | ||||||
|  |  | ||||||
| 			fn(sock) | 	// begin serving | ||||||
| 		}() | 	return srv.Serve(h.listener) | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) { | func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user