WIP: flow rewrite #138
@ -1,3 +1,5 @@
|
||||
//go:build ignore
|
||||
|
||||
package flow
|
||||
|
||||
import (
|
||||
|
21
flow/dag.go
21
flow/dag.go
@ -1,21 +0,0 @@
|
||||
package flow
|
||||
|
||||
type node struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (n *node) ID() string {
|
||||
return n.name
|
||||
}
|
||||
|
||||
func (n *node) Name() string {
|
||||
return n.name
|
||||
}
|
||||
|
||||
func (n *node) String() string {
|
||||
return n.name
|
||||
}
|
||||
|
||||
func (n *node) Hashcode() interface{} {
|
||||
return n.name
|
||||
}
|
@ -1,68 +0,0 @@
|
||||
package flow
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/silas/dag"
|
||||
)
|
||||
|
||||
func checkErr(t *testing.T, err error) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDag(t *testing.T) {
|
||||
d1 := &dag.AcyclicGraph{}
|
||||
d2 := &dag.AcyclicGraph{}
|
||||
d2v1 := d2.Add(&node{"Substep.Create"})
|
||||
v1 := d1.Add(&node{"AccountService.Create"})
|
||||
v2 := d1.Add(&node{"AuthzService.Create"})
|
||||
v3 := d1.Add(&node{"AuthnService.Create"})
|
||||
v4 := d1.Add(&node{"ProjectService.Create"})
|
||||
v5 := d1.Add(&node{"ContactService.Create"})
|
||||
v6 := d1.Add(&node{"NetworkService.Create"})
|
||||
v7 := d1.Add(&node{"MailerService.Create"})
|
||||
v8 := d1.Add(&node{"NestedService.Create"})
|
||||
v9 := d1.Add(d2v1)
|
||||
d1.Connect(dag.BasicEdge(v1, v2))
|
||||
d1.Connect(dag.BasicEdge(v1, v3))
|
||||
d1.Connect(dag.BasicEdge(v1, v4))
|
||||
d1.Connect(dag.BasicEdge(v1, v5))
|
||||
d1.Connect(dag.BasicEdge(v1, v6))
|
||||
d1.Connect(dag.BasicEdge(v1, v7))
|
||||
d1.Connect(dag.BasicEdge(v7, v8))
|
||||
d1.Connect(dag.BasicEdge(v8, v9))
|
||||
|
||||
if err := d1.Validate(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
d1.TransitiveReduction()
|
||||
|
||||
var steps [][]string
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]string, 1)
|
||||
steps[0] = make([]string, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]string, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]string, 0, 1)
|
||||
}
|
||||
steps[idx] = append(steps[idx], fmt.Sprintf("%s", n))
|
||||
return nil
|
||||
}
|
||||
|
||||
start := &node{"AccountService.Create"}
|
||||
err := d1.SortedDepthFirstWalk([]dag.Vertex{start}, fn)
|
||||
checkErr(t, err)
|
||||
if len(steps) != 4 {
|
||||
t.Fatalf("invalid steps: %#+v", steps)
|
||||
}
|
||||
if steps[3][0] != "Substep.Create" {
|
||||
t.Fatalf("invalid last step: %#+v", steps)
|
||||
}
|
||||
}
|
221
flow/default.go
221
flow/default.go
@ -33,10 +33,6 @@ func (w *microWorkflow) ID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Steps() ([][]Step, error) {
|
||||
return w.getSteps("", false)
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Status() Status {
|
||||
return w.status
|
||||
}
|
||||
@ -44,11 +40,11 @@ func (w *microWorkflow) Status() Status {
|
||||
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||
var err error
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
for _, s := range steps {
|
||||
w.steps[s.String()] = s
|
||||
if _, err = w.g.AddVertex(s); err != nil {
|
||||
w.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -57,11 +53,9 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||
for _, req := range dst.Requires() {
|
||||
src, ok := w.steps[req]
|
||||
if !ok {
|
||||
w.Unlock()
|
||||
return ErrStepNotExists
|
||||
}
|
||||
if err = w.g.AddEdge(src.String(), dst.String()); err != nil {
|
||||
w.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -69,8 +63,6 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||
|
||||
w.g.ReduceTransitively()
|
||||
|
||||
w.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -78,6 +70,7 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||
// TODO: handle case when some step requires or required by removed step
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
for _, s := range steps {
|
||||
delete(w.steps, s.String())
|
||||
@ -88,7 +81,6 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||
for _, req := range dst.Requires() {
|
||||
src, ok := w.steps[req]
|
||||
if !ok {
|
||||
w.Unlock()
|
||||
return ErrStepNotExists
|
||||
}
|
||||
w.g.AddEdge(src.String(), dst.String())
|
||||
@ -97,54 +89,9 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||
|
||||
w.g.ReduceTransitively()
|
||||
|
||||
w.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) getSteps(start string) ([][]Step, error) {
|
||||
var steps [][]Step
|
||||
var root dag.Vertex
|
||||
var err error
|
||||
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]Step, 1)
|
||||
steps[0] = make([]Step, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]Step, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]Step, 0, 1)
|
||||
}
|
||||
steps[idx] = append(steps[idx], n.(Step))
|
||||
return nil
|
||||
}
|
||||
|
||||
if start != "" {
|
||||
var ok bool
|
||||
w.RLock()
|
||||
root, ok = w.steps[start]
|
||||
w.RUnlock()
|
||||
if !ok {
|
||||
return nil, ErrStepNotExists
|
||||
}
|
||||
} else {
|
||||
root, err = w.g.Root()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = w.g.SortedDepthFirstWalk(root, fn)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return steps, nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
||||
@ -173,26 +120,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
||||
return "", err
|
||||
}
|
||||
|
||||
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
|
||||
options := NewExecuteOptions(opts...)
|
||||
|
||||
steps, err := w.getSteps(options.Start)
|
||||
if err != nil {
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
cherr := make(chan error, 1)
|
||||
chstatus := make(chan Status, 1)
|
||||
|
||||
nctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
||||
|
||||
nopts = append(nopts,
|
||||
@ -202,21 +134,152 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
||||
ExecuteMeter(w.opts.Meter),
|
||||
)
|
||||
nopts = append(nopts, opts...)
|
||||
done := make(chan struct{})
|
||||
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
return eid, werr
|
||||
}
|
||||
for idx := range steps {
|
||||
for nidx := range steps[idx] {
|
||||
cstep := steps[idx][nidx]
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||
return eid, werr
|
||||
|
||||
var startID string
|
||||
if options.Start == "" {
|
||||
mp := w.g.GetRoots()
|
||||
if len(mp) != 1 {
|
||||
return eid, ErrStepNotExists
|
||||
}
|
||||
for k := range mp {
|
||||
startID = k
|
||||
}
|
||||
} else {
|
||||
for k, v := range w.g.GetVertices() {
|
||||
if v == options.Start {
|
||||
startID = k
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if startID == "" {
|
||||
return eid, ErrStepNotExists
|
||||
}
|
||||
|
||||
if options.Async {
|
||||
go w.handleWorkflow(startID, nopts...)
|
||||
return eid, nil
|
||||
}
|
||||
|
||||
return eid, w.handleWorkflow(startID, nopts...)
|
||||
}
|
||||
|
||||
func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
||||
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||
// workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
|
||||
// Get IDs of all descendant vertices.
|
||||
flowIDs, errDes := w.g.GetDescendants(startID)
|
||||
if errDes != nil {
|
||||
return errDes
|
||||
}
|
||||
|
||||
// inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex).
|
||||
inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1)
|
||||
|
||||
// Iterate vertex IDs and create an input channel for each of them and a single
|
||||
// output channel for leaves. Note, this "pre-flight" is needed to ensure we
|
||||
// really have an input channel regardless of how we traverse the tree and spawn
|
||||
// workers.
|
||||
leafCount := 0
|
||||
|
||||
for id := range flowIDs {
|
||||
|
||||
// Get all parents of this vertex.
|
||||
parents, errPar := w.g.GetParents(id)
|
||||
if errPar != nil {
|
||||
return errPar
|
||||
}
|
||||
|
||||
// Create a buffered input channel that has capacity for all parent results.
|
||||
inputChannels[id] = make(chan FlowResult, len(parents))
|
||||
|
||||
if w.g.isLeaf(id) {
|
||||
leafCount += 1
|
||||
}
|
||||
}
|
||||
|
||||
// outputChannel caries the results of leaf vertices.
|
||||
outputChannel := make(chan FlowResult, leafCount)
|
||||
|
||||
// To also process the start vertex and to have its results being passed to its
|
||||
// children, add it to the vertex IDs. Also add an input channel for the start
|
||||
// vertex and feed the inputs to this channel.
|
||||
flowIDs[startID] = struct{}{}
|
||||
inputChannels[startID] = make(chan FlowResult, len(inputs))
|
||||
for _, i := range inputs {
|
||||
inputChannels[startID] <- i
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
// Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl.
|
||||
// inputs and outputs) in a separate goroutine.
|
||||
for id := range flowIDs {
|
||||
|
||||
// Get all children of this vertex that later need to be notified. Note, we
|
||||
// collect all children before the goroutine to be able to release the read
|
||||
// lock as early as possible.
|
||||
children, errChildren := w.g.GetChildren(id)
|
||||
if errChildren != nil {
|
||||
return errChildren
|
||||
}
|
||||
|
||||
// Remember to wait for this goroutine.
|
||||
wg.Add(1)
|
||||
|
||||
go func(id string) {
|
||||
// Get this vertex's input channel.
|
||||
// Note, only concurrent read here, which is fine.
|
||||
c := inputChannels[id]
|
||||
|
||||
// Await all parent inputs and stuff them into a slice.
|
||||
parentCount := cap(c)
|
||||
parentResults := make([]FlowResult, parentCount)
|
||||
for i := 0; i < parentCount; i++ {
|
||||
parentResults[i] = <-c
|
||||
}
|
||||
|
||||
// Execute the worker.
|
||||
errWorker := callback(w.g, id, parentResults)
|
||||
if errWorker != nil {
|
||||
return errWorker
|
||||
}
|
||||
|
||||
// Send this worker's FlowResult onto all children's input channels or, if it is
|
||||
// a leaf (i.e. no children), send the result onto the output channel.
|
||||
if len(children) > 0 {
|
||||
for child := range children {
|
||||
inputChannels[child] <- flowResult
|
||||
}
|
||||
} else {
|
||||
outputChannel <- flowResult
|
||||
}
|
||||
|
||||
// "Sign off".
|
||||
wg.Done()
|
||||
}(id)
|
||||
}
|
||||
|
||||
// Wait for all go routines to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Await all leaf vertex results and stuff them into a slice.
|
||||
resultCount := cap(outputChannel)
|
||||
results := make([]FlowResult, resultCount)
|
||||
for i := 0; i < resultCount; i++ {
|
||||
results[i] = <-outputChannel
|
||||
}
|
||||
|
||||
/*
|
||||
go func() {
|
||||
for idx := range steps {
|
||||
for nidx := range steps[idx] {
|
||||
@ -337,8 +400,8 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
}
|
||||
|
||||
return eid, err
|
||||
*/
|
||||
return err
|
||||
}
|
||||
|
||||
// NewFlow create new flow
|
||||
|
@ -125,8 +125,6 @@ type Workflow interface {
|
||||
AppendSteps(steps ...Step) error
|
||||
// Status returns workflow status
|
||||
Status() Status
|
||||
// Steps returns steps slice where parallel steps returned on the same level
|
||||
Steps() ([][]Step, error)
|
||||
// Suspend suspends execution
|
||||
Suspend(ctx context.Context, id string) error
|
||||
// Resume resumes execution
|
||||
|
Loading…
Reference in New Issue
Block a user