diff --git a/config/config.go b/config/config.go index 6cd4457a..d2a0262d 100644 --- a/config/config.go +++ b/config/config.go @@ -4,11 +4,15 @@ package config import ( "context" "errors" + "time" ) // DefaultConfig default config var DefaultConfig Config = NewConfig() +// DefaultWatcherInterval default interval for poll changes +var DefaultWatcherInterval = 5 * time.Second + var ( // ErrCodecMissing is returned when codec needed and not specified ErrCodecMissing = errors.New("codec missing") @@ -30,15 +34,17 @@ type Config interface { Load(context.Context, ...LoadOption) error // Save config to sources Save(context.Context, ...SaveOption) error - // Watch a value for changes - //Watch(context.Context) (Watcher, error) + // Watch a config for changes + Watch(context.Context, ...WatchOption) (Watcher, error) // String returns config type name String() string } // Watcher is the config watcher type Watcher interface { - // Next() (, error) + // Next blocks until update happens or error returned + Next() (map[string]interface{}, error) + // Stop stops watcher Stop() error } diff --git a/config/default.go b/config/default.go index 58b2098a..610abb5b 100644 --- a/config/default.go +++ b/config/default.go @@ -5,6 +5,7 @@ import ( "reflect" "strconv" "strings" + "time" "github.com/imdario/mergo" rutil "github.com/unistack-org/micro/v3/util/reflect" @@ -41,11 +42,15 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error { mopts = append(mopts, mergo.WithAppendSlice) } - src, err := rutil.Zero(c.opts.Struct) + dst := c.opts.Struct + if options.Struct != nil { + dst = options.Struct + } + + src, err := rutil.Zero(dst) if err == nil { - valueOf := reflect.ValueOf(src) - if err = c.fillValues(valueOf); err == nil { - err = mergo.Merge(c.opts.Struct, src, mopts...) + if err = fillValues(reflect.ValueOf(src), c.opts.StructTag); err == nil { + err = mergo.Merge(dst, src, mopts...) } } @@ -63,7 +68,7 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error { } //nolint:gocyclo -func (c *defaultConfig) fillValue(value reflect.Value, val string) error { +func fillValue(value reflect.Value, val string) error { if !rutil.IsEmpty(value) { return nil } @@ -80,10 +85,10 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error { kv := strings.FieldsFunc(nval, func(c rune) bool { return c == '=' }) mkey := reflect.Indirect(reflect.New(kt)) mval := reflect.Indirect(reflect.New(et)) - if err := c.fillValue(mkey, kv[0]); err != nil { + if err := fillValue(mkey, kv[0]); err != nil { return err } - if err := c.fillValue(mval, kv[1]); err != nil { + if err := fillValue(mval, kv[1]); err != nil { return err } value.SetMapIndex(mkey, mval) @@ -93,7 +98,7 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error { value.Set(reflect.MakeSlice(reflect.SliceOf(value.Type().Elem()), len(nvals), len(nvals))) for idx, nval := range nvals { nvalue := reflect.Indirect(reflect.New(value.Type().Elem())) - if err := c.fillValue(nvalue, nval); err != nil { + if err := fillValue(nvalue, nval); err != nil { return err } value.Index(idx).Set(nvalue) @@ -182,7 +187,7 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error { return nil } -func (c *defaultConfig) fillValues(valueOf reflect.Value) error { +func fillValues(valueOf reflect.Value, tname string) error { var values reflect.Value if valueOf.Kind() == reflect.Ptr { @@ -209,7 +214,7 @@ func (c *defaultConfig) fillValues(valueOf reflect.Value) error { switch value.Kind() { case reflect.Struct: value.Set(reflect.Indirect(reflect.New(value.Type()))) - if err := c.fillValues(value); err != nil { + if err := fillValues(value, tname); err != nil { return err } continue @@ -223,17 +228,17 @@ func (c *defaultConfig) fillValues(valueOf reflect.Value) error { value.Set(reflect.New(value.Type().Elem())) } value = value.Elem() - if err := c.fillValues(value); err != nil { + if err := fillValues(value, tname); err != nil { return err } continue } - tag, ok := field.Tag.Lookup(c.opts.StructTag) + tag, ok := field.Tag.Lookup(tname) if !ok { continue } - if err := c.fillValue(value, tag); err != nil { + if err := fillValue(value, tag); err != nil { return err } } @@ -265,6 +270,20 @@ func (c *defaultConfig) Name() string { return c.opts.Name } +func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + w := &defaultWatcher{ + opts: c.opts, + wopts: NewWatchOptions(opts...), + done: make(chan bool), + vchan: make(chan map[string]interface{}), + echan: make(chan error), + } + + go w.run() + + return w, nil +} + // NewConfig returns new default config source func NewConfig(opts ...Option) Config { options := NewOptions(opts...) @@ -273,3 +292,73 @@ func NewConfig(opts ...Option) Config { } return &defaultConfig{opts: options} } + +type defaultWatcher struct { + opts Options + wopts WatchOptions + done chan bool + ticker *time.Ticker + vchan chan map[string]interface{} + echan chan error +} + +func (w *defaultWatcher) run() { + ticker := time.NewTicker(w.wopts.Interval) + defer ticker.Stop() + + src := w.opts.Struct + if w.wopts.Struct != nil { + src = w.wopts.Struct + } + + for { + select { + case <-w.done: + return + case <-ticker.C: + dst, err := rutil.Zero(src) + if err == nil { + err = fillValues(reflect.ValueOf(dst), w.opts.StructTag) + } + if err != nil { + w.echan <- err + return + } + srcmp, err := rutil.StructFieldsMap(src) + if err != nil { + w.echan <- err + return + } + dstmp, err := rutil.StructFieldsMap(dst) + if err != nil { + w.echan <- err + return + } + for sk, sv := range srcmp { + if reflect.DeepEqual(dstmp[sk], sv) { + delete(dstmp, sk) + } + } + w.vchan <- dstmp + src = dst + } + } +} + +func (w *defaultWatcher) Next() (map[string]interface{}, error) { + select { + case <-w.done: + break + case v, ok := <-w.vchan: + if !ok { + break + } + return v, nil + } + return nil, ErrWatcherStopped +} + +func (w *defaultWatcher) Stop() error { + close(w.done) + return nil +} diff --git a/config/default_test.go b/config/default_test.go index b5b89376..66e3b193 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/unistack-org/micro/v3/config" ) @@ -17,6 +18,57 @@ type Cfg struct { IntValue int `default:"99"` } +func TestWatch(t *testing.T) { + ctx := context.Background() + + conf := &Cfg{IntValue: 10} + + cfg := config.NewConfig(config.Struct(conf)) + if err := cfg.Init(); err != nil { + t.Fatal(err) + } + if err := cfg.Load(ctx); err != nil { + t.Fatal(err) + } + + w, err := cfg.Watch(ctx, config.WatchInterval(500*time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + defer func() { + _ = w.Stop() + }() + + done := make(chan struct{}) + + go func() { + for { + mp, err := w.Next() + if err != nil && err != config.ErrWatcherStopped { + t.Fatal(err) + } else if err == config.ErrWatcherStopped { + return + } + if len(mp) != 1 { + t.Fatal(fmt.Errorf("default watcher err: %v", mp)) + } + + v, ok := mp["IntValue"] + if !ok { + t.Fatal(fmt.Errorf("default watcher err: %v", v)) + } + if nv, ok := v.(int); !ok || nv != 99 { + t.Fatal(fmt.Errorf("default watcher err: %v", v)) + } + close(done) + return + } + }() + + <-done +} + func TestDefault(t *testing.T) { ctx := context.Background() conf := &Cfg{IntValue: 10} @@ -47,6 +99,6 @@ func TestDefault(t *testing.T) { if conf.StringValue != "after_load" { t.Fatal("AfterLoad option not working") } - - t.Logf("%#+v\n", conf) + _ = conf + //t.Logf("%#+v\n", conf) } diff --git a/config/options.go b/config/options.go index d16e81e2..a417f571 100644 --- a/config/options.go +++ b/config/options.go @@ -2,6 +2,7 @@ package config import ( "context" + "time" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" @@ -62,6 +63,7 @@ type LoadOption func(o *LoadOptions) // LoadOptions struct type LoadOptions struct { + Struct interface{} Override bool Append bool } @@ -88,13 +90,29 @@ func LoadAppend(b bool) LoadOption { } } +// LoadStruct override struct for loading +func LoadStruct(src interface{}) LoadOption { + return func(o *LoadOptions) { + o.Struct = src + } +} + // SaveOption function signature type SaveOption func(o *SaveOptions) // SaveOptions struct type SaveOptions struct { + Struct interface{} } +// SaveStruct override struct for save to config +func SaveStruct(src interface{}) SaveOption { + return func(o *SaveOptions) { + o.Struct = src + } +} + +// NewSaveOptions fill SaveOptions struct func NewSaveOptions(opts ...SaveOption) SaveOptions { options := SaveOptions{} for _, o := range opts { @@ -186,3 +204,56 @@ func Name(n string) Option { o.Name = n } } + +// WatchOptions struuct +type WatchOptions struct { + // Context used by non default options + Context context.Context + // Coalesce multiple events to one + Coalesce bool + // Interval to periodically pull changes if config source not supports async notify + Interval time.Duration + // Struct for filling + Struct interface{} +} + +type WatchOption func(*WatchOptions) + +func NewWatchOptions(opts ...WatchOption) WatchOptions { + options := WatchOptions{ + Context: context.Background(), + Interval: DefaultWatcherInterval, + } + for _, o := range opts { + o(&options) + } + return options +} + +// WatchContext pass context +func WatchContext(ctx context.Context) WatchOption { + return func(o *WatchOptions) { + o.Context = ctx + } +} + +// WatchCoalesce controls watch event combining +func WatchCoalesce(b bool) WatchOption { + return func(o *WatchOptions) { + o.Coalesce = b + } +} + +// WatchInterval specifies time.Duration for pulling changes +func WatchInterval(td time.Duration) WatchOption { + return func(o *WatchOptions) { + o.Interval = td + } +} + +// WatchStruct overrides struct for fill +func WatchStruct(src interface{}) WatchOption { + return func(o *WatchOptions) { + o.Struct = src + } +} diff --git a/util/router/compile.go b/util/router/compile.go index 876aaabd..22f7d874 100644 --- a/util/router/compile.go +++ b/util/router/compile.go @@ -8,18 +8,18 @@ const ( // Template is a compiled representation of path templates. type Template struct { - // Verb is a VERB part in the template - Verb string - // Original template (example: /v1/a_bit_of_everything) - Template string - // OpCodes is a sequence of operations + // Version is the version number of the format. + Version int + // OpCodes is a sequence of operations. OpCodes []int // Pool is a constant pool Pool []string - // Fields is a list of field paths bound in this template + // Verb is a VERB part in the template. + Verb string + // Fields is a list of field paths bound in this template. Fields []string - // Version is the version number of the format - Version int + // Original template (example: /v1/a_bit_of_everything) + Template string } // Compiler compiles utilities representation of path templates into marshallable operations. @@ -29,9 +29,15 @@ type Compiler interface { } type op struct { - str string - code OpCode - operand int + // code is the opcode of the operation + code OpCode + + // str is a string operand of the code. + // num is ignored if str is not empty. + str string + + // num is a numeric operand of the code. + num int } func (w wildcard) compile() []op { @@ -61,8 +67,8 @@ func (v variable) compile() []op { ops = append(ops, s.compile()...) } ops = append(ops, op{ - code: OpConcatN, - operand: len(v.segments), + code: OpConcatN, + num: len(v.segments), }, op{ code: OpCapture, str: v.path, @@ -77,7 +83,6 @@ func (t template) Compile() Template { rawOps = append(rawOps, s.compile()...) } - // ops := make([]int, 0, len(rawOps)) var ( ops []int pool []string @@ -87,8 +92,12 @@ func (t template) Compile() Template { for _, op := range rawOps { ops = append(ops, int(op.code)) if op.str == "" { - ops = append(ops, op.operand) + ops = append(ops, op.num) } else { + // eof segment literal represents the "/" path pattern + if op.str == eof { + op.str = "" + } if _, ok := consts[op.str]; !ok { consts[op.str] = len(pool) pool = append(pool, op.str) diff --git a/util/router/compile_test.go b/util/router/compile_test.go index 0d655b90..63afe625 100644 --- a/util/router/compile_test.go +++ b/util/router/compile_test.go @@ -21,6 +21,13 @@ func TestCompile(t *testing.T) { fields []string }{ {}, + { + segs: []segment{ + literal(eof), + }, + ops: []int{int(OpLitPush), 0}, + pool: []string{""}, + }, { segs: []segment{ wildcard{}, diff --git a/util/router/parse.go b/util/router/parse.go index 9da8ea04..846ba0d1 100644 --- a/util/router/parse.go +++ b/util/router/parse.go @@ -3,11 +3,8 @@ package router // download from https://raw.githubusercontent.com/grpc-ecosystem/grpc-gateway/master/protoc-gen-grpc-gateway/httprule/parse.go import ( - "context" "fmt" "strings" - - "github.com/unistack-org/micro/v3/logger" ) // InvalidTemplateError indicates that the path template is not valid. @@ -83,8 +80,30 @@ func tokenize(path string) (tokens []string, verb string) { } l := len(tokens) + // See + // https://github.com/grpc-ecosystem/grpc-gateway/pull/1947#issuecomment-774523693 ; + // although normal and backwards-compat logic here is to use the last index + // of a colon, if the final segment is a variable followed by a colon, the + // part following the colon must be a verb. Hence if the previous token is + // an end var marker, we switch the index we're looking for to Index instead + // of LastIndex, so that we correctly grab the remaining part of the path as + // the verb. + var penultimateTokenIsEndVar bool + switch l { + case 0, 1: + // Not enough to be variable so skip this logic and don't result in an + // invalid index + default: + penultimateTokenIsEndVar = tokens[l-2] == "}" + } t := tokens[l-1] - if idx := strings.LastIndex(t, ":"); idx == 0 { + var idx int + if penultimateTokenIsEndVar { + idx = strings.Index(t, ":") + } else { + idx = strings.LastIndex(t, ":") + } + if idx == 0 { tokens, verb = tokens[:l-1], t[1:] } else if idx > 0 { tokens[l-1], verb = t[:idx], t[idx+1:] @@ -101,22 +120,17 @@ type parser struct { // topLevelSegments is the target of this parser. func (p *parser) topLevelSegments() ([]segment, error) { - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "Parsing %q", p.tokens) + if _, err := p.accept(typeEOF); err == nil { + p.tokens = p.tokens[:0] + return []segment{literal(eof)}, nil } segs, err := p.segments() if err != nil { return nil, err } - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "accept segments: %q; %q", p.accepted, p.tokens) - } if _, err := p.accept(typeEOF); err != nil { return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, "")) } - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "accept eof: %q; %q", p.accepted, p.tokens) - } return segs, nil } @@ -126,9 +140,6 @@ func (p *parser) segments() ([]segment, error) { return nil, err } - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "accept segment: %q; %q", p.accepted, p.tokens) - } segs := []segment{s} for { if _, err := p.accept("/"); err != nil { @@ -139,9 +150,6 @@ func (p *parser) segments() ([]segment, error) { return segs, err } segs = append(segs, s) - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "accept segment: %q; %q", p.accepted, p.tokens) - } } } diff --git a/util/router/parse_test.go b/util/router/parse_test.go index c6726419..fb13b444 100644 --- a/util/router/parse_test.go +++ b/util/router/parse_test.go @@ -4,7 +4,6 @@ package router import ( "context" - "flag" "fmt" "reflect" "testing" @@ -16,6 +15,7 @@ func TestTokenize(t *testing.T) { for _, spec := range []struct { src string tokens []string + verb string }{ { src: "", @@ -84,32 +84,74 @@ func TestTokenize(t *testing.T) { eof, }, }, + { + src: "v1/a/{endpoint}:a", + tokens: []string{ + "v1", "/", + "a", "/", + "{", "endpoint", "}", + eof, + }, + verb: "a", + }, + { + src: "v1/a/{endpoint}:b:c", + tokens: []string{ + "v1", "/", + "a", "/", + "{", "endpoint", "}", + eof, + }, + verb: "b:c", + }, } { tokens, verb := tokenize(spec.src) if got, want := tokens, spec.tokens; !reflect.DeepEqual(got, want) { t.Errorf("tokenize(%q) = %q, _; want %q, _", spec.src, got, want) } - if got, want := verb, ""; got != want { - t.Errorf("tokenize(%q) = _, %q; want _, %q", spec.src, got, want) - } - src := fmt.Sprintf("%s:%s", spec.src, "LOCK") - tokens, verb = tokenize(src) - if got, want := tokens, spec.tokens; !reflect.DeepEqual(got, want) { - t.Errorf("tokenize(%q) = %q, _; want %q, _", src, got, want) - } - if got, want := verb, "LOCK"; got != want { - t.Errorf("tokenize(%q) = _, %q; want _, %q", src, got, want) + switch { + case spec.verb != "": + if got, want := verb, spec.verb; !reflect.DeepEqual(got, want) { + t.Errorf("tokenize(%q) = %q, _; want %q, _", spec.src, got, want) + } + + default: + if got, want := verb, ""; got != want { + t.Errorf("tokenize(%q) = _, %q; want _, %q", spec.src, got, want) + } + + src := fmt.Sprintf("%s:%s", spec.src, "LOCK") + tokens, verb = tokenize(src) + if got, want := tokens, spec.tokens; !reflect.DeepEqual(got, want) { + t.Errorf("tokenize(%q) = %q, _; want %q, _", src, got, want) + } + if got, want := verb, "LOCK"; got != want { + t.Errorf("tokenize(%q) = _, %q; want _, %q", src, got, want) + } } } } func TestParseSegments(t *testing.T) { - flag.Set("v", "3") for _, spec := range []struct { tokens []string want []segment }{ + { + tokens: []string{eof}, + want: []segment{ + literal(eof), + }, + }, + { + // Note: this case will never arise as tokenize() will never return such a sequence of tokens + // and even if it does it will be treated as [eof] + tokens: []string{eof, "v1", eof}, + want: []segment{ + literal(eof), + }, + }, { tokens: []string{"v1", eof}, want: []segment{ @@ -251,7 +293,6 @@ func TestParseSegments(t *testing.T) { } func TestParseSegmentsWithErrors(t *testing.T) { - flag.Set("v", "3") for _, spec := range []struct { tokens []string }{ @@ -275,10 +316,6 @@ func TestParseSegmentsWithErrors(t *testing.T) { // invalid percent-encoding tokens: []string{"a%2z", eof}, }, - { - // empty segments - tokens: []string{eof}, - }, { // unterminated variable tokens: []string{"{", "name", eof}, diff --git a/util/router/runtime.go b/util/router/runtime.go index 62a26a1b..54d15f71 100644 --- a/util/router/runtime.go +++ b/util/router/runtime.go @@ -23,9 +23,9 @@ type rop struct { operand int } -// Pattern is a template pattern of http request paths defined in github.com/googleapis/googleapis/google/api/http.proto. +// Pattern is a template pattern of http request paths defined in +// https://github.com/googleapis/googleapis/blob/master/google/api/http.proto type Pattern struct { - verb string // ops is a list of operations ops []rop // pool is a constant pool indexed by the operands or vars @@ -36,32 +36,16 @@ type Pattern struct { stacksize int // tailLen is the length of the fixed-size segments after a deep wildcard tailLen int - // assumeColonVerb indicates whether a path suffix after a final - // colon may only be interpreted as a verb. - assumeColonVerb bool + // verb is the VERB part of the path pattern. It is empty if the pattern does not have VERB part. + verb string } -type patternOptions struct { - assumeColonVerb bool -} - -// PatternOpt is an option for creating Patterns. -type PatternOpt func(*patternOptions) - // NewPattern returns a new Pattern from the given definition values. // "ops" is a sequence of op codes. "pool" is a constant pool. // "verb" is the verb part of the pattern. It is empty if the pattern does not have the part. // "version" must be 1 for now. // It returns an error if the given definition is invalid. -//nolint:gocyclo -func NewPattern(version int, ops []int, pool []string, verb string, opts ...PatternOpt) (Pattern, error) { - options := patternOptions{ - assumeColonVerb: true, - } - for _, o := range opts { - o(&options) - } - +func NewPattern(version int, ops []int, pool []string, verb string) (Pattern, error) { if version != 1 { if logger.V(logger.TraceLevel) { logger.Trace(context.TODO(), "unsupported version: %d", version) @@ -159,13 +143,12 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt typedOps = append(typedOps, op) } return Pattern{ - ops: typedOps, - pool: pool, - vars: vars, - stacksize: maxstack, - tailLen: tailLen, - verb: verb, - assumeColonVerb: options.assumeColonVerb, + ops: typedOps, + pool: pool, + vars: vars, + stacksize: maxstack, + tailLen: tailLen, + verb: verb, }, nil } @@ -185,7 +168,7 @@ func MustPattern(p Pattern, err error) Pattern { //nolint:gocyclo func (p Pattern) Match(components []string, verb string) (map[string]string, error) { if p.verb != verb { - if p.assumeColonVerb || p.verb != "" { + if p.verb != "" { return nil, ErrNotMatch } if len(components) == 0 { @@ -274,11 +257,3 @@ func (p Pattern) String() string { } return "/" + segs } - -// AssumeColonVerbOpt indicates whether a path suffix after a final -// colon may only be interpreted as a verb. -func AssumeColonVerbOpt(val bool) PatternOpt { - return PatternOpt(func(o *patternOptions) { - o.assumeColonVerb = val - }) -} diff --git a/util/router/types.go b/util/router/types.go index 82d5f57f..183eeb7c 100644 --- a/util/router/types.go +++ b/util/router/types.go @@ -8,9 +8,9 @@ import ( ) type template struct { + segments []segment verb string template string - segments []segment } type segment interface {