589 lines
14 KiB
Go
589 lines
14 KiB
Go
package flow
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/silas/dag"
|
|
"github.com/unistack-org/micro/v3/client"
|
|
"github.com/unistack-org/micro/v3/codec"
|
|
"github.com/unistack-org/micro/v3/logger"
|
|
"github.com/unistack-org/micro/v3/metadata"
|
|
"github.com/unistack-org/micro/v3/store"
|
|
)
|
|
|
|
type microFlow struct {
|
|
opts Options
|
|
}
|
|
|
|
type microWorkflow struct {
|
|
id string
|
|
g *dag.AcyclicGraph
|
|
init bool
|
|
sync.RWMutex
|
|
opts Options
|
|
steps map[string]Step
|
|
status Status
|
|
}
|
|
|
|
func (w *microWorkflow) ID() string {
|
|
return w.id
|
|
}
|
|
|
|
func (w *microWorkflow) Steps() ([][]Step, error) {
|
|
return w.getSteps("", false)
|
|
}
|
|
|
|
func (w *microWorkflow) Status() Status {
|
|
return w.status
|
|
}
|
|
|
|
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
|
w.Lock()
|
|
|
|
for _, s := range steps {
|
|
w.steps[s.String()] = s
|
|
w.g.Add(s)
|
|
}
|
|
|
|
for _, dst := range steps {
|
|
for _, req := range dst.Requires() {
|
|
src, ok := w.steps[req]
|
|
if !ok {
|
|
return ErrStepNotExists
|
|
}
|
|
w.g.Connect(dag.BasicEdge(src, dst))
|
|
}
|
|
}
|
|
|
|
if err := w.g.Validate(); err != nil {
|
|
w.Unlock()
|
|
return err
|
|
}
|
|
|
|
w.g.TransitiveReduction()
|
|
|
|
w.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
|
// TODO: handle case when some step requires or required by removed step
|
|
|
|
w.Lock()
|
|
|
|
for _, s := range steps {
|
|
delete(w.steps, s.String())
|
|
w.g.Remove(s)
|
|
}
|
|
|
|
for _, dst := range steps {
|
|
for _, req := range dst.Requires() {
|
|
src, ok := w.steps[req]
|
|
if !ok {
|
|
return ErrStepNotExists
|
|
}
|
|
w.g.Connect(dag.BasicEdge(src, dst))
|
|
}
|
|
}
|
|
|
|
if err := w.g.Validate(); err != nil {
|
|
w.Unlock()
|
|
return err
|
|
}
|
|
|
|
w.g.TransitiveReduction()
|
|
|
|
w.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
|
|
var steps [][]Step
|
|
var root dag.Vertex
|
|
var err error
|
|
|
|
fn := func(n dag.Vertex, idx int) error {
|
|
if idx == 0 {
|
|
steps = make([][]Step, 1)
|
|
steps[0] = make([]Step, 0, 1)
|
|
} else if idx >= len(steps) {
|
|
tsteps := make([][]Step, idx+1)
|
|
copy(tsteps, steps)
|
|
steps = tsteps
|
|
steps[idx] = make([]Step, 0, 1)
|
|
}
|
|
steps[idx] = append(steps[idx], n.(Step))
|
|
return nil
|
|
}
|
|
|
|
if start != "" {
|
|
var ok bool
|
|
w.RLock()
|
|
root, ok = w.steps[start]
|
|
w.RUnlock()
|
|
if !ok {
|
|
return nil, ErrStepNotExists
|
|
}
|
|
} else {
|
|
root, err = w.g.Root()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if reverse {
|
|
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
|
} else {
|
|
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return steps, nil
|
|
}
|
|
|
|
func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
|
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
|
}
|
|
|
|
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
|
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
|
|
}
|
|
|
|
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
|
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
|
|
}
|
|
|
|
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
|
|
w.Lock()
|
|
if !w.init {
|
|
if err := w.g.Validate(); err != nil {
|
|
w.Unlock()
|
|
return "", err
|
|
}
|
|
w.g.TransitiveReduction()
|
|
w.init = true
|
|
}
|
|
w.Unlock()
|
|
|
|
uid, err := uuid.NewRandom()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
eid := uid.String()
|
|
|
|
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
|
|
|
options := NewExecuteOptions(opts...)
|
|
|
|
steps, err := w.getSteps(options.Start, options.Reverse)
|
|
if err != nil {
|
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
cherr := make(chan error, 1)
|
|
chstatus := make(chan Status, 1)
|
|
|
|
nctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
|
|
|
nopts = append(nopts,
|
|
ExecuteClient(w.opts.Client),
|
|
ExecuteTracer(w.opts.Tracer),
|
|
ExecuteLogger(w.opts.Logger),
|
|
ExecuteMeter(w.opts.Meter),
|
|
)
|
|
nopts = append(nopts, opts...)
|
|
done := make(chan struct{})
|
|
|
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
|
return eid, werr
|
|
}
|
|
for idx := range steps {
|
|
for nidx := range steps[idx] {
|
|
cstep := steps[idx][nidx]
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
|
return eid, werr
|
|
}
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
for idx := range steps {
|
|
for nidx := range steps[idx] {
|
|
wStatus := &codec.Frame{}
|
|
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
|
|
cherr <- werr
|
|
return
|
|
}
|
|
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
|
|
chstatus <- status
|
|
return
|
|
}
|
|
if w.opts.Logger.V(logger.TraceLevel) {
|
|
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
|
|
}
|
|
cstep := steps[idx][nidx]
|
|
if len(cstep.Requires()) == 0 {
|
|
wg.Add(1)
|
|
go func(step Step) {
|
|
defer wg.Done()
|
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
|
|
cherr <- werr
|
|
return
|
|
}
|
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
|
cherr <- werr
|
|
return
|
|
}
|
|
rsp, serr := step.Execute(nctx, req, nopts...)
|
|
if serr != nil {
|
|
step.SetStatus(StatusFailure)
|
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
}
|
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
}
|
|
cherr <- serr
|
|
return
|
|
} else {
|
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
cherr <- werr
|
|
return
|
|
}
|
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
cherr <- werr
|
|
return
|
|
}
|
|
}
|
|
}(cstep)
|
|
wg.Wait()
|
|
} else {
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
|
|
cherr <- werr
|
|
return
|
|
}
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
|
cherr <- werr
|
|
return
|
|
}
|
|
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
|
if serr != nil {
|
|
cstep.SetStatus(StatusFailure)
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
}
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
}
|
|
cherr <- serr
|
|
return
|
|
} else {
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
|
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
|
cherr <- werr
|
|
return
|
|
}
|
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
|
cherr <- werr
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
if options.Async {
|
|
return eid, nil
|
|
}
|
|
|
|
logger.Tracef(ctx, "wait for finish or error")
|
|
select {
|
|
case <-nctx.Done():
|
|
err = nctx.Err()
|
|
case cerr := <-cherr:
|
|
err = cerr
|
|
case <-done:
|
|
close(cherr)
|
|
case <-chstatus:
|
|
close(chstatus)
|
|
return uid.String(), nil
|
|
}
|
|
|
|
switch {
|
|
case nctx.Err() != nil:
|
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
|
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
|
}
|
|
break
|
|
case err == nil:
|
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
|
}
|
|
break
|
|
case err != nil:
|
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
|
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
|
}
|
|
break
|
|
}
|
|
|
|
return uid.String(), err
|
|
}
|
|
|
|
func NewFlow(opts ...Option) Flow {
|
|
options := NewOptions(opts...)
|
|
return µFlow{opts: options}
|
|
}
|
|
|
|
func (f *microFlow) Options() Options {
|
|
return f.opts
|
|
}
|
|
|
|
func (f *microFlow) Init(opts ...Option) error {
|
|
for _, o := range opts {
|
|
o(&f.opts)
|
|
}
|
|
if err := f.opts.Client.Init(); err != nil {
|
|
return err
|
|
}
|
|
if err := f.opts.Tracer.Init(); err != nil {
|
|
return err
|
|
}
|
|
if err := f.opts.Logger.Init(); err != nil {
|
|
return err
|
|
}
|
|
if err := f.opts.Meter.Init(); err != nil {
|
|
return err
|
|
}
|
|
if err := f.opts.Store.Init(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
|
|
w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
|
|
|
|
for _, s := range steps {
|
|
w.steps[s.String()] = s
|
|
w.g.Add(s)
|
|
}
|
|
|
|
for _, dst := range steps {
|
|
for _, req := range dst.Requires() {
|
|
src, ok := w.steps[req]
|
|
if !ok {
|
|
return nil, ErrStepNotExists
|
|
}
|
|
w.g.Connect(dag.BasicEdge(src, dst))
|
|
}
|
|
}
|
|
|
|
if err := w.g.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
w.g.TransitiveReduction()
|
|
|
|
w.init = true
|
|
|
|
return w, nil
|
|
}
|
|
|
|
func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error {
|
|
return nil
|
|
}
|
|
|
|
func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error {
|
|
return nil
|
|
}
|
|
|
|
func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
type microCallStep struct {
|
|
opts StepOptions
|
|
service string
|
|
method string
|
|
rsp *Message
|
|
req *Message
|
|
status Status
|
|
}
|
|
|
|
func (s *microCallStep) Request() *Message {
|
|
return s.req
|
|
}
|
|
|
|
func (s *microCallStep) Response() *Message {
|
|
return s.rsp
|
|
}
|
|
|
|
func (s *microCallStep) ID() string {
|
|
return s.String()
|
|
}
|
|
|
|
func (s *microCallStep) Options() StepOptions {
|
|
return s.opts
|
|
}
|
|
|
|
func (s *microCallStep) Endpoint() string {
|
|
return s.method
|
|
}
|
|
|
|
func (s *microCallStep) Requires() []string {
|
|
return s.opts.Requires
|
|
}
|
|
|
|
func (s *microCallStep) Require(steps ...Step) error {
|
|
for _, step := range steps {
|
|
s.opts.Requires = append(s.opts.Requires, step.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *microCallStep) String() string {
|
|
if s.opts.ID != "" {
|
|
return s.opts.ID
|
|
}
|
|
return fmt.Sprintf("%s.%s", s.service, s.method)
|
|
}
|
|
|
|
func (s *microCallStep) Name() string {
|
|
return s.String()
|
|
}
|
|
|
|
func (s *microCallStep) Hashcode() interface{} {
|
|
return s.String()
|
|
}
|
|
|
|
func (s *microCallStep) GetStatus() Status {
|
|
return s.status
|
|
}
|
|
|
|
func (s *microCallStep) SetStatus(status Status) {
|
|
s.status = status
|
|
}
|
|
|
|
func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
|
|
options := NewExecuteOptions(opts...)
|
|
if options.Client == nil {
|
|
return nil, ErrMissingClient
|
|
}
|
|
rsp := &codec.Frame{}
|
|
copts := []client.CallOption{client.WithRetries(0)}
|
|
if options.Timeout > 0 {
|
|
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
|
|
}
|
|
nctx := metadata.NewOutgoingContext(ctx, req.Header)
|
|
err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
md, _ := metadata.FromOutgoingContext(nctx)
|
|
return &Message{Header: md, Body: rsp.Data}, err
|
|
}
|
|
|
|
type microPublishStep struct {
|
|
opts StepOptions
|
|
topic string
|
|
req *Message
|
|
rsp *Message
|
|
status Status
|
|
}
|
|
|
|
func (s *microPublishStep) Request() *Message {
|
|
return s.req
|
|
}
|
|
|
|
func (s *microPublishStep) Response() *Message {
|
|
return s.rsp
|
|
}
|
|
|
|
func (s *microPublishStep) ID() string {
|
|
return s.String()
|
|
}
|
|
|
|
func (s *microPublishStep) Options() StepOptions {
|
|
return s.opts
|
|
}
|
|
|
|
func (s *microPublishStep) Endpoint() string {
|
|
return s.topic
|
|
}
|
|
|
|
func (s *microPublishStep) Requires() []string {
|
|
return s.opts.Requires
|
|
}
|
|
|
|
func (s *microPublishStep) Require(steps ...Step) error {
|
|
for _, step := range steps {
|
|
s.opts.Requires = append(s.opts.Requires, step.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *microPublishStep) String() string {
|
|
if s.opts.ID != "" {
|
|
return s.opts.ID
|
|
}
|
|
return fmt.Sprintf("%s", s.topic)
|
|
}
|
|
|
|
func (s *microPublishStep) Name() string {
|
|
return s.String()
|
|
}
|
|
|
|
func (s *microPublishStep) Hashcode() interface{} {
|
|
return s.String()
|
|
}
|
|
|
|
func (s *microPublishStep) GetStatus() Status {
|
|
return s.status
|
|
}
|
|
|
|
func (s *microPublishStep) SetStatus(status Status) {
|
|
s.status = status
|
|
}
|
|
|
|
func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func NewCallStep(service string, name string, method string, opts ...StepOption) Step {
|
|
options := NewStepOptions(opts...)
|
|
return µCallStep{service: service, method: name + "." + method, opts: options}
|
|
}
|
|
|
|
func NewPublishStep(topic string, opts ...StepOption) Step {
|
|
options := NewStepOptions(opts...)
|
|
return µPublishStep{topic: topic, opts: options}
|
|
}
|