diff --git a/client/lookup.go b/client/lookup.go index f452cda1..fcc85896 100644 --- a/client/lookup.go +++ b/client/lookup.go @@ -2,6 +2,7 @@ package client import ( "context" + "fmt" "sort" "github.com/unistack-org/micro/v3/errors" @@ -18,6 +19,10 @@ func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, return opts.Address, nil } + if opts.Router == nil { + return nil, router.ErrRouteNotFound + } + // construct the router query query := []router.QueryOption{router.QueryService(req.Service())} diff --git a/config/config.go b/config/config.go index 4f67c74b..9531c875 100644 --- a/config/config.go +++ b/config/config.go @@ -3,52 +3,35 @@ package config import ( "context" - - "github.com/unistack-org/micro/v3/config/loader" - "github.com/unistack-org/micro/v3/config/reader" - "github.com/unistack-org/micro/v3/config/source" + "errors" ) var ( - DefaultConfig Config + DefaultConfig Config = NewConfig() +) + +var ( + // ErrWatcherStopped is returned when source watcher has been stopped + ErrWatcherStopped = errors.New("watcher stopped") ) // Config is an interface abstraction for dynamic configuration type Config interface { - // provide the reader.Values interface - reader.Values // Init the config Init(opts ...Option) error // Options in the config Options() Options - // Stop the config loader/watcher - Close() error - // Load config sources - Load(source ...source.Source) error - // Force a source changeset sync - Sync() error + // Load config from sources + Load(context.Context) error + // Save config to sources + Save(context.Context) error // Watch a value for changes - Watch(path ...string) (Watcher, error) + // Watch(interface{}) (Watcher, error) + String() string } // Watcher is the config watcher -type Watcher interface { - Next() (reader.Value, error) - Stop() error -} - -type Options struct { - Loader loader.Loader - Reader reader.Reader - Source []source.Source - - // for alternative data - Context context.Context -} - -type Option func(o *Options) - -// NewConfig returns new config -func NewConfig(opts ...Option) (Config, error) { - return newConfig(opts...) -} +//type Watcher interface { +// Next() (, error) +// Stop() error +//} diff --git a/config/default.go b/config/default.go deleted file mode 100644 index 1ed08e47..00000000 --- a/config/default.go +++ /dev/null @@ -1,294 +0,0 @@ -package config - -import ( - "bytes" - "fmt" - "sync" - "time" - - "github.com/unistack-org/micro/v3/config/loader" - "github.com/unistack-org/micro/v3/config/reader" - "github.com/unistack-org/micro/v3/config/source" -) - -type config struct { - exit chan bool - opts Options - - sync.RWMutex - // the current snapshot - snap *loader.Snapshot - // the current values - vals reader.Values -} - -type watcher struct { - lw loader.Watcher - rd reader.Reader - path []string - value reader.Value -} - -func newConfig(opts ...Option) (Config, error) { - var c config - - if err := c.Init(opts...); err != nil { - return nil, err - } - - go c.run() - return &c, nil -} - -func (c *config) Init(opts ...Option) error { - c.opts = Options{} - c.exit = make(chan bool) - for _, o := range opts { - o(&c.opts) - } - - err := c.opts.Loader.Load(c.opts.Source...) - if err != nil { - return err - } - - c.snap, err = c.opts.Loader.Snapshot() - if err != nil { - return err - } - - c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet) - if err != nil { - return err - } - - return nil -} - -func (c *config) Options() Options { - return c.opts -} - -func (c *config) run() { - watch := func(w loader.Watcher) error { - for { - // get changeset - snap, err := w.Next() - if err != nil { - return err - } - - c.Lock() - - if c.snap != nil && c.snap.Version >= snap.Version { - c.Unlock() - continue - } - - // save - c.snap = snap - - // set values - c.vals, _ = c.opts.Reader.Values(snap.ChangeSet) - - c.Unlock() - } - } - - for { - w, err := c.opts.Loader.Watch() - if err != nil { - time.Sleep(time.Second) - continue - } - - done := make(chan bool) - - // the stop watch func - go func() { - select { - case <-done: - case <-c.exit: - } - w.Stop() - }() - - // block watch - if err := watch(w); err != nil { - // do something better - time.Sleep(time.Second) - } - - // close done chan - close(done) - - // if the config is closed exit - select { - case <-c.exit: - return - default: - } - } -} - -func (c *config) Map() map[string]interface{} { - c.RLock() - defer c.RUnlock() - return c.vals.Map() -} - -func (c *config) Scan(v interface{}) error { - c.RLock() - defer c.RUnlock() - return c.vals.Scan(v) -} - -// sync loads all the sources, calls the parser and updates the config -func (c *config) Sync() error { - if err := c.opts.Loader.Sync(); err != nil { - return err - } - - snap, err := c.opts.Loader.Snapshot() - if err != nil { - return err - } - - c.Lock() - defer c.Unlock() - - c.snap = snap - vals, err := c.opts.Reader.Values(snap.ChangeSet) - if err != nil { - return err - } - c.vals = vals - - return nil -} - -func (c *config) Close() error { - select { - case <-c.exit: - return nil - default: - close(c.exit) - } - return nil -} - -func (c *config) Get(path ...string) (reader.Value, error) { - c.RLock() - defer c.RUnlock() - - // did sync actually work? - if c.vals != nil { - return c.vals.Get(path...) - } - - // no value - return nil, fmt.Errorf("no value") -} - -func (c *config) Set(val interface{}, path ...string) error { - c.Lock() - defer c.Unlock() - - if c.vals != nil { - c.vals.Set(val, path...) - } - return nil -} - -func (c *config) Del(path ...string) error { - c.Lock() - defer c.Unlock() - - if c.vals != nil { - c.vals.Del(path...) - } - - return nil -} - -func (c *config) Bytes() []byte { - c.RLock() - defer c.RUnlock() - - if c.vals == nil { - return []byte{} - } - - return c.vals.Bytes() -} - -func (c *config) Load(sources ...source.Source) error { - if err := c.opts.Loader.Load(sources...); err != nil { - return err - } - - snap, err := c.opts.Loader.Snapshot() - if err != nil { - return err - } - - c.Lock() - defer c.Unlock() - - c.snap = snap - vals, err := c.opts.Reader.Values(snap.ChangeSet) - if err != nil { - return err - } - c.vals = vals - - return nil -} - -func (c *config) Watch(path ...string) (Watcher, error) { - value, err := c.Get(path...) - if err != nil { - return nil, err - } - - w, err := c.opts.Loader.Watch(path...) - if err != nil { - return nil, err - } - - return &watcher{ - lw: w, - rd: c.opts.Reader, - path: path, - value: value, - }, nil -} - -func (c *config) String() string { - return "config" -} - -func (w *watcher) Next() (reader.Value, error) { - for { - s, err := w.lw.Next() - if err != nil { - return nil, err - } - - // only process changes - if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) { - continue - } - - v, err := w.rd.Values(s.ChangeSet) - if err != nil { - return nil, err - } - - return v.Get() - } -} - -func (w *watcher) Stop() error { - return w.lw.Stop() -} diff --git a/config/default_test.go b/config/default_test.go deleted file mode 100644 index 0714b367..00000000 --- a/config/default_test.go +++ /dev/null @@ -1,168 +0,0 @@ -// +build ignore - -package config - -import ( - "fmt" - "os" - "path/filepath" - "runtime" - "strings" - "testing" - "time" - - "github.com/unistack-org/micro/v3/config/source" - "github.com/unistack-org/micro/v3/config/source/env" - "github.com/unistack-org/micro/v3/config/source/file" - "github.com/unistack-org/micro/v3/config/source/memory" -) - -func createFileForIssue18(t *testing.T, content string) *os.File { - data := []byte(content) - path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) - fh, err := os.Create(path) - if err != nil { - t.Error(err) - } - _, err = fh.Write(data) - if err != nil { - t.Error(err) - } - - return fh -} - -func createFileForTest(t *testing.T) *os.File { - data := []byte(`{"foo": "bar"}`) - path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) - fh, err := os.Create(path) - if err != nil { - t.Error(err) - } - _, err = fh.Write(data) - if err != nil { - t.Error(err) - } - - return fh -} - -func TestConfigLoadWithGoodFile(t *testing.T) { - fh := createFileForTest(t) - path := fh.Name() - defer func() { - fh.Close() - os.Remove(path) - }() - - // Create new config - conf, err := NewConfig() - if err != nil { - t.Fatalf("Expected no error but got %v", err) - } - // Load file source - if err := conf.Load(file.NewSource( - file.WithPath(path), - )); err != nil { - t.Fatalf("Expected no error but got %v", err) - } -} - -func TestConfigLoadWithInvalidFile(t *testing.T) { - fh := createFileForTest(t) - path := fh.Name() - defer func() { - fh.Close() - os.Remove(path) - }() - - // Create new config - conf, err := NewConfig() - if err != nil { - t.Fatalf("Expected no error but got %v", err) - } - // Load file source - err = conf.Load(file.NewSource( - file.WithPath(path), - file.WithPath("/i/do/not/exists.json"), - )) - - if err == nil { - t.Fatal("Expected error but none !") - } - if !strings.Contains(fmt.Sprintf("%v", err), "/i/do/not/exists.json") { - t.Fatalf("Expected error to contain the unexisting file but got %v", err) - } -} - -func TestConfigMerge(t *testing.T) { - fh := createFileForIssue18(t, `{ - "amqp": { - "host": "rabbit.platform", - "port": 80 - }, - "handler": { - "exchange": "springCloudBus" - } -}`) - path := fh.Name() - defer func() { - fh.Close() - os.Remove(path) - }() - os.Setenv("AMQP_HOST", "rabbit.testing.com") - - conf, err := NewConfig() - if err != nil { - t.Fatalf("Expected no error but got %v", err) - } - if err := conf.Load( - file.NewSource( - file.WithPath(path), - ), - env.NewSource(), - ); err != nil { - t.Fatalf("Expected no error but got %v", err) - } - - actualHost := conf.Get("amqp", "host").String("backup") - if actualHost != "rabbit.testing.com" { - t.Fatalf("Expected %v but got %v", - "rabbit.testing.com", - actualHost) - } -} - -func equalS(t *testing.T, actual, expect string) { - if actual != expect { - t.Errorf("Expected %s but got %s", actual, expect) - } -} - -func TestConfigWatcherDirtyOverrite(t *testing.T) { - n := runtime.GOMAXPROCS(0) - defer runtime.GOMAXPROCS(n) - - runtime.GOMAXPROCS(1) - - l := 100 - - ss := make([]source.Source, l) - - for i := 0; i < l; i++ { - ss[i] = memory.NewSource(memory.WithJSON([]byte(fmt.Sprintf(`{"key%d": "val%d"}`, i, i)))) - } - - conf, _ := NewConfig() - - for _, s := range ss { - _ = conf.Load(s) - } - runtime.Gosched() - - for i := range ss { - k := fmt.Sprintf("key%d", i) - v := fmt.Sprintf("val%d", i) - equalS(t, conf.Get(k).String(""), v) - } -} diff --git a/config/encoder/encoder.go b/config/encoder/encoder.go deleted file mode 100644 index 0ef0654a..00000000 --- a/config/encoder/encoder.go +++ /dev/null @@ -1,8 +0,0 @@ -// Package encoder handles source encoding formats -package encoder - -type Encoder interface { - Encode(interface{}) ([]byte, error) - Decode([]byte, interface{}) error - String() string -} diff --git a/config/encoder/hcl/hcl.go b/config/encoder/hcl/hcl.go deleted file mode 100644 index 0e1801f4..00000000 --- a/config/encoder/hcl/hcl.go +++ /dev/null @@ -1,26 +0,0 @@ -package hcl - -import ( - "encoding/json" - - "github.com/hashicorp/hcl" - "github.com/unistack-org/micro/v3/config/encoder" -) - -type hclEncoder struct{} - -func (h hclEncoder) Encode(v interface{}) ([]byte, error) { - return json.Marshal(v) -} - -func (h hclEncoder) Decode(d []byte, v interface{}) error { - return hcl.Unmarshal(d, v) -} - -func (h hclEncoder) String() string { - return "hcl" -} - -func NewEncoder() encoder.Encoder { - return hclEncoder{} -} diff --git a/config/encoder/json/json.go b/config/encoder/json/json.go deleted file mode 100644 index a63e3af9..00000000 --- a/config/encoder/json/json.go +++ /dev/null @@ -1,25 +0,0 @@ -package json - -import ( - "encoding/json" - - "github.com/unistack-org/micro/v3/config/encoder" -) - -type jsonEncoder struct{} - -func (j jsonEncoder) Encode(v interface{}) ([]byte, error) { - return json.Marshal(v) -} - -func (j jsonEncoder) Decode(d []byte, v interface{}) error { - return json.Unmarshal(d, v) -} - -func (j jsonEncoder) String() string { - return "json" -} - -func NewEncoder() encoder.Encoder { - return jsonEncoder{} -} diff --git a/config/encoder/toml/toml.go b/config/encoder/toml/toml.go deleted file mode 100644 index c4b3c620..00000000 --- a/config/encoder/toml/toml.go +++ /dev/null @@ -1,32 +0,0 @@ -package toml - -import ( - "bytes" - - "github.com/BurntSushi/toml" - "github.com/unistack-org/micro/v3/config/encoder" -) - -type tomlEncoder struct{} - -func (t tomlEncoder) Encode(v interface{}) ([]byte, error) { - b := bytes.NewBuffer(nil) - defer b.Reset() - err := toml.NewEncoder(b).Encode(v) - if err != nil { - return nil, err - } - return b.Bytes(), nil -} - -func (t tomlEncoder) Decode(d []byte, v interface{}) error { - return toml.Unmarshal(d, v) -} - -func (t tomlEncoder) String() string { - return "toml" -} - -func NewEncoder() encoder.Encoder { - return tomlEncoder{} -} diff --git a/config/encoder/xml/xml.go b/config/encoder/xml/xml.go deleted file mode 100644 index d8329944..00000000 --- a/config/encoder/xml/xml.go +++ /dev/null @@ -1,25 +0,0 @@ -package xml - -import ( - "encoding/xml" - - "github.com/unistack-org/micro/v3/config/encoder" -) - -type xmlEncoder struct{} - -func (x xmlEncoder) Encode(v interface{}) ([]byte, error) { - return xml.Marshal(v) -} - -func (x xmlEncoder) Decode(d []byte, v interface{}) error { - return xml.Unmarshal(d, v) -} - -func (x xmlEncoder) String() string { - return "xml" -} - -func NewEncoder() encoder.Encoder { - return xmlEncoder{} -} diff --git a/config/encoder/yaml/yaml.go b/config/encoder/yaml/yaml.go deleted file mode 100644 index 9374b8a5..00000000 --- a/config/encoder/yaml/yaml.go +++ /dev/null @@ -1,24 +0,0 @@ -package yaml - -import ( - "github.com/ghodss/yaml" - "github.com/unistack-org/micro/v3/config/encoder" -) - -type yamlEncoder struct{} - -func (y yamlEncoder) Encode(v interface{}) ([]byte, error) { - return yaml.Marshal(v) -} - -func (y yamlEncoder) Decode(d []byte, v interface{}) error { - return yaml.Unmarshal(d, v) -} - -func (y yamlEncoder) String() string { - return "yaml" -} - -func NewEncoder() encoder.Encoder { - return yamlEncoder{} -} diff --git a/config/loader/loader.go b/config/loader/loader.go deleted file mode 100644 index 7221f77c..00000000 --- a/config/loader/loader.go +++ /dev/null @@ -1,63 +0,0 @@ -// package loader manages loading from multiple sources -package loader - -import ( - "context" - - "github.com/unistack-org/micro/v3/config/reader" - "github.com/unistack-org/micro/v3/config/source" -) - -// Loader manages loading sources -type Loader interface { - // Stop the loader - Close() error - // Load the sources - Load(...source.Source) error - // A Snapshot of loaded config - Snapshot() (*Snapshot, error) - // Force sync of sources - Sync() error - // Watch for changes - Watch(...string) (Watcher, error) - // Name of loader - String() string -} - -// Watcher lets you watch sources and returns a merged ChangeSet -type Watcher interface { - // First call to next may return the current Snapshot - // If you are watching a path then only the data from - // that path is returned. - Next() (*Snapshot, error) - // Stop watching for changes - Stop() error -} - -// Snapshot is a merged ChangeSet -type Snapshot struct { - // The merged ChangeSet - ChangeSet *source.ChangeSet - // Deterministic and comparable version of the snapshot - Version string -} - -type Options struct { - Reader reader.Reader - Source []source.Source - - // for alternative data - Context context.Context -} - -type Option func(o *Options) - -// Copy snapshot -func Copy(s *Snapshot) *Snapshot { - cs := *(s.ChangeSet) - - return &Snapshot{ - ChangeSet: &cs, - Version: s.Version, - } -} diff --git a/config/noop.go b/config/noop.go new file mode 100644 index 00000000..508421b3 --- /dev/null +++ b/config/noop.go @@ -0,0 +1,36 @@ +// Package config is an interface for dynamic configuration. +package config + +import "context" + +type noopConfig struct { + opts Options +} + +func (c *noopConfig) Init(opts ...Option) error { + for _, o := range opts { + o(&c.opts) + } + return nil +} + +func (c *noopConfig) Load(ctx context.Context) error { + return nil +} + +func (c *noopConfig) Save(ctx context.Context) error { + return nil +} + +func (c *noopConfig) Options() Options { + return c.opts +} + +func (c *noopConfig) String() string { + return "noop" +} + +// NewConfig returns new noop config +func NewConfig(opts ...Option) Config { + return &noopConfig{opts: NewOptions(opts...)} +} diff --git a/config/noop_test.go b/config/noop_test.go new file mode 100644 index 00000000..9b58926d --- /dev/null +++ b/config/noop_test.go @@ -0,0 +1,61 @@ +package config_test + +import "testing" +import "context" +import "fmt" +import "github.com/unistack-org/micro/v3/config" + +type Cfg struct { + Value string +} + +func TestNoop(t *testing.T) { + ctx := context.Background() + conf := &Cfg{} + blfn := func(ctx context.Context, cfg config.Config) error { + conf, ok := cfg.Options().Struct.(*Cfg) + if !ok { + return fmt.Errorf("failed to get Struct from options: %v", cfg.Options()) + } + conf.Value = "before_load" + return nil + } + alfn := func(ctx context.Context, cfg config.Config) error { + conf, ok := cfg.Options().Struct.(*Cfg) + if !ok { + return fmt.Errorf("failed to get Struct from options: %v", cfg.Options()) + } + conf.Value = "after_load" + return nil + } + + + cfg := config.NewConfig(config.Struct(conf),config.BeforeLoad(blfn),config.AfterLoad(alfn)) + if err := cfg.Init(); err != nil { + t.Fatal(err) + } + for _, fn := range cfg.Options().BeforeLoad { + if err := fn(ctx, cfg); err != nil { + t.Fatal(err) + } + } + if conf.Value != "before_load" { + t.Fatal("BeforeLoad option not working") + } + + if err := cfg.Load(ctx); err != nil { + t.Fatal(err) + } + + for _, fn := range cfg.Options().AfterLoad { + if err := fn(ctx, cfg); err != nil { + t.Fatal(err) + } + } + if conf.Value != "after_load" { + t.Fatal("AfterLoad option not working") + } + + + +} diff --git a/config/options.go b/config/options.go index 20fcb9de..e6d050cd 100644 --- a/config/options.go +++ b/config/options.go @@ -1,28 +1,88 @@ package config import ( - "github.com/unistack-org/micro/v3/config/loader" - "github.com/unistack-org/micro/v3/config/reader" - "github.com/unistack-org/micro/v3/config/source" + "context" + + "github.com/unistack-org/micro/v3/codec" ) -// WithLoader sets the loader for manager config -func WithLoader(l loader.Loader) Option { +type Options struct { + BeforeLoad []func(context.Context, Config) error + AfterLoad []func(context.Context, Config) error + BeforeSave []func(context.Context, Config) error + AfterSave []func(context.Context, Config) error + // Struct that holds config data + Struct interface{} + // struct tag name + StructTag string + // codec that used for load/save + Codec codec.Codec + // for alternative data + Context context.Context +} + +type Option func(o *Options) + +func NewOptions(opts ...Option) Options { + options := Options{ + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + + return options +} + +func BeforeLoad(fn ...func(context.Context, Config) error) Option { return func(o *Options) { - o.Loader = l + o.BeforeLoad = fn } } -// WithSource appends a source to list of sources -func WithSource(s source.Source) Option { +func AfterLoad(fn ...func(context.Context, Config) error) Option { return func(o *Options) { - o.Source = append(o.Source, s) + o.AfterLoad = fn } } -// WithReader sets the config reader -func WithReader(r reader.Reader) Option { +func BeforeSave(fn ...func(context.Context, Config) error) Option { return func(o *Options) { - o.Reader = r + o.BeforeSave = fn + } +} + +func AfterSave(fn ...func(context.Context, Config) error) Option { + return func(o *Options) { + o.AfterSave= fn + } +} + + + +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +// Codec sets the source codec +func Codec(c codec.Codec) Option { + return func(o *Options) { + o.Codec = c + } +} + +// Struct +func Struct(v interface{}) Option { + return func(o *Options) { + o.Struct = v + } +} + +// StructTag +func StructTag(name string) Option { + return func(o *Options) { + o.StructTag = name } } diff --git a/config/reader/options.go b/config/reader/options.go deleted file mode 100644 index d96938ce..00000000 --- a/config/reader/options.go +++ /dev/null @@ -1,50 +0,0 @@ -package reader - -import ( - "github.com/unistack-org/micro/v3/config/encoder" - "github.com/unistack-org/micro/v3/config/encoder/hcl" - "github.com/unistack-org/micro/v3/config/encoder/json" - "github.com/unistack-org/micro/v3/config/encoder/toml" - "github.com/unistack-org/micro/v3/config/encoder/xml" - "github.com/unistack-org/micro/v3/config/encoder/yaml" -) - -type Options struct { - Encoding map[string]encoder.Encoder - DisableReplaceEnvVars bool -} - -type Option func(o *Options) - -func NewOptions(opts ...Option) Options { - options := Options{ - Encoding: map[string]encoder.Encoder{ - "json": json.NewEncoder(), - "yaml": yaml.NewEncoder(), - "toml": toml.NewEncoder(), - "xml": xml.NewEncoder(), - "hcl": hcl.NewEncoder(), - "yml": yaml.NewEncoder(), - }, - } - for _, o := range opts { - o(&options) - } - return options -} - -func WithEncoder(e encoder.Encoder) Option { - return func(o *Options) { - if o.Encoding == nil { - o.Encoding = make(map[string]encoder.Encoder) - } - o.Encoding[e.String()] = e - } -} - -// WithDisableReplaceEnvVars disables the environment variable interpolation preprocessor -func WithDisableReplaceEnvVars() Option { - return func(o *Options) { - o.DisableReplaceEnvVars = true - } -} diff --git a/config/reader/preprocessor.go b/config/reader/preprocessor.go deleted file mode 100644 index 2895be4f..00000000 --- a/config/reader/preprocessor.go +++ /dev/null @@ -1,23 +0,0 @@ -package reader - -import ( - "os" - "regexp" -) - -func ReplaceEnvVars(raw []byte) ([]byte, error) { - re := regexp.MustCompile(`\$\{([A-Za-z0-9_]+)\}`) - if re.Match(raw) { - dataS := string(raw) - res := re.ReplaceAllStringFunc(dataS, replaceEnvVars) - return []byte(res), nil - } else { - return raw, nil - } -} - -func replaceEnvVars(element string) string { - v := element[2 : len(element)-1] - el := os.Getenv(v) - return el -} diff --git a/config/reader/preprocessor_test.go b/config/reader/preprocessor_test.go deleted file mode 100644 index ba5485fe..00000000 --- a/config/reader/preprocessor_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package reader - -import ( - "os" - "strings" - "testing" -) - -func TestReplaceEnvVars(t *testing.T) { - os.Setenv("myBar", "cat") - os.Setenv("MYBAR", "cat") - os.Setenv("my_Bar", "cat") - os.Setenv("myBar_", "cat") - - testData := []struct { - expected string - data []byte - }{ - // Right use cases - { - `{"foo": "bar", "baz": {"bar": "cat"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${myBar}"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "cat"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${MYBAR}"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "cat"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${my_Bar}"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "cat"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${myBar_}"}}`), - }, - // Wrong use cases - { - `{"foo": "bar", "baz": {"bar": "${myBar-}"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${myBar-}"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "${}"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${}"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "$sss}"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "$sss}"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "${sss"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "${sss"}}`), - }, - { - `{"foo": "bar", "baz": {"bar": "{something}"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "{something}"}}`), - }, - // Use cases without replace env vars - { - `{"foo": "bar", "baz": {"bar": "cat"}}`, - []byte(`{"foo": "bar", "baz": {"bar": "cat"}}`), - }, - } - - for _, test := range testData { - res, err := ReplaceEnvVars(test.data) - if err != nil { - t.Fatal(err) - } - if strings.Compare(test.expected, string(res)) != 0 { - t.Fatalf("Expected %s got %s", test.expected, res) - } - } -} diff --git a/config/reader/reader.go b/config/reader/reader.go deleted file mode 100644 index a44732cb..00000000 --- a/config/reader/reader.go +++ /dev/null @@ -1,38 +0,0 @@ -// Package reader parses change sets and provides config values -package reader - -import ( - "time" - - "github.com/unistack-org/micro/v3/config/source" -) - -// Reader is an interface for merging changesets -type Reader interface { - Merge(...*source.ChangeSet) (*source.ChangeSet, error) - Values(*source.ChangeSet) (Values, error) - String() string -} - -// Values is returned by the reader -type Values interface { - Bytes() []byte - Get(path ...string) (Value, error) - Set(val interface{}, path ...string) error - Del(path ...string) error - Map() map[string]interface{} - Scan(v interface{}) error -} - -// Value represents a value of any type -type Value interface { - Bool(def bool) bool - Int(def int) int - String(def string) string - Float64(def float64) float64 - Duration(def time.Duration) time.Duration - StringSlice(def []string) []string - StringMap(def map[string]string) map[string]string - Scan(val interface{}) error - Bytes() []byte -} diff --git a/config/source/changeset.go b/config/source/changeset.go deleted file mode 100644 index 9958f61d..00000000 --- a/config/source/changeset.go +++ /dev/null @@ -1,13 +0,0 @@ -package source - -import ( - "crypto/md5" - "fmt" -) - -// Sum returns the md5 checksum of the ChangeSet data -func (c *ChangeSet) Sum() string { - h := md5.New() - h.Write(c.Data) - return fmt.Sprintf("%x", h.Sum(nil)) -} diff --git a/config/source/noop.go b/config/source/noop.go deleted file mode 100644 index fc444411..00000000 --- a/config/source/noop.go +++ /dev/null @@ -1,25 +0,0 @@ -package source - -import ( - "errors" -) - -type noopWatcher struct { - exit chan struct{} -} - -func (w *noopWatcher) Next() (*ChangeSet, error) { - <-w.exit - - return nil, errors.New("noopWatcher stopped") -} - -func (w *noopWatcher) Stop() error { - close(w.exit) - return nil -} - -// NewNoopWatcher returns a watcher that blocks on Next() until Stop() is called. -func NewNoopWatcher() (Watcher, error) { - return &noopWatcher{exit: make(chan struct{})}, nil -} diff --git a/config/source/options.go b/config/source/options.go deleted file mode 100644 index bf4b352b..00000000 --- a/config/source/options.go +++ /dev/null @@ -1,49 +0,0 @@ -package source - -import ( - "context" - - "github.com/unistack-org/micro/v3/client" - "github.com/unistack-org/micro/v3/config/encoder" - "github.com/unistack-org/micro/v3/config/encoder/json" -) - -type Options struct { - // Encoder - Encoder encoder.Encoder - - // for alternative data - Context context.Context - - // Client to use for RPC - Client client.Client -} - -type Option func(o *Options) - -func NewOptions(opts ...Option) Options { - options := Options{ - Encoder: json.NewEncoder(), - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// WithEncoder sets the source encoder -func WithEncoder(e encoder.Encoder) Option { - return func(o *Options) { - o.Encoder = e - } -} - -// WithClient sets the source client -func WithClient(c client.Client) Option { - return func(o *Options) { - o.Client = c - } -} diff --git a/config/source/source.go b/config/source/source.go deleted file mode 100644 index 0cf8b9fb..00000000 --- a/config/source/source.go +++ /dev/null @@ -1,35 +0,0 @@ -// Package source is the interface for sources -package source - -import ( - "errors" - "time" -) - -var ( - // ErrWatcherStopped is returned when source watcher has been stopped - ErrWatcherStopped = errors.New("watcher stopped") -) - -// Source is the source from which config is loaded -type Source interface { - Read() (*ChangeSet, error) - Write(*ChangeSet) error - Watch() (Watcher, error) - String() string -} - -// ChangeSet represents a set of changes from a source -type ChangeSet struct { - Data []byte - Checksum string - Format string - Source string - Timestamp time.Time -} - -// Watcher watches a source for changes -type Watcher interface { - Next() (*ChangeSet, error) - Stop() error -} diff --git a/config/value.go b/config/value.go deleted file mode 100644 index d74d5753..00000000 --- a/config/value.go +++ /dev/null @@ -1,49 +0,0 @@ -package config - -import ( - "time" - - "github.com/unistack-org/micro/v3/config/reader" -) - -type value struct{} - -func newValue() reader.Value { - return new(value) -} - -func (v *value) Bool(def bool) bool { - return false -} - -func (v *value) Int(def int) int { - return 0 -} - -func (v *value) String(def string) string { - return "" -} - -func (v *value) Float64(def float64) float64 { - return 0.0 -} - -func (v *value) Duration(def time.Duration) time.Duration { - return time.Duration(0) -} - -func (v *value) StringSlice(def []string) []string { - return nil -} - -func (v *value) StringMap(def map[string]string) map[string]string { - return map[string]string{} -} - -func (v *value) Scan(val interface{}) error { - return nil -} - -func (v *value) Bytes() []byte { - return nil -} diff --git a/go.mod b/go.mod index 282b9abb..d5c9bd1c 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,12 @@ module github.com/unistack-org/micro/v3 go 1.14 require ( - github.com/BurntSushi/toml v0.3.1 github.com/caddyserver/certmagic v0.10.6 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 - github.com/ghodss/yaml v1.0.0 github.com/go-acme/lego/v3 v3.4.0 github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.2 - github.com/hashicorp/hcl v1.0.0 github.com/micro/cli/v2 v2.1.2 github.com/miekg/dns v1.1.31 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c diff --git a/mapping.txt b/mapping.txt new file mode 100644 index 00000000..e4c0086e --- /dev/null +++ b/mapping.txt @@ -0,0 +1,2 @@ +client/grpc/* micro-client-grpc +#server/grpc/* micro-server-grpc diff --git a/options.go b/options.go index 446dcad0..9faaf94d 100644 --- a/options.go +++ b/options.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/micro/cli/v2" - cmd "github.com/unistack-org/micro-config-cmd" "github.com/unistack-org/micro/v3/auth" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/client" @@ -13,7 +11,6 @@ import ( "github.com/unistack-org/micro/v3/debug/profile" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/runtime" @@ -25,25 +22,23 @@ import ( // Options for micro service type Options struct { - Auth auth.Auth - Broker broker.Broker - Logger logger.Logger - Cmd cmd.Cmd - Config config.Config - Client client.Client - Server server.Server - Store store.Store - Registry registry.Registry - Router router.Router - Runtime runtime.Runtime - Transport transport.Transport - Profile profile.Profile + Auth auth.Auth + Broker broker.Broker + Logger logger.Logger + Configs []config.Config + Client client.Client + Server server.Server + Store store.Store + Registry registry.Registry + Router router.Router + Runtime runtime.Runtime + Profile profile.Profile // Before and After funcs - BeforeStart []func() error - BeforeStop []func() error - AfterStart []func() error - AfterStop []func() error + BeforeStart []func(context.Context) error + BeforeStop []func(context.Context) error + AfterStart []func(context.Context) error + AfterStop []func(context.Context) error // Other options for implementations of the interface // can be stored in a context @@ -53,17 +48,16 @@ type Options struct { // NewOptions returns new Options filled with defaults and overrided by provided opts func NewOptions(opts ...Option) Options { options := Options{ - Context: context.Background(), - Server: server.DefaultServer, - Client: client.DefaultClient, - Broker: broker.DefaultBroker, - Registry: registry.DefaultRegistry, - Router: router.DefaultRouter, - Auth: auth.DefaultAuth, - Logger: logger.DefaultLogger, - Config: config.DefaultConfig, - Store: store.DefaultStore, - Transport: transport.DefaultTransport, + Context: context.Background(), + Server: server.DefaultServer, + Client: client.DefaultClient, + Broker: broker.DefaultBroker, + Registry: registry.DefaultRegistry, + Router: router.DefaultRouter, + Auth: auth.DefaultAuth, + Logger: logger.DefaultLogger, + Configs: []config.Config{config.DefaultConfig}, + Store: store.DefaultStore, //Runtime runtime.Runtime //Profile profile.Profile } @@ -92,13 +86,6 @@ func Broker(b broker.Broker) Option { } } -// Cmd to be used for service -func Cmd(c cmd.Cmd) Option { - return func(o *Options) { - o.Cmd = c - } -} - // Client to be used for service func Client(c client.Client) Option { return func(o *Options) { @@ -182,10 +169,10 @@ func Auth(a auth.Auth) Option { } } -// Config sets the config for the service -func Config(c config.Config) Option { +// Configs sets the configs for the service +func Configs(c []config.Config) Option { return func(o *Options) { - o.Config = c + o.Configs = c } } @@ -198,21 +185,6 @@ func Selector(s selector.Selector) Option { } } -// Transport sets the transport for the service -// and the underlying components -func Transport(t transport.Transport) Option { - return func(o *Options) { - o.Transport = t - // Update Client and Server - if o.Client != nil { - o.Client.Init(client.Transport(t)) - } - if o.Server != nil { - o.Server.Init(server.Transport(t)) - } - } -} - // Runtime sets the runtime func Runtime(r runtime.Runtime) Option { return func(o *Options) { @@ -231,8 +203,6 @@ func Router(r router.Router) Option { } } -// Convenience options - // Address sets the address of the server func Address(addr string) Option { return func(o *Options) { @@ -269,24 +239,6 @@ func Metadata(md metadata.Metadata) Option { } } -// Flags that can be passed to service -func Flags(flags ...cli.Flag) Option { - return func(o *Options) { - if o.Cmd != nil { - o.Cmd.App().Flags = append(o.Cmd.App().Flags, flags...) - } - } -} - -// Action can be used to parse user provided cli options -func Action(a func(*cli.Context) error) Option { - return func(o *Options) { - if o.Cmd != nil { - o.Cmd.App().Action = a - } - } -} - // RegisterTTL specifies the TTL to use when registering the service func RegisterTTL(t time.Duration) Option { return func(o *Options) { @@ -352,31 +304,29 @@ func WrapSubscriber(w ...server.SubscriberWrapper) Option { } } -// Before and Afters - // BeforeStart run funcs before service starts -func BeforeStart(fn func() error) Option { +func BeforeStart(fn func(context.Context) error) Option { return func(o *Options) { o.BeforeStart = append(o.BeforeStart, fn) } } // BeforeStop run funcs before service stops -func BeforeStop(fn func() error) Option { +func BeforeStop(fn func(context.Context) error) Option { return func(o *Options) { o.BeforeStop = append(o.BeforeStop, fn) } } // AfterStart run funcs after service starts -func AfterStart(fn func() error) Option { +func AfterStart(fn func(context.Context) error) Option { return func(o *Options) { o.AfterStart = append(o.AfterStart, fn) } } // AfterStop run funcs after service stops -func AfterStop(fn func() error) Option { +func AfterStop(fn func(context.Context) error) Option { return func(o *Options) { o.AfterStop = append(o.AfterStop, fn) } diff --git a/pull.sh b/pull.sh new file mode 100755 index 00000000..c1981d92 --- /dev/null +++ b/pull.sh @@ -0,0 +1,41 @@ +#!/bin/bash -ex + +if [ "$1" == "--force" ]; then + force="yes" +fi + +srcsha="--root" +dstsha="HEAD" +commitrange="${srcsha} ${dstsha}" + +while read srcpath dstpath; do + if [ "${srcpath::1}" == "#" ] ; then + continue + fi + + relpath="${srcpath//\*}" + + rm -rf patches/ + + dstsha=$(git rev-parse HEAD) + if [ -f "../${dstpath}/.synced" ]; then + srcsha=$(cat "../${dstpath}/.synced" | tr -d '\n') + commitrange="${srcsha}..${dstsha}" + fi + + git format-patch --find-copies --break-rewrites --find-renames=100% --relative="${relpath}" --no-stat --minimal --minimal --no-cover-letter --no-signature "${commitrange}" -o patches/ -- "${srcpath}" + + for p in $(ls patches/); do + grep -q 'From: Vasiliy Tolstov /dev/null + git am --rerere-autoupdate --3way ../micro/patches/*.patch + popd >/dev/null + fi + + echo -n "${dstsha}" > ../${dstpath}/.synced + +done < mapping.txt + diff --git a/service.go b/service.go index 075e1103..dac93600 100644 --- a/service.go +++ b/service.go @@ -10,7 +10,6 @@ import ( "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/config" "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/server" @@ -41,18 +40,6 @@ func (s *service) Init(opts ...Option) error { o(&s.opts) } - if s.opts.Cmd != nil { - // set cmd name - if len(s.opts.Cmd.App().Name) == 0 { - s.opts.Cmd.App().Name = s.Server().Options().Name - } - - // Initialise the command options - if err := s.opts.Cmd.Init(); err != nil { - return err - } - } - if s.opts.Logger != nil { if err := s.opts.Logger.Init( logger.WithContext(s.opts.Context), @@ -61,6 +48,14 @@ func (s *service) Init(opts ...Option) error { } } + if s.opts.Configs != nil { + for _, c := range s.opts.Configs { + if err := c.Init(config.Context(s.opts.Context) ); err != nil { + return err + } + } + } + if s.opts.Registry != nil { if err := s.opts.Registry.Init( registry.Context(s.opts.Context), @@ -77,14 +72,6 @@ func (s *service) Init(opts ...Option) error { } } - if s.opts.Transport != nil { - if err := s.opts.Transport.Init( - transport.Context(s.opts.Context), - ); err != nil { - return err - } - } - if s.opts.Store != nil { if err := s.opts.Store.Init( store.Context(s.opts.Context), @@ -140,14 +127,6 @@ func (s *service) Logger() logger.Logger { return s.opts.Logger } -func (s *service) Transport() transport.Transport { - return s.opts.Transport -} - -func (s *service) Config() config.Config { - return s.opts.Config -} - func (s *service) Auth() auth.Auth { return s.opts.Auth } @@ -172,11 +151,27 @@ func (s *service) Start() error { } for _, fn := range s.opts.BeforeStart { - if err = fn(); err != nil { + if err = fn(s.opts.Context); err != nil { return err } } + for _, cfg := range s.opts.Configs { + for _, fn := range cfg.Options().BeforeLoad { + if err := fn(s.opts.Context, cfg); err != nil { + return err + } + } + if err := cfg.Load(s.opts.Context); err != nil { + return err + } + for _, fn := range cfg.Options().AfterLoad { + if err := fn(s.opts.Context, cfg); err != nil { + return err + } + } + } + if s.opts.Server == nil { return fmt.Errorf("cant start nil server") } @@ -204,7 +199,7 @@ func (s *service) Start() error { } for _, fn := range s.opts.AfterStart { - if err = fn(); err != nil { + if err = fn(s.opts.Context); err != nil { return err } } @@ -223,7 +218,7 @@ func (s *service) Stop() error { var err error for _, fn := range s.opts.BeforeStop { - if err = fn(); err != nil { + if err = fn(s.opts.Context); err != nil { return err } } @@ -233,7 +228,7 @@ func (s *service) Stop() error { } for _, fn := range s.opts.AfterStop { - if err = fn(); err != nil { + if err = fn(s.opts.Context); err != nil { return err } } @@ -273,12 +268,6 @@ func (s *service) Run() error { defer s.opts.Profile.Stop() } - if s.opts.Cmd != nil { - if err := s.opts.Cmd.Run(); err != nil { - return err - } - } - if err := s.Start(); err != nil { return err }