diff --git a/flow/context.go b/flow/context.go index 3a7e60a3..1387429f 100644 --- a/flow/context.go +++ b/flow/context.go @@ -15,15 +15,6 @@ func FromContext(ctx context.Context) (Flow, bool) { return c, ok } -// MustContext returns Flow from context -func MustContext(ctx context.Context) Flow { - f, ok := FromContext(ctx) - if !ok { - panic("missing flow") - } - return f -} - // NewContext stores Flow to context func NewContext(ctx context.Context, f Flow) context.Context { if ctx == nil { diff --git a/flow/context_test.go b/flow/context_test.go index 03262697..8821df07 100644 --- a/flow/context_test.go +++ b/flow/context_test.go @@ -1,3 +1,5 @@ +//go:build ignore + package flow import ( diff --git a/flow/default.go b/flow/default.go index 73362650..72d9230b 100644 --- a/flow/default.go +++ b/flow/default.go @@ -1,14 +1,16 @@ +//go:build ignore + package flow import ( "context" "fmt" + "path/filepath" "sync" - "github.com/silas/dag" + "github.com/heimdalr/dag" "go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/store" "go.unistack.org/micro/v4/util/id" @@ -20,7 +22,7 @@ type microFlow struct { type microWorkflow struct { opts Options - g *dag.AcyclicGraph + g *dag.DAG steps map[string]Step id string status Status @@ -32,20 +34,20 @@ func (w *microWorkflow) ID() string { return w.id } -func (w *microWorkflow) Steps() ([][]Step, error) { - return w.getSteps("", false) -} - func (w *microWorkflow) Status() Status { return w.status } func (w *microWorkflow) AppendSteps(steps ...Step) error { + var err error w.Lock() + defer w.Unlock() for _, s := range steps { w.steps[s.String()] = s - w.g.Add(s) + if _, err = w.g.AddVertex(s); err != nil { + return err + } } for _, dst := range steps { @@ -54,18 +56,13 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error { if !ok { return ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + if err = w.g.AddEdge(src.String(), dst.String()); err != nil { + return err + } } } - if err := w.g.Validate(); err != nil { - w.Unlock() - return err - } - - w.g.TransitiveReduction() - - w.Unlock() + w.g.ReduceTransitively() return nil } @@ -74,10 +71,11 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { // TODO: handle case when some step requires or required by removed step w.Lock() + defer w.Unlock() for _, s := range steps { delete(w.steps, s.String()) - w.g.Remove(s) + w.g.DeleteVertex(s.String()) } for _, dst := range steps { @@ -86,91 +84,34 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error { if !ok { return ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + w.g.AddEdge(src.String(), dst.String()) } } - if err := w.g.Validate(); err != nil { - w.Unlock() - return err - } - - w.g.TransitiveReduction() - - w.Unlock() + w.g.ReduceTransitively() return nil } -func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { - var steps [][]Step - var root dag.Vertex - var err error - - fn := func(n dag.Vertex, idx int) error { - if idx == 0 { - steps = make([][]Step, 1) - steps[0] = make([]Step, 0, 1) - } else if idx >= len(steps) { - tsteps := make([][]Step, idx+1) - copy(tsteps, steps) - steps = tsteps - steps[idx] = make([]Step, 0, 1) - } - steps[idx] = append(steps[idx], n.(Step)) - return nil - } - - if start != "" { - var ok bool - w.RLock() - root, ok = w.steps[start] - w.RUnlock() - if !ok { - return nil, ErrStepNotExists - } - } else { - root, err = w.g.Root() - if err != nil { - return nil, err - } - } - - if reverse { - err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) - } else { - err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) - } - if err != nil { - return nil, err - } - - return steps, nil -} - func (w *microWorkflow) Abort(ctx context.Context, id string) error { - workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id) + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())}) } func (w *microWorkflow) Suspend(ctx context.Context, id string) error { - workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id) + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())}) } func (w *microWorkflow) Resume(ctx context.Context, id string) error { - workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id) + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}) } func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { w.Lock() if !w.init { - if err := w.g.Validate(); err != nil { - w.Unlock() - return "", err - } - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.init = true } w.Unlock() @@ -180,26 +121,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu return "", err } - stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid) - workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid) + // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) options := NewExecuteOptions(opts...) - steps, err := w.getSteps(options.Start, options.Reverse) - if err != nil { - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - w.opts.Logger.Error(w.opts.Context, "store write error", werr) - } - return "", err - } - - var wg sync.WaitGroup - cherr := make(chan error, 1) - chstatus := make(chan Status, 1) - - nctx, cancel := context.WithCancel(ctx) - defer cancel() - nopts := make([]ExecuteOption, 0, len(opts)+5) nopts = append(nopts, @@ -209,143 +135,274 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu ExecuteMeter(w.opts.Meter), ) nopts = append(nopts, opts...) - done := make(chan struct{}) - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { - w.opts.Logger.Error(w.opts.Context, "store write error", werr) + if werr := workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + w.opts.Logger.Error(ctx, "store error: %v", werr) return eid, werr } - for idx := range steps { - for nidx := range steps[idx] { - cstep := steps[idx][nidx] - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - return eid, werr + + var startID string + if options.Start == "" { + mp := w.g.GetRoots() + if len(mp) != 1 { + return eid, ErrStepNotExists + } + for k := range mp { + startID = k + } + } else { + for k, v := range w.g.GetVertices() { + if v == options.Start { + startID = k } } } - go func() { - for idx := range steps { - for nidx := range steps[idx] { - wStatus := &codec.Frame{} - if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { - cherr <- werr - return + if startID == "" { + return eid, ErrStepNotExists + } + + if options.Async { + go w.handleWorkflow(startID, nopts...) + return eid, nil + } + + return eid, w.handleWorkflow(startID, nopts...) +} + +func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error { + w.RLock() + defer w.RUnlock() + + // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + // workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) + + // Get IDs of all descendant vertices. + flowIDs, errDes := w.g.GetDescendants(startID) + if errDes != nil { + return errDes + } + + // inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex). + inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1) + + // Iterate vertex IDs and create an input channel for each of them and a single + // output channel for leaves. Note, this "pre-flight" is needed to ensure we + // really have an input channel regardless of how we traverse the tree and spawn + // workers. + leafCount := 0 + + for id := range flowIDs { + + // Get all parents of this vertex. + parents, errPar := w.g.GetParents(id) + if errPar != nil { + return errPar + } + + // Create a buffered input channel that has capacity for all parent results. + inputChannels[id] = make(chan FlowResult, len(parents)) + + if ok, err := w.g.IsLeaf(id); ok && err == nil { + leafCount += 1 + } + } + + // outputChannel caries the results of leaf vertices. + outputChannel := make(chan FlowResult, leafCount) + + // To also process the start vertex and to have its results being passed to its + // children, add it to the vertex IDs. Also add an input channel for the start + // vertex and feed the inputs to this channel. + flowIDs[startID] = struct{}{} + inputChannels[startID] = make(chan FlowResult, len(inputs)) + for _, i := range inputs { + inputChannels[startID] <- i + } + + wg := sync.WaitGroup{} + + // Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl. + // inputs and outputs) in a separate goroutine. + for id := range flowIDs { + + // Get all children of this vertex that later need to be notified. Note, we + // collect all children before the goroutine to be able to release the read + // lock as early as possible. + children, errChildren := w.g.GetChildren(id) + if errChildren != nil { + return errChildren + } + + // Remember to wait for this goroutine. + wg.Add(1) + + go func(id string) { + // Get this vertex's input channel. + // Note, only concurrent read here, which is fine. + c := inputChannels[id] + + // Await all parent inputs and stuff them into a slice. + parentCount := cap(c) + parentResults := make([]FlowResult, parentCount) + for i := 0; i < parentCount; i++ { + parentResults[i] = <-c + } + + // Execute the worker. + errWorker := callback(w.g, id, parentResults) + if errWorker != nil { + return errWorker + } + + // Send this worker's FlowResult onto all children's input channels or, if it is + // a leaf (i.e. no children), send the result onto the output channel. + if len(children) > 0 { + for child := range children { + inputChannels[child] <- flowResult } - if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { - chstatus <- status - return - } - if w.opts.Logger.V(logger.TraceLevel) { - w.opts.Logger.Trace(nctx, fmt.Sprintf("will be executed %v", steps[idx][nidx])) - } - cstep := steps[idx][nidx] - // nolint: nestif - if len(cstep.Requires()) == 0 { - wg.Add(1) - go func(step Step) { - defer wg.Done() - if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil { - cherr <- werr - return - } - if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { - cherr <- werr - return - } - rsp, serr := step.Execute(nctx, req, nopts...) - if serr != nil { - step.SetStatus(StatusFailure) - if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Error(ctx, "store write error", werr) + } else { + outputChannel <- flowResult + } + + // "Sign off". + wg.Done() + }(id) + } + + // Wait for all go routines to finish. + wg.Wait() + + // Await all leaf vertex results and stuff them into a slice. + resultCount := cap(outputChannel) + results := make([]FlowResult, resultCount) + for i := 0; i < resultCount; i++ { + results[i] = <-outputChannel + } + + /* + go func() { + for idx := range steps { + for nidx := range steps[idx] { + wStatus := &codec.Frame{} + if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { + cherr <- werr + return + } + if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { + chstatus <- status + return + } + if w.opts.Logger.V(logger.TraceLevel) { + w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) + } + cstep := steps[idx][nidx] + // nolint: nestif + if len(cstep.Requires()) == 0 { + wg.Add(1) + go func(step Step) { + defer wg.Done() + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { + cherr <- werr + return } - if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Error(ctx, "store write error", werr) + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + cherr <- werr + return + } + rsp, serr := step.Execute(nctx, req, nopts...) + if serr != nil { + step.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + cherr <- serr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + return + } + }(cstep) + wg.Wait() + } else { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { + cherr <- werr + return + } + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { + cherr <- werr + return + } + rsp, serr := cstep.Execute(nctx, req, nopts...) + if serr != nil { + cstep.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr return } - if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil { - w.opts.Logger.Error(ctx, "store write error", werr) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr return } - if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Error(ctx, "store write error", werr) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { cherr <- werr return } - }(cstep) - wg.Wait() - } else { - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil { - cherr <- werr - return - } - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { - cherr <- werr - return - } - rsp, serr := cstep.Execute(nctx, req, nopts...) - if serr != nil { - cstep.SetStatus(StatusFailure) - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Error(ctx, "store write error", werr) - } - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Error(ctx, "store write error", werr) - } - cherr <- serr - return - } - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil { - w.opts.Logger.Error(ctx, "store write error", werr) - cherr <- werr - return - } - if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - cherr <- werr - return } } } - } - close(done) - }() + close(done) + }() - if options.Async { - return eid, nil - } - - logger.DefaultLogger.Trace(ctx, "wait for finish or error") - select { - case <-nctx.Done(): - err = nctx.Err() - case cerr := <-cherr: - err = cerr - case <-done: - close(cherr) - case <-chstatus: - close(chstatus) - return eid, nil - } - - switch { - case nctx.Err() != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { - w.opts.Logger.Error(w.opts.Context, "store write error", werr) + if options.Async { + return eid, nil } - case err == nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Error(w.opts.Context, "store write error", werr) - } - case err != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { - w.opts.Logger.Error(w.opts.Context, "store write error", werr) - } - } - return eid, err + logger.Tracef(ctx, "wait for finish or error") + select { + case <-nctx.Done(): + err = nctx.Err() + case cerr := <-cherr: + err = cerr + case <-done: + close(cherr) + case <-chstatus: + close(chstatus) + return eid, nil + } + + switch { + case nctx.Err() != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + case err == nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + case err != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + } + */ + return err } // NewFlow create new flow @@ -385,11 +442,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { } func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { - w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} + w := µWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))} for _, s := range steps { w.steps[s.String()] = s - w.g.Add(s) + w.g.AddVertex(s) } for _, dst := range steps { @@ -398,14 +455,11 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step if !ok { return nil, ErrStepNotExists } - w.g.Connect(dag.BasicEdge(src, dst)) + w.g.AddEdge(src.String(), dst.String()) } } - if err := w.g.Validate(); err != nil { - return nil, err - } - w.g.TransitiveReduction() + w.g.ReduceTransitively() w.init = true diff --git a/flow/flow.go b/flow/flow.go index 4adf023d..e4812405 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,5 +1,5 @@ // Package flow is an interface used for saga pattern microservice workflow -package flow +package flow // import "go.unistack.org/micro/v4/flow" import ( "context" @@ -125,8 +125,6 @@ type Workflow interface { AppendSteps(steps ...Step) error // Status returns workflow status Status() Status - // Steps returns steps slice where parallel steps returned on the same level - Steps() ([][]Step, error) // Suspend suspends execution Suspend(ctx context.Context, id string) error // Resume resumes execution diff --git a/flow/options.go b/flow/options.go index a40f3abd..d1e1889b 100644 --- a/flow/options.go +++ b/flow/options.go @@ -123,8 +123,6 @@ type ExecuteOptions struct { Start string // Timeout for execution Timeout time.Duration - // Reverse execution - Reverse bool // Async enables async execution Async bool } @@ -167,13 +165,6 @@ func ExecuteContext(ctx context.Context) ExecuteOption { } } -// ExecuteReverse says that dag must be run in reverse order -func ExecuteReverse(b bool) ExecuteOption { - return func(o *ExecuteOptions) { - o.Reverse = b - } -} - // ExecuteTimeout pass timeout time.Duration for execution func ExecuteTimeout(td time.Duration) ExecuteOption { return func(o *ExecuteOptions) { diff --git a/go.mod b/go.mod index b3aded58..138c6b06 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/KimMachineGun/automemlimit v0.7.0 github.com/ash3in/uuidv8 v1.2.0 github.com/google/uuid v1.6.0 + github.com/heimdalr/dag v1.5.0 github.com/matoous/go-nanoid v1.5.1 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 @@ -22,6 +23,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index 71ab9d91..626b20e0 100644 --- a/go.sum +++ b/go.sum @@ -9,12 +9,19 @@ github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0d github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= +github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= +github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/heimdalr/dag v1.5.0 h1:hqVtijvY776P5OKP3QbdVBRt3Xxq6BYopz3XgklsGvo= +github.com/heimdalr/dag v1.5.0/go.mod h1:lthekrHl01dddmzqyBQ1YZbi7XcVGGzjFo0jIky5knc= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=