flow: improve steps handling

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-14 17:12:54 +03:00
parent 5b9c810653
commit 42800fa247
2 changed files with 133 additions and 48 deletions

View File

@ -9,6 +9,8 @@ import (
"github.com/silas/dag"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/store"
)
type microFlow struct {
@ -28,18 +30,118 @@ func (w *microWorkflow) ID() string {
return w.id
}
func (w *microWorkflow) Steps() [][]Step {
return nil
func (w *microWorkflow) Steps() ([][]Step, error) {
return w.getSteps("", false)
}
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
w.Lock()
for _, s := range steps {
w.steps[s.String()] = s
w.g.Add(s)
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil
}
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error {
// TODO: handle case when some step requires or required by removed step
w.Lock()
for _, s := range steps {
delete(w.steps, s.String())
w.g.Remove(s)
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil
}
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
var steps [][]Step
var root dag.Vertex
var err error
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
if start != "" {
var ok bool
w.RLock()
root, ok = w.steps[start]
w.RUnlock()
if !ok {
return nil, ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return nil, err
}
}
if reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil {
return nil, err
}
return steps, nil
}
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) {
w.Lock()
if !w.init {
@ -58,72 +160,55 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
}
options := NewExecuteOptions(opts...)
var steps [][]Step
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
var root dag.Vertex
if options.Start != "" {
var ok bool
w.RLock()
root, ok = w.steps[options.Start]
w.RUnlock()
if !ok {
return "", ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return "", err
}
}
if options.Reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
steps, err := w.getSteps(options.Start, options.Reverse)
if err != nil {
return "", err
}
var wg sync.WaitGroup
cherr := make(chan error, 1)
defer close(cherr)
nctx, cancel := context.WithCancel(ctx)
defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
nopts = append(nopts,
ExecuteClient(w.opts.Client),
ExecuteTracer(w.opts.Tracer),
ExecuteLogger(w.opts.Logger),
ExecuteMeter(w.opts.Meter),
ExecuteStore(store.NewNamespaceStore(w.opts.Store, uid.String())),
)
nopts = append(nopts, opts...)
done := make(chan struct{})
go func() {
for idx := range steps {
wg.Add(len(steps[idx]))
for nidx := range steps[idx] {
if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
}
wg.Add(1)
go func(step Step) {
defer wg.Done()
if err = step.Execute(nctx, req, nopts...); err != nil {
cherr <- err
if serr := step.Execute(nctx, req, nopts...); serr != nil {
cherr <- serr
cancel()
}
}(steps[idx][nidx])
}
wg.Wait()
}
cherr <- nil
close(done)
}()
err = <-cherr
logger.Tracef(ctx, "wait for finish or error")
select {
case <-nctx.Done():
err = nctx.Err()
case cerr := <-cherr:
err = cerr
case <-done:
close(cherr)
}
return uid.String(), err
}
@ -308,9 +393,9 @@ func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ..
return nil
}
func NewCallStep(service string, method string, opts ...StepOption) Step {
func NewCallStep(service string, name string, method string, opts ...StepOption) Step {
options := NewStepOptions(opts...)
return &microCallStep{service: service, method: method, opts: options}
return &microCallStep{service: service, method: name + "." + method, opts: options}
}
func NewPublishStep(topic string, opts ...StepOption) Step {

View File

@ -33,7 +33,7 @@ type Workflow interface {
// ID returns id of the workflow
ID() string
// Steps returns steps slice where parallel steps returned on the same level
Steps() [][]Step
Steps() ([][]Step, error)
// Execute workflow with args, return execution id and error
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error)
// RemoveSteps remove steps from workflow