diff --git a/flow/default.go b/flow/default.go index ec526f76..e9c53a56 100644 --- a/flow/default.go +++ b/flow/default.go @@ -4,10 +4,11 @@ import ( "context" "fmt" "sync" - "time" "github.com/google/uuid" "github.com/silas/dag" + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/codec" ) type microFlow struct { @@ -87,33 +88,44 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex return "", err } } - err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) + if options.Reverse { + err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) + } else { + err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) + } if err != nil { return "", err } var wg sync.WaitGroup - cherr := make(chan error) + cherr := make(chan error, 1) + defer close(cherr) - select { - case err = <-cherr: - return "", err - default: + nctx, cancel := context.WithCancel(ctx) + defer cancel() + nopts := make([]ExecuteOption, 0, len(opts)+5) + nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store)) + + go func() { for idx := range steps { wg.Add(len(steps[idx])) for nidx := range steps[idx] { go func(step Step) { - if err = step.Execute(ctx, req, opts...); err != nil { + defer wg.Done() + if err = step.Execute(nctx, req, nopts...); err != nil { cherr <- err + cancel() } - wg.Done() }(steps[idx][nidx]) } wg.Wait() } - } + cherr <- nil + }() - return uid.String(), nil + err = <-cherr + + return uid.String(), err } func NewFlow(opts ...Option) Flow { @@ -236,9 +248,17 @@ func (s *microCallStep) Hashcode() interface{} { } func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { - fmt.Printf("execute %s with %#v\n", s.String(), req) - time.Sleep(1 * time.Second) - return nil + options := NewExecuteOptions(opts...) + if options.Client == nil { + return fmt.Errorf("client not set") + } + rsp := &codec.Frame{} + copts := []client.CallOption{client.WithRetries(0)} + if options.Timeout > 0 { + copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) + } + err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) + return err } type microPublishStep struct { diff --git a/flow/options.go b/flow/options.go index 8c7f4ccd..0db4d324 100644 --- a/flow/options.go +++ b/flow/options.go @@ -2,6 +2,7 @@ package flow import ( "context" + "time" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/logger" @@ -116,13 +117,67 @@ type ExecuteOptions struct { // Meter holds the meter Meter meter.Meter // Store used for intermediate results - Store store.Store + Store store.Store + // Context can be used to abort execution or pass additional opts Context context.Context - Start string + // Start step + Start string + // Reverse execution + Reverse bool + // Timeout for execution + Timeout time.Duration } type ExecuteOption func(*ExecuteOptions) +func ExecuteClient(c client.Client) ExecuteOption { + return func(o *ExecuteOptions) { + o.Client = c + } +} + +func ExecuteTracer(t tracer.Tracer) ExecuteOption { + return func(o *ExecuteOptions) { + o.Tracer = t + } +} + +func ExecuteLogger(l logger.Logger) ExecuteOption { + return func(o *ExecuteOptions) { + o.Logger = l + } +} + +func ExecuteMeter(m meter.Meter) ExecuteOption { + return func(o *ExecuteOptions) { + o.Meter = m + } +} + +func ExecuteStore(s store.Store) ExecuteOption { + return func(o *ExecuteOptions) { + o.Store = s + } +} + +func ExecuteContext(ctx context.Context) ExecuteOption { + return func(o *ExecuteOptions) { + o.Context = ctx + } +} + +func ExecuteReverse(b bool) ExecuteOption { + return func(o *ExecuteOptions) { + o.Reverse = b + } +} + +func ExecuteTimeout(td time.Duration) ExecuteOption { + return func(o *ExecuteOptions) { + o.Timeout = td + } +} + func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { options := ExecuteOptions{} for _, o := range opts {