From 09e6fa2fede7d8adb860334df52b4609610ea70c Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 16 Jul 2021 00:17:16 +0300 Subject: [PATCH] flow: implement new methods, add Async ExecutionOption Signed-off-by: Vasiliy Tolstov --- flow/default.go | 69 ++++++++++++++++++++++++++++++++----------------- flow/flow.go | 43 ++++++++++++++++++++++++++++++ flow/options.go | 8 ++++++ 3 files changed, 96 insertions(+), 24 deletions(-) diff --git a/flow/default.go b/flow/default.go index 7a9c77e5..5b20b2df 100644 --- a/flow/default.go +++ b/flow/default.go @@ -149,8 +149,22 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { return steps, nil } -func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { +func (w *microWorkflow) Abort(ctx context.Context, eid string) error { + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) + return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())}) +} +func (w *microWorkflow) Suspend(ctx context.Context, eid string) error { + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) + return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())}) +} + +func (w *microWorkflow) Resume(ctx context.Context, eid string) error { + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) + return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}) +} + +func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { w.Lock() if !w.init { if err := w.g.Validate(); err != nil { @@ -175,7 +189,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu steps, err := w.getSteps(options.Start, options.Reverse) if err != nil { - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusPending")}); werr != nil { + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } return "", err @@ -183,10 +197,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu var wg sync.WaitGroup cherr := make(chan error, 1) + chstatus := make(chan Status, 1) nctx, cancel := context.WithCancel(ctx) - defer cancel() + nopts := make([]ExecuteOption, 0, len(opts)+5) nopts = append(nopts, @@ -198,14 +213,14 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu nopts = append(nopts, opts...) done := make(chan struct{}) - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusRunning")}); werr != nil { + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) return eid, werr } for idx := range steps { for nidx := range steps[idx] { cstep := steps[idx][nidx] - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusPending")}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { return eid, werr } } @@ -214,6 +229,15 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu go func() { for idx := range steps { for nidx := range steps[idx] { + wStatus := &codec.Frame{} + if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { + cherr <- werr + return + } + if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { + chstatus <- status + return + } if w.opts.Logger.V(logger.TraceLevel) { w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) } @@ -224,12 +248,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu defer wg.Done() if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { cherr <- werr - cancel() return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { cherr <- werr - cancel() return } rsp, serr := step.Execute(nctx, req, nopts...) @@ -238,23 +260,20 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr - cancel() return } else { if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr - cancel() return } - if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr - cancel() return } } @@ -263,12 +282,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu } else { if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { cherr <- werr - cancel() return } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { cherr <- werr - cancel() return } rsp, serr := cstep.Execute(nctx, req, nopts...) @@ -277,22 +294,19 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr - cancel() return } else { if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr - cancel() return } - if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { cherr <- werr - cancel() return } } @@ -302,6 +316,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu close(done) }() + if options.Async { + return eid, nil + } + logger.Tracef(ctx, "wait for finish or error") select { case <-nctx.Done(): @@ -310,21 +328,24 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu err = cerr case <-done: close(cherr) + case <-chstatus: + close(chstatus) + return uid.String(), nil } switch { case nctx.Err() != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusAborted")}); werr != nil { + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } break case err == nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil { + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } break case err != nil: - if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusFailure")}); werr != nil { + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } break diff --git a/flow/flow.go b/flow/flow.go index 6b8bb3cb..1dee697e 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -4,6 +4,8 @@ package flow import ( "context" "errors" + "sync" + "sync/atomic" "github.com/unistack-org/micro/v3/metadata" ) @@ -67,6 +69,10 @@ type Step interface { type Status int +func (status Status) String() string { + return StatusString[status] +} + const ( StatusPending Status = iota StatusRunning @@ -76,6 +82,25 @@ const ( StatusSuspend ) +var ( + StatusString = map[Status]string{ + StatusPending: "StatusPending", + StatusRunning: "StatusRunning", + StatusFailure: "StatusFailure", + StatusSuccess: "StatusSuccess", + StatusAborted: "StatusAborted", + StatusSuspend: "StatusSuspend", + } + StringStatus = map[string]Status{ + "StatusPending": StatusPending, + "StatusRunning": StatusRunning, + "StatusFailure": StatusFailure, + "StatusSuccess": StatusSuccess, + "StatusAborted": StatusAborted, + "StatusSuspend": StatusSuspend, + } +) + // Workflow contains all steps to execute type Workflow interface { // ID returns id of the workflow @@ -90,6 +115,12 @@ type Workflow interface { Status() Status // Steps returns steps slice where parallel steps returned on the same level Steps() ([][]Step, error) + // Suspend suspends execution + Suspend(ctx context.Context, eid string) error + // Resume resumes execution + Resume(ctx context.Context, eid string) error + // Abort abort execution + Abort(ctx context.Context, eid string) error } // Flow the base interface to interact with workflows @@ -107,3 +138,15 @@ type Flow interface { // WorkflowList lists all workflows WorkflowList(ctx context.Context) ([]Workflow, error) } + +var ( + flowMu sync.Mutex + atomicSteps atomic.Value +) + +func RegisterStep(step Step) { + flowMu.Lock() + steps, _ := atomicSteps.Load().([]Step) + atomicSteps.Store(append(steps, step)) + flowMu.Unlock() +} diff --git a/flow/options.go b/flow/options.go index f577fe41..5d7385bd 100644 --- a/flow/options.go +++ b/flow/options.go @@ -124,6 +124,8 @@ type ExecuteOptions struct { Reverse bool // Timeout for execution Timeout time.Duration + // Async enables async execution + Async bool } type ExecuteOption func(*ExecuteOptions) @@ -170,6 +172,12 @@ func ExecuteTimeout(td time.Duration) ExecuteOption { } } +func ExecuteAsync(b bool) ExecuteOption { + return func(o *ExecuteOptions) { + o.Async = b + } +} + func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { options := ExecuteOptions{ Client: client.DefaultClient,