config: add helper funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
e3fee6f8a6
commit
3e40bac5f4
@ -25,7 +25,7 @@ var (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// DefaultMaxMsgSize specifies how much data codec can handle
|
// DefaultMaxMsgSize specifies how much data codec can handle
|
||||||
DefaultMaxMsgSize int = 1024 * 1024 * 4 // 4Mb
|
DefaultMaxMsgSize = 1024 * 1024 * 4 // 4Mb
|
||||||
// DefaultCodec is the global default codec
|
// DefaultCodec is the global default codec
|
||||||
DefaultCodec Codec = NewCodec()
|
DefaultCodec Codec = NewCodec()
|
||||||
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
|
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
|
||||||
|
@ -64,3 +64,53 @@ func Load(ctx context.Context, cs []Config, opts ...LoadOption) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultAfterLoad = func(ctx context.Context, c Config) error {
|
||||||
|
for _, fn := range c.Options().AfterLoad {
|
||||||
|
if err := fn(ctx, c); err != nil {
|
||||||
|
c.Options().Logger.Errorf(ctx, "%s AfterLoad err: %v", c.String(), err)
|
||||||
|
if !c.Options().AllowFail {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultAfterSave = func(ctx context.Context, c Config) error {
|
||||||
|
for _, fn := range c.Options().AfterSave {
|
||||||
|
if err := fn(ctx, c); err != nil {
|
||||||
|
c.Options().Logger.Errorf(ctx, "%s AfterSave err: %v", c.String(), err)
|
||||||
|
if !c.Options().AllowFail {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultBeforeLoad = func(ctx context.Context, c Config) error {
|
||||||
|
for _, fn := range c.Options().BeforeLoad {
|
||||||
|
if err := fn(ctx, c); err != nil {
|
||||||
|
c.Options().Logger.Errorf(ctx, "%s BeforeLoad err: %v", c.String(), err)
|
||||||
|
if !c.Options().AllowFail {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultBeforeSave = func(ctx context.Context, c Config) error {
|
||||||
|
for _, fn := range c.Options().BeforeSave {
|
||||||
|
if err := fn(ctx, c); err != nil {
|
||||||
|
c.Options().Logger.Errorf(ctx, "%s BeforeSavec err: %v", c.String(), err)
|
||||||
|
if !c.Options().AllowFail {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
)
|
||||||
|
@ -27,11 +27,9 @@ func (c *defaultConfig) Init(opts ...Option) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
||||||
for _, fn := range c.opts.BeforeLoad {
|
if err := DefaultBeforeLoad(ctx, c); err != nil {
|
||||||
if err := fn(ctx, c); err != nil && !c.opts.AllowFail {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
options := NewLoadOptions(opts...)
|
options := NewLoadOptions(opts...)
|
||||||
mopts := []func(*mergo.Config){mergo.WithTypeCheck}
|
mopts := []func(*mergo.Config){mergo.WithTypeCheck}
|
||||||
@ -48,20 +46,26 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
src, err := rutil.Zero(dst)
|
src, err := rutil.Zero(dst)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
if !c.opts.AllowFail {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return DefaultAfterLoad(ctx, c)
|
||||||
|
}
|
||||||
|
|
||||||
if err = fillValues(reflect.ValueOf(src), c.opts.StructTag); err == nil {
|
if err = fillValues(reflect.ValueOf(src), c.opts.StructTag); err == nil {
|
||||||
err = mergo.Merge(dst, src, mopts...)
|
err = mergo.Merge(dst, src, mopts...)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil && !c.opts.AllowFail {
|
if err != nil {
|
||||||
|
c.opts.Logger.Errorf(ctx, "default load error: %v", err)
|
||||||
|
if !c.opts.AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fn := range c.opts.AfterLoad {
|
|
||||||
if err := fn(ctx, c); err != nil && !c.opts.AllowFail {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := DefaultAfterLoad(ctx, c); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -247,17 +251,13 @@ func fillValues(valueOf reflect.Value, tname string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *defaultConfig) Save(ctx context.Context, opts ...SaveOption) error {
|
func (c *defaultConfig) Save(ctx context.Context, opts ...SaveOption) error {
|
||||||
for _, fn := range c.opts.BeforeSave {
|
if err := DefaultBeforeSave(ctx, c); err != nil {
|
||||||
if err := fn(ctx, c); err != nil && !c.opts.AllowFail {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for _, fn := range c.opts.AfterSave {
|
if err := DefaultAfterSave(ctx, c); err != nil {
|
||||||
if err := fn(ctx, c); err != nil && !c.opts.AllowFail {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -20,13 +20,13 @@ type microFlow struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type microWorkflow struct {
|
type microWorkflow struct {
|
||||||
id string
|
|
||||||
g *dag.AcyclicGraph
|
|
||||||
init bool
|
|
||||||
sync.RWMutex
|
|
||||||
opts Options
|
opts Options
|
||||||
|
g *dag.AcyclicGraph
|
||||||
steps map[string]Step
|
steps map[string]Step
|
||||||
|
id string
|
||||||
status Status
|
status Status
|
||||||
|
sync.RWMutex
|
||||||
|
init bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) ID() string {
|
func (w *microWorkflow) ID() string {
|
||||||
@ -424,11 +424,11 @@ func (f *microFlow) WorkflowLoad(ctx context.Context, id string) (Workflow, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
type microCallStep struct {
|
type microCallStep struct {
|
||||||
|
rsp *Message
|
||||||
|
req *Message
|
||||||
opts StepOptions
|
opts StepOptions
|
||||||
service string
|
service string
|
||||||
method string
|
method string
|
||||||
rsp *Message
|
|
||||||
req *Message
|
|
||||||
status Status
|
status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,10 +508,10 @@ func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
}
|
}
|
||||||
|
|
||||||
type microPublishStep struct {
|
type microPublishStep struct {
|
||||||
opts StepOptions
|
|
||||||
topic string
|
|
||||||
req *Message
|
req *Message
|
||||||
rsp *Message
|
rsp *Message
|
||||||
|
opts StepOptions
|
||||||
|
topic string
|
||||||
status Status
|
status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,10 +120,10 @@ type ExecuteOptions struct {
|
|||||||
Context context.Context
|
Context context.Context
|
||||||
// Start step
|
// Start step
|
||||||
Start string
|
Start string
|
||||||
// Reverse execution
|
|
||||||
Reverse bool
|
|
||||||
// Timeout for execution
|
// Timeout for execution
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
// Reverse execution
|
||||||
|
Reverse bool
|
||||||
// Async enables async execution
|
// Async enables async execution
|
||||||
Async bool
|
Async bool
|
||||||
}
|
}
|
||||||
|
@ -34,13 +34,13 @@ type handler struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type subscriber struct {
|
type subscriber struct {
|
||||||
opts SubscriberOptions
|
|
||||||
typ reflect.Type
|
typ reflect.Type
|
||||||
subscriber interface{}
|
subscriber interface{}
|
||||||
rcvr reflect.Value
|
rcvr reflect.Value
|
||||||
topic string
|
topic string
|
||||||
handlers []*handler
|
|
||||||
endpoints []*register.Endpoint
|
endpoints []*register.Endpoint
|
||||||
|
handlers []*handler
|
||||||
|
opts SubscriberOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this an exported - upper case - name?
|
// Is this an exported - upper case - name?
|
||||||
|
Loading…
Reference in New Issue
Block a user