From 46da092899603ddb74746d7e1ae8e6ca26baa78e Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 16 Apr 2022 16:35:46 +0300 Subject: [PATCH] client: implement Call and Stream methods for noop Signed-off-by: Vasiliy Tolstov --- client/noop.go | 262 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 262 insertions(+) diff --git a/client/noop.go b/client/noop.go index c427f334..bdded98e 100644 --- a/client/noop.go +++ b/client/noop.go @@ -2,6 +2,8 @@ package client import ( "context" + "fmt" + "time" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/codec" @@ -181,6 +183,133 @@ func (n *noopClient) String() string { } func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { + // make a copy of call opts + callOpts := n.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + // check if we already have a deadline + d, ok := ctx.Deadline() + if !ok { + var cancel context.CancelFunc + // no deadline so we create a new one + ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) + defer cancel() + } else { + // got a deadline so no need to setup context + // but we need to set the timeout we pass along + opt := WithRequestTimeout(time.Until(d)) + opt(&callOpts) + } + + // should we noop right here? + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + // make copy of call method + hcall := n.call + + // wrap the call in reverse + for i := len(callOpts.CallWrappers); i > 0; i-- { + hcall = callOpts.CallWrappers[i-1](hcall) + } + + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = n.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = n.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(n.opts.Proxy) > 0 { + callOpts.Address = []string{n.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err := n.opts.Lookup(ctx, req, callOpts) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return err + } + + // return errors.New("go.micro.client", "request timeout", 408) + call := func(i int) error { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + node := next() + + // make the call + err = hcall(ctx, node, req, rsp, callOpts) + // record the result of the call to inform future routing decisions + if verr := n.opts.Selector.Record(node, err); verr != nil { + return verr + } + + // try and transform the error to a go-micro error + if verr, ok := err.(*errors.Error); ok { + return verr + } + + return err + } + + ch := make(chan error, callOpts.Retries) + var gerr error + + for i := 0; i <= callOpts.Retries; i++ { + go func() { + ch <- call(i) + }() + + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case err := <-ch: + // if the call succeeded lets bail early + if err == nil { + return nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return rerr + } + + if !retry { + return err + } + + gerr = err + } + } + + return gerr +} + +func (n *noopClient) call(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { return nil } @@ -194,6 +323,139 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp } func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { + // make a copy of call opts + callOpts := n.opts.CallOptions + for _, o := range opts { + o(&callOpts) + } + + // check if we already have a deadline + d, ok := ctx.Deadline() + if !ok && callOpts.StreamTimeout > time.Duration(0) { + var cancel context.CancelFunc + // no deadline so we create a new one + ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout) + defer cancel() + } else { + // got a deadline so no need to setup context + // but we need to set the timeout we pass along + o := WithStreamTimeout(time.Until(d)) + o(&callOpts) + } + + // should we noop right here? + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + /* + // make copy of call method + hstream := h.stream + // wrap the call in reverse + for i := len(callOpts.CallWrappers); i > 0; i-- { + hstream = callOpts.CallWrappers[i-1](hstream) + } + */ + + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = n.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = n.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(n.opts.Proxy) > 0 { + callOpts.Address = []string{n.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err := n.opts.Lookup(ctx, req, callOpts) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return nil, err + } + + call := func(i int) (Stream, error) { + // call backoff first. Someone may want an initial start delay + t, cerr := callOpts.Backoff(ctx, req, i) + if cerr != nil { + return nil, errors.InternalServerError("go.micro.client", cerr.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + node := next() + + stream, cerr := n.stream(ctx, node, req, callOpts) + + // record the result of the call to inform future routing decisions + if verr := n.opts.Selector.Record(node, cerr); verr != nil { + return nil, verr + } + + // try and transform the error to a go-micro error + if verr, ok := cerr.(*errors.Error); ok { + return nil, verr + } + + return stream, cerr + } + + type response struct { + stream Stream + err error + } + + ch := make(chan response, callOpts.Retries) + var grr error + + for i := 0; i <= callOpts.Retries; i++ { + go func() { + s, cerr := call(i) + ch <- response{s, cerr} + }() + + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case rsp := <-ch: + // if the call succeeded lets bail early + if rsp.err == nil { + return rsp.stream, nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return nil, rerr + } + + if !retry { + return nil, rsp.err + } + + grr = rsp.err + } + } + + return nil, grr +} + +func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) { return &noopStream{}, nil }