diff --git a/flow/context_test.go b/flow/context_test.go index 03262697..8821df07 100644 --- a/flow/context_test.go +++ b/flow/context_test.go @@ -1,3 +1,5 @@ +//go:build ignore + package flow import ( diff --git a/flow/dag.go b/flow/dag.go deleted file mode 100644 index 4cc728ea..00000000 --- a/flow/dag.go +++ /dev/null @@ -1,21 +0,0 @@ -package flow - -type node struct { - name string -} - -func (n *node) ID() string { - return n.name -} - -func (n *node) Name() string { - return n.name -} - -func (n *node) String() string { - return n.name -} - -func (n *node) Hashcode() interface{} { - return n.name -} diff --git a/flow/dag_test.go b/flow/dag_test.go deleted file mode 100644 index 6563200c..00000000 --- a/flow/dag_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package flow - -import ( - "fmt" - "testing" - - "github.com/silas/dag" -) - -func checkErr(t *testing.T, err error) { - if err != nil { - t.Fatal(err) - } -} - -func TestDag(t *testing.T) { - d1 := &dag.AcyclicGraph{} - d2 := &dag.AcyclicGraph{} - d2v1 := d2.Add(&node{"Substep.Create"}) - v1 := d1.Add(&node{"AccountService.Create"}) - v2 := d1.Add(&node{"AuthzService.Create"}) - v3 := d1.Add(&node{"AuthnService.Create"}) - v4 := d1.Add(&node{"ProjectService.Create"}) - v5 := d1.Add(&node{"ContactService.Create"}) - v6 := d1.Add(&node{"NetworkService.Create"}) - v7 := d1.Add(&node{"MailerService.Create"}) - v8 := d1.Add(&node{"NestedService.Create"}) - v9 := d1.Add(d2v1) - d1.Connect(dag.BasicEdge(v1, v2)) - d1.Connect(dag.BasicEdge(v1, v3)) - d1.Connect(dag.BasicEdge(v1, v4)) - d1.Connect(dag.BasicEdge(v1, v5)) - d1.Connect(dag.BasicEdge(v1, v6)) - d1.Connect(dag.BasicEdge(v1, v7)) - d1.Connect(dag.BasicEdge(v7, v8)) - d1.Connect(dag.BasicEdge(v8, v9)) - - if err := d1.Validate(); err != nil { - t.Fatal(err) - } - - d1.TransitiveReduction() - - var steps [][]string - fn := func(n dag.Vertex, idx int) error { - if idx == 0 { - steps = make([][]string, 1) - steps[0] = make([]string, 0, 1) - } else if idx >= len(steps) { - tsteps := make([][]string, idx+1) - copy(tsteps, steps) - steps = tsteps - steps[idx] = make([]string, 0, 1) - } - steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) - return nil - } - - start := &node{"AccountService.Create"} - err := d1.SortedDepthFirstWalk([]dag.Vertex{start}, fn) - checkErr(t, err) - if len(steps) != 4 { - t.Fatalf("invalid steps: %#+v", steps) - } - if steps[3][0] != "Substep.Create" { - t.Fatalf("invalid last step: %#+v", steps) - } -} diff --git a/flow/default.go b/flow/default.go index 67a4225a..4d741abb 100644 --- a/flow/default.go +++ b/flow/default.go @@ -33,10 +33,6 @@ func (w *microWorkflow) ID() string { return w.id } -func (w *microWorkflow) Steps() ([][]Step, error) { - return w.getSteps("", false) -} - func (w *microWorkflow) Status() Status { return w.status } @@ -44,11 +40,11 @@ func (w *microWorkflow) Status() Status { func (w *microWorkflow) AppendSteps(steps ...Step) error { var err error w.Lock() + defer w.Unlock() for _, s := range steps { w.steps[s.String()] = s if _, err = w.g.AddVertex(s); err != nil { - w.Unlock() return err } } @@ -57,11 +53,9 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { - w.Unlock() return ErrStepNotExists } if err = w.g.AddEdge(src.String(), dst.String()); err != nil { - w.Unlock() return err } } @@ -69,8 +63,6 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error { w.g.ReduceTransitively() - w.Unlock() - return nil } @@ -78,6 +70,7 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { // TODO: handle case when some step requires or required by removed step w.Lock() + defer w.Unlock() for _, s := range steps { delete(w.steps, s.String()) @@ -88,7 +81,6 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { - w.Unlock() return ErrStepNotExists } w.g.AddEdge(src.String(), dst.String()) @@ -97,54 +89,9 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { w.g.ReduceTransitively() - w.Unlock() - return nil } -func (w *microWorkflow) getSteps(start string) ([][]Step, error) { - var steps [][]Step - var root dag.Vertex - var err error - - fn := func(n dag.Vertex, idx int) error { - if idx == 0 { - steps = make([][]Step, 1) - steps[0] = make([]Step, 0, 1) - } else if idx >= len(steps) { - tsteps := make([][]Step, idx+1) - copy(tsteps, steps) - steps = tsteps - steps[idx] = make([]Step, 0, 1) - } - steps[idx] = append(steps[idx], n.(Step)) - return nil - } - - if start != "" { - var ok bool - w.RLock() - root, ok = w.steps[start] - w.RUnlock() - if !ok { - return nil, ErrStepNotExists - } - } else { - root, err = w.g.Root() - if err != nil { - return nil, err - } - } - - err = w.g.SortedDepthFirstWalk(root, fn) - - if err != nil { - return nil, err - } - - return steps, nil -} - func (w *microWorkflow) Abort(ctx context.Context, id string) error { workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())}) @@ -173,26 +120,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu return "", err } - stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) options := NewExecuteOptions(opts...) - steps, err := w.getSteps(options.Start) - if err != nil { - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) - } - return "", err - } - - var wg sync.WaitGroup - cherr := make(chan error, 1) - chstatus := make(chan Status, 1) - - nctx, cancel := context.WithCancel(ctx) - defer cancel() - nopts := make([]ExecuteOption, 0, len(opts)+5) nopts = append(nopts, @@ -202,143 +134,274 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu ExecuteMeter(w.opts.Meter), ) nopts = append(nopts, opts...) - done := make(chan struct{}) if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) return eid, werr } - for idx := range steps { - for nidx := range steps[idx] { - cstep := steps[idx][nidx] - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - return eid, werr + + var startID string + if options.Start == "" { + mp := w.g.GetRoots() + if len(mp) != 1 { + return eid, ErrStepNotExists + } + for k := range mp { + startID = k + } + } else { + for k, v := range w.g.GetVertices() { + if v == options.Start { + startID = k } } } - go func() { - for idx := range steps { - for nidx := range steps[idx] { - wStatus := &codec.Frame{} - if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { - cherr <- werr - return + if startID == "" { + return eid, ErrStepNotExists + } + + if options.Async { + go w.handleWorkflow(startID, nopts...) + return eid, nil + } + + return eid, w.handleWorkflow(startID, nopts...) +} + +func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error { + w.RLock() + defer w.RUnlock() + + // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + // workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) + + // Get IDs of all descendant vertices. + flowIDs, errDes := w.g.GetDescendants(startID) + if errDes != nil { + return errDes + } + + // inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex). + inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1) + + // Iterate vertex IDs and create an input channel for each of them and a single + // output channel for leaves. Note, this "pre-flight" is needed to ensure we + // really have an input channel regardless of how we traverse the tree and spawn + // workers. + leafCount := 0 + + for id := range flowIDs { + + // Get all parents of this vertex. + parents, errPar := w.g.GetParents(id) + if errPar != nil { + return errPar + } + + // Create a buffered input channel that has capacity for all parent results. + inputChannels[id] = make(chan FlowResult, len(parents)) + + if w.g.isLeaf(id) { + leafCount += 1 + } + } + + // outputChannel caries the results of leaf vertices. + outputChannel := make(chan FlowResult, leafCount) + + // To also process the start vertex and to have its results being passed to its + // children, add it to the vertex IDs. Also add an input channel for the start + // vertex and feed the inputs to this channel. + flowIDs[startID] = struct{}{} + inputChannels[startID] = make(chan FlowResult, len(inputs)) + for _, i := range inputs { + inputChannels[startID] <- i + } + + wg := sync.WaitGroup{} + + // Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl. + // inputs and outputs) in a separate goroutine. + for id := range flowIDs { + + // Get all children of this vertex that later need to be notified. Note, we + // collect all children before the goroutine to be able to release the read + // lock as early as possible. + children, errChildren := w.g.GetChildren(id) + if errChildren != nil { + return errChildren + } + + // Remember to wait for this goroutine. + wg.Add(1) + + go func(id string) { + // Get this vertex's input channel. + // Note, only concurrent read here, which is fine. + c := inputChannels[id] + + // Await all parent inputs and stuff them into a slice. + parentCount := cap(c) + parentResults := make([]FlowResult, parentCount) + for i := 0; i < parentCount; i++ { + parentResults[i] = <-c + } + + // Execute the worker. + errWorker := callback(w.g, id, parentResults) + if errWorker != nil { + return errWorker + } + + // Send this worker's FlowResult onto all children's input channels or, if it is + // a leaf (i.e. no children), send the result onto the output channel. + if len(children) > 0 { + for child := range children { + inputChannels[child] <- flowResult } - if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { - chstatus <- status - return - } - if w.opts.Logger.V(logger.TraceLevel) { - w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) - } - cstep := steps[idx][nidx] - // nolint: nestif - if len(cstep.Requires()) == 0 { - wg.Add(1) - go func(step Step) { - defer wg.Done() - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { + } else { + outputChannel <- flowResult + } + + // "Sign off". + wg.Done() + }(id) + } + + // Wait for all go routines to finish. + wg.Wait() + + // Await all leaf vertex results and stuff them into a slice. + resultCount := cap(outputChannel) + results := make([]FlowResult, resultCount) + for i := 0; i < resultCount; i++ { + results[i] = <-outputChannel + } + + /* + go func() { + for idx := range steps { + for nidx := range steps[idx] { + wStatus := &codec.Frame{} + if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { + cherr <- werr + return + } + if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { + chstatus <- status + return + } + if w.opts.Logger.V(logger.TraceLevel) { + w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) + } + cstep := steps[idx][nidx] + // nolint: nestif + if len(cstep.Requires()) == 0 { + wg.Add(1) + go func(step Step) { + defer wg.Done() + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { + cherr <- werr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + cherr <- werr + return + } + rsp, serr := step.Execute(nctx, req, nopts...) + if serr != nil { + step.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + cherr <- serr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + return + } + }(cstep) + wg.Wait() + } else { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { cherr <- werr return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { cherr <- werr return } - rsp, serr := step.Execute(nctx, req, nopts...) + rsp, serr := cstep.Execute(nctx, req, nopts...) if serr != nil { - step.SetStatus(StatusFailure) - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + cstep.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { cherr <- werr return } - }(cstep) - wg.Wait() - } else { - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { - cherr <- werr - return - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { - cherr <- werr - return - } - rsp, serr := cstep.Execute(nctx, req, nopts...) - if serr != nil { - cstep.SetStatus(StatusFailure) - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) - } - cherr <- serr - return - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) - cherr <- werr - return - } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - cherr <- werr - return } } } - } - close(done) - }() + close(done) + }() - if options.Async { - return eid, nil - } - - logger.Tracef(ctx, "wait for finish or error") - select { - case <-nctx.Done(): - err = nctx.Err() - case cerr := <-cherr: - err = cerr - case <-done: - close(cherr) - case <-chstatus: - close(chstatus) - return eid, nil - } - - switch { - case nctx.Err() != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + if options.Async { + return eid, nil } - case err == nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) - } - case err != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) - } - } - return eid, err + logger.Tracef(ctx, "wait for finish or error") + select { + case <-nctx.Done(): + err = nctx.Err() + case cerr := <-cherr: + err = cerr + case <-done: + close(cherr) + case <-chstatus: + close(chstatus) + return eid, nil + } + + switch { + case nctx.Err() != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + case err == nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + case err != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + } + */ + return err } // NewFlow create new flow diff --git a/flow/flow.go b/flow/flow.go index 0124db3b..2930701b 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -125,8 +125,6 @@ type Workflow interface { AppendSteps(steps ...Step) error // Status returns workflow status Status() Status - // Steps returns steps slice where parallel steps returned on the same level - Steps() ([][]Step, error) // Suspend suspends execution Suspend(ctx context.Context, id string) error // Resume resumes execution