config: add Validate func, small lint fixes #104
@ -50,6 +50,7 @@ type Handler func(Event) error
|
||||
// Events contains multiple events
|
||||
type Events []Event
|
||||
|
||||
// Ack try to ack all events and return
|
||||
func (evs Events) Ack() error {
|
||||
var err error
|
||||
for _, ev := range evs {
|
||||
@ -60,6 +61,7 @@ func (evs Events) Ack() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetError sets error on event
|
||||
func (evs Events) SetError(err error) {
|
||||
for _, ev := range evs {
|
||||
ev.SetError(err)
|
||||
|
@ -4,11 +4,16 @@ package config // import "go.unistack.org/micro/v3/config"
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Validator interface {
|
||||
Validate() error
|
||||
}
|
||||
|
||||
// DefaultConfig default config
|
||||
var DefaultConfig Config = NewConfig()
|
||||
var DefaultConfig = NewConfig()
|
||||
|
||||
// DefaultWatcherMinInterval default min interval for poll changes
|
||||
var DefaultWatcherMinInterval = 5 * time.Second
|
||||
@ -67,7 +72,59 @@ func Load(ctx context.Context, cs []Config, opts ...LoadOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate runs Validate() error func for each struct field
|
||||
func Validate(ctx context.Context, cfg interface{}) error {
|
||||
if cfg == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if v, ok := cfg.(Validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sv := reflect.ValueOf(cfg)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.IsValid() || len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if v, ok := val.Interface().(Validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case reflect.Ptr:
|
||||
if reflect.Indirect(val).Kind() == reflect.Struct {
|
||||
if err := Validate(ctx, val.Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case reflect.Struct:
|
||||
if err := Validate(ctx, val.Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultAfterLoad default func that runs after config load
|
||||
DefaultAfterLoad = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterLoad {
|
||||
if err := fn(ctx, c); err != nil {
|
||||
@ -79,7 +136,7 @@ var (
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DefaultAfterSave default func that runs after config save
|
||||
DefaultAfterSave = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterSave {
|
||||
if err := fn(ctx, c); err != nil {
|
||||
@ -91,7 +148,7 @@ var (
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DefaultBeforeLoad default func that runs before config load
|
||||
DefaultBeforeLoad = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().BeforeLoad {
|
||||
if err := fn(ctx, c); err != nil {
|
||||
@ -103,11 +160,11 @@ var (
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DefaultBeforeSave default func that runs befora config save
|
||||
DefaultBeforeSave = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().BeforeSave {
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s BeforeSavec err: %v", c.String(), err)
|
||||
c.Options().Logger.Errorf(ctx, "%s BeforeSave err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
|
@ -8,30 +8,46 @@ import (
|
||||
"go.unistack.org/micro/v3/config"
|
||||
)
|
||||
|
||||
type Cfg struct {
|
||||
type cfg struct {
|
||||
StringValue string `default:"string_value"`
|
||||
IgnoreValue string `json:"-"`
|
||||
StructValue struct {
|
||||
StructValue *cfgStructValue
|
||||
IntValue int `default:"99"`
|
||||
}
|
||||
|
||||
type cfgStructValue struct {
|
||||
StringValue string `default:"string_value"`
|
||||
}
|
||||
IntValue int `default:"99"`
|
||||
|
||||
func (c *cfg) Validate() error {
|
||||
if c.IntValue != 10 {
|
||||
return fmt.Errorf("invalid IntValue %d != %d", 10, c.IntValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cfgStructValue) Validate() error {
|
||||
if c.StringValue != "string_value" {
|
||||
return fmt.Errorf("invalid StringValue %s != %s", "string_value", c.StringValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDefault(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
conf := &Cfg{IntValue: 10}
|
||||
blfn := func(ctx context.Context, cfg config.Config) error {
|
||||
nconf, ok := cfg.Options().Struct.(*Cfg)
|
||||
conf := &cfg{IntValue: 10}
|
||||
blfn := func(_ context.Context, c config.Config) error {
|
||||
nconf, ok := c.Options().Struct.(*cfg)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
|
||||
return fmt.Errorf("failed to get Struct from options: %v", c.Options())
|
||||
}
|
||||
nconf.StringValue = "before_load"
|
||||
return nil
|
||||
}
|
||||
alfn := func(ctx context.Context, cfg config.Config) error {
|
||||
nconf, ok := cfg.Options().Struct.(*Cfg)
|
||||
alfn := func(_ context.Context, c config.Config) error {
|
||||
nconf, ok := c.Options().Struct.(*cfg)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
|
||||
return fmt.Errorf("failed to get Struct from options: %v", c.Options())
|
||||
}
|
||||
nconf.StringValue = "after_load"
|
||||
return nil
|
||||
@ -50,3 +66,19 @@ func TestDefault(t *testing.T) {
|
||||
_ = conf
|
||||
// t.Logf("%#+v\n", conf)
|
||||
}
|
||||
|
||||
func TestValidate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
conf := &cfg{IntValue: 10}
|
||||
cfg := config.NewConfig(config.Struct(conf))
|
||||
if err := cfg.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := cfg.Load(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := config.Validate(ctx, conf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ type LoadOptions struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// NewLoadOptions create LoadOptions struct with provided opts
|
||||
func NewLoadOptions(opts ...LoadOption) LoadOptions {
|
||||
options := LoadOptions{}
|
||||
for _, o := range opts {
|
||||
@ -221,8 +222,10 @@ type WatchOptions struct {
|
||||
Coalesce bool
|
||||
}
|
||||
|
||||
// WatchOption func signature
|
||||
type WatchOption func(*WatchOptions)
|
||||
|
||||
// NewWatchOptions create WatchOptions struct with provided opts
|
||||
func NewWatchOptions(opts ...WatchOption) WatchOptions {
|
||||
options := WatchOptions{
|
||||
Context: context.Background(),
|
||||
|
13
flow/flow.go
13
flow/flow.go
@ -11,7 +11,9 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrStepNotExists returns when step not found
|
||||
ErrStepNotExists = errors.New("step not exists")
|
||||
// ErrMissingClient returns when client.Client is missing
|
||||
ErrMissingClient = errors.New("client not set")
|
||||
)
|
||||
|
||||
@ -36,6 +38,7 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Message used to transfer data between steps
|
||||
type Message struct {
|
||||
Header metadata.Metadata
|
||||
Body RawMessage
|
||||
@ -67,6 +70,7 @@ type Step interface {
|
||||
Response() *Message
|
||||
}
|
||||
|
||||
// Status contains step current status
|
||||
type Status int
|
||||
|
||||
func (status Status) String() string {
|
||||
@ -74,15 +78,22 @@ func (status Status) String() string {
|
||||
}
|
||||
|
||||
const (
|
||||
// StatusPending step waiting to start
|
||||
StatusPending Status = iota
|
||||
// StatusRunning step is running
|
||||
StatusRunning
|
||||
// StatusFailure step competed with error
|
||||
StatusFailure
|
||||
// StatusSuccess step completed without error
|
||||
StatusSuccess
|
||||
// StatusAborted step aborted while it running
|
||||
StatusAborted
|
||||
// StatusSuspend step suspended
|
||||
StatusSuspend
|
||||
)
|
||||
|
||||
var (
|
||||
// StatusString contains map status => string
|
||||
StatusString = map[Status]string{
|
||||
StatusPending: "StatusPending",
|
||||
StatusRunning: "StatusRunning",
|
||||
@ -91,6 +102,7 @@ var (
|
||||
StatusAborted: "StatusAborted",
|
||||
StatusSuspend: "StatusSuspend",
|
||||
}
|
||||
// StringStatus contains map string => status
|
||||
StringStatus = map[string]Status{
|
||||
"StatusPending": StatusPending,
|
||||
"StatusRunning": StatusRunning,
|
||||
@ -144,6 +156,7 @@ var (
|
||||
atomicSteps atomic.Value
|
||||
)
|
||||
|
||||
// RegisterStep register own step with workflow
|
||||
func RegisterStep(step Step) {
|
||||
flowMu.Lock()
|
||||
steps, _ := atomicSteps.Load().([]Step)
|
||||
|
@ -91,7 +91,7 @@ func Store(s store.Store) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WorflowOption func signature
|
||||
// WorkflowOption func signature
|
||||
type WorkflowOption func(*WorkflowOptions)
|
||||
|
||||
// WorkflowOptions holds workflow options
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultClientCallObserver called by wrapper in client Call
|
||||
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
@ -19,6 +20,7 @@ var (
|
||||
return labels
|
||||
}
|
||||
|
||||
// DefaultClientStreamObserver called by wrapper in client Stream
|
||||
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
@ -27,6 +29,7 @@ var (
|
||||
return labels
|
||||
}
|
||||
|
||||
// DefaultClientPublishObserver called by wrapper in client Publish
|
||||
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
|
||||
labels := []string{"endpoint", msg.Topic()}
|
||||
if err != nil {
|
||||
@ -35,6 +38,7 @@ var (
|
||||
return labels
|
||||
}
|
||||
|
||||
// DefaultServerHandlerObserver called by wrapper in server Handler
|
||||
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
@ -43,6 +47,7 @@ var (
|
||||
return labels
|
||||
}
|
||||
|
||||
// DefaultServerSubscriberObserver called by wrapper in server Subscriber
|
||||
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string {
|
||||
labels := []string{"endpoint", msg.Topic()}
|
||||
if err != nil {
|
||||
@ -51,6 +56,7 @@ var (
|
||||
return labels
|
||||
}
|
||||
|
||||
// DefaultClientCallFuncObserver called by wrapper in client CallFunc
|
||||
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
@ -59,6 +65,7 @@ var (
|
||||
return labels
|
||||
}
|
||||
|
||||
// DefaultSkipEndpoints wrapper not called for this endpoints
|
||||
DefaultSkipEndpoints = []string{"Meter.Metrics"}
|
||||
)
|
||||
|
||||
@ -71,11 +78,17 @@ type lWrapper struct {
|
||||
}
|
||||
|
||||
type (
|
||||
// ClientCallObserver func signature
|
||||
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
|
||||
// ClientStreamObserver func signature
|
||||
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
|
||||
// ClientPublishObserver func signature
|
||||
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
|
||||
// ClientCallFuncObserver func signature
|
||||
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
|
||||
// ServerHandlerObserver func signature
|
||||
ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
|
||||
// ServerSubscriberObserver func signature
|
||||
ServerSubscriberObserver func(context.Context, server.Message, error) []string
|
||||
)
|
||||
|
||||
|
@ -102,7 +102,7 @@ func (k byKey) Swap(i, j int) {
|
||||
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
|
||||
}
|
||||
|
||||
// BuildLables used to sort labels and delete duplicates.
|
||||
// BuildLabels used to sort labels and delete duplicates.
|
||||
// Last value wins in case of duplicate label keys.
|
||||
func BuildLabels(labels ...string) []string {
|
||||
if len(labels)%2 == 1 {
|
||||
|
@ -104,7 +104,7 @@ func Meter(m meter.Meter) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// SkipEndpoint add endpoint to skip
|
||||
// SkipEndoints add endpoint to skip
|
||||
func SkipEndoints(eps ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
|
||||
@ -294,7 +294,7 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
// NewSubscribeWrapper create server subscribe wrapper
|
||||
// NewSubscriberWrapper create server subscribe wrapper
|
||||
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
|
@ -44,9 +44,8 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// nolint: golint,revive
|
||||
// RegisterOptions holds options for register method
|
||||
type RegisterOptions struct {
|
||||
type RegisterOptions struct { // nolint: golint,revive
|
||||
Context context.Context
|
||||
Domain string
|
||||
TTL time.Duration
|
||||
@ -197,33 +196,29 @@ func TLSConfig(t *tls.Config) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint,revive
|
||||
// RegisterAttempts specifies register atempts count
|
||||
func RegisterAttempts(t int) RegisterOption {
|
||||
func RegisterAttempts(t int) RegisterOption { // nolint: golint,revive
|
||||
return func(o *RegisterOptions) {
|
||||
o.Attempts = t
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint,revive
|
||||
// RegisterTTL specifies register ttl
|
||||
func RegisterTTL(t time.Duration) RegisterOption {
|
||||
func RegisterTTL(t time.Duration) RegisterOption { // nolint: golint,revive
|
||||
return func(o *RegisterOptions) {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint,revive
|
||||
// RegisterContext sets the register context
|
||||
func RegisterContext(ctx context.Context) RegisterOption {
|
||||
func RegisterContext(ctx context.Context) RegisterOption { // nolint: golint,revive
|
||||
return func(o *RegisterOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// nolint: golint,revive
|
||||
// RegisterDomain secifies register domain
|
||||
func RegisterDomain(d string) RegisterOption {
|
||||
func RegisterDomain(d string) RegisterOption { // nolint: golint,revive
|
||||
return func(o *RegisterOptions) {
|
||||
o.Domain = d
|
||||
}
|
||||
|
@ -69,9 +69,8 @@ type Endpoint struct {
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// nolint: golint,revive
|
||||
// RegisterOption option is used to register service
|
||||
type RegisterOption func(*RegisterOptions)
|
||||
type RegisterOption func(*RegisterOptions) // nolint: golint,revive
|
||||
|
||||
// WatchOption option is used to watch service changes
|
||||
type WatchOption func(*WatchOptions)
|
||||
|
@ -2,12 +2,16 @@ package tracer
|
||||
|
||||
import "go.unistack.org/micro/v3/logger"
|
||||
|
||||
// SpanOptions contains span option
|
||||
type SpanOptions struct{}
|
||||
|
||||
// SpanOption func signature
|
||||
type SpanOption func(o *SpanOptions)
|
||||
|
||||
// EventOptions contains event options
|
||||
type EventOptions struct{}
|
||||
|
||||
// EventOption func signature
|
||||
type EventOption func(o *EventOptions)
|
||||
|
||||
// Options struct
|
||||
@ -18,7 +22,7 @@ type Options struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
// Option func
|
||||
// Option func signature
|
||||
type Option func(o *Options)
|
||||
|
||||
// Logger sets the logger
|
||||
|
@ -9,18 +9,30 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// SplitToken used to detect path components
|
||||
SplitToken = "."
|
||||
// IndexCloseChar used to detect index end
|
||||
IndexCloseChar = "]"
|
||||
// IndexOpenChar used to detect index start
|
||||
IndexOpenChar = "["
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMalformedIndex = errors.New("Malformed index key")
|
||||
ErrInvalidIndexUsage = errors.New("Invalid index key usage")
|
||||
ErrKeyNotFound = errors.New("Unable to find the key")
|
||||
ErrBadJSONPath = errors.New("Bad path: must start with $ and have more then 2 chars")
|
||||
// ErrMalformedIndex returns when index key have invalid format
|
||||
ErrMalformedIndex = errors.New("malformed index key")
|
||||
// ErrInvalidIndexUsage returns when index key usage error
|
||||
ErrInvalidIndexUsage = errors.New("invalid index key usage")
|
||||
// ErrKeyNotFound returns when key not found
|
||||
ErrKeyNotFound = errors.New("unable to find the key")
|
||||
// ErrBadJSONPath returns when path have invalid syntax
|
||||
ErrBadJSONPath = errors.New("bad path: must start with $ and have more then 2 chars")
|
||||
)
|
||||
|
||||
// Lookup performs a lookup into a value, using a path of keys. The key should
|
||||
// match with a Field or a MapIndex. For slice you can use the syntax key[index]
|
||||
// to access a specific index. If one key owns to a slice and an index is not
|
||||
// specificied the rest of the path will be apllied to evaley value of the
|
||||
// slice, and the value will be merged into a slice.
|
||||
func Lookup(i interface{}, path string) (reflect.Value, error) {
|
||||
if path == "" || path[0:1] != "$" {
|
||||
return reflect.Value{}, ErrBadJSONPath
|
||||
@ -37,11 +49,6 @@ func Lookup(i interface{}, path string) (reflect.Value, error) {
|
||||
return lookup(i, strings.Split(path[2:], SplitToken)...)
|
||||
}
|
||||
|
||||
// Lookup performs a lookup into a value, using a path of keys. The key should
|
||||
// match with a Field or a MapIndex. For slice you can use the syntax key[index]
|
||||
// to access a specific index. If one key owns to a slice and an index is not
|
||||
// specificied the rest of the path will be apllied to evaley value of the
|
||||
// slice, and the value will be merged into a slice.
|
||||
func lookup(i interface{}, path ...string) (reflect.Value, error) {
|
||||
value := reflect.ValueOf(i)
|
||||
var parent reflect.Value
|
||||
|
@ -20,8 +20,8 @@ var bracketSplitter = regexp.MustCompile(`\[|\]`)
|
||||
|
||||
// StructField contains struct field path its value and field
|
||||
type StructField struct {
|
||||
Path string
|
||||
Value reflect.Value
|
||||
Path string
|
||||
Field reflect.StructField
|
||||
}
|
||||
|
||||
|
@ -11,13 +11,13 @@ import (
|
||||
|
||||
func TestStructfields(t *testing.T) {
|
||||
type Config struct {
|
||||
Wait time.Duration
|
||||
Time time.Time
|
||||
Nested *Config
|
||||
Metadata map[string]int
|
||||
Broker string
|
||||
Addr []string
|
||||
Wait time.Duration
|
||||
Verbose bool
|
||||
Nested *Config
|
||||
}
|
||||
cfg := &Config{Nested: &Config{}}
|
||||
fields, err := rutil.StructFields(cfg)
|
||||
|
Loading…
Reference in New Issue
Block a user