From 53c5455909dfae24bd84a44c8bcd6a9f8534d535 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 28 Jun 2021 22:39:50 +0300 Subject: [PATCH] new steps Signed-off-by: Vasiliy Tolstov --- flow/context.go | 34 ++++++++ flow/default.go | 222 ++++++++++++++++++++++++++++++++++++++++++++++++ flow/flow.go | 54 +++++++++++- flow/options.go | 121 ++++++++++++++++++++++++++ 4 files changed, 428 insertions(+), 3 deletions(-) create mode 100644 flow/context.go create mode 100644 flow/default.go create mode 100644 flow/options.go diff --git a/flow/context.go b/flow/context.go new file mode 100644 index 00000000..1387429f --- /dev/null +++ b/flow/context.go @@ -0,0 +1,34 @@ +package flow + +import ( + "context" +) + +type flowKey struct{} + +// FromContext returns Flow from context +func FromContext(ctx context.Context) (Flow, bool) { + if ctx == nil { + return nil, false + } + c, ok := ctx.Value(flowKey{}).(Flow) + return c, ok +} + +// NewContext stores Flow to context +func NewContext(ctx context.Context, f Flow) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, flowKey{}, f) +} + +// SetOption returns a function to setup a context with given value +func SetOption(k, v interface{}) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/flow/default.go b/flow/default.go new file mode 100644 index 00000000..66786adf --- /dev/null +++ b/flow/default.go @@ -0,0 +1,222 @@ +package flow + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/silas/dag" +) + +type microFlow struct { + opts Options +} + +type microWorkflow struct { + id string + g *dag.AcyclicGraph + init bool + sync.RWMutex +} + +func (w *microWorkflow) ID() string { + return w.id +} + +func (w *microWorkflow) Steps() [][]Step { + return nil +} + +func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { + return nil +} + +func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { + return nil +} + +func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { + w.Lock() + if !w.init { + if err := w.g.Validate(); err != nil { + w.Unlock() + return "", err + } + w.g.TransitiveReduction() + w.init = true + } + w.Unlock() + + uid, err := uuid.NewRandom() + if err != nil { + return "", err + } + + var steps [][]string + fn := func(n dag.Vertex, idx int) error { + if idx == 0 { + steps = make([][]string, 1) + steps[0] = make([]string, 0, 1) + } else if idx >= len(steps) { + tsteps := make([][]string, idx+1) + copy(tsteps, steps) + steps = tsteps + steps[idx] = make([]string, 0, 1) + } + steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) + return nil + } + + w.RLock() + err = w.g.SortedDepthFirstWalk([]dag.Vertex{start}, fn) + w.RUnlock() + + if err != nil { + return "", err + } + + return uid.String(), nil +} + +func NewFlow(opts ...Option) Flow { + options := NewOptions(opts...) + return µFlow{opts: options} +} + +func (f *microFlow) Options() Options { + return f.opts +} + +func (f *microFlow) Init(opts ...Option) error { + for _, o := range opts { + o(&f.opts) + } + if err := f.opts.Client.Init(); err != nil { + return err + } + if err := f.opts.Tracer.Init(); err != nil { + return err + } + if err := f.opts.Logger.Init(); err != nil { + return err + } + if err := f.opts.Meter.Init(); err != nil { + return err + } + if err := f.opts.Store.Init(); err != nil { + return err + } + return nil +} + +func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { + return nil, nil +} + +func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { + w := µWorkflow{id: id, g: &dag.AcyclicGraph{}} + + for _, s := range steps { + w.g.Add(s.Options().ID) + } + for _, s := range steps { + for _, req := range s.Requires() { + w.g.Connect(dag.BasicEdge(s, req)) + } + } + + if err := w.g.Validate(); err != nil { + return nil, err + } + w.g.TransitiveReduction() + + w.init = true + + return w, nil +} + +func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error { + return nil +} + +func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error { + return nil +} + +func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) { + return nil, nil +} + +type microCallStep struct { + opts StepOptions + service string + method string + requires []Step +} + +func (s *microCallStep) ID() string { + return s.opts.ID +} + +func (s *microCallStep) Options() StepOptions { + return s.opts +} + +func (s *microCallStep) Endpoint() string { + return s.method +} + +func (s *microCallStep) Requires() []Step { + return s.requires +} + +func (s *microCallStep) Require(steps ...Step) error { + s.requires = append(s.requires, steps...) + return nil +} + +func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + return nil +} + +type microPublishStep struct { + opts StepOptions + topic string + requires []Step +} + +func (s *microPublishStep) ID() string { + return s.opts.ID +} + +func (s *microPublishStep) Options() StepOptions { + return s.opts +} + +func (s *microPublishStep) Endpoint() string { + return s.topic +} + +func (s *microPublishStep) Requires() []Step { + return s.requires +} + +func (s *microPublishStep) Require(steps ...Step) error { + s.requires = append(s.requires, steps...) + return nil +} + +func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { + return nil +} + +func NewCallStep(service string, method string, opts ...StepOption) Step { + options := NewStepOptions(opts...) + return µCallStep{service: service, method: method, opts: options} +} + +func NewPublishStep(topic string, opts ...StepOption) Step { + options := NewStepOptions(opts...) + return µPublishStep{topic: topic, opts: options} +} diff --git a/flow/flow.go b/flow/flow.go index 6f411989..6152d16c 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,17 +1,65 @@ // Package flow is an interface used for saga pattern microservice workflow package flow +import ( + "context" + + "github.com/google/uuid" +) + +// Step represents dedicated workflow step type Step interface { + // ID returns step id + ID() string // Endpoint returns rpc endpoint service_name.service_method or broker topic Endpoint() string + // Execute step run + Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error + // Requires returns dependent steps + Requires() []Step + // Options returns step options + Options() StepOptions + // Require add required steps + Require(steps ...Step) error } +func NewStepOptions(opts ...StepOption) StepOptions { + options := StepOptions{} + for _, o := range opts { + o(&options) + } + if options.ID == "" { + options.ID = uuid.New().String() + } + return options +} + +// Workflow contains all steps to execute type Workflow interface { + // ID returns id of the workflow + ID() string + // Steps returns steps slice where parallel steps returned on the same level Steps() [][]Step - Stop() error + // Execute workflow with args, return execution id and error + Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) + // RemoveSteps remove steps from workflow + RemoveSteps(ctx context.Context, steps ...Step) error + // AppendSteps append steps to workflow + AppendSteps(ctx context.Context, steps ...Step) error } +// Flow the base interface to interact with workflows type Flow interface { - Start(Workflow) error - Stop(Workflow) + // Options returns options + Options() Options + // Init initialize + Init(...Option) error + // WorkflowCreate creates new workflow with specific id and steps + WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) + // WorkflowSave saves workflow + WorkflowSave(ctx context.Context, w Workflow) error + // WorkflowLoad loads workflow with specific id + WorkflowLoad(ctx context.Context, id string) (Workflow, error) + // WorkflowList lists all workflows + WorkflowList(ctx context.Context) ([]Workflow, error) } diff --git a/flow/options.go b/flow/options.go new file mode 100644 index 00000000..e53561a0 --- /dev/null +++ b/flow/options.go @@ -0,0 +1,121 @@ +package flow + +import ( + "context" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/meter" + "github.com/unistack-org/micro/v3/store" + "github.com/unistack-org/micro/v3/tracer" +) + +// Option func +type Option func(*Options) + +// Options server struct +type Options struct { + // Context holds the external options and can be used for flow shutdown + Context context.Context + // Client holds the client.Client + Client client.Client + // Tracer holds the tracer + Tracer tracer.Tracer + // Logger holds the logger + Logger logger.Logger + // Meter holds the meter + Meter meter.Meter + // Store used for intermediate results + Store store.Store +} + +// NewOptions returns new options struct with default or passed values +func NewOptions(opts ...Option) Options { + options := Options{ + Context: context.Background(), + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, + Client: client.DefaultClient, + } + + for _, o := range opts { + o(&options) + } + + return options +} + +// Logger sets the logger option +func Logger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +// Meter sets the meter option +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + +// Client to use for sync/async communication +func Client(c client.Client) Option { + return func(o *Options) { + o.Client = c + } +} + +// Context specifies a context for the service. +// Can be used to signal shutdown of the flow +// Can be used for extra option values. +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +// Tracer mechanism for distributed tracking +func Tracer(t tracer.Tracer) Option { + return func(o *Options) { + o.Tracer = t + } +} + +// Store used for intermediate results +func Store(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// WorflowOption signature +type WorkflowOption func(*WorkflowOptions) + +// WorkflowOptions holds workflow options +type WorkflowOptions struct { + ID string + Context context.Context +} + +// WorkflowID set workflow id +func WorkflowID(id string) WorkflowOption { + return func(o *WorkflowOptions) { + o.ID = id + } +} + +type ExecuteOptions struct { + Context context.Context +} + +type ExecuteOption func(*ExecuteOptions) + +type StepOptions struct { + ID string + Name string + Context context.Context +} + +type StepOption func(*StepOptions)