flow: improve store

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-15 22:56:26 +03:00
parent b4e5d9462a
commit 10a09a5c6f
3 changed files with 250 additions and 37 deletions

View File

@ -3,6 +3,7 @@ package flow
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@ -10,6 +11,7 @@ import (
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/store" "github.com/unistack-org/micro/v3/store"
) )
@ -22,8 +24,9 @@ type microWorkflow struct {
g *dag.AcyclicGraph g *dag.AcyclicGraph
init bool init bool
sync.RWMutex sync.RWMutex
opts Options opts Options
steps map[string]Step steps map[string]Step
status Status
} }
func (w *microWorkflow) ID() string { func (w *microWorkflow) ID() string {
@ -34,7 +37,11 @@ func (w *microWorkflow) Steps() ([][]Step, error) {
return w.getSteps("", false) return w.getSteps("", false)
} }
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { func (w *microWorkflow) Status() Status {
return w.status
}
func (w *microWorkflow) AppendSteps(steps ...Step) error {
w.Lock() w.Lock()
for _, s := range steps { for _, s := range steps {
@ -64,7 +71,7 @@ func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
return nil return nil
} }
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { func (w *microWorkflow) RemoveSteps(steps ...Step) error {
// TODO: handle case when some step requires or required by removed step // TODO: handle case when some step requires or required by removed step
w.Lock() w.Lock()
@ -142,7 +149,8 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
return steps, nil return steps, nil
} }
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
w.Lock() w.Lock()
if !w.init { if !w.init {
if err := w.g.Validate(); err != nil { if err := w.g.Validate(); err != nil {
@ -158,10 +166,18 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
if err != nil { if err != nil {
return "", err return "", err
} }
eid := uid.String()
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
options := NewExecuteOptions(opts...) options := NewExecuteOptions(opts...)
steps, err := w.getSteps(options.Start, options.Reverse) steps, err := w.getSteps(options.Start, options.Reverse)
if err != nil { if err != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusPending")}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
return "", err return "", err
} }
@ -169,33 +185,119 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
cherr := make(chan error, 1) cherr := make(chan error, 1)
nctx, cancel := context.WithCancel(ctx) nctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5) nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts, nopts = append(nopts,
ExecuteClient(w.opts.Client), ExecuteClient(w.opts.Client),
ExecuteTracer(w.opts.Tracer), ExecuteTracer(w.opts.Tracer),
ExecuteLogger(w.opts.Logger), ExecuteLogger(w.opts.Logger),
ExecuteMeter(w.opts.Meter), ExecuteMeter(w.opts.Meter),
ExecuteStore(store.NewNamespaceStore(w.opts.Store, uid.String())),
) )
nopts = append(nopts, opts...) nopts = append(nopts, opts...)
done := make(chan struct{}) done := make(chan struct{})
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusRunning")}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
return eid, werr
}
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusPending")}); werr != nil {
return eid, werr
}
}
}
go func() { go func() {
for idx := range steps { for idx := range steps {
for nidx := range steps[idx] { for nidx := range steps[idx] {
if w.opts.Logger.V(logger.TraceLevel) { if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
} }
wg.Add(1) cstep := steps[idx][nidx]
go func(step Step) { if len(cstep.Requires()) == 0 {
defer wg.Done() wg.Add(1)
if serr := step.Execute(nctx, req, nopts...); serr != nil { go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil {
cherr <- werr
cancel()
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
cancel()
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
cancel()
return
}
}
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil {
cherr <- werr
cancel()
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr cherr <- serr
cancel() cancel()
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
cancel()
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil {
cherr <- werr
cancel()
return
}
} }
}(steps[idx][nidx]) }
} }
wg.Wait()
} }
close(done) close(done)
}() }()
@ -210,6 +312,24 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
close(cherr) close(cherr)
} }
switch {
case nctx.Err() != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusAborted")}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusFailure")}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
}
return uid.String(), err return uid.String(), err
} }
@ -292,6 +412,17 @@ type microCallStep struct {
opts StepOptions opts StepOptions
service string service string
method string method string
rsp *Message
req *Message
status Status
}
func (s *microCallStep) Request() *Message {
return s.req
}
func (s *microCallStep) Response() *Message {
return s.rsp
} }
func (s *microCallStep) ID() string { func (s *microCallStep) ID() string {
@ -332,23 +463,47 @@ func (s *microCallStep) Hashcode() interface{} {
return s.String() return s.String()
} }
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { func (s *microCallStep) GetStatus() Status {
return s.status
}
func (s *microCallStep) SetStatus(status Status) {
s.status = status
}
func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
options := NewExecuteOptions(opts...) options := NewExecuteOptions(opts...)
if options.Client == nil { if options.Client == nil {
return fmt.Errorf("client not set") return nil, ErrMissingClient
} }
rsp := &codec.Frame{} rsp := &codec.Frame{}
copts := []client.CallOption{client.WithRetries(0)} copts := []client.CallOption{client.WithRetries(0)}
if options.Timeout > 0 { if options.Timeout > 0 {
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
} }
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) nctx := metadata.NewOutgoingContext(ctx, req.Header)
return err err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
if err != nil {
return nil, err
}
md, _ := metadata.FromOutgoingContext(nctx)
return &Message{Header: md, Body: rsp.Data}, err
} }
type microPublishStep struct { type microPublishStep struct {
opts StepOptions opts StepOptions
topic string topic string
req *Message
rsp *Message
status Status
}
func (s *microPublishStep) Request() *Message {
return s.req
}
func (s *microPublishStep) Response() *Message {
return s.rsp
} }
func (s *microPublishStep) ID() string { func (s *microPublishStep) ID() string {
@ -389,8 +544,16 @@ func (s *microPublishStep) Hashcode() interface{} {
return s.String() return s.String()
} }
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { func (s *microPublishStep) GetStatus() Status {
return nil return s.status
}
func (s *microPublishStep) SetStatus(status Status) {
s.status = status
}
func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
return nil, nil
} }
func NewCallStep(service string, name string, method string, opts ...StepOption) Step { func NewCallStep(service string, name string, method string, opts ...StepOption) Step {

View File

@ -4,12 +4,41 @@ package flow
import ( import (
"context" "context"
"errors" "errors"
"github.com/unistack-org/micro/v3/metadata"
) )
var ( var (
ErrStepNotExists = errors.New("step not exists") ErrStepNotExists = errors.New("step not exists")
ErrMissingClient = errors.New("client not set")
) )
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}
type Message struct {
Header metadata.Metadata
Body RawMessage
}
// Step represents dedicated workflow step // Step represents dedicated workflow step
type Step interface { type Step interface {
// ID returns step id // ID returns step id
@ -17,7 +46,7 @@ type Step interface {
// Endpoint returns rpc endpoint service_name.service_method or broker topic // Endpoint returns rpc endpoint service_name.service_method or broker topic
Endpoint() string Endpoint() string
// Execute step run // Execute step run
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
// Requires returns dependent steps // Requires returns dependent steps
Requires() []string Requires() []string
// Options returns step options // Options returns step options
@ -26,20 +55,41 @@ type Step interface {
Require(steps ...Step) error Require(steps ...Step) error
// String // String
String() string String() string
// GetStatus returns step status
GetStatus() Status
// SetStatus sets the step status
SetStatus(Status)
// Request returns step request message
Request() *Message
// Response returns step response message
Response() *Message
} }
type Status int
const (
StatusPending Status = iota
StatusRunning
StatusFailure
StatusSuccess
StatusAborted
StatusSuspend
)
// Workflow contains all steps to execute // Workflow contains all steps to execute
type Workflow interface { type Workflow interface {
// ID returns id of the workflow // ID returns id of the workflow
ID() string ID() string
// Execute workflow with args, return execution id and error
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
// RemoveSteps remove steps from workflow
RemoveSteps(steps ...Step) error
// AppendSteps append steps to workflow
AppendSteps(steps ...Step) error
// Status returns workflow status
Status() Status
// Steps returns steps slice where parallel steps returned on the same level // Steps returns steps slice where parallel steps returned on the same level
Steps() ([][]Step, error) Steps() ([][]Step, error)
// Execute workflow with args, return execution id and error
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error)
// RemoveSteps remove steps from workflow
RemoveSteps(ctx context.Context, steps ...Step) error
// AppendSteps append steps to workflow
AppendSteps(ctx context.Context, steps ...Step) error
} }
// Flow the base interface to interact with workflows // Flow the base interface to interact with workflows

View File

@ -116,8 +116,6 @@ type ExecuteOptions struct {
Logger logger.Logger Logger logger.Logger
// Meter holds the meter // Meter holds the meter
Meter meter.Meter Meter meter.Meter
// Store used for intermediate results
Store store.Store
// Context can be used to abort execution or pass additional opts // Context can be used to abort execution or pass additional opts
Context context.Context Context context.Context
// Start step // Start step
@ -154,12 +152,6 @@ func ExecuteMeter(m meter.Meter) ExecuteOption {
} }
} }
func ExecuteStore(s store.Store) ExecuteOption {
return func(o *ExecuteOptions) {
o.Store = s
}
}
func ExecuteContext(ctx context.Context) ExecuteOption { func ExecuteContext(ctx context.Context) ExecuteOption {
return func(o *ExecuteOptions) { return func(o *ExecuteOptions) {
o.Context = ctx o.Context = ctx
@ -179,7 +171,13 @@ func ExecuteTimeout(td time.Duration) ExecuteOption {
} }
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
options := ExecuteOptions{} options := ExecuteOptions{
Client: client.DefaultClient,
Logger: logger.DefaultLogger,
Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter,
Context: context.Background(),
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@ -196,7 +194,9 @@ type StepOptions struct {
type StepOption func(*StepOptions) type StepOption func(*StepOptions)
func NewStepOptions(opts ...StepOption) StepOptions { func NewStepOptions(opts ...StepOption) StepOptions {
options := StepOptions{Context: context.Background()} options := StepOptions{
Context: context.Background(),
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }