120 lines
2.0 KiB
Go
120 lines
2.0 KiB
Go
package mock
|
|
|
|
/*
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"go.unistack.org/micro/v3/client"
|
|
"go.unistack.org/micro/v3/codec"
|
|
"go.unistack.org/micro/v3/errors"
|
|
"go.unistack.org/micro/v3/logger"
|
|
)
|
|
|
|
// Implements the streamer interface
|
|
type mockStream struct {
|
|
err error
|
|
conn net.Conn
|
|
cf codec.Codec
|
|
context context.Context
|
|
logger logger.Logger
|
|
request client.Request
|
|
closed chan bool
|
|
reader *bufio.Reader
|
|
address string
|
|
ct string
|
|
opts client.CallOptions
|
|
sync.RWMutex
|
|
}
|
|
|
|
var errShutdown = fmt.Errorf("connection is shut down")
|
|
|
|
func (s *mockStream) isClosed() bool {
|
|
select {
|
|
case <-s.closed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (s *mockStream) Context() context.Context {
|
|
return mock.context
|
|
}
|
|
|
|
func (s *mockStream) Request() client.Request {
|
|
return s.request
|
|
}
|
|
|
|
func (s *mockStream) Response() client.Response {
|
|
return nil
|
|
}
|
|
|
|
func (s *mockStream) SendMsg(msg interface{}) error {
|
|
return s.Send(msg)
|
|
}
|
|
|
|
func (s *mockStream) Send(msg interface{}) error {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
|
|
if h.isClosed() {
|
|
h.err = errShutdown
|
|
return errShutdown
|
|
}
|
|
|
|
hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return hreq.Write(h.conn)
|
|
}
|
|
|
|
func (h *httpStream) RecvMsg(msg interface{}) error {
|
|
return h.Recv(msg)
|
|
}
|
|
|
|
func (h *httpStream) Recv(msg interface{}) error {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
|
|
if h.isClosed() {
|
|
h.err = errShutdown
|
|
return errShutdown
|
|
}
|
|
|
|
hrsp, err := http.ReadResponse(h.reader, new(http.Request))
|
|
if err != nil {
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
defer hrsp.Body.Close()
|
|
|
|
return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts)
|
|
}
|
|
|
|
func (h *httpStream) Error() error {
|
|
h.RLock()
|
|
defer h.RUnlock()
|
|
return h.err
|
|
}
|
|
|
|
func (h *httpStream) CloseSend() error {
|
|
return h.Close()
|
|
}
|
|
|
|
func (h *httpStream) Close() error {
|
|
select {
|
|
case <-h.closed:
|
|
return nil
|
|
default:
|
|
close(h.closed)
|
|
return h.conn.Close()
|
|
}
|
|
}
|
|
*/
|