use backoff for lookup too

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2022-05-01 23:40:27 +03:00
parent e41b3225dc
commit 4a22454ba1
3 changed files with 46 additions and 27 deletions

61
drpc.go
View File

@@ -15,6 +15,7 @@ import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/selector"
"storj.io/drpc/drpcconn"
dmetadata "storj.io/drpc/drpcmetadata"
)
@@ -401,18 +402,7 @@ func (d *drpcClient) Call(ctx context.Context, req client.Request, rsp interface
callOpts.Address = []string{d.opts.Proxy}
}
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err := d.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return err
}
var next selector.Next
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
@@ -427,6 +417,23 @@ func (d *drpcClient) Call(ctx context.Context, req client.Request, rsp interface
time.Sleep(t)
}
if next == nil {
var routes []string
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err = d.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err = callOpts.Selector.Select(routes)
if err != nil {
return err
}
}
// get the next node
node := next()
@@ -520,18 +527,7 @@ func (g *drpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
callOpts.Address = []string{g.opts.Proxy}
}
// lookup the route to send the reques to
// TODO: move to internal lookup func
routes, err := g.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
var next selector.Next
call := func(i int) (client.Stream, error) {
// call backoff first. Someone may want an initial start delay
@@ -545,6 +541,23 @@ func (g *drpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
time.Sleep(t)
}
if next == nil {
var routes []string
// lookup the route to send the reques to
// TODO apply any filtering here
routes, err = d.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// balance the list of nodes
next, err := callOpts.Selector.Select(routes)
if err != nil {
return nil, err
}
}
// get the next node
node := next()