package flow import ( "context" "fmt" "path/filepath" "sync" "github.com/heimdalr/dag" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/store" "go.unistack.org/micro/v3/util/id" ) type microFlow struct { opts Options } type microWorkflow struct { opts Options g *dag.DAG steps map[string]Step id string status Status sync.RWMutex init bool } func (w *microWorkflow) ID() string { return w.id } func (w *microWorkflow) Status() Status { return w.status } func (w *microWorkflow) AppendSteps(steps ...Step) error { var err error w.Lock() defer w.Unlock() for _, s := range steps { w.steps[s.String()] = s if _, err = w.g.AddVertex(s); err != nil { return err } } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { return ErrStepNotExists } if err = w.g.AddEdge(src.String(), dst.String()); err != nil { return err } } } w.g.ReduceTransitively() return nil } func (w *microWorkflow) RemoveSteps(steps ...Step) error { // TODO: handle case when some step requires or required by removed step w.Lock() defer w.Unlock() for _, s := range steps { delete(w.steps, s.String()) w.g.DeleteVertex(s.String()) } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { return ErrStepNotExists } w.g.AddEdge(src.String(), dst.String()) } } w.g.ReduceTransitively() return nil } func (w *microWorkflow) Abort(ctx context.Context, id string) error { workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())}) } func (w *microWorkflow) Suspend(ctx context.Context, id string) error { workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())}) } func (w *microWorkflow) Resume(ctx context.Context, id string) error { workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}) } func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) { w.Lock() if !w.init { w.g.ReduceTransitively() w.init = true } w.Unlock() eid, err := id.New() if err != nil { return "", err } // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) options := NewExecuteOptions(opts...) nopts := make([]ExecuteOption, 0, len(opts)+5) nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ) nopts = append(nopts, opts...) if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) return eid, werr } var startID string if options.Start == "" { mp := w.g.GetRoots() if len(mp) != 1 { return eid, ErrStepNotExists } for k := range mp { startID = k } } else { for k, v := range w.g.GetVertices() { if v == options.Start { startID = k } } } if startID == "" { return eid, ErrStepNotExists } if options.Async { go w.handleWorkflow(startID, nopts...) return eid, nil } return eid, w.handleWorkflow(startID, nopts...) } func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error { w.RLock() defer w.RUnlock() // stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) // workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) // Get IDs of all descendant vertices. flowIDs, errDes := w.g.GetDescendants(startID) if errDes != nil { return errDes } // inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex). inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1) // Iterate vertex IDs and create an input channel for each of them and a single // output channel for leaves. Note, this "pre-flight" is needed to ensure we // really have an input channel regardless of how we traverse the tree and spawn // workers. leafCount := 0 for id := range flowIDs { // Get all parents of this vertex. parents, errPar := w.g.GetParents(id) if errPar != nil { return errPar } // Create a buffered input channel that has capacity for all parent results. inputChannels[id] = make(chan FlowResult, len(parents)) if w.g.isLeaf(id) { leafCount += 1 } } // outputChannel caries the results of leaf vertices. outputChannel := make(chan FlowResult, leafCount) // To also process the start vertex and to have its results being passed to its // children, add it to the vertex IDs. Also add an input channel for the start // vertex and feed the inputs to this channel. flowIDs[startID] = struct{}{} inputChannels[startID] = make(chan FlowResult, len(inputs)) for _, i := range inputs { inputChannels[startID] <- i } wg := sync.WaitGroup{} // Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl. // inputs and outputs) in a separate goroutine. for id := range flowIDs { // Get all children of this vertex that later need to be notified. Note, we // collect all children before the goroutine to be able to release the read // lock as early as possible. children, errChildren := w.g.GetChildren(id) if errChildren != nil { return errChildren } // Remember to wait for this goroutine. wg.Add(1) go func(id string) { // Get this vertex's input channel. // Note, only concurrent read here, which is fine. c := inputChannels[id] // Await all parent inputs and stuff them into a slice. parentCount := cap(c) parentResults := make([]FlowResult, parentCount) for i := 0; i < parentCount; i++ { parentResults[i] = <-c } // Execute the worker. errWorker := callback(w.g, id, parentResults) if errWorker != nil { return errWorker } // Send this worker's FlowResult onto all children's input channels or, if it is // a leaf (i.e. no children), send the result onto the output channel. if len(children) > 0 { for child := range children { inputChannels[child] <- flowResult } } else { outputChannel <- flowResult } // "Sign off". wg.Done() }(id) } // Wait for all go routines to finish. wg.Wait() // Await all leaf vertex results and stuff them into a slice. resultCount := cap(outputChannel) results := make([]FlowResult, resultCount) for i := 0; i < resultCount; i++ { results[i] = <-outputChannel } /* go func() { for idx := range steps { for nidx := range steps[idx] { wStatus := &codec.Frame{} if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil { cherr <- werr return } if status := StringStatus[string(wStatus.Data)]; status != StatusRunning { chstatus <- status return } if w.opts.Logger.V(logger.TraceLevel) { w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) } cstep := steps[idx][nidx] // nolint: nestif if len(cstep.Requires()) == 0 { wg.Add(1) go func(step Step) { defer wg.Done() if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { cherr <- werr return } if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { cherr <- werr return } rsp, serr := step.Execute(nctx, req, nopts...) if serr != nil { step.SetStatus(StatusFailure) if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr return } if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr return } if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr return } }(cstep) wg.Wait() } else { if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { cherr <- werr return } if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { cherr <- werr return } rsp, serr := cstep.Execute(nctx, req, nopts...) if serr != nil { cstep.SetStatus(StatusFailure) if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) } cherr <- serr return } if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { w.opts.Logger.Errorf(ctx, "store write error: %v", werr) cherr <- werr return } if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { cherr <- werr return } } } } close(done) }() if options.Async { return eid, nil } logger.Tracef(ctx, "wait for finish or error") select { case <-nctx.Done(): err = nctx.Err() case cerr := <-cherr: err = cerr case <-done: close(cherr) case <-chstatus: close(chstatus) return eid, nil } switch { case nctx.Err() != nil: if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } case err == nil: if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } case err != nil: if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) } } */ return err } // NewFlow create new flow func NewFlow(opts ...Option) Flow { options := NewOptions(opts...) return µFlow{opts: options} } func (f *microFlow) Options() Options { return f.opts } func (f *microFlow) Init(opts ...Option) error { for _, o := range opts { o(&f.opts) } if err := f.opts.Client.Init(); err != nil { return err } if err := f.opts.Tracer.Init(); err != nil { return err } if err := f.opts.Logger.Init(); err != nil { return err } if err := f.opts.Meter.Init(); err != nil { return err } if err := f.opts.Store.Init(); err != nil { return err } return nil } func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) { return nil, nil } func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) { w := µWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))} for _, s := range steps { w.steps[s.String()] = s w.g.AddVertex(s) } for _, dst := range steps { for _, req := range dst.Requires() { src, ok := w.steps[req] if !ok { return nil, ErrStepNotExists } w.g.AddEdge(src.String(), dst.String()) } } w.g.ReduceTransitively() w.init = true return w, nil } func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error { return nil } func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error { return nil } func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) { return nil, nil } type microCallStep struct { rsp *Message req *Message service string method string opts StepOptions status Status } func (s *microCallStep) Request() *Message { return s.req } func (s *microCallStep) Response() *Message { return s.rsp } func (s *microCallStep) ID() string { return s.String() } func (s *microCallStep) Options() StepOptions { return s.opts } func (s *microCallStep) Endpoint() string { return s.method } func (s *microCallStep) Requires() []string { return s.opts.Requires } func (s *microCallStep) Require(steps ...Step) error { for _, step := range steps { s.opts.Requires = append(s.opts.Requires, step.String()) } return nil } func (s *microCallStep) String() string { if s.opts.ID != "" { return s.opts.ID } return fmt.Sprintf("%s.%s", s.service, s.method) } func (s *microCallStep) Name() string { return s.String() } func (s *microCallStep) Hashcode() interface{} { return s.String() } func (s *microCallStep) GetStatus() Status { return s.status } func (s *microCallStep) SetStatus(status Status) { s.status = status } func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) { options := NewExecuteOptions(opts...) if options.Client == nil { return nil, ErrMissingClient } rsp := &codec.Frame{} copts := []client.CallOption{client.WithRetries(0)} if options.Timeout > 0 { copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) } nctx := metadata.NewOutgoingContext(ctx, req.Header) err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp, copts...) if err != nil { return nil, err } md, _ := metadata.FromOutgoingContext(nctx) return &Message{Header: md, Body: rsp.Data}, err } type microPublishStep struct { req *Message rsp *Message topic string opts StepOptions status Status } func (s *microPublishStep) Request() *Message { return s.req } func (s *microPublishStep) Response() *Message { return s.rsp } func (s *microPublishStep) ID() string { return s.String() } func (s *microPublishStep) Options() StepOptions { return s.opts } func (s *microPublishStep) Endpoint() string { return s.topic } func (s *microPublishStep) Requires() []string { return s.opts.Requires } func (s *microPublishStep) Require(steps ...Step) error { for _, step := range steps { s.opts.Requires = append(s.opts.Requires, step.String()) } return nil } func (s *microPublishStep) String() string { if s.opts.ID != "" { return s.opts.ID } return s.topic } func (s *microPublishStep) Name() string { return s.String() } func (s *microPublishStep) Hashcode() interface{} { return s.String() } func (s *microPublishStep) GetStatus() Status { return s.status } func (s *microPublishStep) SetStatus(status Status) { s.status = status } func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) { return nil, nil } // NewCallStep create new step with client.Call func NewCallStep(service string, name string, method string, opts ...StepOption) Step { options := NewStepOptions(opts...) return µCallStep{service: service, method: name + "." + method, opts: options} } // NewPublishStep create new step with client.Publish func NewPublishStep(topic string, opts ...StepOption) Step { options := NewStepOptions(opts...) return µPublishStep{topic: topic, opts: options} }