From 53c5455909dfae24bd84a44c8bcd6a9f8534d535 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 28 Jun 2021 22:39:50 +0300 Subject: [PATCH 1/4] new steps Signed-off-by: Vasiliy Tolstov --- flow/context.go | 34 ++++++++ flow/default.go | 222 ++++++++++++++++++++++++++++++++++++++++++++++++ flow/flow.go | 54 +++++++++++- flow/options.go | 121 ++++++++++++++++++++++++++ 4 files changed, 428 insertions(+), 3 deletions(-) create mode 100644 flow/context.go create mode 100644 flow/default.go create mode 100644 flow/options.go diff --git a/flow/context.go b/flow/context.go new file mode 100644 index 00000000..1387429f --- /dev/null +++ b/flow/context.go @@ -0,0 +1,34 @@ +package flow + +import ( + "context" +) + +type flowKey struct{} + +// FromContext returns Flow from context +func FromContext(ctx context.Context) (Flow, bool) { + if ctx == nil { + return nil, false + } + c, ok := ctx.Value(flowKey{}).(Flow) + return c, ok +} + +// NewContext stores Flow to context +func NewContext(ctx context.Context, f Flow) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, flowKey{}, f) +} + +// SetOption returns a function to setup a context with given value +func SetOption(k, v interface{}) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/flow/default.go b/flow/default.go new file mode 100644 index 00000000..66786adf --- /dev/null +++ b/flow/default.go @@ -0,0 +1,222 @@ +package flow + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/silas/dag" +) + +type microFlow struct { + opts Options +} + +type microWorkflow struct { + id string + g *dag.AcyclicGraph + init bool + sync.RWMutex +} + +func (w *microWorkflow) ID() string { + return w.id +} + +func (w *microWorkflow) Steps() [][]Step { + return nil +} + +func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { + return nil +} + +func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { + return nil +} + +func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { + w.Lock() + if !w.init { + if err := w.g.Validate(); err != nil { + w.Unlock() + return "", err + } + w.g.TransitiveReduction() + w.init = true + } + w.Unlock() + + uid, err := uuid.NewRandom() + if err != nil { + return "", err + } + + var steps [][]string + fn := func(n dag.Vertex, idx int) error { + if idx == 0 { + steps = make([][]string, 1) + steps[0] = make([]string, 0, 1) + } else if idx >= len(steps) { + tsteps := make([][]string, idx+1) + copy(tsteps, steps) + steps = tsteps + steps[idx] = make([]string, 0, 1) + } + steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) + return nil + } + + w.RLock() + err = w.g.SortedDepthFirstWalk([]dag.Vertex{start}, fn) + w.RUnlock() + + if err != nil { + return "", err + } + + return uid.String(), nil +} + +func NewFlow(opts ...Option) Flow { + options := NewOptions(opts...) + return µFlow{opts: options} +} + +func (f *microFlow) Options() Options { + return f.opts +} + +func (f *microFlow) Init(opts ...Option) error { + for _, o := range opts { + o(&f.opts) + } + if err := f.opts.Client.Init(); err != nil { + return err + } + if err := f.opts.Tracer.Init(); err != nil { + return err + } + if err := f.opts.Logger.Init(); err != nil { + return err + } + if err := f.opts.Meter.Init(); err != nil { + return err + } + if err := f.opts.Store.Init(); err != nil { + return err + } + return nil +} + +func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { + return nil, nil +} + +func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { + w := µWorkflow{id: id, g: &dag.AcyclicGraph{}} + + for _, s := range steps { + w.g.Add(s.Options().ID) + } + for _, s := range steps { + for _, req := range s.Requires() { + w.g.Connect(dag.BasicEdge(s, req)) + } + } + + if err := w.g.Validate(); err != nil { + return nil, err + } + w.g.TransitiveReduction() + + w.init = true + + return w, nil +} + +func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error { + return nil +} + +func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error { + return nil +} + +func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) { + return nil, nil +} + +type microCallStep struct { + opts StepOptions + service string + method string + requires []Step +} + +func (s *microCallStep) ID() string { + return s.opts.ID +} + +func (s *microCallStep) Options() StepOptions { + return s.opts +} + +func (s *microCallStep) Endpoint() string { + return s.method +} + +func (s *microCallStep) Requires() []Step { + return s.requires +} + +func (s *microCallStep) Require(steps ...Step) error { + s.requires = append(s.requires, steps...) + return nil +} + +func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + return nil +} + +type microPublishStep struct { + opts StepOptions + topic string + requires []Step +} + +func (s *microPublishStep) ID() string { + return s.opts.ID +} + +func (s *microPublishStep) Options() StepOptions { + return s.opts +} + +func (s *microPublishStep) Endpoint() string { + return s.topic +} + +func (s *microPublishStep) Requires() []Step { + return s.requires +} + +func (s *microPublishStep) Require(steps ...Step) error { + s.requires = append(s.requires, steps...) + return nil +} + +func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + return nil +} + +func NewCallStep(service string, method string, opts ...StepOption) Step { + options := NewStepOptions(opts...) + return µCallStep{service: service, method: method, opts: options} +} + +func NewPublishStep(topic string, opts ...StepOption) Step { + options := NewStepOptions(opts...) + return µPublishStep{topic: topic, opts: options} +} diff --git a/flow/flow.go b/flow/flow.go index 6f411989..6152d16c 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,17 +1,65 @@ // Package flow is an interface used for saga pattern microservice workflow package flow +import ( + "context" + + "github.com/google/uuid" +) + +// Step represents dedicated workflow step type Step interface { + // ID returns step id + ID() string // Endpoint returns rpc endpoint service_name.service_method or broker topic Endpoint() string + // Execute step run + Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error + // Requires returns dependent steps + Requires() []Step + // Options returns step options + Options() StepOptions + // Require add required steps + Require(steps ...Step) error } +func NewStepOptions(opts ...StepOption) StepOptions { + options := StepOptions{} + for _, o := range opts { + o(&options) + } + if options.ID == "" { + options.ID = uuid.New().String() + } + return options +} + +// Workflow contains all steps to execute type Workflow interface { + // ID returns id of the workflow + ID() string + // Steps returns steps slice where parallel steps returned on the same level Steps() [][]Step - Stop() error + // Execute workflow with args, return execution id and error + Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) + // RemoveSteps remove steps from workflow + RemoveSteps(ctx context.Context, steps ...Step) error + // AppendSteps append steps to workflow + AppendSteps(ctx context.Context, steps ...Step) error } +// Flow the base interface to interact with workflows type Flow interface { - Start(Workflow) error - Stop(Workflow) + // Options returns options + Options() Options + // Init initialize + Init(...Option) error + // WorkflowCreate creates new workflow with specific id and steps + WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) + // WorkflowSave saves workflow + WorkflowSave(ctx context.Context, w Workflow) error + // WorkflowLoad loads workflow with specific id + WorkflowLoad(ctx context.Context, id string) (Workflow, error) + // WorkflowList lists all workflows + WorkflowList(ctx context.Context) ([]Workflow, error) } diff --git a/flow/options.go b/flow/options.go new file mode 100644 index 00000000..e53561a0 --- /dev/null +++ b/flow/options.go @@ -0,0 +1,121 @@ +package flow + +import ( + "context" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/meter" + "github.com/unistack-org/micro/v3/store" + "github.com/unistack-org/micro/v3/tracer" +) + +// Option func +type Option func(*Options) + +// Options server struct +type Options struct { + // Context holds the external options and can be used for flow shutdown + Context context.Context + // Client holds the client.Client + Client client.Client + // Tracer holds the tracer + Tracer tracer.Tracer + // Logger holds the logger + Logger logger.Logger + // Meter holds the meter + Meter meter.Meter + // Store used for intermediate results + Store store.Store +} + +// NewOptions returns new options struct with default or passed values +func NewOptions(opts ...Option) Options { + options := Options{ + Context: context.Background(), + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, + Client: client.DefaultClient, + } + + for _, o := range opts { + o(&options) + } + + return options +} + +// Logger sets the logger option +func Logger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +// Meter sets the meter option +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + +// Client to use for sync/async communication +func Client(c client.Client) Option { + return func(o *Options) { + o.Client = c + } +} + +// Context specifies a context for the service. +// Can be used to signal shutdown of the flow +// Can be used for extra option values. +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +// Tracer mechanism for distributed tracking +func Tracer(t tracer.Tracer) Option { + return func(o *Options) { + o.Tracer = t + } +} + +// Store used for intermediate results +func Store(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// WorflowOption signature +type WorkflowOption func(*WorkflowOptions) + +// WorkflowOptions holds workflow options +type WorkflowOptions struct { + ID string + Context context.Context +} + +// WorkflowID set workflow id +func WorkflowID(id string) WorkflowOption { + return func(o *WorkflowOptions) { + o.ID = id + } +} + +type ExecuteOptions struct { + Context context.Context +} + +type ExecuteOption func(*ExecuteOptions) + +type StepOptions struct { + ID string + Name string + Context context.Context +} + +type StepOption func(*StepOptions) -- 2.45.2 From f22647d108c20b6740de68be346ac2c599ca212b Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 30 Jun 2021 05:03:06 +0300 Subject: [PATCH 2/4] add async Signed-off-by: Vasiliy Tolstov --- flow/default.go | 137 +++++++++++++++++++++++++++++++++++++----------- flow/flow.go | 20 +++---- flow/options.go | 45 ++++++++++++++-- 3 files changed, 156 insertions(+), 46 deletions(-) diff --git a/flow/default.go b/flow/default.go index 66786adf..ec526f76 100644 --- a/flow/default.go +++ b/flow/default.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/google/uuid" "github.com/silas/dag" @@ -18,6 +19,8 @@ type microWorkflow struct { g *dag.AcyclicGraph init bool sync.RWMutex + opts Options + steps map[string]Step } func (w *microWorkflow) ID() string { @@ -53,29 +56,63 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex return "", err } - var steps [][]string + options := NewExecuteOptions(opts...) + var steps [][]Step fn := func(n dag.Vertex, idx int) error { if idx == 0 { - steps = make([][]string, 1) - steps[0] = make([]string, 0, 1) + steps = make([][]Step, 1) + steps[0] = make([]Step, 0, 1) } else if idx >= len(steps) { - tsteps := make([][]string, idx+1) + tsteps := make([][]Step, idx+1) copy(tsteps, steps) steps = tsteps - steps[idx] = make([]string, 0, 1) + steps[idx] = make([]Step, 0, 1) } - steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) + steps[idx] = append(steps[idx], n.(Step)) return nil } - w.RLock() - err = w.g.SortedDepthFirstWalk([]dag.Vertex{start}, fn) - w.RUnlock() - + var root dag.Vertex + if options.Start != "" { + var ok bool + w.RLock() + root, ok = w.steps[options.Start] + w.RUnlock() + if !ok { + return "", ErrStepNotExists + } + } else { + root, err = w.g.Root() + if err != nil { + return "", err + } + } + err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) if err != nil { return "", err } + var wg sync.WaitGroup + cherr := make(chan error) + + select { + case err = <-cherr: + return "", err + default: + for idx := range steps { + wg.Add(len(steps[idx])) + for nidx := range steps[idx] { + go func(step Step) { + if err = step.Execute(ctx, req, opts...); err != nil { + cherr <- err + } + wg.Done() + }(steps[idx][nidx]) + } + wg.Wait() + } + } + return uid.String(), nil } @@ -115,14 +152,20 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { } func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { - w := µWorkflow{id: id, g: &dag.AcyclicGraph{}} + w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} for _, s := range steps { - w.g.Add(s.Options().ID) + w.steps[s.String()] = s + w.g.Add(s) } - for _, s := range steps { - for _, req := range s.Requires() { - w.g.Connect(dag.BasicEdge(s, req)) + + for _, dst := range steps { + for _, req := range dst.Requires() { + src, ok := w.steps[req] + if !ok { + return nil, ErrStepNotExists + } + w.g.Connect(dag.BasicEdge(src, dst)) } } @@ -149,14 +192,13 @@ func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, erro } type microCallStep struct { - opts StepOptions - service string - method string - requires []Step + opts StepOptions + service string + method string } func (s *microCallStep) ID() string { - return s.opts.ID + return s.String() } func (s *microCallStep) Options() StepOptions { @@ -167,27 +209,45 @@ func (s *microCallStep) Endpoint() string { return s.method } -func (s *microCallStep) Requires() []Step { - return s.requires +func (s *microCallStep) Requires() []string { + return s.opts.Requires } func (s *microCallStep) Require(steps ...Step) error { - s.requires = append(s.requires, steps...) + for _, step := range steps { + s.opts.Requires = append(s.opts.Requires, step.String()) + } return nil } +func (s *microCallStep) String() string { + if s.opts.ID != "" { + return s.opts.ID + } + return fmt.Sprintf("%s.%s", s.service, s.method) +} + +func (s *microCallStep) Name() string { + return s.String() +} + +func (s *microCallStep) Hashcode() interface{} { + return s.String() +} + func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + fmt.Printf("execute %s with %#v\n", s.String(), req) + time.Sleep(1 * time.Second) return nil } type microPublishStep struct { - opts StepOptions - topic string - requires []Step + opts StepOptions + topic string } func (s *microPublishStep) ID() string { - return s.opts.ID + return s.String() } func (s *microPublishStep) Options() StepOptions { @@ -198,15 +258,32 @@ func (s *microPublishStep) Endpoint() string { return s.topic } -func (s *microPublishStep) Requires() []Step { - return s.requires +func (s *microPublishStep) Requires() []string { + return s.opts.Requires } func (s *microPublishStep) Require(steps ...Step) error { - s.requires = append(s.requires, steps...) + for _, step := range steps { + s.opts.Requires = append(s.opts.Requires, step.String()) + } return nil } +func (s *microPublishStep) String() string { + if s.opts.ID != "" { + return s.opts.ID + } + return fmt.Sprintf("%s", s.topic) +} + +func (s *microPublishStep) Name() string { + return s.String() +} + +func (s *microPublishStep) Hashcode() interface{} { + return s.String() +} + func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { return nil } diff --git a/flow/flow.go b/flow/flow.go index 6152d16c..54f052df 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -3,8 +3,11 @@ package flow import ( "context" + "errors" +) - "github.com/google/uuid" +var ( + ErrStepNotExists = errors.New("step not exists") ) // Step represents dedicated workflow step @@ -16,22 +19,13 @@ type Step interface { // Execute step run Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error // Requires returns dependent steps - Requires() []Step + Requires() []string // Options returns step options Options() StepOptions // Require add required steps Require(steps ...Step) error -} - -func NewStepOptions(opts ...StepOption) StepOptions { - options := StepOptions{} - for _, o := range opts { - o(&options) - } - if options.ID == "" { - options.ID = uuid.New().String() - } - return options + // String + String() string } // Workflow contains all steps to execute diff --git a/flow/options.go b/flow/options.go index e53561a0..8c7f4ccd 100644 --- a/flow/options.go +++ b/flow/options.go @@ -107,15 +107,54 @@ func WorkflowID(id string) WorkflowOption { } type ExecuteOptions struct { + // Client holds the client.Client + Client client.Client + // Tracer holds the tracer + Tracer tracer.Tracer + // Logger holds the logger + Logger logger.Logger + // Meter holds the meter + Meter meter.Meter + // Store used for intermediate results + Store store.Store Context context.Context + Start string } type ExecuteOption func(*ExecuteOptions) +func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { + options := ExecuteOptions{} + for _, o := range opts { + o(&options) + } + return options +} + type StepOptions struct { - ID string - Name string - Context context.Context + ID string + Context context.Context + Requires []string } type StepOption func(*StepOptions) + +func NewStepOptions(opts ...StepOption) StepOptions { + options := StepOptions{Context: context.Background()} + for _, o := range opts { + o(&options) + } + return options +} + +func StepID(id string) StepOption { + return func(o *StepOptions) { + o.ID = id + } +} + +func StepRequires(steps ...string) StepOption { + return func(o *StepOptions) { + o.Requires = steps + } +} -- 2.45.2 From 9fd8f581ac84bf57d129a7677c0fb27d3fba2c80 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 30 Jun 2021 05:39:43 +0300 Subject: [PATCH 3/4] execute Signed-off-by: Vasiliy Tolstov --- flow/default.go | 48 ++++++++++++++++++++++++++++------------ flow/options.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 91 insertions(+), 16 deletions(-) diff --git a/flow/default.go b/flow/default.go index ec526f76..e9c53a56 100644 --- a/flow/default.go +++ b/flow/default.go @@ -4,10 +4,11 @@ import ( "context" "fmt" "sync" - "time" "github.com/google/uuid" "github.com/silas/dag" + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/codec" ) type microFlow struct { @@ -87,33 +88,44 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex return "", err } } - err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) + if options.Reverse { + err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) + } else { + err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) + } if err != nil { return "", err } var wg sync.WaitGroup - cherr := make(chan error) + cherr := make(chan error, 1) + defer close(cherr) - select { - case err = <-cherr: - return "", err - default: + nctx, cancel := context.WithCancel(ctx) + defer cancel() + nopts := make([]ExecuteOption, 0, len(opts)+5) + nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store)) + + go func() { for idx := range steps { wg.Add(len(steps[idx])) for nidx := range steps[idx] { go func(step Step) { - if err = step.Execute(ctx, req, opts...); err != nil { + defer wg.Done() + if err = step.Execute(nctx, req, nopts...); err != nil { cherr <- err + cancel() } - wg.Done() }(steps[idx][nidx]) } wg.Wait() } - } + cherr <- nil + }() - return uid.String(), nil + err = <-cherr + + return uid.String(), err } func NewFlow(opts ...Option) Flow { @@ -236,9 +248,17 @@ func (s *microCallStep) Hashcode() interface{} { } func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { - fmt.Printf("execute %s with %#v\n", s.String(), req) - time.Sleep(1 * time.Second) - return nil + options := NewExecuteOptions(opts...) + if options.Client == nil { + return fmt.Errorf("client not set") + } + rsp := &codec.Frame{} + copts := []client.CallOption{client.WithRetries(0)} + if options.Timeout > 0 { + copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) + } + err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) + return err } type microPublishStep struct { diff --git a/flow/options.go b/flow/options.go index 8c7f4ccd..0db4d324 100644 --- a/flow/options.go +++ b/flow/options.go @@ -2,6 +2,7 @@ package flow import ( "context" + "time" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/logger" @@ -116,13 +117,67 @@ type ExecuteOptions struct { // Meter holds the meter Meter meter.Meter // Store used for intermediate results - Store store.Store + Store store.Store + // Context can be used to abort execution or pass additional opts Context context.Context - Start string + // Start step + Start string + // Reverse execution + Reverse bool + // Timeout for execution + Timeout time.Duration } type ExecuteOption func(*ExecuteOptions) +func ExecuteClient(c client.Client) ExecuteOption { + return func(o *ExecuteOptions) { + o.Client = c + } +} + +func ExecuteTracer(t tracer.Tracer) ExecuteOption { + return func(o *ExecuteOptions) { + o.Tracer = t + } +} + +func ExecuteLogger(l logger.Logger) ExecuteOption { + return func(o *ExecuteOptions) { + o.Logger = l + } +} + +func ExecuteMeter(m meter.Meter) ExecuteOption { + return func(o *ExecuteOptions) { + o.Meter = m + } +} + +func ExecuteStore(s store.Store) ExecuteOption { + return func(o *ExecuteOptions) { + o.Store = s + } +} + +func ExecuteContext(ctx context.Context) ExecuteOption { + return func(o *ExecuteOptions) { + o.Context = ctx + } +} + +func ExecuteReverse(b bool) ExecuteOption { + return func(o *ExecuteOptions) { + o.Reverse = b + } +} + +func ExecuteTimeout(td time.Duration) ExecuteOption { + return func(o *ExecuteOptions) { + o.Timeout = td + } +} + func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { options := ExecuteOptions{} for _, o := range opts { -- 2.45.2 From 7b1bfc80a43168ae2ca572445f50bb0595429ee3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 30 Jun 2021 05:49:43 +0300 Subject: [PATCH 4/4] add fallback Signed-off-by: Vasiliy Tolstov --- flow/options.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flow/options.go b/flow/options.go index 0db4d324..f3066311 100644 --- a/flow/options.go +++ b/flow/options.go @@ -190,6 +190,7 @@ type StepOptions struct { ID string Context context.Context Requires []string + Fallback string } type StepOption func(*StepOptions) @@ -213,3 +214,9 @@ func StepRequires(steps ...string) StepOption { o.Requires = steps } } + +func StepFallback(step string) StepOption { + return func(o *StepOptions) { + o.Fallback = step + } +} -- 2.45.2