flow improvements #52

Merged
vtolstov merged 4 commits from flow into master 2021-06-30 17:50:58 +03:00
3 changed files with 156 additions and 46 deletions
Showing only changes of commit f22647d108 - Show all commits

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/silas/dag" "github.com/silas/dag"
@ -18,6 +19,8 @@ type microWorkflow struct {
g *dag.AcyclicGraph g *dag.AcyclicGraph
init bool init bool
sync.RWMutex sync.RWMutex
opts Options
steps map[string]Step
} }
func (w *microWorkflow) ID() string { func (w *microWorkflow) ID() string {
@ -53,29 +56,63 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
return "", err return "", err
} }
var steps [][]string options := NewExecuteOptions(opts...)
var steps [][]Step
fn := func(n dag.Vertex, idx int) error { fn := func(n dag.Vertex, idx int) error {
if idx == 0 { if idx == 0 {
steps = make([][]string, 1) steps = make([][]Step, 1)
steps[0] = make([]string, 0, 1) steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) { } else if idx >= len(steps) {
tsteps := make([][]string, idx+1) tsteps := make([][]Step, idx+1)
copy(tsteps, steps) copy(tsteps, steps)
steps = tsteps steps = tsteps
steps[idx] = make([]string, 0, 1) steps[idx] = make([]Step, 0, 1)
} }
steps[idx] = append(steps[idx], fmt.Sprintf("%s", n)) steps[idx] = append(steps[idx], n.(Step))
return nil return nil
} }
w.RLock() var root dag.Vertex
err = w.g.SortedDepthFirstWalk([]dag.Vertex{start}, fn) if options.Start != "" {
w.RUnlock() var ok bool
w.RLock()
root, ok = w.steps[options.Start]
w.RUnlock()
if !ok {
return "", ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return "", err
}
}
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
if err != nil { if err != nil {
return "", err return "", err
} }
var wg sync.WaitGroup
cherr := make(chan error)
select {
case err = <-cherr:
return "", err
default:
for idx := range steps {
wg.Add(len(steps[idx]))
for nidx := range steps[idx] {
go func(step Step) {
if err = step.Execute(ctx, req, opts...); err != nil {
cherr <- err
}
wg.Done()
}(steps[idx][nidx])
}
wg.Wait()
}
}
return uid.String(), nil return uid.String(), nil
} }
@ -115,14 +152,20 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
} }
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
w := &microWorkflow{id: id, g: &dag.AcyclicGraph{}} w := &microWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
for _, s := range steps { for _, s := range steps {
w.g.Add(s.Options().ID) w.steps[s.String()] = s
w.g.Add(s)
} }
for _, s := range steps {
for _, req := range s.Requires() { for _, dst := range steps {
w.g.Connect(dag.BasicEdge(s, req)) for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return nil, ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
} }
} }
@ -149,14 +192,13 @@ func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, erro
} }
type microCallStep struct { type microCallStep struct {
opts StepOptions opts StepOptions
service string service string
method string method string
requires []Step
} }
func (s *microCallStep) ID() string { func (s *microCallStep) ID() string {
return s.opts.ID return s.String()
} }
func (s *microCallStep) Options() StepOptions { func (s *microCallStep) Options() StepOptions {
@ -167,27 +209,45 @@ func (s *microCallStep) Endpoint() string {
return s.method return s.method
} }
func (s *microCallStep) Requires() []Step { func (s *microCallStep) Requires() []string {
return s.requires return s.opts.Requires
} }
func (s *microCallStep) Require(steps ...Step) error { func (s *microCallStep) Require(steps ...Step) error {
s.requires = append(s.requires, steps...) for _, step := range steps {
s.opts.Requires = append(s.opts.Requires, step.String())
}
return nil return nil
} }
func (s *microCallStep) String() string {
if s.opts.ID != "" {
return s.opts.ID
}
return fmt.Sprintf("%s.%s", s.service, s.method)
}
func (s *microCallStep) Name() string {
return s.String()
}
func (s *microCallStep) Hashcode() interface{} {
return s.String()
}
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
fmt.Printf("execute %s with %#v\n", s.String(), req)
time.Sleep(1 * time.Second)
return nil return nil
} }
type microPublishStep struct { type microPublishStep struct {
opts StepOptions opts StepOptions
topic string topic string
requires []Step
} }
func (s *microPublishStep) ID() string { func (s *microPublishStep) ID() string {
return s.opts.ID return s.String()
} }
func (s *microPublishStep) Options() StepOptions { func (s *microPublishStep) Options() StepOptions {
@ -198,15 +258,32 @@ func (s *microPublishStep) Endpoint() string {
return s.topic return s.topic
} }
func (s *microPublishStep) Requires() []Step { func (s *microPublishStep) Requires() []string {
return s.requires return s.opts.Requires
} }
func (s *microPublishStep) Require(steps ...Step) error { func (s *microPublishStep) Require(steps ...Step) error {
s.requires = append(s.requires, steps...) for _, step := range steps {
s.opts.Requires = append(s.opts.Requires, step.String())
}
return nil return nil
} }
func (s *microPublishStep) String() string {
if s.opts.ID != "" {
return s.opts.ID
}
return fmt.Sprintf("%s", s.topic)
}
func (s *microPublishStep) Name() string {
return s.String()
}
func (s *microPublishStep) Hashcode() interface{} {
return s.String()
}
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
return nil return nil
} }

View File

@ -3,8 +3,11 @@ package flow
import ( import (
"context" "context"
"errors"
)
"github.com/google/uuid" var (
ErrStepNotExists = errors.New("step not exists")
) )
// Step represents dedicated workflow step // Step represents dedicated workflow step
@ -16,22 +19,13 @@ type Step interface {
// Execute step run // Execute step run
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error
// Requires returns dependent steps // Requires returns dependent steps
Requires() []Step Requires() []string
// Options returns step options // Options returns step options
Options() StepOptions Options() StepOptions
// Require add required steps // Require add required steps
Require(steps ...Step) error Require(steps ...Step) error
} // String
String() string
func NewStepOptions(opts ...StepOption) StepOptions {
options := StepOptions{}
for _, o := range opts {
o(&options)
}
if options.ID == "" {
options.ID = uuid.New().String()
}
return options
} }
// Workflow contains all steps to execute // Workflow contains all steps to execute

View File

@ -107,15 +107,54 @@ func WorkflowID(id string) WorkflowOption {
} }
type ExecuteOptions struct { type ExecuteOptions struct {
// Client holds the client.Client
Client client.Client
// Tracer holds the tracer
Tracer tracer.Tracer
// Logger holds the logger
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
// Store used for intermediate results
Store store.Store
Context context.Context Context context.Context
Start string
} }
type ExecuteOption func(*ExecuteOptions) type ExecuteOption func(*ExecuteOptions)
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
options := ExecuteOptions{}
for _, o := range opts {
o(&options)
}
return options
}
type StepOptions struct { type StepOptions struct {
ID string ID string
Name string Context context.Context
Context context.Context Requires []string
} }
type StepOption func(*StepOptions) type StepOption func(*StepOptions)
func NewStepOptions(opts ...StepOption) StepOptions {
options := StepOptions{Context: context.Background()}
for _, o := range opts {
o(&options)
}
return options
}
func StepID(id string) StepOption {
return func(o *StepOptions) {
o.ID = id
}
}
func StepRequires(steps ...string) StepOption {
return func(o *StepOptions) {
o.Requires = steps
}
}