client: implement Call and Stream methods for noop
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
b871f64ba6
commit
46da092899
262
client/noop.go
262
client/noop.go
@ -2,6 +2,8 @@ package client
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
@ -181,6 +183,133 @@ func (n *noopClient) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||||
|
// make a copy of call opts
|
||||||
|
callOpts := n.opts.CallOptions
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(&callOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if we already have a deadline
|
||||||
|
d, ok := ctx.Deadline()
|
||||||
|
if !ok {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
// no deadline so we create a new one
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
} else {
|
||||||
|
// got a deadline so no need to setup context
|
||||||
|
// but we need to set the timeout we pass along
|
||||||
|
opt := WithRequestTimeout(time.Until(d))
|
||||||
|
opt(&callOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// should we noop right here?
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// make copy of call method
|
||||||
|
hcall := n.call
|
||||||
|
|
||||||
|
// wrap the call in reverse
|
||||||
|
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
||||||
|
hcall = callOpts.CallWrappers[i-1](hcall)
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
|
if callOpts.Router == nil {
|
||||||
|
callOpts.Router = n.opts.Router
|
||||||
|
}
|
||||||
|
|
||||||
|
if callOpts.Selector == nil {
|
||||||
|
callOpts.Selector = n.opts.Selector
|
||||||
|
}
|
||||||
|
|
||||||
|
// inject proxy address
|
||||||
|
// TODO: don't even bother using Lookup/Select in this case
|
||||||
|
if len(n.opts.Proxy) > 0 {
|
||||||
|
callOpts.Address = []string{n.opts.Proxy}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lookup the route to send the reques to
|
||||||
|
// TODO apply any filtering here
|
||||||
|
routes, err := n.opts.Lookup(ctx, req, callOpts)
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// balance the list of nodes
|
||||||
|
next, err := callOpts.Selector.Select(routes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// return errors.New("go.micro.client", "request timeout", 408)
|
||||||
|
call := func(i int) error {
|
||||||
|
// call backoff first. Someone may want an initial start delay
|
||||||
|
t, err := callOpts.Backoff(ctx, req, i)
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// only sleep if greater than 0
|
||||||
|
if t.Seconds() > 0 {
|
||||||
|
time.Sleep(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
node := next()
|
||||||
|
|
||||||
|
// make the call
|
||||||
|
err = hcall(ctx, node, req, rsp, callOpts)
|
||||||
|
// record the result of the call to inform future routing decisions
|
||||||
|
if verr := n.opts.Selector.Record(node, err); verr != nil {
|
||||||
|
return verr
|
||||||
|
}
|
||||||
|
|
||||||
|
// try and transform the error to a go-micro error
|
||||||
|
if verr, ok := err.(*errors.Error); ok {
|
||||||
|
return verr
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan error, callOpts.Retries)
|
||||||
|
var gerr error
|
||||||
|
|
||||||
|
for i := 0; i <= callOpts.Retries; i++ {
|
||||||
|
go func() {
|
||||||
|
ch <- call(i)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
||||||
|
case err := <-ch:
|
||||||
|
// if the call succeeded lets bail early
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
retry, rerr := callOpts.Retry(ctx, req, i, err)
|
||||||
|
if rerr != nil {
|
||||||
|
return rerr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !retry {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
gerr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return gerr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClient) call(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,6 +323,139 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||||
|
// make a copy of call opts
|
||||||
|
callOpts := n.opts.CallOptions
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&callOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if we already have a deadline
|
||||||
|
d, ok := ctx.Deadline()
|
||||||
|
if !ok && callOpts.StreamTimeout > time.Duration(0) {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
// no deadline so we create a new one
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout)
|
||||||
|
defer cancel()
|
||||||
|
} else {
|
||||||
|
// got a deadline so no need to setup context
|
||||||
|
// but we need to set the timeout we pass along
|
||||||
|
o := WithStreamTimeout(time.Until(d))
|
||||||
|
o(&callOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// should we noop right here?
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// make copy of call method
|
||||||
|
hstream := h.stream
|
||||||
|
// wrap the call in reverse
|
||||||
|
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
||||||
|
hstream = callOpts.CallWrappers[i-1](hstream)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
|
if callOpts.Router == nil {
|
||||||
|
callOpts.Router = n.opts.Router
|
||||||
|
}
|
||||||
|
|
||||||
|
if callOpts.Selector == nil {
|
||||||
|
callOpts.Selector = n.opts.Selector
|
||||||
|
}
|
||||||
|
|
||||||
|
// inject proxy address
|
||||||
|
// TODO: don't even bother using Lookup/Select in this case
|
||||||
|
if len(n.opts.Proxy) > 0 {
|
||||||
|
callOpts.Address = []string{n.opts.Proxy}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lookup the route to send the reques to
|
||||||
|
// TODO apply any filtering here
|
||||||
|
routes, err := n.opts.Lookup(ctx, req, callOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// balance the list of nodes
|
||||||
|
next, err := callOpts.Selector.Select(routes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
call := func(i int) (Stream, error) {
|
||||||
|
// call backoff first. Someone may want an initial start delay
|
||||||
|
t, cerr := callOpts.Backoff(ctx, req, i)
|
||||||
|
if cerr != nil {
|
||||||
|
return nil, errors.InternalServerError("go.micro.client", cerr.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// only sleep if greater than 0
|
||||||
|
if t.Seconds() > 0 {
|
||||||
|
time.Sleep(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
node := next()
|
||||||
|
|
||||||
|
stream, cerr := n.stream(ctx, node, req, callOpts)
|
||||||
|
|
||||||
|
// record the result of the call to inform future routing decisions
|
||||||
|
if verr := n.opts.Selector.Record(node, cerr); verr != nil {
|
||||||
|
return nil, verr
|
||||||
|
}
|
||||||
|
|
||||||
|
// try and transform the error to a go-micro error
|
||||||
|
if verr, ok := cerr.(*errors.Error); ok {
|
||||||
|
return nil, verr
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream, cerr
|
||||||
|
}
|
||||||
|
|
||||||
|
type response struct {
|
||||||
|
stream Stream
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan response, callOpts.Retries)
|
||||||
|
var grr error
|
||||||
|
|
||||||
|
for i := 0; i <= callOpts.Retries; i++ {
|
||||||
|
go func() {
|
||||||
|
s, cerr := call(i)
|
||||||
|
ch <- response{s, cerr}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
||||||
|
case rsp := <-ch:
|
||||||
|
// if the call succeeded lets bail early
|
||||||
|
if rsp.err == nil {
|
||||||
|
return rsp.stream, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
retry, rerr := callOpts.Retry(ctx, req, i, err)
|
||||||
|
if rerr != nil {
|
||||||
|
return nil, rerr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !retry {
|
||||||
|
return nil, rsp.err
|
||||||
|
}
|
||||||
|
|
||||||
|
grr = rsp.err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, grr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
|
||||||
return &noopStream{}, nil
|
return &noopStream{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user