From bd4d4c363eebe268f5a0ac7358b473d3498cacad Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 30 Jun 2021 17:50:58 +0300 Subject: [PATCH] flow improvements (#52) * flow improvements Signed-off-by: Vasiliy Tolstov --- flow/context.go | 34 ++++++ flow/default.go | 319 ++++++++++++++++++++++++++++++++++++++++++++++++ flow/flow.go | 48 +++++++- flow/options.go | 222 +++++++++++++++++++++++++++++++++ 4 files changed, 620 insertions(+), 3 deletions(-) create mode 100644 flow/context.go create mode 100644 flow/default.go create mode 100644 flow/options.go diff --git a/flow/context.go b/flow/context.go new file mode 100644 index 00000000..1387429f --- /dev/null +++ b/flow/context.go @@ -0,0 +1,34 @@ +package flow + +import ( + "context" +) + +type flowKey struct{} + +// FromContext returns Flow from context +func FromContext(ctx context.Context) (Flow, bool) { + if ctx == nil { + return nil, false + } + c, ok := ctx.Value(flowKey{}).(Flow) + return c, ok +} + +// NewContext stores Flow to context +func NewContext(ctx context.Context, f Flow) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, flowKey{}, f) +} + +// SetOption returns a function to setup a context with given value +func SetOption(k, v interface{}) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/flow/default.go b/flow/default.go new file mode 100644 index 00000000..e9c53a56 --- /dev/null +++ b/flow/default.go @@ -0,0 +1,319 @@ +package flow + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/silas/dag" + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/codec" +) + +type microFlow struct { + opts Options +} + +type microWorkflow struct { + id string + g *dag.AcyclicGraph + init bool + sync.RWMutex + opts Options + steps map[string]Step +} + +func (w *microWorkflow) ID() string { + return w.id +} + +func (w *microWorkflow) Steps() [][]Step { + return nil +} + +func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { + return nil +} + +func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { + return nil +} + +func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { + w.Lock() + if !w.init { + if err := w.g.Validate(); err != nil { + w.Unlock() + return "", err + } + w.g.TransitiveReduction() + w.init = true + } + w.Unlock() + + uid, err := uuid.NewRandom() + if err != nil { + return "", err + } + + options := NewExecuteOptions(opts...) + var steps [][]Step + fn := func(n dag.Vertex, idx int) error { + if idx == 0 { + steps = make([][]Step, 1) + steps[0] = make([]Step, 0, 1) + } else if idx >= len(steps) { + tsteps := make([][]Step, idx+1) + copy(tsteps, steps) + steps = tsteps + steps[idx] = make([]Step, 0, 1) + } + steps[idx] = append(steps[idx], n.(Step)) + return nil + } + + var root dag.Vertex + if options.Start != "" { + var ok bool + w.RLock() + root, ok = w.steps[options.Start] + w.RUnlock() + if !ok { + return "", ErrStepNotExists + } + } else { + root, err = w.g.Root() + if err != nil { + return "", err + } + } + if options.Reverse { + err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn) + } else { + err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn) + } + if err != nil { + return "", err + } + + var wg sync.WaitGroup + cherr := make(chan error, 1) + defer close(cherr) + + nctx, cancel := context.WithCancel(ctx) + defer cancel() + nopts := make([]ExecuteOption, 0, len(opts)+5) + nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store)) + + go func() { + for idx := range steps { + wg.Add(len(steps[idx])) + for nidx := range steps[idx] { + go func(step Step) { + defer wg.Done() + if err = step.Execute(nctx, req, nopts...); err != nil { + cherr <- err + cancel() + } + }(steps[idx][nidx]) + } + wg.Wait() + } + cherr <- nil + }() + + err = <-cherr + + return uid.String(), err +} + +func NewFlow(opts ...Option) Flow { + options := NewOptions(opts...) + return µFlow{opts: options} +} + +func (f *microFlow) Options() Options { + return f.opts +} + +func (f *microFlow) Init(opts ...Option) error { + for _, o := range opts { + o(&f.opts) + } + if err := f.opts.Client.Init(); err != nil { + return err + } + if err := f.opts.Tracer.Init(); err != nil { + return err + } + if err := f.opts.Logger.Init(); err != nil { + return err + } + if err := f.opts.Meter.Init(); err != nil { + return err + } + if err := f.opts.Store.Init(); err != nil { + return err + } + return nil +} + +func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { + return nil, nil +} + +func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { + w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))} + + for _, s := range steps { + w.steps[s.String()] = s + w.g.Add(s) + } + + for _, dst := range steps { + for _, req := range dst.Requires() { + src, ok := w.steps[req] + if !ok { + return nil, ErrStepNotExists + } + w.g.Connect(dag.BasicEdge(src, dst)) + } + } + + if err := w.g.Validate(); err != nil { + return nil, err + } + w.g.TransitiveReduction() + + w.init = true + + return w, nil +} + +func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error { + return nil +} + +func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error { + return nil +} + +func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) { + return nil, nil +} + +type microCallStep struct { + opts StepOptions + service string + method string +} + +func (s *microCallStep) ID() string { + return s.String() +} + +func (s *microCallStep) Options() StepOptions { + return s.opts +} + +func (s *microCallStep) Endpoint() string { + return s.method +} + +func (s *microCallStep) Requires() []string { + return s.opts.Requires +} + +func (s *microCallStep) Require(steps ...Step) error { + for _, step := range steps { + s.opts.Requires = append(s.opts.Requires, step.String()) + } + return nil +} + +func (s *microCallStep) String() string { + if s.opts.ID != "" { + return s.opts.ID + } + return fmt.Sprintf("%s.%s", s.service, s.method) +} + +func (s *microCallStep) Name() string { + return s.String() +} + +func (s *microCallStep) Hashcode() interface{} { + return s.String() +} + +func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + options := NewExecuteOptions(opts...) + if options.Client == nil { + return fmt.Errorf("client not set") + } + rsp := &codec.Frame{} + copts := []client.CallOption{client.WithRetries(0)} + if options.Timeout > 0 { + copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) + } + err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) + return err +} + +type microPublishStep struct { + opts StepOptions + topic string +} + +func (s *microPublishStep) ID() string { + return s.String() +} + +func (s *microPublishStep) Options() StepOptions { + return s.opts +} + +func (s *microPublishStep) Endpoint() string { + return s.topic +} + +func (s *microPublishStep) Requires() []string { + return s.opts.Requires +} + +func (s *microPublishStep) Require(steps ...Step) error { + for _, step := range steps { + s.opts.Requires = append(s.opts.Requires, step.String()) + } + return nil +} + +func (s *microPublishStep) String() string { + if s.opts.ID != "" { + return s.opts.ID + } + return fmt.Sprintf("%s", s.topic) +} + +func (s *microPublishStep) Name() string { + return s.String() +} + +func (s *microPublishStep) Hashcode() interface{} { + return s.String() +} + +func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + return nil +} + +func NewCallStep(service string, method string, opts ...StepOption) Step { + options := NewStepOptions(opts...) + return µCallStep{service: service, method: method, opts: options} +} + +func NewPublishStep(topic string, opts ...StepOption) Step { + options := NewStepOptions(opts...) + return µPublishStep{topic: topic, opts: options} +} diff --git a/flow/flow.go b/flow/flow.go index 6f411989..54f052df 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,17 +1,59 @@ // Package flow is an interface used for saga pattern microservice workflow package flow +import ( + "context" + "errors" +) + +var ( + ErrStepNotExists = errors.New("step not exists") +) + +// Step represents dedicated workflow step type Step interface { + // ID returns step id + ID() string // Endpoint returns rpc endpoint service_name.service_method or broker topic Endpoint() string + // Execute step run + Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error + // Requires returns dependent steps + Requires() []string + // Options returns step options + Options() StepOptions + // Require add required steps + Require(steps ...Step) error + // String + String() string } +// Workflow contains all steps to execute type Workflow interface { + // ID returns id of the workflow + ID() string + // Steps returns steps slice where parallel steps returned on the same level Steps() [][]Step - Stop() error + // Execute workflow with args, return execution id and error + Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) + // RemoveSteps remove steps from workflow + RemoveSteps(ctx context.Context, steps ...Step) error + // AppendSteps append steps to workflow + AppendSteps(ctx context.Context, steps ...Step) error } +// Flow the base interface to interact with workflows type Flow interface { - Start(Workflow) error - Stop(Workflow) + // Options returns options + Options() Options + // Init initialize + Init(...Option) error + // WorkflowCreate creates new workflow with specific id and steps + WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) + // WorkflowSave saves workflow + WorkflowSave(ctx context.Context, w Workflow) error + // WorkflowLoad loads workflow with specific id + WorkflowLoad(ctx context.Context, id string) (Workflow, error) + // WorkflowList lists all workflows + WorkflowList(ctx context.Context) ([]Workflow, error) } diff --git a/flow/options.go b/flow/options.go new file mode 100644 index 00000000..f3066311 --- /dev/null +++ b/flow/options.go @@ -0,0 +1,222 @@ +package flow + +import ( + "context" + "time" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/meter" + "github.com/unistack-org/micro/v3/store" + "github.com/unistack-org/micro/v3/tracer" +) + +// Option func +type Option func(*Options) + +// Options server struct +type Options struct { + // Context holds the external options and can be used for flow shutdown + Context context.Context + // Client holds the client.Client + Client client.Client + // Tracer holds the tracer + Tracer tracer.Tracer + // Logger holds the logger + Logger logger.Logger + // Meter holds the meter + Meter meter.Meter + // Store used for intermediate results + Store store.Store +} + +// NewOptions returns new options struct with default or passed values +func NewOptions(opts ...Option) Options { + options := Options{ + Context: context.Background(), + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, + Client: client.DefaultClient, + } + + for _, o := range opts { + o(&options) + } + + return options +} + +// Logger sets the logger option +func Logger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +// Meter sets the meter option +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + +// Client to use for sync/async communication +func Client(c client.Client) Option { + return func(o *Options) { + o.Client = c + } +} + +// Context specifies a context for the service. +// Can be used to signal shutdown of the flow +// Can be used for extra option values. +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +// Tracer mechanism for distributed tracking +func Tracer(t tracer.Tracer) Option { + return func(o *Options) { + o.Tracer = t + } +} + +// Store used for intermediate results +func Store(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// WorflowOption signature +type WorkflowOption func(*WorkflowOptions) + +// WorkflowOptions holds workflow options +type WorkflowOptions struct { + ID string + Context context.Context +} + +// WorkflowID set workflow id +func WorkflowID(id string) WorkflowOption { + return func(o *WorkflowOptions) { + o.ID = id + } +} + +type ExecuteOptions struct { + // Client holds the client.Client + Client client.Client + // Tracer holds the tracer + Tracer tracer.Tracer + // Logger holds the logger + Logger logger.Logger + // Meter holds the meter + Meter meter.Meter + // Store used for intermediate results + Store store.Store + // Context can be used to abort execution or pass additional opts + Context context.Context + // Start step + Start string + // Reverse execution + Reverse bool + // Timeout for execution + Timeout time.Duration +} + +type ExecuteOption func(*ExecuteOptions) + +func ExecuteClient(c client.Client) ExecuteOption { + return func(o *ExecuteOptions) { + o.Client = c + } +} + +func ExecuteTracer(t tracer.Tracer) ExecuteOption { + return func(o *ExecuteOptions) { + o.Tracer = t + } +} + +func ExecuteLogger(l logger.Logger) ExecuteOption { + return func(o *ExecuteOptions) { + o.Logger = l + } +} + +func ExecuteMeter(m meter.Meter) ExecuteOption { + return func(o *ExecuteOptions) { + o.Meter = m + } +} + +func ExecuteStore(s store.Store) ExecuteOption { + return func(o *ExecuteOptions) { + o.Store = s + } +} + +func ExecuteContext(ctx context.Context) ExecuteOption { + return func(o *ExecuteOptions) { + o.Context = ctx + } +} + +func ExecuteReverse(b bool) ExecuteOption { + return func(o *ExecuteOptions) { + o.Reverse = b + } +} + +func ExecuteTimeout(td time.Duration) ExecuteOption { + return func(o *ExecuteOptions) { + o.Timeout = td + } +} + +func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { + options := ExecuteOptions{} + for _, o := range opts { + o(&options) + } + return options +} + +type StepOptions struct { + ID string + Context context.Context + Requires []string + Fallback string +} + +type StepOption func(*StepOptions) + +func NewStepOptions(opts ...StepOption) StepOptions { + options := StepOptions{Context: context.Background()} + for _, o := range opts { + o(&options) + } + return options +} + +func StepID(id string) StepOption { + return func(o *StepOptions) { + o.ID = id + } +} + +func StepRequires(steps ...string) StepOption { + return func(o *StepOptions) { + o.Requires = steps + } +} + +func StepFallback(step string) StepOption { + return func(o *StepOptions) { + o.Fallback = step + } +}