WIP: flow rewrite #138
@ -1,3 +1,5 @@
|
|||||||
|
//go:build ignore
|
||||||
|
|
||||||
package flow
|
package flow
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
21
flow/dag.go
21
flow/dag.go
@ -1,21 +0,0 @@
|
|||||||
package flow
|
|
||||||
|
|
||||||
type node struct {
|
|
||||||
name string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) ID() string {
|
|
||||||
return n.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) Name() string {
|
|
||||||
return n.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) String() string {
|
|
||||||
return n.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) Hashcode() interface{} {
|
|
||||||
return n.name
|
|
||||||
}
|
|
117
flow/dag_test.go
117
flow/dag_test.go
@ -1,117 +0,0 @@
|
|||||||
package flow
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/silas/dag"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestDeps(t *testing.T) {
|
|
||||||
d := &dag.AcyclicGraph{}
|
|
||||||
|
|
||||||
v0 := d.Add(&node{"v0"})
|
|
||||||
v1 := d.Add(&node{"v1"})
|
|
||||||
v2 := d.Add(&node{"v2"})
|
|
||||||
v3 := d.Add(&node{"v3"})
|
|
||||||
v4 := d.Add(&node{"v4"})
|
|
||||||
|
|
||||||
d.Connect(dag.BasicEdge(v0, v1))
|
|
||||||
d.Connect(dag.BasicEdge(v1, v2))
|
|
||||||
d.Connect(dag.BasicEdge(v2, v4))
|
|
||||||
d.Connect(dag.BasicEdge(v0, v3))
|
|
||||||
d.Connect(dag.BasicEdge(v3, v4))
|
|
||||||
|
|
||||||
if err := d.Validate(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
d.TransitiveReduction()
|
|
||||||
|
|
||||||
var steps [][]string
|
|
||||||
fn := func(n dag.Vertex, idx int) error {
|
|
||||||
if idx == 0 {
|
|
||||||
steps = make([][]string, 1)
|
|
||||||
steps[0] = make([]string, 0, 1)
|
|
||||||
} else if idx >= len(steps) {
|
|
||||||
tsteps := make([][]string, idx+1)
|
|
||||||
copy(tsteps, steps)
|
|
||||||
steps = tsteps
|
|
||||||
steps[idx] = make([]string, 0, 1)
|
|
||||||
}
|
|
||||||
steps[idx] = append(steps[idx], fmt.Sprintf("%s", n))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
start := &node{"v0"}
|
|
||||||
err := d.SortedDepthFirstWalk([]dag.Vertex{start}, fn)
|
|
||||||
checkErr(t, err)
|
|
||||||
|
|
||||||
for idx, steps := range steps {
|
|
||||||
fmt.Printf("level %d steps %#+v\n", idx, steps)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(steps[2]) != 1 {
|
|
||||||
t.Logf("invalid steps %#+v", steps[2])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkErr(t *testing.T, err error) {
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDag(t *testing.T) {
|
|
||||||
d1 := &dag.AcyclicGraph{}
|
|
||||||
d2 := &dag.AcyclicGraph{}
|
|
||||||
d2v1 := d2.Add(&node{"Substep.Create"})
|
|
||||||
v1 := d1.Add(&node{"AccountService.Create"})
|
|
||||||
v2 := d1.Add(&node{"AuthzService.Create"})
|
|
||||||
v3 := d1.Add(&node{"AuthnService.Create"})
|
|
||||||
v4 := d1.Add(&node{"ProjectService.Create"})
|
|
||||||
v5 := d1.Add(&node{"ContactService.Create"})
|
|
||||||
v6 := d1.Add(&node{"NetworkService.Create"})
|
|
||||||
v7 := d1.Add(&node{"MailerService.Create"})
|
|
||||||
v8 := d1.Add(&node{"NestedService.Create"})
|
|
||||||
v9 := d1.Add(d2v1)
|
|
||||||
d1.Connect(dag.BasicEdge(v1, v2))
|
|
||||||
d1.Connect(dag.BasicEdge(v1, v3))
|
|
||||||
d1.Connect(dag.BasicEdge(v1, v4))
|
|
||||||
d1.Connect(dag.BasicEdge(v1, v5))
|
|
||||||
d1.Connect(dag.BasicEdge(v1, v6))
|
|
||||||
d1.Connect(dag.BasicEdge(v1, v7))
|
|
||||||
d1.Connect(dag.BasicEdge(v7, v8))
|
|
||||||
d1.Connect(dag.BasicEdge(v8, v9))
|
|
||||||
|
|
||||||
if err := d1.Validate(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
d1.TransitiveReduction()
|
|
||||||
|
|
||||||
var steps [][]string
|
|
||||||
fn := func(n dag.Vertex, idx int) error {
|
|
||||||
if idx == 0 {
|
|
||||||
steps = make([][]string, 1)
|
|
||||||
steps[0] = make([]string, 0, 1)
|
|
||||||
} else if idx >= len(steps) {
|
|
||||||
tsteps := make([][]string, idx+1)
|
|
||||||
copy(tsteps, steps)
|
|
||||||
steps = tsteps
|
|
||||||
steps[idx] = make([]string, 0, 1)
|
|
||||||
}
|
|
||||||
steps[idx] = append(steps[idx], fmt.Sprintf("%s", n))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
start := &node{"AccountService.Create"}
|
|
||||||
err := d1.SortedDepthFirstWalk([]dag.Vertex{start}, fn)
|
|
||||||
checkErr(t, err)
|
|
||||||
if len(steps) != 4 {
|
|
||||||
t.Fatalf("invalid steps: %#+v", steps)
|
|
||||||
}
|
|
||||||
if steps[3][0] != "Substep.Create" {
|
|
||||||
t.Fatalf("invalid last step: %#+v", steps)
|
|
||||||
}
|
|
||||||
}
|
|
268
flow/default.go
268
flow/default.go
@ -6,7 +6,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/silas/dag"
|
"github.com/heimdalr/dag"
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
@ -21,7 +21,7 @@ type microFlow struct {
|
|||||||
|
|
||||||
type microWorkflow struct {
|
type microWorkflow struct {
|
||||||
opts Options
|
opts Options
|
||||||
g *dag.AcyclicGraph
|
g *dag.DAG
|
||||||
steps map[string]Step
|
steps map[string]Step
|
||||||
id string
|
id string
|
||||||
status Status
|
status Status
|
||||||
@ -33,20 +33,20 @@ func (w *microWorkflow) ID() string {
|
|||||||
return w.id
|
return w.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) Steps() ([][]Step, error) {
|
|
||||||
return w.getSteps("", false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *microWorkflow) Status() Status {
|
func (w *microWorkflow) Status() Status {
|
||||||
return w.status
|
return w.status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||||
|
var err error
|
||||||
w.Lock()
|
w.Lock()
|
||||||
|
defer w.Unlock()
|
||||||
|
|
||||||
for _, s := range steps {
|
for _, s := range steps {
|
||||||
w.steps[s.String()] = s
|
w.steps[s.String()] = s
|
||||||
w.g.Add(s)
|
if _, err = w.g.AddVertex(s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dst := range steps {
|
for _, dst := range steps {
|
||||||
@ -55,18 +55,13 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return ErrStepNotExists
|
return ErrStepNotExists
|
||||||
}
|
}
|
||||||
w.g.Connect(dag.BasicEdge(src, dst))
|
if err = w.g.AddEdge(src.String(), dst.String()); err != nil {
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.g.Validate(); err != nil {
|
|
||||||
w.Unlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
w.g.TransitiveReduction()
|
w.g.ReduceTransitively()
|
||||||
|
|
||||||
w.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -75,10 +70,11 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
|||||||
// TODO: handle case when some step requires or required by removed step
|
// TODO: handle case when some step requires or required by removed step
|
||||||
|
|
||||||
w.Lock()
|
w.Lock()
|
||||||
|
defer w.Unlock()
|
||||||
|
|
||||||
for _, s := range steps {
|
for _, s := range steps {
|
||||||
delete(w.steps, s.String())
|
delete(w.steps, s.String())
|
||||||
w.g.Remove(s)
|
w.g.DeleteVertex(s.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dst := range steps {
|
for _, dst := range steps {
|
||||||
@ -87,68 +83,15 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return ErrStepNotExists
|
return ErrStepNotExists
|
||||||
}
|
}
|
||||||
w.g.Connect(dag.BasicEdge(src, dst))
|
w.g.AddEdge(src.String(), dst.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.g.Validate(); err != nil {
|
w.g.ReduceTransitively()
|
||||||
w.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w.g.TransitiveReduction()
|
|
||||||
|
|
||||||
w.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
|
|
||||||
var steps [][]Step
|
|
||||||
var root dag.Vertex
|
|
||||||
var err error
|
|
||||||
|
|
||||||
fn := func(n dag.Vertex, idx int) error {
|
|
||||||
if idx == 0 {
|
|
||||||
steps = make([][]Step, 1)
|
|
||||||
steps[0] = make([]Step, 0, 1)
|
|
||||||
} else if idx >= len(steps) {
|
|
||||||
tsteps := make([][]Step, idx+1)
|
|
||||||
copy(tsteps, steps)
|
|
||||||
steps = tsteps
|
|
||||||
steps[idx] = make([]Step, 0, 1)
|
|
||||||
}
|
|
||||||
steps[idx] = append(steps[idx], n.(Step))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if start != "" {
|
|
||||||
var ok bool
|
|
||||||
w.RLock()
|
|
||||||
root, ok = w.steps[start]
|
|
||||||
w.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
return nil, ErrStepNotExists
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
root, err = w.g.Root()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if reverse {
|
|
||||||
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
|
||||||
} else {
|
|
||||||
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return steps, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
|
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
||||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
||||||
@ -167,11 +110,7 @@ func (w *microWorkflow) Resume(ctx context.Context, id string) error {
|
|||||||
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
|
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
|
||||||
w.Lock()
|
w.Lock()
|
||||||
if !w.init {
|
if !w.init {
|
||||||
if err := w.g.Validate(); err != nil {
|
w.g.ReduceTransitively()
|
||||||
w.Unlock()
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
w.g.TransitiveReduction()
|
|
||||||
w.init = true
|
w.init = true
|
||||||
}
|
}
|
||||||
w.Unlock()
|
w.Unlock()
|
||||||
@ -181,26 +120,11 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
|
||||||
options := NewExecuteOptions(opts...)
|
options := NewExecuteOptions(opts...)
|
||||||
|
|
||||||
steps, err := w.getSteps(options.Start, options.Reverse)
|
|
||||||
if err != nil {
|
|
||||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
|
||||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
|
||||||
}
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
cherr := make(chan error, 1)
|
|
||||||
chstatus := make(chan Status, 1)
|
|
||||||
|
|
||||||
nctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
||||||
|
|
||||||
nopts = append(nopts,
|
nopts = append(nopts,
|
||||||
@ -210,21 +134,152 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
ExecuteMeter(w.opts.Meter),
|
ExecuteMeter(w.opts.Meter),
|
||||||
)
|
)
|
||||||
nopts = append(nopts, opts...)
|
nopts = append(nopts, opts...)
|
||||||
done := make(chan struct{})
|
|
||||||
|
|
||||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
return eid, werr
|
return eid, werr
|
||||||
}
|
}
|
||||||
for idx := range steps {
|
|
||||||
for nidx := range steps[idx] {
|
var startID string
|
||||||
cstep := steps[idx][nidx]
|
if options.Start == "" {
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
mp := w.g.GetRoots()
|
||||||
return eid, werr
|
if len(mp) != 1 {
|
||||||
|
return eid, ErrStepNotExists
|
||||||
|
}
|
||||||
|
for k := range mp {
|
||||||
|
startID = k
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for k, v := range w.g.GetVertices() {
|
||||||
|
if v == options.Start {
|
||||||
|
startID = k
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if startID == "" {
|
||||||
|
return eid, ErrStepNotExists
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.Async {
|
||||||
|
go w.handleWorkflow(startID, nopts...)
|
||||||
|
return eid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return eid, w.handleWorkflow(startID, nopts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error {
|
||||||
|
w.RLock()
|
||||||
|
defer w.RUnlock()
|
||||||
|
|
||||||
|
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||||
|
// workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
|
||||||
|
// Get IDs of all descendant vertices.
|
||||||
|
flowIDs, errDes := w.g.GetDescendants(startID)
|
||||||
|
if errDes != nil {
|
||||||
|
return errDes
|
||||||
|
}
|
||||||
|
|
||||||
|
// inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex).
|
||||||
|
inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1)
|
||||||
|
|
||||||
|
// Iterate vertex IDs and create an input channel for each of them and a single
|
||||||
|
// output channel for leaves. Note, this "pre-flight" is needed to ensure we
|
||||||
|
// really have an input channel regardless of how we traverse the tree and spawn
|
||||||
|
// workers.
|
||||||
|
leafCount := 0
|
||||||
|
|
||||||
|
for id := range flowIDs {
|
||||||
|
|
||||||
|
// Get all parents of this vertex.
|
||||||
|
parents, errPar := w.g.GetParents(id)
|
||||||
|
if errPar != nil {
|
||||||
|
return errPar
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a buffered input channel that has capacity for all parent results.
|
||||||
|
inputChannels[id] = make(chan FlowResult, len(parents))
|
||||||
|
|
||||||
|
if w.g.isLeaf(id) {
|
||||||
|
leafCount += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// outputChannel caries the results of leaf vertices.
|
||||||
|
outputChannel := make(chan FlowResult, leafCount)
|
||||||
|
|
||||||
|
// To also process the start vertex and to have its results being passed to its
|
||||||
|
// children, add it to the vertex IDs. Also add an input channel for the start
|
||||||
|
// vertex and feed the inputs to this channel.
|
||||||
|
flowIDs[startID] = struct{}{}
|
||||||
|
inputChannels[startID] = make(chan FlowResult, len(inputs))
|
||||||
|
for _, i := range inputs {
|
||||||
|
inputChannels[startID] <- i
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
// Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl.
|
||||||
|
// inputs and outputs) in a separate goroutine.
|
||||||
|
for id := range flowIDs {
|
||||||
|
|
||||||
|
// Get all children of this vertex that later need to be notified. Note, we
|
||||||
|
// collect all children before the goroutine to be able to release the read
|
||||||
|
// lock as early as possible.
|
||||||
|
children, errChildren := w.g.GetChildren(id)
|
||||||
|
if errChildren != nil {
|
||||||
|
return errChildren
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember to wait for this goroutine.
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func(id string) {
|
||||||
|
// Get this vertex's input channel.
|
||||||
|
// Note, only concurrent read here, which is fine.
|
||||||
|
c := inputChannels[id]
|
||||||
|
|
||||||
|
// Await all parent inputs and stuff them into a slice.
|
||||||
|
parentCount := cap(c)
|
||||||
|
parentResults := make([]FlowResult, parentCount)
|
||||||
|
for i := 0; i < parentCount; i++ {
|
||||||
|
parentResults[i] = <-c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the worker.
|
||||||
|
errWorker := callback(w.g, id, parentResults)
|
||||||
|
if errWorker != nil {
|
||||||
|
return errWorker
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send this worker's FlowResult onto all children's input channels or, if it is
|
||||||
|
// a leaf (i.e. no children), send the result onto the output channel.
|
||||||
|
if len(children) > 0 {
|
||||||
|
for child := range children {
|
||||||
|
inputChannels[child] <- flowResult
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
outputChannel <- flowResult
|
||||||
|
}
|
||||||
|
|
||||||
|
// "Sign off".
|
||||||
|
wg.Done()
|
||||||
|
}(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all go routines to finish.
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Await all leaf vertex results and stuff them into a slice.
|
||||||
|
resultCount := cap(outputChannel)
|
||||||
|
results := make([]FlowResult, resultCount)
|
||||||
|
for i := 0; i < resultCount; i++ {
|
||||||
|
results[i] = <-outputChannel
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
go func() {
|
go func() {
|
||||||
for idx := range steps {
|
for idx := range steps {
|
||||||
for nidx := range steps[idx] {
|
for nidx := range steps[idx] {
|
||||||
@ -345,8 +400,8 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
return eid, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFlow create new flow
|
// NewFlow create new flow
|
||||||
@ -386,11 +441,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
|
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
|
||||||
w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
|
w := µWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))}
|
||||||
|
|
||||||
for _, s := range steps {
|
for _, s := range steps {
|
||||||
w.steps[s.String()] = s
|
w.steps[s.String()] = s
|
||||||
w.g.Add(s)
|
w.g.AddVertex(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, dst := range steps {
|
for _, dst := range steps {
|
||||||
@ -399,14 +454,11 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, ErrStepNotExists
|
return nil, ErrStepNotExists
|
||||||
}
|
}
|
||||||
w.g.Connect(dag.BasicEdge(src, dst))
|
w.g.AddEdge(src.String(), dst.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.g.Validate(); err != nil {
|
w.g.ReduceTransitively()
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
w.g.TransitiveReduction()
|
|
||||||
|
|
||||||
w.init = true
|
w.init = true
|
||||||
|
|
||||||
|
@ -125,8 +125,6 @@ type Workflow interface {
|
|||||||
AppendSteps(steps ...Step) error
|
AppendSteps(steps ...Step) error
|
||||||
// Status returns workflow status
|
// Status returns workflow status
|
||||||
Status() Status
|
Status() Status
|
||||||
// Steps returns steps slice where parallel steps returned on the same level
|
|
||||||
Steps() ([][]Step, error)
|
|
||||||
// Suspend suspends execution
|
// Suspend suspends execution
|
||||||
Suspend(ctx context.Context, id string) error
|
Suspend(ctx context.Context, id string) error
|
||||||
// Resume resumes execution
|
// Resume resumes execution
|
||||||
|
@ -123,8 +123,6 @@ type ExecuteOptions struct {
|
|||||||
Start string
|
Start string
|
||||||
// Timeout for execution
|
// Timeout for execution
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
// Reverse execution
|
|
||||||
Reverse bool
|
|
||||||
// Async enables async execution
|
// Async enables async execution
|
||||||
Async bool
|
Async bool
|
||||||
}
|
}
|
||||||
@ -167,13 +165,6 @@ func ExecuteContext(ctx context.Context) ExecuteOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteReverse says that dag must be run in reverse order
|
|
||||||
func ExecuteReverse(b bool) ExecuteOption {
|
|
||||||
return func(o *ExecuteOptions) {
|
|
||||||
o.Reverse = b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExecuteTimeout pass timeout time.Duration for execution
|
// ExecuteTimeout pass timeout time.Duration for execution
|
||||||
func ExecuteTimeout(td time.Duration) ExecuteOption {
|
func ExecuteTimeout(td time.Duration) ExecuteOption {
|
||||||
return func(o *ExecuteOptions) {
|
return func(o *ExecuteOptions) {
|
||||||
|
6
go.mod
6
go.mod
@ -3,13 +3,15 @@ module go.unistack.org/micro/v3
|
|||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/google/gnostic v0.6.8 // indirect
|
github.com/google/gnostic v0.6.9 // indirect
|
||||||
github.com/google/go-cmp v0.5.7 // indirect
|
github.com/google/go-cmp v0.5.7 // indirect
|
||||||
|
github.com/heimdalr/dag v1.2.1
|
||||||
github.com/imdario/mergo v0.3.13
|
github.com/imdario/mergo v0.3.13
|
||||||
github.com/kr/pretty v0.2.1 // indirect
|
github.com/kr/pretty v0.2.1 // indirect
|
||||||
github.com/kr/text v0.2.0 // indirect
|
github.com/kr/text v0.2.0 // indirect
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
|
||||||
go.unistack.org/micro-proto/v3 v3.2.7
|
go.unistack.org/micro-proto/v3 v3.2.7
|
||||||
google.golang.org/protobuf v1.28.0 // indirect
|
google.golang.org/protobuf v1.28.0 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
18
go.sum
18
go.sum
@ -14,6 +14,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
||||||
|
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
|
||||||
|
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||||
@ -22,6 +24,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
|
|||||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||||
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
|
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
|
||||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||||
|
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
|
||||||
|
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
|
||||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||||
@ -39,8 +43,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
|
|||||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
github.com/google/gnostic v0.6.6/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E=
|
github.com/google/gnostic v0.6.6/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E=
|
||||||
github.com/google/gnostic v0.6.8 h1:bT56GPYBWh1tvBuBEd94qcS3+60b+y0HQur0ITkGuCk=
|
github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0=
|
||||||
github.com/google/gnostic v0.6.8/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E=
|
github.com/google/gnostic v0.6.9/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E=
|
||||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
@ -49,8 +53,11 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
|
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
|
||||||
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
|
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
|
||||||
|
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
|
||||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
|
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
|
||||||
|
github.com/heimdalr/dag v1.2.1 h1:XJOMaoWqJK1UKdp+4zaO2uwav9GFbHMGCirdViKMRIQ=
|
||||||
|
github.com/heimdalr/dag v1.2.1/go.mod h1:Of/wUB7Yoj4dwiOcGOOYIq6MHlPF/8/QMBKFJpwg+yc=
|
||||||
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
|
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
|
||||||
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
|
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
|
||||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
@ -65,8 +72,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTK
|
|||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||||
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
|
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
@ -155,7 +162,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|||||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
|
|
||||||
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
|
Loading…
Reference in New Issue
Block a user