fix flow
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
a45b672c98
commit
70adfeab0d
@ -176,13 +176,13 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
}
|
}
|
||||||
w.Unlock()
|
w.Unlock()
|
||||||
|
|
||||||
id, err := id.New()
|
eid, err := id.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", id))
|
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
|
||||||
options := NewExecuteOptions(opts...)
|
options := NewExecuteOptions(opts...)
|
||||||
|
|
||||||
@ -214,13 +214,13 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
|
|
||||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
return id, werr
|
return eid, werr
|
||||||
}
|
}
|
||||||
for idx := range steps {
|
for idx := range steps {
|
||||||
for nidx := range steps[idx] {
|
for nidx := range steps[idx] {
|
||||||
cstep := steps[idx][nidx]
|
cstep := steps[idx][nidx]
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||||
return id, werr
|
return eid, werr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -316,7 +316,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
if options.Async {
|
if options.Async {
|
||||||
return id, nil
|
return eid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Tracef(ctx, "wait for finish or error")
|
logger.Tracef(ctx, "wait for finish or error")
|
||||||
@ -329,7 +329,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
close(cherr)
|
close(cherr)
|
||||||
case <-chstatus:
|
case <-chstatus:
|
||||||
close(chstatus)
|
close(chstatus)
|
||||||
return uid.String(), nil
|
return eid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
@ -350,7 +350,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return uid.String(), err
|
return eid, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFlow(opts ...Option) Flow {
|
func NewFlow(opts ...Option) Flow {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user