use own fork

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-01-10 14:48:10 +03:00
parent 280d3a64c9
commit 59d77f7b8c
22 changed files with 812 additions and 866 deletions

View File

@@ -4,21 +4,23 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"github.com/micro/go-micro/v2/client"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/errors"
)
// Implements the streamer interface
type httpStream struct {
sync.RWMutex
address string
codec Codec
opts client.CallOptions
codec codec.Codec
context context.Context
header http.Header
seq uint64
@@ -30,7 +32,7 @@ type httpStream struct {
}
var (
errShutdown = errors.New("connection is shut down")
errShutdown = fmt.Errorf("connection is shut down")
)
func (h *httpStream) isClosed() bool {
@@ -68,22 +70,16 @@ func (h *httpStream) Send(msg interface{}) error {
return err
}
buf := &buffer{bytes.NewBuffer(b)}
defer buf.Close()
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.address,
Path: h.request.Endpoint(),
},
Header: h.header,
Body: buf,
ContentLength: int64(len(b)),
Host: h.address,
buf := bytes.NewBuffer(b)
req, err := newRequest(h.address, h.request, h.opts)
if err != nil {
return err
}
req.Header = h.header
req.Body = ioutil.NopCloser(buf)
req.ContentLength = int64(len(b))
return req.Write(h.conn)
}
@@ -96,22 +92,13 @@ func (h *httpStream) Recv(msg interface{}) error {
return errShutdown
}
rsp, err := http.ReadResponse(h.reader, new(http.Request))
hrsp, err := http.ReadResponse(h.reader, new(http.Request))
if err != nil {
return err
return errors.InternalServerError("go.micro.client", err.Error())
}
defer rsp.Body.Close()
defer hrsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
return h.codec.Unmarshal(b, msg)
return parseRsp(h.context, hrsp, h.codec, msg, h.opts)
}
func (h *httpStream) Error() error {