package http import ( "bufio" "bytes" "fmt" "io/ioutil" "net" "net/http" "net/url" "sync" "time" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" "github.com/micro/go-micro/codec" errors "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" "golang.org/x/net/context" ) type httpClient struct { once sync.Once opts client.Options } func init() { cmd.DefaultClients["http"] = NewClient } func (h *httpClient) call(ctx context.Context, address string, req client.Request, rsp interface{}, opts client.CallOptions) error { header := make(http.Header) if md, ok := metadata.FromContext(ctx); ok { for k, v := range md { header.Set(k, v) } } // set timeout in nanoseconds header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout)) // set the content type for the request header.Set("Content-Type", req.ContentType()) // get codec cf, err := h.newHTTPCodec(req.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // marshal request b, err := cf.Marshal(req.Request()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } buf := &buffer{bytes.NewBuffer(b)} defer buf.Close() hreq := &http.Request{ Method: "POST", URL: &url.URL{ Scheme: "http", Host: address, Path: req.Method(), }, Header: header, Body: buf, ContentLength: int64(len(b)), Host: address, } // make the request hrsp, err := http.DefaultClient.Do(hreq.WithContext(ctx)) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } defer hrsp.Body.Close() // parse response b, err = ioutil.ReadAll(hrsp.Body) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // unmarshal if err := cf.Unmarshal(b, rsp); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } return nil } func (h *httpClient) stream(ctx context.Context, address string, req client.Request, opts client.CallOptions) (client.Streamer, error) { header := make(http.Header) if md, ok := metadata.FromContext(ctx); ok { for k, v := range md { header.Set(k, v) } } // set timeout in nanoseconds header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout)) // set the content type for the request header.Set("Content-Type", req.ContentType()) // get codec cf, err := h.newHTTPCodec(req.ContentType()) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } cc, err := net.Dial("tcp", address) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err)) } return &httpStream{ address: address, context: ctx, closed: make(chan bool), conn: cc, codec: cf, header: header, reader: bufio.NewReader(cc), request: req, }, nil } func (h *httpClient) newHTTPCodec(contentType string) (Codec, error) { if c, ok := defaultHTTPCodecs[contentType]; ok { return c, nil } return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } func (h *httpClient) newCodec(contentType string) (codec.NewCodec, error) { if c, ok := h.opts.Codecs[contentType]; ok { return c, nil } if cf, ok := defaultRPCCodecs[contentType]; ok { return cf, nil } return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } func (h *httpClient) Init(opts ...client.Option) error { for _, o := range opts { o(&h.opts) } return nil } func (h *httpClient) Options() client.Options { return h.opts } func (h *httpClient) NewPublication(topic string, msg interface{}) client.Publication { return newHTTPPublication(topic, msg, "application/proto") } func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...) } func (h *httpClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return newHTTPRequest(service, method, req, "application/proto", reqOpts...) } func (h *httpClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return newHTTPRequest(service, method, req, "application/json", reqOpts...) } func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { // make a copy of call opts callOpts := h.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // get next nodes from the selector next, err := h.opts.Selector.Select(req.Service(), callOpts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := client.WithRequestTimeout(d.Sub(time.Now())) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // make copy of call method hcall := h.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { hcall = callOpts.CallWrappers[i-1](hcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } // set the address addr := node.Address if node.Port > 0 { addr = fmt.Sprintf("%s:%d", addr, node.Port) } // make the call err = hcall(ctx, addr, req, rsp, callOpts) h.opts.Selector.Mark(req.Service(), node, err) return err } ch := make(chan error, callOpts.Retries) var gerr error for i := 0; i < callOpts.Retries; i++ { go func() { ch <- call(i) }() select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return rerr } if !retry { return err } gerr = err } } return gerr } func (h *httpClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error { callOpts := h.opts.CallOptions for _, opt := range opts { opt(&callOpts) } return h.call(ctx, addr, req, rsp, callOpts) } func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { // make a copy of call opts callOpts := h.opts.CallOptions for _, opt := range opts { opt(&callOpts) } // get next nodes from the selector next, err := h.opts.Selector.Select(req.Service(), callOpts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } // check if we already have a deadline d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along opt := client.WithRequestTimeout(d.Sub(time.Now())) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } call := func(i int) (client.Streamer, error) { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } node, err := next() if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } addr := node.Address if node.Port > 0 { addr = fmt.Sprintf("%s:%d", addr, node.Port) } stream, err := h.stream(ctx, addr, req, callOpts) h.opts.Selector.Mark(req.Service(), node, err) return stream, err } type response struct { stream client.Streamer err error } ch := make(chan response, callOpts.Retries) var grr error for i := 0; i < callOpts.Retries; i++ { go func() { s, err := call(i) ch <- response{s, err} }() select { case <-ctx.Done(): return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case rsp := <-ch: // if the call succeeded lets bail early if rsp.err == nil { return rsp.stream, nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr != nil { return nil, rerr } if !retry { return nil, rsp.err } grr = rsp.err } } return nil, grr } func (h *httpClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) { callOpts := h.opts.CallOptions for _, opt := range opts { opt(&callOpts) } return h.stream(ctx, addr, req, callOpts) } func (h *httpClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error { md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } md["Content-Type"] = p.ContentType() cf, err := h.newCodec(p.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } b := &buffer{bytes.NewBuffer(nil)} if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } h.once.Do(func() { h.opts.Broker.Connect() }) return h.opts.Broker.Publish(p.Topic(), &broker.Message{ Header: md, Body: b.Bytes(), }) } func (h *httpClient) String() string { return "http" } func newClient(opts ...client.Option) client.Client { options := client.Options{ CallOptions: client.CallOptions{ Backoff: client.DefaultBackoff, Retry: client.DefaultRetry, Retries: client.DefaultRetries, RequestTimeout: client.DefaultRequestTimeout, DialTimeout: transport.DefaultDialTimeout, }, } for _, o := range opts { o(&options) } if len(options.ContentType) == 0 { options.ContentType = "application/proto" } if options.Broker == nil { options.Broker = broker.DefaultBroker } if options.Registry == nil { options.Registry = registry.DefaultRegistry } if options.Selector == nil { options.Selector = selector.NewSelector( selector.Registry(options.Registry), ) } rc := &httpClient{ once: sync.Once{}, opts: options, } c := client.Client(rc) // wrap in reverse for i := len(options.Wrappers); i > 0; i-- { c = options.Wrappers[i-1](c) } return c } func NewClient(opts ...client.Option) client.Client { return newClient(opts...) }