176 lines
3.4 KiB
Go
176 lines
3.4 KiB
Go
package http
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"go.unistack.org/micro/v3/client"
|
|
"go.unistack.org/micro/v3/codec"
|
|
"go.unistack.org/micro/v3/errors"
|
|
"go.unistack.org/micro/v3/logger"
|
|
)
|
|
|
|
// Implements the streamer interface
|
|
type httpStream struct {
|
|
err error
|
|
conn net.Conn
|
|
cf codec.Codec
|
|
context context.Context
|
|
logger logger.Logger
|
|
request client.Request
|
|
closed chan bool
|
|
reader *bufio.Reader
|
|
address string
|
|
ct string
|
|
opts client.CallOptions
|
|
sync.RWMutex
|
|
}
|
|
|
|
var errShutdown = fmt.Errorf("connection is shut down")
|
|
|
|
func (h *httpStream) isClosed() bool {
|
|
select {
|
|
case <-h.closed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (h *httpStream) Context() context.Context {
|
|
return h.context
|
|
}
|
|
|
|
func (h *httpStream) Request() client.Request {
|
|
return h.request
|
|
}
|
|
|
|
func (h *httpStream) Response() client.Response {
|
|
return nil
|
|
}
|
|
|
|
func (h *httpStream) SendMsg(msg interface{}) error {
|
|
return h.Send(msg)
|
|
}
|
|
|
|
func (h *httpStream) Send(msg interface{}) error {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
|
|
if h.isClosed() {
|
|
h.err = errShutdown
|
|
return errShutdown
|
|
}
|
|
|
|
hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return hreq.Write(h.conn)
|
|
}
|
|
|
|
func (h *httpStream) RecvMsg(msg interface{}) error {
|
|
return h.Recv(msg)
|
|
}
|
|
|
|
func (h *httpStream) Recv(msg interface{}) error {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
|
|
if h.isClosed() {
|
|
h.err = errShutdown
|
|
return errShutdown
|
|
}
|
|
|
|
hrsp, err := http.ReadResponse(h.reader, new(http.Request))
|
|
if err != nil {
|
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
|
}
|
|
defer hrsp.Body.Close()
|
|
|
|
return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts)
|
|
}
|
|
|
|
func (h *httpStream) Error() error {
|
|
h.RLock()
|
|
defer h.RUnlock()
|
|
return h.err
|
|
}
|
|
|
|
func (h *httpStream) CloseSend() error {
|
|
return h.Close()
|
|
}
|
|
|
|
func (h *httpStream) Close() error {
|
|
select {
|
|
case <-h.closed:
|
|
return nil
|
|
default:
|
|
close(h.closed)
|
|
return h.conn.Close()
|
|
}
|
|
}
|
|
|
|
func (h *httpStream) parseRsp(ctx context.Context, log logger.Logger, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
|
|
var err error
|
|
var buf []byte
|
|
|
|
// fast path return
|
|
if hrsp.StatusCode == http.StatusNoContent {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
default:
|
|
if hrsp.Body != nil {
|
|
buf, err = io.ReadAll(hrsp.Body)
|
|
if err != nil {
|
|
if log.V(logger.ErrorLevel) {
|
|
log.Error(ctx, "failed to read body", err)
|
|
}
|
|
return errors.InternalServerError("go.micro.client", "%s", buf)
|
|
}
|
|
}
|
|
|
|
if log.V(logger.DebugLevel) {
|
|
log.Debug(ctx, fmt.Sprintf("response %s with %v", buf, hrsp.Header))
|
|
}
|
|
|
|
if hrsp.StatusCode < 400 {
|
|
if err = cf.Unmarshal(buf, rsp); err != nil {
|
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var rerr interface{}
|
|
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
|
|
if ok && errmap != nil {
|
|
if rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
|
|
rerr, ok = errmap["default"].(error)
|
|
}
|
|
}
|
|
if !ok || rerr == nil {
|
|
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
|
|
}
|
|
|
|
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
|
|
return errors.InternalServerError("go.micro.client", "%+v", cerr)
|
|
}
|
|
|
|
if err, ok = rerr.(error); !ok {
|
|
err = &Error{rerr}
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|