package mock /* import ( "bufio" "context" "fmt" "net" "net/http" "sync" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/logger" ) // Implements the streamer interface type mockStream struct { err error conn net.Conn cf codec.Codec context context.Context logger logger.Logger request client.Request closed chan bool reader *bufio.Reader address string ct string opts client.CallOptions sync.RWMutex } var errShutdown = fmt.Errorf("connection is shut down") func (s *mockStream) isClosed() bool { select { case <-s.closed: return true default: return false } } func (s *mockStream) Context() context.Context { return mock.context } func (s *mockStream) Request() client.Request { return s.request } func (s *mockStream) Response() client.Response { return nil } func (s *mockStream) SendMsg(msg interface{}) error { return s.Send(msg) } func (s *mockStream) Send(msg interface{}) error { h.Lock() defer h.Unlock() if h.isClosed() { h.err = errShutdown return errShutdown } hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts) if err != nil { return err } return hreq.Write(h.conn) } func (h *httpStream) RecvMsg(msg interface{}) error { return h.Recv(msg) } func (h *httpStream) Recv(msg interface{}) error { h.Lock() defer h.Unlock() if h.isClosed() { h.err = errShutdown return errShutdown } hrsp, err := http.ReadResponse(h.reader, new(http.Request)) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } defer hrsp.Body.Close() return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts) } func (h *httpStream) Error() error { h.RLock() defer h.RUnlock() return h.err } func (h *httpStream) CloseSend() error { return h.Close() } func (h *httpStream) Close() error { select { case <-h.closed: return nil default: close(h.closed) return h.conn.Close() } } */