package http import ( "bufio" "context" "fmt" "io" "net" "net/http" "sync" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/logger" ) // Implements the streamer interface type httpStream struct { err error conn net.Conn cf codec.Codec context context.Context logger logger.Logger request client.Request closed chan bool reader *bufio.Reader address string ct string opts client.CallOptions sync.RWMutex } var errShutdown = fmt.Errorf("connection is shut down") func (h *httpStream) isClosed() bool { select { case <-h.closed: return true default: return false } } func (h *httpStream) Context() context.Context { return h.context } func (h *httpStream) Request() client.Request { return h.request } func (h *httpStream) Response() client.Response { return nil } func (h *httpStream) SendMsg(msg interface{}) error { return h.Send(msg) } func (h *httpStream) Send(msg interface{}) error { h.Lock() defer h.Unlock() if h.isClosed() { h.err = errShutdown return errShutdown } hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts) if err != nil { return err } return hreq.Write(h.conn) } func (h *httpStream) RecvMsg(msg interface{}) error { return h.Recv(msg) } func (h *httpStream) Recv(msg interface{}) error { h.Lock() defer h.Unlock() if h.isClosed() { h.err = errShutdown return errShutdown } hrsp, err := http.ReadResponse(h.reader, new(http.Request)) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } defer hrsp.Body.Close() return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts) } func (h *httpStream) Error() error { h.RLock() defer h.RUnlock() return h.err } func (h *httpStream) CloseSend() error { return h.Close() } func (h *httpStream) Close() error { select { case <-h.closed: return nil default: close(h.closed) return h.conn.Close() } } func (h *httpStream) parseRsp(ctx context.Context, log logger.Logger, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error { var err error var buf []byte // fast path return if hrsp.StatusCode == http.StatusNoContent { return nil } select { case <-ctx.Done(): err = ctx.Err() default: if hrsp.Body != nil { buf, err = io.ReadAll(hrsp.Body) if err != nil { if log.V(logger.ErrorLevel) { log.Error(ctx, "failed to read body", err) } return errors.InternalServerError("go.micro.client", string(buf)) } } if log.V(logger.DebugLevel) { log.Debug(ctx, fmt.Sprintf("response %s with %v", buf, hrsp.Header)) } if hrsp.StatusCode < 400 { if err = cf.Unmarshal(buf, rsp); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } return nil } var rerr interface{} errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{}) if ok && errmap != nil { if rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok { rerr, ok = errmap["default"].(error) } } if !ok || rerr == nil { return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode)) } if cerr := cf.Unmarshal(buf, rerr); cerr != nil { return errors.InternalServerError("go.micro.client", cerr.Error()) } if err, ok = rerr.(error); !ok { err = &Error{rerr} } } return err }