From 10a09a5c6fa39a5729f60f8f7296e9264fa62e86 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 15 Jul 2021 22:56:26 +0300 Subject: [PATCH] flow: improve store Signed-off-by: Vasiliy Tolstov --- flow/default.go | 203 +++++++++++++++++++++++++++++++++++++++++++----- flow/flow.go | 64 +++++++++++++-- flow/options.go | 20 ++--- 3 files changed, 250 insertions(+), 37 deletions(-) diff --git a/flow/default.go b/flow/default.go index b1f98fb4..7a9c77e5 100644 --- a/flow/default.go +++ b/flow/default.go @@ -3,6 +3,7 @@ package flow import ( "context" "fmt" + "path/filepath" "sync" "github.com/google/uuid" @@ -10,6 +11,7 @@ import ( "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/store" ) @@ -22,8 +24,9 @@ type microWorkflow struct { g *dag.AcyclicGraph init bool sync.RWMutex - opts Options - steps map[string]Step + opts Options + steps map[string]Step + status Status } func (w *microWorkflow) ID() string { @@ -34,7 +37,11 @@ func (w *microWorkflow) Steps() ([][]Step, error) { return w.getSteps("", false) } -func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { +func (w *microWorkflow) Status() Status { + return w.status +} + +func (w *microWorkflow) AppendSteps(steps ...Step) error { w.Lock() for _, s := range steps { @@ -64,7 +71,7 @@ func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { return nil } -func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { +func (w *microWorkflow) RemoveSteps(steps ...Step) error { // TODO: handle case when some step requires or required by removed step w.Lock() @@ -142,7 +149,8 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) { return steps, nil } -func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { +func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { + w.Lock() if !w.init { if err := w.g.Validate(); err != nil { @@ -158,10 +166,18 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex if err != nil { return "", err } + eid := uid.String() + + stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) + workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) options := NewExecuteOptions(opts...) + steps, err := w.getSteps(options.Start, options.Reverse) if err != nil { + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusPending")}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } return "", err } @@ -169,33 +185,119 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex cherr := make(chan error, 1) nctx, cancel := context.WithCancel(ctx) + defer cancel() nopts := make([]ExecuteOption, 0, len(opts)+5) + nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), - ExecuteStore(store.NewNamespaceStore(w.opts.Store, uid.String())), ) nopts = append(nopts, opts...) done := make(chan struct{}) + + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusRunning")}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + return eid, werr + } + for idx := range steps { + for nidx := range steps[idx] { + cstep := steps[idx][nidx] + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusPending")}); werr != nil { + return eid, werr + } + } + } + go func() { for idx := range steps { for nidx := range steps[idx] { if w.opts.Logger.V(logger.TraceLevel) { w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) } - wg.Add(1) - go func(step Step) { - defer wg.Done() - if serr := step.Execute(nctx, req, nopts...); serr != nil { + cstep := steps[idx][nidx] + if len(cstep.Requires()) == 0 { + wg.Add(1) + go func(step Step) { + defer wg.Done() + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { + cherr <- werr + cancel() + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil { + cherr <- werr + cancel() + return + } + rsp, serr := step.Execute(nctx, req, nopts...) + if serr != nil { + step.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + cherr <- serr + cancel() + return + } else { + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + cancel() + return + } + if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + cancel() + return + } + } + }(cstep) + wg.Wait() + } else { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { + cherr <- werr + cancel() + return + } + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusRunning")}); werr != nil { + cherr <- werr + cancel() + return + } + rsp, serr := cstep.Execute(nctx, req, nopts...) + if serr != nil { + cstep.SetStatus(StatusFailure) + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusFailure")}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + } cherr <- serr cancel() + return + } else { + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { + w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + cherr <- werr + cancel() + return + } + if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil { + cherr <- werr + cancel() + return + } } - }(steps[idx][nidx]) + } } - wg.Wait() } close(done) }() @@ -210,6 +312,24 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex close(cherr) } + switch { + case nctx.Err() != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusAborted")}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + break + case err == nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusSuccess")}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + break + case err != nil: + if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte("StatusFailure")}); werr != nil { + w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + } + break + } + return uid.String(), err } @@ -292,6 +412,17 @@ type microCallStep struct { opts StepOptions service string method string + rsp *Message + req *Message + status Status +} + +func (s *microCallStep) Request() *Message { + return s.req +} + +func (s *microCallStep) Response() *Message { + return s.rsp } func (s *microCallStep) ID() string { @@ -332,23 +463,47 @@ func (s *microCallStep) Hashcode() interface{} { return s.String() } -func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { +func (s *microCallStep) GetStatus() Status { + return s.status +} + +func (s *microCallStep) SetStatus(status Status) { + s.status = status +} + +func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) { options := NewExecuteOptions(opts...) if options.Client == nil { - return fmt.Errorf("client not set") + return nil, ErrMissingClient } rsp := &codec.Frame{} copts := []client.CallOption{client.WithRetries(0)} if options.Timeout > 0 { copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) } - err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) - return err + nctx := metadata.NewOutgoingContext(ctx, req.Header) + err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp) + if err != nil { + return nil, err + } + md, _ := metadata.FromOutgoingContext(nctx) + return &Message{Header: md, Body: rsp.Data}, err } type microPublishStep struct { - opts StepOptions - topic string + opts StepOptions + topic string + req *Message + rsp *Message + status Status +} + +func (s *microPublishStep) Request() *Message { + return s.req +} + +func (s *microPublishStep) Response() *Message { + return s.rsp } func (s *microPublishStep) ID() string { @@ -389,8 +544,16 @@ func (s *microPublishStep) Hashcode() interface{} { return s.String() } -func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { - return nil +func (s *microPublishStep) GetStatus() Status { + return s.status +} + +func (s *microPublishStep) SetStatus(status Status) { + s.status = status +} + +func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) { + return nil, nil } func NewCallStep(service string, name string, method string, opts ...StepOption) Step { diff --git a/flow/flow.go b/flow/flow.go index 72773b5a..6b8bb3cb 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -4,12 +4,41 @@ package flow import ( "context" "errors" + + "github.com/unistack-org/micro/v3/metadata" ) var ( ErrStepNotExists = errors.New("step not exists") + ErrMissingClient = errors.New("client not set") ) +// RawMessage is a raw encoded JSON value. +// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding. +type RawMessage []byte + +// MarshalJSON returns m as the JSON encoding of m. +func (m *RawMessage) MarshalJSON() ([]byte, error) { + if m == nil { + return []byte("null"), nil + } + return *m, nil +} + +// UnmarshalJSON sets *m to a copy of data. +func (m *RawMessage) UnmarshalJSON(data []byte) error { + if m == nil { + return errors.New("RawMessage UnmarshalJSON on nil pointer") + } + *m = append((*m)[0:0], data...) + return nil +} + +type Message struct { + Header metadata.Metadata + Body RawMessage +} + // Step represents dedicated workflow step type Step interface { // ID returns step id @@ -17,7 +46,7 @@ type Step interface { // Endpoint returns rpc endpoint service_name.service_method or broker topic Endpoint() string // Execute step run - Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error + Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) // Requires returns dependent steps Requires() []string // Options returns step options @@ -26,20 +55,41 @@ type Step interface { Require(steps ...Step) error // String String() string + // GetStatus returns step status + GetStatus() Status + // SetStatus sets the step status + SetStatus(Status) + // Request returns step request message + Request() *Message + // Response returns step response message + Response() *Message } +type Status int + +const ( + StatusPending Status = iota + StatusRunning + StatusFailure + StatusSuccess + StatusAborted + StatusSuspend +) + // Workflow contains all steps to execute type Workflow interface { // ID returns id of the workflow ID() string + // Execute workflow with args, return execution id and error + Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) + // RemoveSteps remove steps from workflow + RemoveSteps(steps ...Step) error + // AppendSteps append steps to workflow + AppendSteps(steps ...Step) error + // Status returns workflow status + Status() Status // Steps returns steps slice where parallel steps returned on the same level Steps() ([][]Step, error) - // Execute workflow with args, return execution id and error - Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) - // RemoveSteps remove steps from workflow - RemoveSteps(ctx context.Context, steps ...Step) error - // AppendSteps append steps to workflow - AppendSteps(ctx context.Context, steps ...Step) error } // Flow the base interface to interact with workflows diff --git a/flow/options.go b/flow/options.go index f3066311..f577fe41 100644 --- a/flow/options.go +++ b/flow/options.go @@ -116,8 +116,6 @@ type ExecuteOptions struct { Logger logger.Logger // Meter holds the meter Meter meter.Meter - // Store used for intermediate results - Store store.Store // Context can be used to abort execution or pass additional opts Context context.Context // Start step @@ -154,12 +152,6 @@ func ExecuteMeter(m meter.Meter) ExecuteOption { } } -func ExecuteStore(s store.Store) ExecuteOption { - return func(o *ExecuteOptions) { - o.Store = s - } -} - func ExecuteContext(ctx context.Context) ExecuteOption { return func(o *ExecuteOptions) { o.Context = ctx @@ -179,7 +171,13 @@ func ExecuteTimeout(td time.Duration) ExecuteOption { } func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { - options := ExecuteOptions{} + options := ExecuteOptions{ + Client: client.DefaultClient, + Logger: logger.DefaultLogger, + Tracer: tracer.DefaultTracer, + Meter: meter.DefaultMeter, + Context: context.Background(), + } for _, o := range opts { o(&options) } @@ -196,7 +194,9 @@ type StepOptions struct { type StepOption func(*StepOptions) func NewStepOptions(opts ...StepOption) StepOptions { - options := StepOptions{Context: context.Background()} + options := StepOptions{ + Context: context.Background(), + } for _, o := range opts { o(&options) }