flow improvements (#52)
* flow improvements Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit was merged in pull request #52.
	This commit is contained in:
		
							
								
								
									
										34
									
								
								flow/context.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								flow/context.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,34 @@ | |||||||
|  | package flow | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type flowKey struct{} | ||||||
|  |  | ||||||
|  | // FromContext returns Flow from context | ||||||
|  | func FromContext(ctx context.Context) (Flow, bool) { | ||||||
|  | 	if ctx == nil { | ||||||
|  | 		return nil, false | ||||||
|  | 	} | ||||||
|  | 	c, ok := ctx.Value(flowKey{}).(Flow) | ||||||
|  | 	return c, ok | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewContext stores Flow to context | ||||||
|  | func NewContext(ctx context.Context, f Flow) context.Context { | ||||||
|  | 	if ctx == nil { | ||||||
|  | 		ctx = context.Background() | ||||||
|  | 	} | ||||||
|  | 	return context.WithValue(ctx, flowKey{}, f) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // SetOption returns a function to setup a context with given value | ||||||
|  | func SetOption(k, v interface{}) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		if o.Context == nil { | ||||||
|  | 			o.Context = context.Background() | ||||||
|  | 		} | ||||||
|  | 		o.Context = context.WithValue(o.Context, k, v) | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										319
									
								
								flow/default.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										319
									
								
								flow/default.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,319 @@ | |||||||
|  | package flow | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/silas/dag" | ||||||
|  | 	"github.com/unistack-org/micro/v3/client" | ||||||
|  | 	"github.com/unistack-org/micro/v3/codec" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type microFlow struct { | ||||||
|  | 	opts Options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type microWorkflow struct { | ||||||
|  | 	id   string | ||||||
|  | 	g    *dag.AcyclicGraph | ||||||
|  | 	init bool | ||||||
|  | 	sync.RWMutex | ||||||
|  | 	opts  Options | ||||||
|  | 	steps map[string]Step | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *microWorkflow) ID() string { | ||||||
|  | 	return w.id | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *microWorkflow) Steps() [][]Step { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { | ||||||
|  | 	w.Lock() | ||||||
|  | 	if !w.init { | ||||||
|  | 		if err := w.g.Validate(); err != nil { | ||||||
|  | 			w.Unlock() | ||||||
|  | 			return "", err | ||||||
|  | 		} | ||||||
|  | 		w.g.TransitiveReduction() | ||||||
|  | 		w.init = true | ||||||
|  | 	} | ||||||
|  | 	w.Unlock() | ||||||
|  |  | ||||||
|  | 	uid, err := uuid.NewRandom() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	options := NewExecuteOptions(opts...) | ||||||
|  | 	var steps [][]Step | ||||||
|  | 	fn := func(n dag.Vertex, idx int) error { | ||||||
|  | 		if idx == 0 { | ||||||
|  | 			steps = make([][]Step, 1) | ||||||
|  | 			steps[0] = make([]Step, 0, 1) | ||||||
|  | 		} else if idx >= len(steps) { | ||||||
|  | 			tsteps := make([][]Step, idx+1) | ||||||
|  | 			copy(tsteps, steps) | ||||||
|  | 			steps = tsteps | ||||||
|  | 			steps[idx] = make([]Step, 0, 1) | ||||||
|  | 		} | ||||||
|  | 		steps[idx] = append(steps[idx], n.(Step)) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var root dag.Vertex | ||||||
|  | 	if options.Start != "" { | ||||||
|  | 		var ok bool | ||||||
|  | 		w.RLock() | ||||||
|  | 		root, ok = w.steps[options.Start] | ||||||
|  | 		w.RUnlock() | ||||||
|  | 		if !ok { | ||||||
|  | 			return "", ErrStepNotExists | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		root, err = w.g.Root() | ||||||
|  | 		if err != nil { | ||||||
|  | 			return "", err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if options.Reverse { | ||||||
|  | 		err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) | ||||||
|  | 	} else { | ||||||
|  | 		err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) | ||||||
|  | 	} | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	cherr := make(chan error, 1) | ||||||
|  | 	defer close(cherr) | ||||||
|  |  | ||||||
|  | 	nctx, cancel := context.WithCancel(ctx) | ||||||
|  | 	defer cancel() | ||||||
|  | 	nopts := make([]ExecuteOption, 0, len(opts)+5) | ||||||
|  | 	nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store)) | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		for idx := range steps { | ||||||
|  | 			wg.Add(len(steps[idx])) | ||||||
|  | 			for nidx := range steps[idx] { | ||||||
|  | 				go func(step Step) { | ||||||
|  | 					defer wg.Done() | ||||||
|  | 					if err = step.Execute(nctx, req, nopts...); err != nil { | ||||||
|  | 						cherr <- err | ||||||
|  | 						cancel() | ||||||
|  | 					} | ||||||
|  | 				}(steps[idx][nidx]) | ||||||
|  | 			} | ||||||
|  | 			wg.Wait() | ||||||
|  | 		} | ||||||
|  | 		cherr <- nil | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	err = <-cherr | ||||||
|  |  | ||||||
|  | 	return uid.String(), err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewFlow(opts ...Option) Flow { | ||||||
|  | 	options := NewOptions(opts...) | ||||||
|  | 	return µFlow{opts: options} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) Options() Options { | ||||||
|  | 	return f.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) Init(opts ...Option) error { | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&f.opts) | ||||||
|  | 	} | ||||||
|  | 	if err := f.opts.Client.Init(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if err := f.opts.Tracer.Init(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if err := f.opts.Logger.Init(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if err := f.opts.Meter.Init(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if err := f.opts.Store.Init(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { | ||||||
|  | 	return nil, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { | ||||||
|  | 	w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} | ||||||
|  |  | ||||||
|  | 	for _, s := range steps { | ||||||
|  | 		w.steps[s.String()] = s | ||||||
|  | 		w.g.Add(s) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, dst := range steps { | ||||||
|  | 		for _, req := range dst.Requires() { | ||||||
|  | 			src, ok := w.steps[req] | ||||||
|  | 			if !ok { | ||||||
|  | 				return nil, ErrStepNotExists | ||||||
|  | 			} | ||||||
|  | 			w.g.Connect(dag.BasicEdge(src, dst)) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := w.g.Validate(); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	w.g.TransitiveReduction() | ||||||
|  |  | ||||||
|  | 	w.init = true | ||||||
|  |  | ||||||
|  | 	return w, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) { | ||||||
|  | 	return nil, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type microCallStep struct { | ||||||
|  | 	opts    StepOptions | ||||||
|  | 	service string | ||||||
|  | 	method  string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) ID() string { | ||||||
|  | 	return s.String() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Options() StepOptions { | ||||||
|  | 	return s.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Endpoint() string { | ||||||
|  | 	return s.method | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Requires() []string { | ||||||
|  | 	return s.opts.Requires | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Require(steps ...Step) error { | ||||||
|  | 	for _, step := range steps { | ||||||
|  | 		s.opts.Requires = append(s.opts.Requires, step.String()) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) String() string { | ||||||
|  | 	if s.opts.ID != "" { | ||||||
|  | 		return s.opts.ID | ||||||
|  | 	} | ||||||
|  | 	return fmt.Sprintf("%s.%s", s.service, s.method) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Name() string { | ||||||
|  | 	return s.String() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Hashcode() interface{} { | ||||||
|  | 	return s.String() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { | ||||||
|  | 	options := NewExecuteOptions(opts...) | ||||||
|  | 	if options.Client == nil { | ||||||
|  | 		return fmt.Errorf("client not set") | ||||||
|  | 	} | ||||||
|  | 	rsp := &codec.Frame{} | ||||||
|  | 	copts := []client.CallOption{client.WithRetries(0)} | ||||||
|  | 	if options.Timeout > 0 { | ||||||
|  | 		copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) | ||||||
|  | 	} | ||||||
|  | 	err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type microPublishStep struct { | ||||||
|  | 	opts  StepOptions | ||||||
|  | 	topic string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) ID() string { | ||||||
|  | 	return s.String() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Options() StepOptions { | ||||||
|  | 	return s.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Endpoint() string { | ||||||
|  | 	return s.topic | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Requires() []string { | ||||||
|  | 	return s.opts.Requires | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Require(steps ...Step) error { | ||||||
|  | 	for _, step := range steps { | ||||||
|  | 		s.opts.Requires = append(s.opts.Requires, step.String()) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) String() string { | ||||||
|  | 	if s.opts.ID != "" { | ||||||
|  | 		return s.opts.ID | ||||||
|  | 	} | ||||||
|  | 	return fmt.Sprintf("%s", s.topic) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Name() string { | ||||||
|  | 	return s.String() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Hashcode() interface{} { | ||||||
|  | 	return s.String() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewCallStep(service string, method string, opts ...StepOption) Step { | ||||||
|  | 	options := NewStepOptions(opts...) | ||||||
|  | 	return µCallStep{service: service, method: method, opts: options} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewPublishStep(topic string, opts ...StepOption) Step { | ||||||
|  | 	options := NewStepOptions(opts...) | ||||||
|  | 	return µPublishStep{topic: topic, opts: options} | ||||||
|  | } | ||||||
							
								
								
									
										48
									
								
								flow/flow.go
									
									
									
									
									
								
							
							
						
						
									
										48
									
								
								flow/flow.go
									
									
									
									
									
								
							| @@ -1,17 +1,59 @@ | |||||||
| // Package flow is an interface used for saga pattern microservice workflow | // Package flow is an interface used for saga pattern microservice workflow | ||||||
| package flow | package flow | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	ErrStepNotExists = errors.New("step not exists") | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Step represents dedicated workflow step | ||||||
| type Step interface { | type Step interface { | ||||||
|  | 	// ID returns step id | ||||||
|  | 	ID() string | ||||||
| 	// Endpoint returns rpc endpoint service_name.service_method or broker topic | 	// Endpoint returns rpc endpoint service_name.service_method or broker topic | ||||||
| 	Endpoint() string | 	Endpoint() string | ||||||
|  | 	// Execute step run | ||||||
|  | 	Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error | ||||||
|  | 	// Requires returns dependent steps | ||||||
|  | 	Requires() []string | ||||||
|  | 	// Options returns step options | ||||||
|  | 	Options() StepOptions | ||||||
|  | 	// Require add required steps | ||||||
|  | 	Require(steps ...Step) error | ||||||
|  | 	// String | ||||||
|  | 	String() string | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Workflow contains all steps to execute | ||||||
| type Workflow interface { | type Workflow interface { | ||||||
|  | 	// ID returns id of the workflow | ||||||
|  | 	ID() string | ||||||
|  | 	// Steps returns steps slice where parallel steps returned on the same level | ||||||
| 	Steps() [][]Step | 	Steps() [][]Step | ||||||
| 	Stop() error | 	// Execute workflow with args, return execution id and error | ||||||
|  | 	Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) | ||||||
|  | 	// RemoveSteps remove steps from workflow | ||||||
|  | 	RemoveSteps(ctx context.Context, steps ...Step) error | ||||||
|  | 	// AppendSteps append steps to workflow | ||||||
|  | 	AppendSteps(ctx context.Context, steps ...Step) error | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Flow the base interface to interact with workflows | ||||||
| type Flow interface { | type Flow interface { | ||||||
| 	Start(Workflow) error | 	// Options returns options | ||||||
| 	Stop(Workflow) | 	Options() Options | ||||||
|  | 	// Init initialize | ||||||
|  | 	Init(...Option) error | ||||||
|  | 	// WorkflowCreate creates new workflow with specific id and steps | ||||||
|  | 	WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) | ||||||
|  | 	// WorkflowSave saves workflow | ||||||
|  | 	WorkflowSave(ctx context.Context, w Workflow) error | ||||||
|  | 	// WorkflowLoad loads workflow with specific id | ||||||
|  | 	WorkflowLoad(ctx context.Context, id string) (Workflow, error) | ||||||
|  | 	// WorkflowList lists all workflows | ||||||
|  | 	WorkflowList(ctx context.Context) ([]Workflow, error) | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										222
									
								
								flow/options.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										222
									
								
								flow/options.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,222 @@ | |||||||
|  | package flow | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/unistack-org/micro/v3/client" | ||||||
|  | 	"github.com/unistack-org/micro/v3/logger" | ||||||
|  | 	"github.com/unistack-org/micro/v3/meter" | ||||||
|  | 	"github.com/unistack-org/micro/v3/store" | ||||||
|  | 	"github.com/unistack-org/micro/v3/tracer" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Option func | ||||||
|  | type Option func(*Options) | ||||||
|  |  | ||||||
|  | // Options server struct | ||||||
|  | type Options struct { | ||||||
|  | 	// Context holds the external options and can be used for flow shutdown | ||||||
|  | 	Context context.Context | ||||||
|  | 	// Client holds the client.Client | ||||||
|  | 	Client client.Client | ||||||
|  | 	// Tracer holds the tracer | ||||||
|  | 	Tracer tracer.Tracer | ||||||
|  | 	// Logger holds the logger | ||||||
|  | 	Logger logger.Logger | ||||||
|  | 	// Meter holds the meter | ||||||
|  | 	Meter meter.Meter | ||||||
|  | 	// Store used for intermediate results | ||||||
|  | 	Store store.Store | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewOptions returns new options struct with default or passed values | ||||||
|  | func NewOptions(opts ...Option) Options { | ||||||
|  | 	options := Options{ | ||||||
|  | 		Context: context.Background(), | ||||||
|  | 		Logger:  logger.DefaultLogger, | ||||||
|  | 		Meter:   meter.DefaultMeter, | ||||||
|  | 		Tracer:  tracer.DefaultTracer, | ||||||
|  | 		Client:  client.DefaultClient, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Logger sets the logger option | ||||||
|  | func Logger(l logger.Logger) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Logger = l | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Meter sets the meter option | ||||||
|  | func Meter(m meter.Meter) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Meter = m | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Client to use for sync/async communication | ||||||
|  | func Client(c client.Client) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Client = c | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Context specifies a context for the service. | ||||||
|  | // Can be used to signal shutdown of the flow | ||||||
|  | // Can be used for extra option values. | ||||||
|  | func Context(ctx context.Context) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Context = ctx | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Tracer mechanism for distributed tracking | ||||||
|  | func Tracer(t tracer.Tracer) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Tracer = t | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Store used for intermediate results | ||||||
|  | func Store(s store.Store) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.Store = s | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // WorflowOption signature | ||||||
|  | type WorkflowOption func(*WorkflowOptions) | ||||||
|  |  | ||||||
|  | // WorkflowOptions holds workflow options | ||||||
|  | type WorkflowOptions struct { | ||||||
|  | 	ID      string | ||||||
|  | 	Context context.Context | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // WorkflowID set workflow id | ||||||
|  | func WorkflowID(id string) WorkflowOption { | ||||||
|  | 	return func(o *WorkflowOptions) { | ||||||
|  | 		o.ID = id | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type ExecuteOptions struct { | ||||||
|  | 	// Client holds the client.Client | ||||||
|  | 	Client client.Client | ||||||
|  | 	// Tracer holds the tracer | ||||||
|  | 	Tracer tracer.Tracer | ||||||
|  | 	// Logger holds the logger | ||||||
|  | 	Logger logger.Logger | ||||||
|  | 	// Meter holds the meter | ||||||
|  | 	Meter meter.Meter | ||||||
|  | 	// Store used for intermediate results | ||||||
|  | 	Store store.Store | ||||||
|  | 	// Context can be used to abort execution or pass additional opts | ||||||
|  | 	Context context.Context | ||||||
|  | 	// Start step | ||||||
|  | 	Start string | ||||||
|  | 	// Reverse execution | ||||||
|  | 	Reverse bool | ||||||
|  | 	// Timeout for execution | ||||||
|  | 	Timeout time.Duration | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type ExecuteOption func(*ExecuteOptions) | ||||||
|  |  | ||||||
|  | func ExecuteClient(c client.Client) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Client = c | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteTracer(t tracer.Tracer) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Tracer = t | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteLogger(l logger.Logger) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Logger = l | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteMeter(m meter.Meter) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Meter = m | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteStore(s store.Store) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Store = s | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteContext(ctx context.Context) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Context = ctx | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteReverse(b bool) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Reverse = b | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ExecuteTimeout(td time.Duration) ExecuteOption { | ||||||
|  | 	return func(o *ExecuteOptions) { | ||||||
|  | 		o.Timeout = td | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { | ||||||
|  | 	options := ExecuteOptions{} | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  | 	return options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type StepOptions struct { | ||||||
|  | 	ID       string | ||||||
|  | 	Context  context.Context | ||||||
|  | 	Requires []string | ||||||
|  | 	Fallback string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type StepOption func(*StepOptions) | ||||||
|  |  | ||||||
|  | func NewStepOptions(opts ...StepOption) StepOptions { | ||||||
|  | 	options := StepOptions{Context: context.Background()} | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  | 	return options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func StepID(id string) StepOption { | ||||||
|  | 	return func(o *StepOptions) { | ||||||
|  | 		o.ID = id | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func StepRequires(steps ...string) StepOption { | ||||||
|  | 	return func(o *StepOptions) { | ||||||
|  | 		o.Requires = steps | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func StepFallback(step string) StepOption { | ||||||
|  | 	return func(o *StepOptions) { | ||||||
|  | 		o.Fallback = step | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user