initial rewrite

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2022-06-29 22:54:06 +03:00
parent 5953b5aae6
commit a9d9daa331
4 changed files with 43 additions and 53 deletions

View File

@@ -6,7 +6,7 @@ import (
"path/filepath"
"sync"
"github.com/silas/dag"
"github.com/heimdalr/dag"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
@@ -21,7 +21,7 @@ type microFlow struct {
type microWorkflow struct {
opts Options
g *dag.AcyclicGraph
g *dag.DAG
steps map[string]Step
id string
status Status
@@ -42,29 +42,32 @@ func (w *microWorkflow) Status() Status {
}
func (w *microWorkflow) AppendSteps(steps ...Step) error {
var err error
w.Lock()
for _, s := range steps {
w.steps[s.String()] = s
w.g.Add(s)
if _, err = w.g.AddVertex(s); err != nil {
w.Unlock()
return err
}
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
w.Unlock()
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
if err = w.g.AddEdge(src.String(), dst.String()); err != nil {
w.Unlock()
return err
}
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.g.ReduceTransitively()
w.Unlock()
@@ -78,32 +81,28 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
for _, s := range steps {
delete(w.steps, s.String())
w.g.Remove(s)
w.g.DeleteVertex(s.String())
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
w.Unlock()
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
w.g.AddEdge(src.String(), dst.String())
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.g.ReduceTransitively()
w.Unlock()
return nil
}
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
func (w *microWorkflow) getSteps(start string) ([][]Step, error) {
var steps [][]Step
var root dag.Vertex
var err error
@@ -137,11 +136,8 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
}
}
if reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
err = w.g.SortedDepthFirstWalk(root, fn)
if err != nil {
return nil, err
}
@@ -167,11 +163,7 @@ func (w *microWorkflow) Resume(ctx context.Context, id string) error {
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
w.Lock()
if !w.init {
if err := w.g.Validate(); err != nil {
w.Unlock()
return "", err
}
w.g.TransitiveReduction()
w.g.ReduceTransitively()
w.init = true
}
w.Unlock()
@@ -186,7 +178,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
options := NewExecuteOptions(opts...)
steps, err := w.getSteps(options.Start, options.Reverse)
steps, err := w.getSteps(options.Start)
if err != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
@@ -386,11 +378,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
}
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
w := &microWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
w := &microWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))}
for _, s := range steps {
w.steps[s.String()] = s
w.g.Add(s)
w.g.AddVertex(s)
}
for _, dst := range steps {
@@ -399,14 +391,11 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step
if !ok {
return nil, ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
w.g.AddEdge(src.String(), dst.String())
}
}
if err := w.g.Validate(); err != nil {
return nil, err
}
w.g.TransitiveReduction()
w.g.ReduceTransitively()
w.init = true

View File

@@ -123,8 +123,6 @@ type ExecuteOptions struct {
Start string
// Timeout for execution
Timeout time.Duration
// Reverse execution
Reverse bool
// Async enables async execution
Async bool
}
@@ -167,13 +165,6 @@ func ExecuteContext(ctx context.Context) ExecuteOption {
}
}
// ExecuteReverse says that dag must be run in reverse order
func ExecuteReverse(b bool) ExecuteOption {
return func(o *ExecuteOptions) {
o.Reverse = b
}
}
// ExecuteTimeout pass timeout time.Duration for execution
func ExecuteTimeout(td time.Duration) ExecuteOption {
return func(o *ExecuteOptions) {