diff --git a/drpc.go b/drpc.go index 32c02d4..cc09f4f 100644 --- a/drpc.go +++ b/drpc.go @@ -15,6 +15,7 @@ import ( "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/selector" "storj.io/drpc/drpcconn" dmetadata "storj.io/drpc/drpcmetadata" ) @@ -401,18 +402,7 @@ func (d *drpcClient) Call(ctx context.Context, req client.Request, rsp interface callOpts.Address = []string{d.opts.Proxy} } - // lookup the route to send the reques to - // TODO apply any filtering here - routes, err := d.opts.Lookup(ctx, req, callOpts) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - // balance the list of nodes - next, err := callOpts.Selector.Select(routes) - if err != nil { - return err - } + var next selector.Next // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { @@ -427,6 +417,23 @@ func (d *drpcClient) Call(ctx context.Context, req client.Request, rsp interface time.Sleep(t) } + if next == nil { + var routes []string + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err = d.opts.Lookup(ctx, req, callOpts) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err = callOpts.Selector.Select(routes) + if err != nil { + return err + } + } + // get the next node node := next() @@ -520,18 +527,7 @@ func (g *drpcClient) Stream(ctx context.Context, req client.Request, opts ...cli callOpts.Address = []string{g.opts.Proxy} } - // lookup the route to send the reques to - // TODO: move to internal lookup func - routes, err := g.opts.Lookup(ctx, req, callOpts) - if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) - } - - // balance the list of nodes - next, err := callOpts.Selector.Select(routes) - if err != nil { - return nil, err - } + var next selector.Next call := func(i int) (client.Stream, error) { // call backoff first. Someone may want an initial start delay @@ -545,6 +541,23 @@ func (g *drpcClient) Stream(ctx context.Context, req client.Request, opts ...cli time.Sleep(t) } + if next == nil { + var routes []string + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err = d.opts.Lookup(ctx, req, callOpts) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return nil, err + } + } + // get the next node node := next() diff --git a/go.mod b/go.mod index e008a67..417af0b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,9 @@ module go.unistack.org/micro-client-drpc/v3 go 1.16 require ( + github.com/google/gnostic v0.6.9 // indirect + github.com/zeebo/errs v1.3.0 // indirect go.unistack.org/micro/v3 v3.9.7 + google.golang.org/protobuf v1.28.0 // indirect storj.io/drpc v0.0.30 ) diff --git a/go.sum b/go.sum index e2258f1..b403bee 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,9 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/gnostic v0.6.6 h1:MVSM2r2j9aRUvYNym66JGW96Ddd5MN4sTi59yktb6yk= github.com/google/gnostic v0.6.6/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= +github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0= +github.com/google/gnostic v0.6.9/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -71,8 +72,9 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1: github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= -github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= +github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.unistack.org/micro-proto/v3 v3.2.7 h1:zG6d69kHc+oij2lwQ3AfrCgdjiEVRG2A7TlsxjusWs4= go.unistack.org/micro-proto/v3 v3.2.7/go.mod h1:ZltVWNECD5yK+40+OCONzGw4OtmSdTpVi8/KFgo9dqM= @@ -146,8 +148,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=