flow improvements #52
34
flow/context.go
Normal file
34
flow/context.go
Normal file
@ -0,0 +1,34 @@
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type flowKey struct{}
|
||||
|
||||
// FromContext returns Flow from context
|
||||
func FromContext(ctx context.Context) (Flow, bool) {
|
||||
if ctx == nil {
|
||||
return nil, false
|
||||
}
|
||||
c, ok := ctx.Value(flowKey{}).(Flow)
|
||||
return c, ok
|
||||
}
|
||||
|
||||
// NewContext stores Flow to context
|
||||
func NewContext(ctx context.Context, f Flow) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return context.WithValue(ctx, flowKey{}, f)
|
||||
}
|
||||
|
||||
// SetOption returns a function to setup a context with given value
|
||||
func SetOption(k, v interface{}) Option {
|
||||
return func(o *Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
319
flow/default.go
Normal file
319
flow/default.go
Normal file
@ -0,0 +1,319 @@
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/silas/dag"
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
)
|
||||
|
||||
type microFlow struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
type microWorkflow struct {
|
||||
id string
|
||||
g *dag.AcyclicGraph
|
||||
init bool
|
||||
sync.RWMutex
|
||||
opts Options
|
||||
steps map[string]Step
|
||||
}
|
||||
|
||||
func (w *microWorkflow) ID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Steps() [][]Step {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) {
|
||||
w.Lock()
|
||||
if !w.init {
|
||||
if err := w.g.Validate(); err != nil {
|
||||
w.Unlock()
|
||||
return "", err
|
||||
}
|
||||
w.g.TransitiveReduction()
|
||||
w.init = true
|
||||
}
|
||||
w.Unlock()
|
||||
|
||||
uid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
options := NewExecuteOptions(opts...)
|
||||
var steps [][]Step
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]Step, 1)
|
||||
steps[0] = make([]Step, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]Step, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]Step, 0, 1)
|
||||
}
|
||||
steps[idx] = append(steps[idx], n.(Step))
|
||||
return nil
|
||||
}
|
||||
|
||||
var root dag.Vertex
|
||||
if options.Start != "" {
|
||||
var ok bool
|
||||
w.RLock()
|
||||
root, ok = w.steps[options.Start]
|
||||
w.RUnlock()
|
||||
if !ok {
|
||||
return "", ErrStepNotExists
|
||||
}
|
||||
} else {
|
||||
root, err = w.g.Root()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
if options.Reverse {
|
||||
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
} else {
|
||||
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
cherr := make(chan error, 1)
|
||||
defer close(cherr)
|
||||
|
||||
nctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
||||
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
|
||||
|
||||
go func() {
|
||||
for idx := range steps {
|
||||
wg.Add(len(steps[idx]))
|
||||
for nidx := range steps[idx] {
|
||||
go func(step Step) {
|
||||
defer wg.Done()
|
||||
if err = step.Execute(nctx, req, nopts...); err != nil {
|
||||
cherr <- err
|
||||
cancel()
|
||||
}
|
||||
}(steps[idx][nidx])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
cherr <- nil
|
||||
}()
|
||||
|
||||
err = <-cherr
|
||||
|
||||
return uid.String(), err
|
||||
}
|
||||
|
||||
func NewFlow(opts ...Option) Flow {
|
||||
options := NewOptions(opts...)
|
||||
return µFlow{opts: options}
|
||||
}
|
||||
|
||||
func (f *microFlow) Options() Options {
|
||||
return f.opts
|
||||
}
|
||||
|
||||
func (f *microFlow) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&f.opts)
|
||||
}
|
||||
if err := f.opts.Client.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.opts.Tracer.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.opts.Logger.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.opts.Meter.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.opts.Store.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
|
||||
w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
|
||||
|
||||
for _, s := range steps {
|
||||
w.steps[s.String()] = s
|
||||
w.g.Add(s)
|
||||
}
|
||||
|
||||
for _, dst := range steps {
|
||||
for _, req := range dst.Requires() {
|
||||
src, ok := w.steps[req]
|
||||
if !ok {
|
||||
return nil, ErrStepNotExists
|
||||
}
|
||||
w.g.Connect(dag.BasicEdge(src, dst))
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.g.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w.g.TransitiveReduction()
|
||||
|
||||
w.init = true
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (f *microFlow) WorkflowRemove(ctx context.Context, id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *microFlow) WorkflowSave(ctx context.Context, w Workflow) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type microCallStep struct {
|
||||
opts StepOptions
|
||||
service string
|
||||
method string
|
||||
}
|
||||
|
||||
func (s *microCallStep) ID() string {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microCallStep) Options() StepOptions {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
func (s *microCallStep) Endpoint() string {
|
||||
return s.method
|
||||
}
|
||||
|
||||
func (s *microCallStep) Requires() []string {
|
||||
return s.opts.Requires
|
||||
}
|
||||
|
||||
func (s *microCallStep) Require(steps ...Step) error {
|
||||
for _, step := range steps {
|
||||
s.opts.Requires = append(s.opts.Requires, step.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *microCallStep) String() string {
|
||||
if s.opts.ID != "" {
|
||||
return s.opts.ID
|
||||
}
|
||||
return fmt.Sprintf("%s.%s", s.service, s.method)
|
||||
}
|
||||
|
||||
func (s *microCallStep) Name() string {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microCallStep) Hashcode() interface{} {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
|
||||
options := NewExecuteOptions(opts...)
|
||||
if options.Client == nil {
|
||||
return fmt.Errorf("client not set")
|
||||
}
|
||||
rsp := &codec.Frame{}
|
||||
copts := []client.CallOption{client.WithRetries(0)}
|
||||
if options.Timeout > 0 {
|
||||
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
|
||||
}
|
||||
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp)
|
||||
return err
|
||||
}
|
||||
|
||||
type microPublishStep struct {
|
||||
opts StepOptions
|
||||
topic string
|
||||
}
|
||||
|
||||
func (s *microPublishStep) ID() string {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Options() StepOptions {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Endpoint() string {
|
||||
return s.topic
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Requires() []string {
|
||||
return s.opts.Requires
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Require(steps ...Step) error {
|
||||
for _, step := range steps {
|
||||
s.opts.Requires = append(s.opts.Requires, step.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *microPublishStep) String() string {
|
||||
if s.opts.ID != "" {
|
||||
return s.opts.ID
|
||||
}
|
||||
return fmt.Sprintf("%s", s.topic)
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Name() string {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Hashcode() interface{} {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCallStep(service string, method string, opts ...StepOption) Step {
|
||||
options := NewStepOptions(opts...)
|
||||
return µCallStep{service: service, method: method, opts: options}
|
||||
}
|
||||
|
||||
func NewPublishStep(topic string, opts ...StepOption) Step {
|
||||
options := NewStepOptions(opts...)
|
||||
return µPublishStep{topic: topic, opts: options}
|
||||
}
|
48
flow/flow.go
48
flow/flow.go
@ -1,17 +1,59 @@
|
||||
// Package flow is an interface used for saga pattern microservice workflow
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrStepNotExists = errors.New("step not exists")
|
||||
)
|
||||
|
||||
// Step represents dedicated workflow step
|
||||
type Step interface {
|
||||
// ID returns step id
|
||||
ID() string
|
||||
// Endpoint returns rpc endpoint service_name.service_method or broker topic
|
||||
Endpoint() string
|
||||
// Execute step run
|
||||
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error
|
||||
// Requires returns dependent steps
|
||||
Requires() []string
|
||||
// Options returns step options
|
||||
Options() StepOptions
|
||||
// Require add required steps
|
||||
Require(steps ...Step) error
|
||||
// String
|
||||
String() string
|
||||
}
|
||||
|
||||
// Workflow contains all steps to execute
|
||||
type Workflow interface {
|
||||
// ID returns id of the workflow
|
||||
ID() string
|
||||
// Steps returns steps slice where parallel steps returned on the same level
|
||||
Steps() [][]Step
|
||||
Stop() error
|
||||
// Execute workflow with args, return execution id and error
|
||||
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error)
|
||||
// RemoveSteps remove steps from workflow
|
||||
RemoveSteps(ctx context.Context, steps ...Step) error
|
||||
// AppendSteps append steps to workflow
|
||||
AppendSteps(ctx context.Context, steps ...Step) error
|
||||
}
|
||||
|
||||
// Flow the base interface to interact with workflows
|
||||
type Flow interface {
|
||||
Start(Workflow) error
|
||||
Stop(Workflow)
|
||||
// Options returns options
|
||||
Options() Options
|
||||
// Init initialize
|
||||
Init(...Option) error
|
||||
// WorkflowCreate creates new workflow with specific id and steps
|
||||
WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error)
|
||||
// WorkflowSave saves workflow
|
||||
WorkflowSave(ctx context.Context, w Workflow) error
|
||||
// WorkflowLoad loads workflow with specific id
|
||||
WorkflowLoad(ctx context.Context, id string) (Workflow, error)
|
||||
// WorkflowList lists all workflows
|
||||
WorkflowList(ctx context.Context) ([]Workflow, error)
|
||||
}
|
||||
|
222
flow/options.go
Normal file
222
flow/options.go
Normal file
@ -0,0 +1,222 @@
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/meter"
|
||||
"github.com/unistack-org/micro/v3/store"
|
||||
"github.com/unistack-org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
// Option func
|
||||
type Option func(*Options)
|
||||
|
||||
// Options server struct
|
||||
type Options struct {
|
||||
// Context holds the external options and can be used for flow shutdown
|
||||
Context context.Context
|
||||
// Client holds the client.Client
|
||||
Client client.Client
|
||||
// Tracer holds the tracer
|
||||
Tracer tracer.Tracer
|
||||
// Logger holds the logger
|
||||
Logger logger.Logger
|
||||
// Meter holds the meter
|
||||
Meter meter.Meter
|
||||
// Store used for intermediate results
|
||||
Store store.Store
|
||||
}
|
||||
|
||||
// NewOptions returns new options struct with default or passed values
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Context: context.Background(),
|
||||
Logger: logger.DefaultLogger,
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
Client: client.DefaultClient,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// Logger sets the logger option
|
||||
func Logger(l logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
// Meter sets the meter option
|
||||
func Meter(m meter.Meter) Option {
|
||||
return func(o *Options) {
|
||||
o.Meter = m
|
||||
}
|
||||
}
|
||||
|
||||
// Client to use for sync/async communication
|
||||
func Client(c client.Client) Option {
|
||||
return func(o *Options) {
|
||||
o.Client = c
|
||||
}
|
||||
}
|
||||
|
||||
// Context specifies a context for the service.
|
||||
// Can be used to signal shutdown of the flow
|
||||
// Can be used for extra option values.
|
||||
func Context(ctx context.Context) Option {
|
||||
return func(o *Options) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// Tracer mechanism for distributed tracking
|
||||
func Tracer(t tracer.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
// Store used for intermediate results
|
||||
func Store(s store.Store) Option {
|
||||
return func(o *Options) {
|
||||
o.Store = s
|
||||
}
|
||||
}
|
||||
|
||||
// WorflowOption signature
|
||||
type WorkflowOption func(*WorkflowOptions)
|
||||
|
||||
// WorkflowOptions holds workflow options
|
||||
type WorkflowOptions struct {
|
||||
ID string
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// WorkflowID set workflow id
|
||||
func WorkflowID(id string) WorkflowOption {
|
||||
return func(o *WorkflowOptions) {
|
||||
o.ID = id
|
||||
}
|
||||
}
|
||||
|
||||
type ExecuteOptions struct {
|
||||
// Client holds the client.Client
|
||||
Client client.Client
|
||||
// Tracer holds the tracer
|
||||
Tracer tracer.Tracer
|
||||
// Logger holds the logger
|
||||
Logger logger.Logger
|
||||
// Meter holds the meter
|
||||
Meter meter.Meter
|
||||
// Store used for intermediate results
|
||||
Store store.Store
|
||||
// Context can be used to abort execution or pass additional opts
|
||||
Context context.Context
|
||||
// Start step
|
||||
Start string
|
||||
// Reverse execution
|
||||
Reverse bool
|
||||
// Timeout for execution
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type ExecuteOption func(*ExecuteOptions)
|
||||
|
||||
func ExecuteClient(c client.Client) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Client = c
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteTracer(t tracer.Tracer) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteLogger(l logger.Logger) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteMeter(m meter.Meter) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Meter = m
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteStore(s store.Store) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Store = s
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteContext(ctx context.Context) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteReverse(b bool) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Reverse = b
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteTimeout(td time.Duration) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Timeout = td
|
||||
}
|
||||
}
|
||||
|
||||
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
|
||||
options := ExecuteOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
type StepOptions struct {
|
||||
ID string
|
||||
Context context.Context
|
||||
Requires []string
|
||||
Fallback string
|
||||
}
|
||||
|
||||
type StepOption func(*StepOptions)
|
||||
|
||||
func NewStepOptions(opts ...StepOption) StepOptions {
|
||||
options := StepOptions{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
func StepID(id string) StepOption {
|
||||
return func(o *StepOptions) {
|
||||
o.ID = id
|
||||
}
|
||||
}
|
||||
|
||||
func StepRequires(steps ...string) StepOption {
|
||||
return func(o *StepOptions) {
|
||||
o.Requires = steps
|
||||
}
|
||||
}
|
||||
|
||||
func StepFallback(step string) StepOption {
|
||||
return func(o *StepOptions) {
|
||||
o.Fallback = step
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user