diff --git a/flow/default.go b/flow/default.go index 57f0b8e9..67a4225a 100644 --- a/flow/default.go +++ b/flow/default.go @@ -6,7 +6,7 @@ import ( "path/filepath" "sync" - "github.com/silas/dag" + "github.com/heimdalr/dag" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" @@ -21,7 +21,7 @@ type microFlow struct { type microWorkflow struct { opts Options - g *dag.AcyclicGraph + g *dag.DAG steps map[string]Step id string status Status @@ -42,29 +42,32 @@ func (w *microWorkflow) Status() Status { } func (w *microWorkflow) AppendSteps(steps ...Step) error { + var err error w.Lock() for _, s := range steps { w.steps[s.String()] = s - w.g.Add(s) + if _, err = w.g.AddVertex(s); err != nil { + w.Unlock() + return err + } } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { + w.Unlock() return ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + if err = w.g.AddEdge(src.String(), dst.String()); err != nil { + w.Unlock() + return err + } } } - if err := w.g.Validate(); err != nil { - w.Unlock() - return err - } - - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.Unlock() @@ -78,32 +81,28 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { for _, s := range steps { delete(w.steps, s.String()) - w.g.Remove(s) + w.g.DeleteVertex(s.String()) } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { + w.Unlock() return ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + w.g.AddEdge(src.String(), dst.String()) } } - if err := w.g.Validate(); err != nil { - w.Unlock() - return err - } - - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.Unlock() return nil } -func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { +func (w *microWorkflow) getSteps(start string) ([][]Step, error) { var steps [][]Step var root dag.Vertex var err error @@ -137,11 +136,8 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { } } - if reverse { - err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) - } else { - err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) - } + err = w.g.SortedDepthFirstWalk(root, fn) + if err != nil { return nil, err } @@ -167,11 +163,7 @@ func (w *microWorkflow) Resume(ctx context.Context, id string) error { func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { w.Lock() if !w.init { - if err := w.g.Validate(); err != nil { - w.Unlock() - return "", err - } - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.init = true } w.Unlock() @@ -186,7 +178,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu options := NewExecuteOptions(opts...) - steps, err := w.getSteps(options.Start, options.Reverse) + steps, err := w.getSteps(options.Start) if err != nil { if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) @@ -386,11 +378,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { } func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { - w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} + w := µWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))} for _, s := range steps { w.steps[s.String()] = s - w.g.Add(s) + w.g.AddVertex(s) } for _, dst := range steps { @@ -399,14 +391,11 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step if !ok { return nil, ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + w.g.AddEdge(src.String(), dst.String()) } } - if err := w.g.Validate(); err != nil { - return nil, err - } - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.init = true diff --git a/flow/options.go b/flow/options.go index 2e03b561..a8901471 100644 --- a/flow/options.go +++ b/flow/options.go @@ -123,8 +123,6 @@ type ExecuteOptions struct { Start string // Timeout for execution Timeout time.Duration - // Reverse execution - Reverse bool // Async enables async execution Async bool } @@ -167,13 +165,6 @@ func ExecuteContext(ctx context.Context) ExecuteOption { } } -// ExecuteReverse says that dag must be run in reverse order -func ExecuteReverse(b bool) ExecuteOption { - return func(o *ExecuteOptions) { - o.Reverse = b - } -} - // ExecuteTimeout pass timeout time.Duration for execution func ExecuteTimeout(td time.Duration) ExecuteOption { return func(o *ExecuteOptions) { diff --git a/go.mod b/go.mod index b0b4ccf1..575137ce 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,15 @@ module go.unistack.org/micro/v3 go 1.16 require ( - github.com/google/gnostic v0.6.8 // indirect + github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.7 // indirect + github.com/heimdalr/dag v1.2.1 github.com/imdario/mergo v0.3.13 github.com/kr/pretty v0.2.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 + github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 go.unistack.org/micro-proto/v3 v3.2.7 google.golang.org/protobuf v1.28.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c4a660fa..3e9f86c1 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -22,6 +24,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M= +github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -39,8 +43,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/gnostic v0.6.6/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= -github.com/google/gnostic v0.6.8 h1:bT56GPYBWh1tvBuBEd94qcS3+60b+y0HQur0ITkGuCk= -github.com/google/gnostic v0.6.8/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= +github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0= +github.com/google/gnostic v0.6.9/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -49,8 +53,11 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/heimdalr/dag v1.2.1 h1:XJOMaoWqJK1UKdp+4zaO2uwav9GFbHMGCirdViKMRIQ= +github.com/heimdalr/dag v1.2.1/go.mod h1:Of/wUB7Yoj4dwiOcGOOYIq6MHlPF/8/QMBKFJpwg+yc= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -65,8 +72,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTK github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -155,7 +162,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=