From 42800fa2478026c838db600cac016591f44a2c9f Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 14 Jul 2021 17:12:54 +0300 Subject: [PATCH] flow: improve steps handling Signed-off-by: Vasiliy Tolstov --- flow/default.go | 179 +++++++++++++++++++++++++++++++++++------------- flow/flow.go | 2 +- 2 files changed, 133 insertions(+), 48 deletions(-) diff --git a/flow/default.go b/flow/default.go index e9c53a56..b1f98fb4 100644 --- a/flow/default.go +++ b/flow/default.go @@ -9,6 +9,8 @@ import ( "github.com/silas/dag" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/store" ) type microFlow struct { @@ -28,18 +30,118 @@ func (w *microWorkflow) ID() string { return w.id } -func (w *microWorkflow) Steps() [][]Step { - return nil +func (w *microWorkflow) Steps() ([][]Step, error) { + return w.getSteps("", false) } func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { + w.Lock() + + for _, s := range steps { + w.steps[s.String()] = s + w.g.Add(s) + } + + for _, dst := range steps { + for _, req := range dst.Requires() { + src, ok := w.steps[req] + if !ok { + return ErrStepNotExists + } + w.g.Connect(dag.BasicEdge(src, dst)) + } + } + + if err := w.g.Validate(); err != nil { + w.Unlock() + return err + } + + w.g.TransitiveReduction() + + w.Unlock() + return nil } func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { + // TODO: handle case when some step requires or required by removed step + + w.Lock() + + for _, s := range steps { + delete(w.steps, s.String()) + w.g.Remove(s) + } + + for _, dst := range steps { + for _, req := range dst.Requires() { + src, ok := w.steps[req] + if !ok { + return ErrStepNotExists + } + w.g.Connect(dag.BasicEdge(src, dst)) + } + } + + if err := w.g.Validate(); err != nil { + w.Unlock() + return err + } + + w.g.TransitiveReduction() + + w.Unlock() + return nil } +func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { + var steps [][]Step + var root dag.Vertex + var err error + + fn := func(n dag.Vertex, idx int) error { + if idx == 0 { + steps = make([][]Step, 1) + steps[0] = make([]Step, 0, 1) + } else if idx >= len(steps) { + tsteps := make([][]Step, idx+1) + copy(tsteps, steps) + steps = tsteps + steps[idx] = make([]Step, 0, 1) + } + steps[idx] = append(steps[idx], n.(Step)) + return nil + } + + if start != "" { + var ok bool + w.RLock() + root, ok = w.steps[start] + w.RUnlock() + if !ok { + return nil, ErrStepNotExists + } + } else { + root, err = w.g.Root() + if err != nil { + return nil, err + } + } + + if reverse { + err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) + } else { + err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) + } + if err != nil { + return nil, err + } + + return steps, nil +} + func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { w.Lock() if !w.init { @@ -58,72 +160,55 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex } options := NewExecuteOptions(opts...) - var steps [][]Step - fn := func(n dag.Vertex, idx int) error { - if idx == 0 { - steps = make([][]Step, 1) - steps[0] = make([]Step, 0, 1) - } else if idx >= len(steps) { - tsteps := make([][]Step, idx+1) - copy(tsteps, steps) - steps = tsteps - steps[idx] = make([]Step, 0, 1) - } - steps[idx] = append(steps[idx], n.(Step)) - return nil - } - - var root dag.Vertex - if options.Start != "" { - var ok bool - w.RLock() - root, ok = w.steps[options.Start] - w.RUnlock() - if !ok { - return "", ErrStepNotExists - } - } else { - root, err = w.g.Root() - if err != nil { - return "", err - } - } - if options.Reverse { - err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) - } else { - err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) - } + steps, err := w.getSteps(options.Start, options.Reverse) if err != nil { return "", err } var wg sync.WaitGroup cherr := make(chan error, 1) - defer close(cherr) nctx, cancel := context.WithCancel(ctx) defer cancel() nopts := make([]ExecuteOption, 0, len(opts)+5) - nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store)) - + nopts = append(nopts, + ExecuteClient(w.opts.Client), + ExecuteTracer(w.opts.Tracer), + ExecuteLogger(w.opts.Logger), + ExecuteMeter(w.opts.Meter), + ExecuteStore(store.NewNamespaceStore(w.opts.Store, uid.String())), + ) + nopts = append(nopts, opts...) + done := make(chan struct{}) go func() { for idx := range steps { - wg.Add(len(steps[idx])) for nidx := range steps[idx] { + if w.opts.Logger.V(logger.TraceLevel) { + w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) + } + wg.Add(1) go func(step Step) { defer wg.Done() - if err = step.Execute(nctx, req, nopts...); err != nil { - cherr <- err + if serr := step.Execute(nctx, req, nopts...); serr != nil { + cherr <- serr cancel() } }(steps[idx][nidx]) } wg.Wait() } - cherr <- nil + close(done) }() - err = <-cherr + logger.Tracef(ctx, "wait for finish or error") + select { + case <-nctx.Done(): + err = nctx.Err() + case cerr := <-cherr: + err = cerr + case <-done: + close(cherr) + } return uid.String(), err } @@ -308,9 +393,9 @@ func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts .. return nil } -func NewCallStep(service string, method string, opts ...StepOption) Step { +func NewCallStep(service string, name string, method string, opts ...StepOption) Step { options := NewStepOptions(opts...) - return µCallStep{service: service, method: method, opts: options} + return µCallStep{service: service, method: name + "." + method, opts: options} } func NewPublishStep(topic string, opts ...StepOption) Step { diff --git a/flow/flow.go b/flow/flow.go index 54f052df..72773b5a 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -33,7 +33,7 @@ type Workflow interface { // ID returns id of the workflow ID() string // Steps returns steps slice where parallel steps returned on the same level - Steps() [][]Step + Steps() ([][]Step, error) // Execute workflow with args, return execution id and error Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) // RemoveSteps remove steps from workflow