119
stream.go
Normal file
119
stream.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package mock
|
||||
|
||||
/*
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
)
|
||||
|
||||
// Implements the streamer interface
|
||||
type mockStream struct {
|
||||
err error
|
||||
conn net.Conn
|
||||
cf codec.Codec
|
||||
context context.Context
|
||||
logger logger.Logger
|
||||
request client.Request
|
||||
closed chan bool
|
||||
reader *bufio.Reader
|
||||
address string
|
||||
ct string
|
||||
opts client.CallOptions
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
var errShutdown = fmt.Errorf("connection is shut down")
|
||||
|
||||
func (s *mockStream) isClosed() bool {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *mockStream) Context() context.Context {
|
||||
return mock.context
|
||||
}
|
||||
|
||||
func (s *mockStream) Request() client.Request {
|
||||
return s.request
|
||||
}
|
||||
|
||||
func (s *mockStream) Response() client.Response {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *mockStream) SendMsg(msg interface{}) error {
|
||||
return s.Send(msg)
|
||||
}
|
||||
|
||||
func (s *mockStream) Send(msg interface{}) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.isClosed() {
|
||||
h.err = errShutdown
|
||||
return errShutdown
|
||||
}
|
||||
|
||||
hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return hreq.Write(h.conn)
|
||||
}
|
||||
|
||||
func (h *httpStream) RecvMsg(msg interface{}) error {
|
||||
return h.Recv(msg)
|
||||
}
|
||||
|
||||
func (h *httpStream) Recv(msg interface{}) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.isClosed() {
|
||||
h.err = errShutdown
|
||||
return errShutdown
|
||||
}
|
||||
|
||||
hrsp, err := http.ReadResponse(h.reader, new(http.Request))
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
defer hrsp.Body.Close()
|
||||
|
||||
return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts)
|
||||
}
|
||||
|
||||
func (h *httpStream) Error() error {
|
||||
h.RLock()
|
||||
defer h.RUnlock()
|
||||
return h.err
|
||||
}
|
||||
|
||||
func (h *httpStream) CloseSend() error {
|
||||
return h.Close()
|
||||
}
|
||||
|
||||
func (h *httpStream) Close() error {
|
||||
select {
|
||||
case <-h.closed:
|
||||
return nil
|
||||
default:
|
||||
close(h.closed)
|
||||
return h.conn.Close()
|
||||
}
|
||||
}
|
||||
*/
|
Reference in New Issue
Block a user