From f22647d108c20b6740de68be346ac2c599ca212b Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 30 Jun 2021 05:03:06 +0300 Subject: [PATCH] add async Signed-off-by: Vasiliy Tolstov --- flow/default.go | 137 +++++++++++++++++++++++++++++++++++++----------- flow/flow.go | 20 +++---- flow/options.go | 45 ++++++++++++++-- 3 files changed, 156 insertions(+), 46 deletions(-) diff --git a/flow/default.go b/flow/default.go index 66786adf..ec526f76 100644 --- a/flow/default.go +++ b/flow/default.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/google/uuid" "github.com/silas/dag" @@ -18,6 +19,8 @@ type microWorkflow struct { g *dag.AcyclicGraph init bool sync.RWMutex + opts Options + steps map[string]Step } func (w *microWorkflow) ID() string { @@ -53,29 +56,63 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex return "", err } - var steps [][]string + options := NewExecuteOptions(opts...) + var steps [][]Step fn := func(n dag.Vertex, idx int) error { if idx == 0 { - steps = make([][]string, 1) - steps[0] = make([]string, 0, 1) + steps = make([][]Step, 1) + steps[0] = make([]Step, 0, 1) } else if idx >= len(steps) { - tsteps := make([][]string, idx+1) + tsteps := make([][]Step, idx+1) copy(tsteps, steps) steps = tsteps - steps[idx] = make([]string, 0, 1) + steps[idx] = make([]Step, 0, 1) } - steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) + steps[idx] = append(steps[idx], n.(Step)) return nil } - w.RLock() - err = w.g.SortedDepthFirstWalk([]dag.Vertex{start}, fn) - w.RUnlock() - + var root dag.Vertex + if options.Start != "" { + var ok bool + w.RLock() + root, ok = w.steps[options.Start] + w.RUnlock() + if !ok { + return "", ErrStepNotExists + } + } else { + root, err = w.g.Root() + if err != nil { + return "", err + } + } + err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) if err != nil { return "", err } + var wg sync.WaitGroup + cherr := make(chan error) + + select { + case err = <-cherr: + return "", err + default: + for idx := range steps { + wg.Add(len(steps[idx])) + for nidx := range steps[idx] { + go func(step Step) { + if err = step.Execute(ctx, req, opts...); err != nil { + cherr <- err + } + wg.Done() + }(steps[idx][nidx]) + } + wg.Wait() + } + } + return uid.String(), nil } @@ -115,14 +152,20 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { } func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { - w := µWorkflow{id: id, g: &dag.AcyclicGraph{}} + w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} for _, s := range steps { - w.g.Add(s.Options().ID) + w.steps[s.String()] = s + w.g.Add(s) } - for _, s := range steps { - for _, req := range s.Requires() { - w.g.Connect(dag.BasicEdge(s, req)) + + for _, dst := range steps { + for _, req := range dst.Requires() { + src, ok := w.steps[req] + if !ok { + return nil, ErrStepNotExists + } + w.g.Connect(dag.BasicEdge(src, dst)) } } @@ -149,14 +192,13 @@ func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, erro } type microCallStep struct { - opts StepOptions - service string - method string - requires []Step + opts StepOptions + service string + method string } func (s *microCallStep) ID() string { - return s.opts.ID + return s.String() } func (s *microCallStep) Options() StepOptions { @@ -167,27 +209,45 @@ func (s *microCallStep) Endpoint() string { return s.method } -func (s *microCallStep) Requires() []Step { - return s.requires +func (s *microCallStep) Requires() []string { + return s.opts.Requires } func (s *microCallStep) Require(steps ...Step) error { - s.requires = append(s.requires, steps...) + for _, step := range steps { + s.opts.Requires = append(s.opts.Requires, step.String()) + } return nil } +func (s *microCallStep) String() string { + if s.opts.ID != "" { + return s.opts.ID + } + return fmt.Sprintf("%s.%s", s.service, s.method) +} + +func (s *microCallStep) Name() string { + return s.String() +} + +func (s *microCallStep) Hashcode() interface{} { + return s.String() +} + func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + fmt.Printf("execute %s with %#v\n", s.String(), req) + time.Sleep(1 * time.Second) return nil } type microPublishStep struct { - opts StepOptions - topic string - requires []Step + opts StepOptions + topic string } func (s *microPublishStep) ID() string { - return s.opts.ID + return s.String() } func (s *microPublishStep) Options() StepOptions { @@ -198,15 +258,32 @@ func (s *microPublishStep) Endpoint() string { return s.topic } -func (s *microPublishStep) Requires() []Step { - return s.requires +func (s *microPublishStep) Requires() []string { + return s.opts.Requires } func (s *microPublishStep) Require(steps ...Step) error { - s.requires = append(s.requires, steps...) + for _, step := range steps { + s.opts.Requires = append(s.opts.Requires, step.String()) + } return nil } +func (s *microPublishStep) String() string { + if s.opts.ID != "" { + return s.opts.ID + } + return fmt.Sprintf("%s", s.topic) +} + +func (s *microPublishStep) Name() string { + return s.String() +} + +func (s *microPublishStep) Hashcode() interface{} { + return s.String() +} + func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { return nil } diff --git a/flow/flow.go b/flow/flow.go index 6152d16c..54f052df 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -3,8 +3,11 @@ package flow import ( "context" + "errors" +) - "github.com/google/uuid" +var ( + ErrStepNotExists = errors.New("step not exists") ) // Step represents dedicated workflow step @@ -16,22 +19,13 @@ type Step interface { // Execute step run Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error // Requires returns dependent steps - Requires() []Step + Requires() []string // Options returns step options Options() StepOptions // Require add required steps Require(steps ...Step) error -} - -func NewStepOptions(opts ...StepOption) StepOptions { - options := StepOptions{} - for _, o := range opts { - o(&options) - } - if options.ID == "" { - options.ID = uuid.New().String() - } - return options + // String + String() string } // Workflow contains all steps to execute diff --git a/flow/options.go b/flow/options.go index e53561a0..8c7f4ccd 100644 --- a/flow/options.go +++ b/flow/options.go @@ -107,15 +107,54 @@ func WorkflowID(id string) WorkflowOption { } type ExecuteOptions struct { + // Client holds the client.Client + Client client.Client + // Tracer holds the tracer + Tracer tracer.Tracer + // Logger holds the logger + Logger logger.Logger + // Meter holds the meter + Meter meter.Meter + // Store used for intermediate results + Store store.Store Context context.Context + Start string } type ExecuteOption func(*ExecuteOptions) +func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { + options := ExecuteOptions{} + for _, o := range opts { + o(&options) + } + return options +} + type StepOptions struct { - ID string - Name string - Context context.Context + ID string + Context context.Context + Requires []string } type StepOption func(*StepOptions) + +func NewStepOptions(opts ...StepOption) StepOptions { + options := StepOptions{Context: context.Background()} + for _, o := range opts { + o(&options) + } + return options +} + +func StepID(id string) StepOption { + return func(o *StepOptions) { + o.ID = id + } +} + +func StepRequires(steps ...string) StepOption { + return func(o *StepOptions) { + o.Requires = steps + } +}