flow: implement new methods, add Async ExecutionOption

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-07-16 00:17:16 +03:00
parent 10a09a5c6f
commit 09e6fa2fed
3 changed files with 96 additions and 24 deletions

View File

@@ -149,8 +149,22 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
return steps, nil
}
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
w.Lock()
if !w.init {
if err := w.g.Validate(); err != nil {
@@ -175,7 +189,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
steps, err := w.getSteps(options.Start, options.Reverse)
if err != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusPending")}); werr != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
return "", err
@@ -183,10 +197,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
var wg sync.WaitGroup
cherr := make(chan error, 1)
chstatus := make(chan Status, 1)
nctx, cancel := context.WithCancel(ctx)
defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts,
@@ -198,14 +213,14 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
nopts = append(nopts, opts...)
done := make(chan struct{})
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusRunning")}); werr != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
return eid, werr
}
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusPending")}); werr != nil {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr
}
}
@@ -214,6 +229,15 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
go func() {
for idx := range steps {
for nidx := range steps[idx] {
wStatus := &codec.Frame{}
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
cherr <- werr
return
}
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
chstatus <- status
return
}
if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
}
@@ -224,12 +248,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
cancel()
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
@@ -238,23 +260,20 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
cancel()
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
cancel()
return
}
}
@@ -263,12 +282,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
cancel()
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
@@ -277,22 +294,19 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
cancel()
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
cancel()
return
}
}
@@ -302,6 +316,10 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
close(done)
}()
if options.Async {
return eid, nil
}
logger.Tracef(ctx, "wait for finish or error")
select {
case <-nctx.Done():
@@ -310,21 +328,24 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
err = cerr
case <-done:
close(cherr)
case <-chstatus:
close(chstatus)
return uid.String(), nil
}
switch {
case nctx.Err() != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusAborted")}); werr != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusFailure")}); werr != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break