From 3f5fc8ec0676dc75e5734daffe44be344e4c3d68 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 30 Mar 2023 02:24:54 +0300 Subject: [PATCH] initial import Signed-off-by: Vasiliy Tolstov --- go.mod | 5 ++ go.sum | 8 ++ mock.go | 251 +++++++++++++++++++++++++++++++++++++++++++++++++++++ request.go | 57 ++++++++++++ stream.go | 119 +++++++++++++++++++++++++ 5 files changed, 440 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 mock.go create mode 100644 request.go create mode 100644 stream.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b490a82 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module go.unistack.org/micro-client-mock/v3 + +go 1.20 + +require go.unistack.org/micro/v3 v3.10.18 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1d8a6c5 --- /dev/null +++ b/go.sum @@ -0,0 +1,8 @@ +github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +go.unistack.org/micro/v3 v3.10.18 h1:iz193N8eZKGrKPXuX6XMsGIRHMqdvUaZSfb9mzwlUYM= +go.unistack.org/micro/v3 v3.10.18/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mock.go b/mock.go new file mode 100644 index 0000000..70c4f98 --- /dev/null +++ b/mock.go @@ -0,0 +1,251 @@ +package mock + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/errors" + rutil "go.unistack.org/micro/v3/util/reflect" +) + +var _ client.Client = (*MockClient)(nil) + +type MockClient struct { + opts client.Options + mu sync.Mutex + expected []expectation +} + +func (c *MockClient) newCodec(ct string) (codec.Codec, error) { + if idx := strings.IndexRune(ct, ';'); idx >= 0 { + ct = ct[:idx] + } + + if cc, ok := c.opts.Codecs[ct]; ok { + return cc, nil + } + + return nil, codec.ErrUnknownContentType +} + +// an expectation interface +type expectation interface { + fulfilled() bool + Lock() + Unlock() + String() string +} + +// common expectation struct +// satisfies the expectation interface +type commonExpectation struct { + sync.Mutex + triggered bool + err error +} + +func (e *commonExpectation) fulfilled() bool { + return e.triggered +} + +// ExpectedRequest is used to manage client.Call expectations. +// Returned by *MockClient.ExpectRequest. +type ExpectedRequest struct { + commonExpectation + delay time.Duration + rsp interface{} + req client.Request +} + +// WillDelayFor allows to specify duration for which it will delay result. May +// be used together with Context. +func (e *ExpectedRequest) WillDelayFor(duration time.Duration) *ExpectedRequest { + e.delay = duration + return e +} + +// WillReturnError allows to set an error for expected client.Call +func (e *ExpectedRequest) WillReturnError(err error) *ExpectedRequest { + e.err = err + return e +} + +// WillReturnResponse allows to set a response for expected client.Call +func (e *ExpectedRequest) WillReturnResponse(rsp interface{}) *ExpectedRequest { + e.rsp = rsp + return e +} + +// String returns string representation +func (e *ExpectedRequest) String() string { + msg := "ExpectedRequest => expecting client.Call request" + if e.err != nil { + msg += fmt.Sprintf(", which should return error: %s", e.err) + } + if e.rsp != nil { + msg += fmt.Sprintf(", which should return rsp: %v", e.rsp) + } + return msg +} + +func (c *MockClient) ExpectationsWereMet() error { + for _, e := range c.expected { + e.Lock() + fulfilled := e.fulfilled() + e.Unlock() + + if !fulfilled { + return fmt.Errorf("there is a remaining expectation which was not matched: %s", e) + } + + } + + return nil +} + +func (c *MockClient) ExpectRequest(req client.Request) *ExpectedRequest { + e := &ExpectedRequest{req: req} + c.expected = append(c.expected, e) + return e +} + +func (c *MockClient) BatchPublish(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error { + return nil +} + +func (c *MockClient) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { + return nil // c.opts.Broker.Publish() +} + +func (c *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + c.mu.Lock() + defer c.mu.Unlock() + + options := client.NewCallOptions(opts...) + ct := req.ContentType() + if len(options.ContentType) > 0 { + ct = options.ContentType + } + + cf, err := c.newCodec(ct) + if err != nil { + return errors.BadRequest("go.micro.client", err.Error()) + } + + for _, e := range c.expected { + er, ok := e.(*ExpectedRequest) + if !ok { + continue + } + + if er.delay > 0 { + time.Sleep(er.delay) + } + + if er.req.Service() != req.Service() || + er.req.Method() != req.Method() { + continue + } + + er.triggered = true + + if er.err != nil { + return er.err + } + + if er.req == nil { + return errors.BadRequest("go.micro.client", "empty request passed") + } + + src := er.req.Body() + switch reqbody := er.req.Body().(type) { + case []byte: + src, err = rutil.Zero(req.Body()) + if err == nil { + err = cf.Unmarshal(reqbody, src) + } + if err != nil { + return errors.BadRequest("go.micro.client", err.Error()) + } + case client.Request: + break + default: + return errors.BadRequest("go.micro.client", "unknown request passed: %v", reqbody) + } + + if !reflect.DeepEqual(req.Body(), src) { + return errors.BadRequest("go.micro.client", "unexpected request %v != %v", req.Body(), src) + } + + if er.rsp == nil { + return nil + } + + switch rspbody := er.rsp.(type) { + case []byte: + if err = cf.Unmarshal(rspbody, rsp); err != nil { + return errors.BadRequest("go.micro.client", err.Error()) + } + return nil + } + + v := reflect.ValueOf(rsp) + + if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr { + v = reflect.Indirect(v) + } + response := er.rsp + if t := reflect.TypeOf(er.rsp); t.Kind() == reflect.Func { + response = reflect.ValueOf(er.rsp).Call([]reflect.Value{})[0].Interface() + } + + v.Set(reflect.ValueOf(response)) + + return nil + } + + return fmt.Errorf("can't find service %s", req.Method()) +} + +func (c *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + return nil, nil +} + +func (c *MockClient) Init(opts ...client.Option) error { + for _, o := range opts { + o(&c.opts) + } + + return nil +} + +func (c *MockClient) String() string { + return "mock" +} + +func (c *MockClient) Name() string { + return c.opts.Name +} + +func (c *MockClient) Options() client.Options { + return c.opts +} + +func (c *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + return nil +} + +func (c *MockClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request { + return newRequest(service, method, req, c.opts.ContentType, opts...) +} + +func NewClient(opts ...client.Option) *MockClient { + options := client.NewOptions(opts...) + return &MockClient{opts: options} +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..24cd6d0 --- /dev/null +++ b/request.go @@ -0,0 +1,57 @@ +package mock + +import ( + "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/codec" +) + +type mockRequest struct { + service string + method string + contentType string + request interface{} + opts client.RequestOptions +} + +func newRequest(service, method string, request interface{}, contentType string, opts ...client.RequestOption) client.Request { + options := client.NewRequestOptions(opts...) + if len(options.ContentType) == 0 { + options.ContentType = contentType + } + + return &mockRequest{ + service: service, + method: method, + request: request, + contentType: options.ContentType, + opts: options, + } +} + +func (r *mockRequest) ContentType() string { + return r.contentType +} + +func (r *mockRequest) Service() string { + return r.service +} + +func (r *mockRequest) Method() string { + return r.method +} + +func (r *mockRequest) Endpoint() string { + return r.method +} + +func (r *mockRequest) Codec() codec.Codec { + return nil +} + +func (r *mockRequest) Body() interface{} { + return r.request +} + +func (r *mockRequest) Stream() bool { + return r.opts.Stream +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..554e903 --- /dev/null +++ b/stream.go @@ -0,0 +1,119 @@ +package mock + +/* +import ( + "bufio" + "context" + "fmt" + "net" + "net/http" + "sync" + + "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/errors" + "go.unistack.org/micro/v3/logger" +) + +// Implements the streamer interface +type mockStream struct { + err error + conn net.Conn + cf codec.Codec + context context.Context + logger logger.Logger + request client.Request + closed chan bool + reader *bufio.Reader + address string + ct string + opts client.CallOptions + sync.RWMutex +} + +var errShutdown = fmt.Errorf("connection is shut down") + +func (s *mockStream) isClosed() bool { + select { + case <-s.closed: + return true + default: + return false + } +} + +func (s *mockStream) Context() context.Context { + return mock.context +} + +func (s *mockStream) Request() client.Request { + return s.request +} + +func (s *mockStream) Response() client.Response { + return nil +} + +func (s *mockStream) SendMsg(msg interface{}) error { + return s.Send(msg) +} + +func (s *mockStream) Send(msg interface{}) error { + h.Lock() + defer h.Unlock() + + if h.isClosed() { + h.err = errShutdown + return errShutdown + } + + hreq, err := newRequest(h.context, h.logger, h.address, h.request, h.ct, h.cf, msg, h.opts) + if err != nil { + return err + } + + return hreq.Write(h.conn) +} + +func (h *httpStream) RecvMsg(msg interface{}) error { + return h.Recv(msg) +} + +func (h *httpStream) Recv(msg interface{}) error { + h.Lock() + defer h.Unlock() + + if h.isClosed() { + h.err = errShutdown + return errShutdown + } + + hrsp, err := http.ReadResponse(h.reader, new(http.Request)) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + defer hrsp.Body.Close() + + return h.parseRsp(h.context, h.logger, hrsp, h.cf, msg, h.opts) +} + +func (h *httpStream) Error() error { + h.RLock() + defer h.RUnlock() + return h.err +} + +func (h *httpStream) CloseSend() error { + return h.Close() +} + +func (h *httpStream) Close() error { + select { + case <-h.closed: + return nil + default: + close(h.closed) + return h.conn.Close() + } +} +*/