flow improvements #52

Merged
vtolstov merged 4 commits from flow into master 2021-06-30 17:50:58 +03:00
2 changed files with 91 additions and 16 deletions
Showing only changes of commit 9fd8f581ac - Show all commits

View File

@ -4,10 +4,11 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/silas/dag"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
)
type microFlow struct {
@ -87,33 +88,44 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
return "", err
}
}
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
if options.Reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil {
return "", err
}
var wg sync.WaitGroup
cherr := make(chan error)
cherr := make(chan error, 1)
defer close(cherr)
select {
case err = <-cherr:
return "", err
default:
nctx, cancel := context.WithCancel(ctx)
defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
go func() {
for idx := range steps {
wg.Add(len(steps[idx]))
for nidx := range steps[idx] {
go func(step Step) {
if err = step.Execute(ctx, req, opts...); err != nil {
defer wg.Done()
if err = step.Execute(nctx, req, nopts...); err != nil {
cherr <- err
cancel()
}
wg.Done()
}(steps[idx][nidx])
}
wg.Wait()
}
}
cherr <- nil
}()
return uid.String(), nil
err = <-cherr
return uid.String(), err
}
func NewFlow(opts ...Option) Flow {
@ -236,9 +248,17 @@ func (s *microCallStep) Hashcode() interface{} {
}
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
fmt.Printf("execute %s with %#v\n", s.String(), req)
time.Sleep(1 * time.Second)
return nil
options := NewExecuteOptions(opts...)
if options.Client == nil {
return fmt.Errorf("client not set")
}
rsp := &codec.Frame{}
copts := []client.CallOption{client.WithRetries(0)}
if options.Timeout > 0 {
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
}
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp)
return err
}
type microPublishStep struct {

View File

@ -2,6 +2,7 @@ package flow
import (
"context"
"time"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger"
@ -116,13 +117,67 @@ type ExecuteOptions struct {
// Meter holds the meter
Meter meter.Meter
// Store used for intermediate results
Store store.Store
Store store.Store
// Context can be used to abort execution or pass additional opts
Context context.Context
Start string
// Start step
Start string
// Reverse execution
Reverse bool
// Timeout for execution
Timeout time.Duration
}
type ExecuteOption func(*ExecuteOptions)
func ExecuteClient(c client.Client) ExecuteOption {
return func(o *ExecuteOptions) {
o.Client = c
}
}
func ExecuteTracer(t tracer.Tracer) ExecuteOption {
return func(o *ExecuteOptions) {
o.Tracer = t
}
}
func ExecuteLogger(l logger.Logger) ExecuteOption {
return func(o *ExecuteOptions) {
o.Logger = l
}
}
func ExecuteMeter(m meter.Meter) ExecuteOption {
return func(o *ExecuteOptions) {
o.Meter = m
}
}
func ExecuteStore(s store.Store) ExecuteOption {
return func(o *ExecuteOptions) {
o.Store = s
}
}
func ExecuteContext(ctx context.Context) ExecuteOption {
return func(o *ExecuteOptions) {
o.Context = ctx
}
}
func ExecuteReverse(b bool) ExecuteOption {
return func(o *ExecuteOptions) {
o.Reverse = b
}
}
func ExecuteTimeout(td time.Duration) ExecuteOption {
return func(o *ExecuteOptions) {
o.Timeout = td
}
}
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
options := ExecuteOptions{}
for _, o := range opts {