micro/transport/http_transport.go
Jelmer Snoeck af0028d821 HTTPTransport: implement queue for listener.
Before this patch, when an error occurs in trying to accept a connection
from the listener, the error would be returned. This also happened on
temporary issues like `too many open files`.

Temporary issues are "self-healing" and will resolve over time. This
means that we can put the requests in a queue to wait until the issue is
resolved and start processing the connections once it is resolved.

This patch implements such mechanism, as copied from the standard
library http package. It will retry temporary errors but will return
permanent errors (or errors that are not from the net.Error type).
2016-04-01 13:10:18 +01:00

446 lines
7.4 KiB
Go

package transport
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
mls "github.com/micro/misc/lib/tls"
)
type buffer struct {
io.ReadWriter
}
type httpTransport struct {
opts Options
}
type httpTransportClient struct {
ht *httpTransport
addr string
conn net.Conn
dialOpts DialOptions
once sync.Once
sync.Mutex
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
}
type httpTransportSocket struct {
r chan *http.Request
conn net.Conn
once sync.Once
sync.Mutex
buff *bufio.Reader
}
type httpTransportListener struct {
listener net.Listener
}
func listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, error) {
// host:port || host:min-max
parts := strings.Split(addr, ":")
//
if len(parts) < 2 {
return fn(addr)
}
// try to extract port range
ports := strings.Split(parts[len(parts)-1], "-")
// single port
if len(ports) < 2 {
return fn(addr)
}
// we have a port range
// extract min port
min, err := strconv.Atoi(ports[0])
if err != nil {
return nil, errors.New("unable to extract port range")
}
// extract max port
max, err := strconv.Atoi(ports[1])
if err != nil {
return nil, errors.New("unable to extract port range")
}
// set host
host := parts[:len(parts)-1]
// range the ports
for port := min; port <= max; port++ {
// try bind to host:port
ln, err := fn(fmt.Sprintf("%s:%d", host, port))
if err == nil {
return ln, nil
}
// hit max port
if port == max {
return nil, err
}
}
// why are we here?
return nil, fmt.Errorf("unable to bind to %s", addr)
}
func (b *buffer) Close() error {
return nil
}
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
for k, v := range m.Header {
header.Set(k, v)
}
reqB := bytes.NewBuffer(m.Body)
defer reqB.Reset()
buf := &buffer{
reqB,
}
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: buf,
ContentLength: int64(reqB.Len()),
Host: h.addr,
}
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
return req.Write(h.conn)
}
func (h *httpTransportClient) Recv(m *Message) error {
var r *http.Request
if !h.dialOpts.Stream {
rc, ok := <-h.r
if !ok {
return io.EOF
}
r = rc
}
h.Lock()
defer h.Unlock()
if h.buff == nil {
return io.EOF
}
rsp, err := http.ReadResponse(h.buff, r)
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
mr := &Message{
Header: make(map[string]string),
Body: b,
}
for k, v := range rsp.Header {
if len(v) > 0 {
mr.Header[k] = v[0]
} else {
mr.Header[k] = ""
}
}
*m = *mr
return nil
}
func (h *httpTransportClient) Close() error {
err := h.conn.Close()
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
close(h.r)
})
return err
}
func (h *httpTransportSocket) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
r, err := http.ReadRequest(h.buff)
if err != nil {
return err
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
r.Body.Close()
mr := &Message{
Header: make(map[string]string),
Body: b,
}
for k, v := range r.Header {
if len(v) > 0 {
mr.Header[k] = v[0]
} else {
mr.Header[k] = ""
}
}
select {
case h.r <- r:
default:
}
*m = *mr
return nil
}
func (h *httpTransportSocket) Send(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
r := <-h.r
rsp := &http.Response{
Header: r.Header,
Body: &buffer{b},
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
select {
case h.r <- r:
default:
}
return rsp.Write(h.conn)
}
func (h *httpTransportSocket) error(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
rsp := &http.Response{
Header: make(http.Header),
Body: &buffer{b},
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
}
func (h *httpTransportSocket) Close() error {
err := h.conn.Close()
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
})
return err
}
func (h *httpTransportListener) Addr() string {
return h.listener.Addr().String()
}
func (h *httpTransportListener) Close() error {
return h.listener.Close()
}
func (h *httpTransportListener) Accept(fn func(Socket)) error {
var tempDelay time.Duration
for {
c, err := h.listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Printf("http: Accept error: %v; retrying in %v\n", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
sock := &httpTransportSocket{
conn: c,
buff: bufio.NewReader(c),
r: make(chan *http.Request, 1),
}
go func() {
// TODO: think of a better error response strategy
defer func() {
if r := recover(); r != nil {
sock.Close()
}
}()
fn(sock)
}()
}
return nil
}
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
dopts := DialOptions{
Timeout: DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
var conn net.Conn
var err error
// TODO: support dial option here rather than using internal config
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
conn, err = tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
} else {
conn, err = net.DialTimeout("tcp", addr, dopts.Timeout)
}
if err != nil {
return nil, err
}
return &httpTransportClient{
ht: h,
addr: addr,
conn: conn,
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
}, nil
}
func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, error) {
var options ListenOptions
for _, o := range opts {
o(&options)
}
var l net.Listener
var err error
// TODO: support use of listen options
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
cert, err := mls.Certificate(addr)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = listen(addr, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = listen(addr, fn)
}
if err != nil {
return nil, err
}
return &httpTransportListener{
listener: l,
}, nil
}
func (h *httpTransport) String() string {
return "http"
}
func newHttpTransport(opts ...Option) *httpTransport {
var options Options
for _, o := range opts {
o(&options)
}
return &httpTransport{opts: options}
}