config package rework (#9)

* change config interface
* reuse codec
* allow to modify next config based on values in current config via BeforeLoad

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-12-04 02:28:45 +03:00 committed by GitHub
parent 0ddc8de00b
commit 5279c2aa0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 294 additions and 1190 deletions

View File

@ -2,6 +2,7 @@ package client
import (
"context"
"fmt"
"sort"
"github.com/unistack-org/micro/v3/errors"
@ -18,6 +19,10 @@ func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string,
return opts.Address, nil
}
if opts.Router == nil {
return nil, router.ErrRouteNotFound
}
// construct the router query
query := []router.QueryOption{router.QueryService(req.Service())}

View File

@ -3,52 +3,35 @@ package config
import (
"context"
"github.com/unistack-org/micro/v3/config/loader"
"github.com/unistack-org/micro/v3/config/reader"
"github.com/unistack-org/micro/v3/config/source"
"errors"
)
var (
DefaultConfig Config
DefaultConfig Config = NewConfig()
)
var (
// ErrWatcherStopped is returned when source watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
// Config is an interface abstraction for dynamic configuration
type Config interface {
// provide the reader.Values interface
reader.Values
// Init the config
Init(opts ...Option) error
// Options in the config
Options() Options
// Stop the config loader/watcher
Close() error
// Load config sources
Load(source ...source.Source) error
// Force a source changeset sync
Sync() error
// Load config from sources
Load(context.Context) error
// Save config to sources
Save(context.Context) error
// Watch a value for changes
Watch(path ...string) (Watcher, error)
// Watch(interface{}) (Watcher, error)
String() string
}
// Watcher is the config watcher
type Watcher interface {
Next() (reader.Value, error)
Stop() error
}
type Options struct {
Loader loader.Loader
Reader reader.Reader
Source []source.Source
// for alternative data
Context context.Context
}
type Option func(o *Options)
// NewConfig returns new config
func NewConfig(opts ...Option) (Config, error) {
return newConfig(opts...)
}
//type Watcher interface {
// Next() (, error)
// Stop() error
//}

View File

@ -1,294 +0,0 @@
package config
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/unistack-org/micro/v3/config/loader"
"github.com/unistack-org/micro/v3/config/reader"
"github.com/unistack-org/micro/v3/config/source"
)
type config struct {
exit chan bool
opts Options
sync.RWMutex
// the current snapshot
snap *loader.Snapshot
// the current values
vals reader.Values
}
type watcher struct {
lw loader.Watcher
rd reader.Reader
path []string
value reader.Value
}
func newConfig(opts ...Option) (Config, error) {
var c config
if err := c.Init(opts...); err != nil {
return nil, err
}
go c.run()
return &c, nil
}
func (c *config) Init(opts ...Option) error {
c.opts = Options{}
c.exit = make(chan bool)
for _, o := range opts {
o(&c.opts)
}
err := c.opts.Loader.Load(c.opts.Source...)
if err != nil {
return err
}
c.snap, err = c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
if err != nil {
return err
}
return nil
}
func (c *config) Options() Options {
return c.opts
}
func (c *config) run() {
watch := func(w loader.Watcher) error {
for {
// get changeset
snap, err := w.Next()
if err != nil {
return err
}
c.Lock()
if c.snap != nil && c.snap.Version >= snap.Version {
c.Unlock()
continue
}
// save
c.snap = snap
// set values
c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
c.Unlock()
}
}
for {
w, err := c.opts.Loader.Watch()
if err != nil {
time.Sleep(time.Second)
continue
}
done := make(chan bool)
// the stop watch func
go func() {
select {
case <-done:
case <-c.exit:
}
w.Stop()
}()
// block watch
if err := watch(w); err != nil {
// do something better
time.Sleep(time.Second)
}
// close done chan
close(done)
// if the config is closed exit
select {
case <-c.exit:
return
default:
}
}
}
func (c *config) Map() map[string]interface{} {
c.RLock()
defer c.RUnlock()
return c.vals.Map()
}
func (c *config) Scan(v interface{}) error {
c.RLock()
defer c.RUnlock()
return c.vals.Scan(v)
}
// sync loads all the sources, calls the parser and updates the config
func (c *config) Sync() error {
if err := c.opts.Loader.Sync(); err != nil {
return err
}
snap, err := c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.snap = snap
vals, err := c.opts.Reader.Values(snap.ChangeSet)
if err != nil {
return err
}
c.vals = vals
return nil
}
func (c *config) Close() error {
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
func (c *config) Get(path ...string) (reader.Value, error) {
c.RLock()
defer c.RUnlock()
// did sync actually work?
if c.vals != nil {
return c.vals.Get(path...)
}
// no value
return nil, fmt.Errorf("no value")
}
func (c *config) Set(val interface{}, path ...string) error {
c.Lock()
defer c.Unlock()
if c.vals != nil {
c.vals.Set(val, path...)
}
return nil
}
func (c *config) Del(path ...string) error {
c.Lock()
defer c.Unlock()
if c.vals != nil {
c.vals.Del(path...)
}
return nil
}
func (c *config) Bytes() []byte {
c.RLock()
defer c.RUnlock()
if c.vals == nil {
return []byte{}
}
return c.vals.Bytes()
}
func (c *config) Load(sources ...source.Source) error {
if err := c.opts.Loader.Load(sources...); err != nil {
return err
}
snap, err := c.opts.Loader.Snapshot()
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
c.snap = snap
vals, err := c.opts.Reader.Values(snap.ChangeSet)
if err != nil {
return err
}
c.vals = vals
return nil
}
func (c *config) Watch(path ...string) (Watcher, error) {
value, err := c.Get(path...)
if err != nil {
return nil, err
}
w, err := c.opts.Loader.Watch(path...)
if err != nil {
return nil, err
}
return &watcher{
lw: w,
rd: c.opts.Reader,
path: path,
value: value,
}, nil
}
func (c *config) String() string {
return "config"
}
func (w *watcher) Next() (reader.Value, error) {
for {
s, err := w.lw.Next()
if err != nil {
return nil, err
}
// only process changes
if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
continue
}
v, err := w.rd.Values(s.ChangeSet)
if err != nil {
return nil, err
}
return v.Get()
}
}
func (w *watcher) Stop() error {
return w.lw.Stop()
}

View File

@ -1,168 +0,0 @@
// +build ignore
package config
import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"github.com/unistack-org/micro/v3/config/source"
"github.com/unistack-org/micro/v3/config/source/env"
"github.com/unistack-org/micro/v3/config/source/file"
"github.com/unistack-org/micro/v3/config/source/memory"
)
func createFileForIssue18(t *testing.T, content string) *os.File {
data := []byte(content)
path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano()))
fh, err := os.Create(path)
if err != nil {
t.Error(err)
}
_, err = fh.Write(data)
if err != nil {
t.Error(err)
}
return fh
}
func createFileForTest(t *testing.T) *os.File {
data := []byte(`{"foo": "bar"}`)
path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano()))
fh, err := os.Create(path)
if err != nil {
t.Error(err)
}
_, err = fh.Write(data)
if err != nil {
t.Error(err)
}
return fh
}
func TestConfigLoadWithGoodFile(t *testing.T) {
fh := createFileForTest(t)
path := fh.Name()
defer func() {
fh.Close()
os.Remove(path)
}()
// Create new config
conf, err := NewConfig()
if err != nil {
t.Fatalf("Expected no error but got %v", err)
}
// Load file source
if err := conf.Load(file.NewSource(
file.WithPath(path),
)); err != nil {
t.Fatalf("Expected no error but got %v", err)
}
}
func TestConfigLoadWithInvalidFile(t *testing.T) {
fh := createFileForTest(t)
path := fh.Name()
defer func() {
fh.Close()
os.Remove(path)
}()
// Create new config
conf, err := NewConfig()
if err != nil {
t.Fatalf("Expected no error but got %v", err)
}
// Load file source
err = conf.Load(file.NewSource(
file.WithPath(path),
file.WithPath("/i/do/not/exists.json"),
))
if err == nil {
t.Fatal("Expected error but none !")
}
if !strings.Contains(fmt.Sprintf("%v", err), "/i/do/not/exists.json") {
t.Fatalf("Expected error to contain the unexisting file but got %v", err)
}
}
func TestConfigMerge(t *testing.T) {
fh := createFileForIssue18(t, `{
"amqp": {
"host": "rabbit.platform",
"port": 80
},
"handler": {
"exchange": "springCloudBus"
}
}`)
path := fh.Name()
defer func() {
fh.Close()
os.Remove(path)
}()
os.Setenv("AMQP_HOST", "rabbit.testing.com")
conf, err := NewConfig()
if err != nil {
t.Fatalf("Expected no error but got %v", err)
}
if err := conf.Load(
file.NewSource(
file.WithPath(path),
),
env.NewSource(),
); err != nil {
t.Fatalf("Expected no error but got %v", err)
}
actualHost := conf.Get("amqp", "host").String("backup")
if actualHost != "rabbit.testing.com" {
t.Fatalf("Expected %v but got %v",
"rabbit.testing.com",
actualHost)
}
}
func equalS(t *testing.T, actual, expect string) {
if actual != expect {
t.Errorf("Expected %s but got %s", actual, expect)
}
}
func TestConfigWatcherDirtyOverrite(t *testing.T) {
n := runtime.GOMAXPROCS(0)
defer runtime.GOMAXPROCS(n)
runtime.GOMAXPROCS(1)
l := 100
ss := make([]source.Source, l)
for i := 0; i < l; i++ {
ss[i] = memory.NewSource(memory.WithJSON([]byte(fmt.Sprintf(`{"key%d": "val%d"}`, i, i))))
}
conf, _ := NewConfig()
for _, s := range ss {
_ = conf.Load(s)
}
runtime.Gosched()
for i := range ss {
k := fmt.Sprintf("key%d", i)
v := fmt.Sprintf("val%d", i)
equalS(t, conf.Get(k).String(""), v)
}
}

View File

@ -1,8 +0,0 @@
// Package encoder handles source encoding formats
package encoder
type Encoder interface {
Encode(interface{}) ([]byte, error)
Decode([]byte, interface{}) error
String() string
}

View File

@ -1,26 +0,0 @@
package hcl
import (
"encoding/json"
"github.com/hashicorp/hcl"
"github.com/unistack-org/micro/v3/config/encoder"
)
type hclEncoder struct{}
func (h hclEncoder) Encode(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (h hclEncoder) Decode(d []byte, v interface{}) error {
return hcl.Unmarshal(d, v)
}
func (h hclEncoder) String() string {
return "hcl"
}
func NewEncoder() encoder.Encoder {
return hclEncoder{}
}

View File

@ -1,25 +0,0 @@
package json
import (
"encoding/json"
"github.com/unistack-org/micro/v3/config/encoder"
)
type jsonEncoder struct{}
func (j jsonEncoder) Encode(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (j jsonEncoder) Decode(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}
func (j jsonEncoder) String() string {
return "json"
}
func NewEncoder() encoder.Encoder {
return jsonEncoder{}
}

View File

@ -1,32 +0,0 @@
package toml
import (
"bytes"
"github.com/BurntSushi/toml"
"github.com/unistack-org/micro/v3/config/encoder"
)
type tomlEncoder struct{}
func (t tomlEncoder) Encode(v interface{}) ([]byte, error) {
b := bytes.NewBuffer(nil)
defer b.Reset()
err := toml.NewEncoder(b).Encode(v)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
func (t tomlEncoder) Decode(d []byte, v interface{}) error {
return toml.Unmarshal(d, v)
}
func (t tomlEncoder) String() string {
return "toml"
}
func NewEncoder() encoder.Encoder {
return tomlEncoder{}
}

View File

@ -1,25 +0,0 @@
package xml
import (
"encoding/xml"
"github.com/unistack-org/micro/v3/config/encoder"
)
type xmlEncoder struct{}
func (x xmlEncoder) Encode(v interface{}) ([]byte, error) {
return xml.Marshal(v)
}
func (x xmlEncoder) Decode(d []byte, v interface{}) error {
return xml.Unmarshal(d, v)
}
func (x xmlEncoder) String() string {
return "xml"
}
func NewEncoder() encoder.Encoder {
return xmlEncoder{}
}

View File

@ -1,24 +0,0 @@
package yaml
import (
"github.com/ghodss/yaml"
"github.com/unistack-org/micro/v3/config/encoder"
)
type yamlEncoder struct{}
func (y yamlEncoder) Encode(v interface{}) ([]byte, error) {
return yaml.Marshal(v)
}
func (y yamlEncoder) Decode(d []byte, v interface{}) error {
return yaml.Unmarshal(d, v)
}
func (y yamlEncoder) String() string {
return "yaml"
}
func NewEncoder() encoder.Encoder {
return yamlEncoder{}
}

View File

@ -1,63 +0,0 @@
// package loader manages loading from multiple sources
package loader
import (
"context"
"github.com/unistack-org/micro/v3/config/reader"
"github.com/unistack-org/micro/v3/config/source"
)
// Loader manages loading sources
type Loader interface {
// Stop the loader
Close() error
// Load the sources
Load(...source.Source) error
// A Snapshot of loaded config
Snapshot() (*Snapshot, error)
// Force sync of sources
Sync() error
// Watch for changes
Watch(...string) (Watcher, error)
// Name of loader
String() string
}
// Watcher lets you watch sources and returns a merged ChangeSet
type Watcher interface {
// First call to next may return the current Snapshot
// If you are watching a path then only the data from
// that path is returned.
Next() (*Snapshot, error)
// Stop watching for changes
Stop() error
}
// Snapshot is a merged ChangeSet
type Snapshot struct {
// The merged ChangeSet
ChangeSet *source.ChangeSet
// Deterministic and comparable version of the snapshot
Version string
}
type Options struct {
Reader reader.Reader
Source []source.Source
// for alternative data
Context context.Context
}
type Option func(o *Options)
// Copy snapshot
func Copy(s *Snapshot) *Snapshot {
cs := *(s.ChangeSet)
return &Snapshot{
ChangeSet: &cs,
Version: s.Version,
}
}

36
config/noop.go Normal file
View File

@ -0,0 +1,36 @@
// Package config is an interface for dynamic configuration.
package config
import "context"
type noopConfig struct {
opts Options
}
func (c *noopConfig) Init(opts ...Option) error {
for _, o := range opts {
o(&c.opts)
}
return nil
}
func (c *noopConfig) Load(ctx context.Context) error {
return nil
}
func (c *noopConfig) Save(ctx context.Context) error {
return nil
}
func (c *noopConfig) Options() Options {
return c.opts
}
func (c *noopConfig) String() string {
return "noop"
}
// NewConfig returns new noop config
func NewConfig(opts ...Option) Config {
return &noopConfig{opts: NewOptions(opts...)}
}

61
config/noop_test.go Normal file
View File

@ -0,0 +1,61 @@
package config_test
import "testing"
import "context"
import "fmt"
import "github.com/unistack-org/micro/v3/config"
type Cfg struct {
Value string
}
func TestNoop(t *testing.T) {
ctx := context.Background()
conf := &Cfg{}
blfn := func(ctx context.Context, cfg config.Config) error {
conf, ok := cfg.Options().Struct.(*Cfg)
if !ok {
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
}
conf.Value = "before_load"
return nil
}
alfn := func(ctx context.Context, cfg config.Config) error {
conf, ok := cfg.Options().Struct.(*Cfg)
if !ok {
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options())
}
conf.Value = "after_load"
return nil
}
cfg := config.NewConfig(config.Struct(conf),config.BeforeLoad(blfn),config.AfterLoad(alfn))
if err := cfg.Init(); err != nil {
t.Fatal(err)
}
for _, fn := range cfg.Options().BeforeLoad {
if err := fn(ctx, cfg); err != nil {
t.Fatal(err)
}
}
if conf.Value != "before_load" {
t.Fatal("BeforeLoad option not working")
}
if err := cfg.Load(ctx); err != nil {
t.Fatal(err)
}
for _, fn := range cfg.Options().AfterLoad {
if err := fn(ctx, cfg); err != nil {
t.Fatal(err)
}
}
if conf.Value != "after_load" {
t.Fatal("AfterLoad option not working")
}
}

View File

@ -1,28 +1,88 @@
package config
import (
"github.com/unistack-org/micro/v3/config/loader"
"github.com/unistack-org/micro/v3/config/reader"
"github.com/unistack-org/micro/v3/config/source"
"context"
"github.com/unistack-org/micro/v3/codec"
)
// WithLoader sets the loader for manager config
func WithLoader(l loader.Loader) Option {
type Options struct {
BeforeLoad []func(context.Context, Config) error
AfterLoad []func(context.Context, Config) error
BeforeSave []func(context.Context, Config) error
AfterSave []func(context.Context, Config) error
// Struct that holds config data
Struct interface{}
// struct tag name
StructTag string
// codec that used for load/save
Codec codec.Codec
// for alternative data
Context context.Context
}
type Option func(o *Options)
func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
func BeforeLoad(fn ...func(context.Context, Config) error) Option {
return func(o *Options) {
o.Loader = l
o.BeforeLoad = fn
}
}
// WithSource appends a source to list of sources
func WithSource(s source.Source) Option {
func AfterLoad(fn ...func(context.Context, Config) error) Option {
return func(o *Options) {
o.Source = append(o.Source, s)
o.AfterLoad = fn
}
}
// WithReader sets the config reader
func WithReader(r reader.Reader) Option {
func BeforeSave(fn ...func(context.Context, Config) error) Option {
return func(o *Options) {
o.Reader = r
o.BeforeSave = fn
}
}
func AfterSave(fn ...func(context.Context, Config) error) Option {
return func(o *Options) {
o.AfterSave= fn
}
}
func Context(ctx context.Context) Option {
return func(o *Options) {
o.Context = ctx
}
}
// Codec sets the source codec
func Codec(c codec.Codec) Option {
return func(o *Options) {
o.Codec = c
}
}
// Struct
func Struct(v interface{}) Option {
return func(o *Options) {
o.Struct = v
}
}
// StructTag
func StructTag(name string) Option {
return func(o *Options) {
o.StructTag = name
}
}

View File

@ -1,50 +0,0 @@
package reader
import (
"github.com/unistack-org/micro/v3/config/encoder"
"github.com/unistack-org/micro/v3/config/encoder/hcl"
"github.com/unistack-org/micro/v3/config/encoder/json"
"github.com/unistack-org/micro/v3/config/encoder/toml"
"github.com/unistack-org/micro/v3/config/encoder/xml"
"github.com/unistack-org/micro/v3/config/encoder/yaml"
)
type Options struct {
Encoding map[string]encoder.Encoder
DisableReplaceEnvVars bool
}
type Option func(o *Options)
func NewOptions(opts ...Option) Options {
options := Options{
Encoding: map[string]encoder.Encoder{
"json": json.NewEncoder(),
"yaml": yaml.NewEncoder(),
"toml": toml.NewEncoder(),
"xml": xml.NewEncoder(),
"hcl": hcl.NewEncoder(),
"yml": yaml.NewEncoder(),
},
}
for _, o := range opts {
o(&options)
}
return options
}
func WithEncoder(e encoder.Encoder) Option {
return func(o *Options) {
if o.Encoding == nil {
o.Encoding = make(map[string]encoder.Encoder)
}
o.Encoding[e.String()] = e
}
}
// WithDisableReplaceEnvVars disables the environment variable interpolation preprocessor
func WithDisableReplaceEnvVars() Option {
return func(o *Options) {
o.DisableReplaceEnvVars = true
}
}

View File

@ -1,23 +0,0 @@
package reader
import (
"os"
"regexp"
)
func ReplaceEnvVars(raw []byte) ([]byte, error) {
re := regexp.MustCompile(`\$\{([A-Za-z0-9_]+)\}`)
if re.Match(raw) {
dataS := string(raw)
res := re.ReplaceAllStringFunc(dataS, replaceEnvVars)
return []byte(res), nil
} else {
return raw, nil
}
}
func replaceEnvVars(element string) string {
v := element[2 : len(element)-1]
el := os.Getenv(v)
return el
}

View File

@ -1,73 +0,0 @@
package reader
import (
"os"
"strings"
"testing"
)
func TestReplaceEnvVars(t *testing.T) {
os.Setenv("myBar", "cat")
os.Setenv("MYBAR", "cat")
os.Setenv("my_Bar", "cat")
os.Setenv("myBar_", "cat")
testData := []struct {
expected string
data []byte
}{
// Right use cases
{
`{"foo": "bar", "baz": {"bar": "cat"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${myBar}"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "cat"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${MYBAR}"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "cat"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${my_Bar}"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "cat"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${myBar_}"}}`),
},
// Wrong use cases
{
`{"foo": "bar", "baz": {"bar": "${myBar-}"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${myBar-}"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "${}"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${}"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "$sss}"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "$sss}"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "${sss"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "${sss"}}`),
},
{
`{"foo": "bar", "baz": {"bar": "{something}"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "{something}"}}`),
},
// Use cases without replace env vars
{
`{"foo": "bar", "baz": {"bar": "cat"}}`,
[]byte(`{"foo": "bar", "baz": {"bar": "cat"}}`),
},
}
for _, test := range testData {
res, err := ReplaceEnvVars(test.data)
if err != nil {
t.Fatal(err)
}
if strings.Compare(test.expected, string(res)) != 0 {
t.Fatalf("Expected %s got %s", test.expected, res)
}
}
}

View File

@ -1,38 +0,0 @@
// Package reader parses change sets and provides config values
package reader
import (
"time"
"github.com/unistack-org/micro/v3/config/source"
)
// Reader is an interface for merging changesets
type Reader interface {
Merge(...*source.ChangeSet) (*source.ChangeSet, error)
Values(*source.ChangeSet) (Values, error)
String() string
}
// Values is returned by the reader
type Values interface {
Bytes() []byte
Get(path ...string) (Value, error)
Set(val interface{}, path ...string) error
Del(path ...string) error
Map() map[string]interface{}
Scan(v interface{}) error
}
// Value represents a value of any type
type Value interface {
Bool(def bool) bool
Int(def int) int
String(def string) string
Float64(def float64) float64
Duration(def time.Duration) time.Duration
StringSlice(def []string) []string
StringMap(def map[string]string) map[string]string
Scan(val interface{}) error
Bytes() []byte
}

View File

@ -1,13 +0,0 @@
package source
import (
"crypto/md5"
"fmt"
)
// Sum returns the md5 checksum of the ChangeSet data
func (c *ChangeSet) Sum() string {
h := md5.New()
h.Write(c.Data)
return fmt.Sprintf("%x", h.Sum(nil))
}

View File

@ -1,25 +0,0 @@
package source
import (
"errors"
)
type noopWatcher struct {
exit chan struct{}
}
func (w *noopWatcher) Next() (*ChangeSet, error) {
<-w.exit
return nil, errors.New("noopWatcher stopped")
}
func (w *noopWatcher) Stop() error {
close(w.exit)
return nil
}
// NewNoopWatcher returns a watcher that blocks on Next() until Stop() is called.
func NewNoopWatcher() (Watcher, error) {
return &noopWatcher{exit: make(chan struct{})}, nil
}

View File

@ -1,49 +0,0 @@
package source
import (
"context"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/config/encoder"
"github.com/unistack-org/micro/v3/config/encoder/json"
)
type Options struct {
// Encoder
Encoder encoder.Encoder
// for alternative data
Context context.Context
// Client to use for RPC
Client client.Client
}
type Option func(o *Options)
func NewOptions(opts ...Option) Options {
options := Options{
Encoder: json.NewEncoder(),
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// WithEncoder sets the source encoder
func WithEncoder(e encoder.Encoder) Option {
return func(o *Options) {
o.Encoder = e
}
}
// WithClient sets the source client
func WithClient(c client.Client) Option {
return func(o *Options) {
o.Client = c
}
}

View File

@ -1,35 +0,0 @@
// Package source is the interface for sources
package source
import (
"errors"
"time"
)
var (
// ErrWatcherStopped is returned when source watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
// Source is the source from which config is loaded
type Source interface {
Read() (*ChangeSet, error)
Write(*ChangeSet) error
Watch() (Watcher, error)
String() string
}
// ChangeSet represents a set of changes from a source
type ChangeSet struct {
Data []byte
Checksum string
Format string
Source string
Timestamp time.Time
}
// Watcher watches a source for changes
type Watcher interface {
Next() (*ChangeSet, error)
Stop() error
}

View File

@ -1,49 +0,0 @@
package config
import (
"time"
"github.com/unistack-org/micro/v3/config/reader"
)
type value struct{}
func newValue() reader.Value {
return new(value)
}
func (v *value) Bool(def bool) bool {
return false
}
func (v *value) Int(def int) int {
return 0
}
func (v *value) String(def string) string {
return ""
}
func (v *value) Float64(def float64) float64 {
return 0.0
}
func (v *value) Duration(def time.Duration) time.Duration {
return time.Duration(0)
}
func (v *value) StringSlice(def []string) []string {
return nil
}
func (v *value) StringMap(def map[string]string) map[string]string {
return map[string]string{}
}
func (v *value) Scan(val interface{}) error {
return nil
}
func (v *value) Bytes() []byte {
return nil
}

3
go.mod
View File

@ -3,15 +3,12 @@ module github.com/unistack-org/micro/v3
go 1.14
require (
github.com/BurntSushi/toml v0.3.1
github.com/caddyserver/certmagic v0.10.6
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1
github.com/ghodss/yaml v1.0.0
github.com/go-acme/lego/v3 v3.4.0
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.2
github.com/hashicorp/hcl v1.0.0
github.com/micro/cli/v2 v2.1.2
github.com/miekg/dns v1.1.31
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c

2
mapping.txt Normal file
View File

@ -0,0 +1,2 @@
client/grpc/* micro-client-grpc
#server/grpc/* micro-server-grpc

View File

@ -4,8 +4,6 @@ import (
"context"
"time"
"github.com/micro/cli/v2"
cmd "github.com/unistack-org/micro-config-cmd"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/client"
@ -13,7 +11,6 @@ import (
"github.com/unistack-org/micro/v3/debug/profile"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/runtime"
@ -25,25 +22,23 @@ import (
// Options for micro service
type Options struct {
Auth auth.Auth
Broker broker.Broker
Logger logger.Logger
Cmd cmd.Cmd
Config config.Config
Client client.Client
Server server.Server
Store store.Store
Registry registry.Registry
Router router.Router
Runtime runtime.Runtime
Transport transport.Transport
Profile profile.Profile
Auth auth.Auth
Broker broker.Broker
Logger logger.Logger
Configs []config.Config
Client client.Client
Server server.Server
Store store.Store
Registry registry.Registry
Router router.Router
Runtime runtime.Runtime
Profile profile.Profile
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
AfterStart []func() error
AfterStop []func() error
BeforeStart []func(context.Context) error
BeforeStop []func(context.Context) error
AfterStart []func(context.Context) error
AfterStop []func(context.Context) error
// Other options for implementations of the interface
// can be stored in a context
@ -53,17 +48,16 @@ type Options struct {
// NewOptions returns new Options filled with defaults and overrided by provided opts
func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
Server: server.DefaultServer,
Client: client.DefaultClient,
Broker: broker.DefaultBroker,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Auth: auth.DefaultAuth,
Logger: logger.DefaultLogger,
Config: config.DefaultConfig,
Store: store.DefaultStore,
Transport: transport.DefaultTransport,
Context: context.Background(),
Server: server.DefaultServer,
Client: client.DefaultClient,
Broker: broker.DefaultBroker,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Auth: auth.DefaultAuth,
Logger: logger.DefaultLogger,
Configs: []config.Config{config.DefaultConfig},
Store: store.DefaultStore,
//Runtime runtime.Runtime
//Profile profile.Profile
}
@ -92,13 +86,6 @@ func Broker(b broker.Broker) Option {
}
}
// Cmd to be used for service
func Cmd(c cmd.Cmd) Option {
return func(o *Options) {
o.Cmd = c
}
}
// Client to be used for service
func Client(c client.Client) Option {
return func(o *Options) {
@ -182,10 +169,10 @@ func Auth(a auth.Auth) Option {
}
}
// Config sets the config for the service
func Config(c config.Config) Option {
// Configs sets the configs for the service
func Configs(c []config.Config) Option {
return func(o *Options) {
o.Config = c
o.Configs = c
}
}
@ -198,21 +185,6 @@ func Selector(s selector.Selector) Option {
}
}
// Transport sets the transport for the service
// and the underlying components
func Transport(t transport.Transport) Option {
return func(o *Options) {
o.Transport = t
// Update Client and Server
if o.Client != nil {
o.Client.Init(client.Transport(t))
}
if o.Server != nil {
o.Server.Init(server.Transport(t))
}
}
}
// Runtime sets the runtime
func Runtime(r runtime.Runtime) Option {
return func(o *Options) {
@ -231,8 +203,6 @@ func Router(r router.Router) Option {
}
}
// Convenience options
// Address sets the address of the server
func Address(addr string) Option {
return func(o *Options) {
@ -269,24 +239,6 @@ func Metadata(md metadata.Metadata) Option {
}
}
// Flags that can be passed to service
func Flags(flags ...cli.Flag) Option {
return func(o *Options) {
if o.Cmd != nil {
o.Cmd.App().Flags = append(o.Cmd.App().Flags, flags...)
}
}
}
// Action can be used to parse user provided cli options
func Action(a func(*cli.Context) error) Option {
return func(o *Options) {
if o.Cmd != nil {
o.Cmd.App().Action = a
}
}
}
// RegisterTTL specifies the TTL to use when registering the service
func RegisterTTL(t time.Duration) Option {
return func(o *Options) {
@ -352,31 +304,29 @@ func WrapSubscriber(w ...server.SubscriberWrapper) Option {
}
}
// Before and Afters
// BeforeStart run funcs before service starts
func BeforeStart(fn func() error) Option {
func BeforeStart(fn func(context.Context) error) Option {
return func(o *Options) {
o.BeforeStart = append(o.BeforeStart, fn)
}
}
// BeforeStop run funcs before service stops
func BeforeStop(fn func() error) Option {
func BeforeStop(fn func(context.Context) error) Option {
return func(o *Options) {
o.BeforeStop = append(o.BeforeStop, fn)
}
}
// AfterStart run funcs after service starts
func AfterStart(fn func() error) Option {
func AfterStart(fn func(context.Context) error) Option {
return func(o *Options) {
o.AfterStart = append(o.AfterStart, fn)
}
}
// AfterStop run funcs after service stops
func AfterStop(fn func() error) Option {
func AfterStop(fn func(context.Context) error) Option {
return func(o *Options) {
o.AfterStop = append(o.AfterStop, fn)
}

41
pull.sh Executable file
View File

@ -0,0 +1,41 @@
#!/bin/bash -ex
if [ "$1" == "--force" ]; then
force="yes"
fi
srcsha="--root"
dstsha="HEAD"
commitrange="${srcsha} ${dstsha}"
while read srcpath dstpath; do
if [ "${srcpath::1}" == "#" ] ; then
continue
fi
relpath="${srcpath//\*}"
rm -rf patches/
dstsha=$(git rev-parse HEAD)
if [ -f "../${dstpath}/.synced" ]; then
srcsha=$(cat "../${dstpath}/.synced" | tr -d '\n')
commitrange="${srcsha}..${dstsha}"
fi
git format-patch --find-copies --break-rewrites --find-renames=100% --relative="${relpath}" --no-stat --minimal --minimal --no-cover-letter --no-signature "${commitrange}" -o patches/ -- "${srcpath}"
for p in $(ls patches/); do
grep -q 'From: Vasiliy Tolstov <v.tolstov' "patches/${p}" || sed -i '/Signed-off-by: Vasiliy Tolstov/d' "patches/${p}"
done
if [ "x$(find patches/ -type f -name '*.patch' | wc -l)" != "x0" ]; then
pushd ../${dstpath} >/dev/null
git am --rerere-autoupdate --3way ../micro/patches/*.patch
popd >/dev/null
fi
echo -n "${dstsha}" > ../${dstpath}/.synced
done < mapping.txt

View File

@ -10,7 +10,6 @@ import (
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/config"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/server"
@ -41,18 +40,6 @@ func (s *service) Init(opts ...Option) error {
o(&s.opts)
}
if s.opts.Cmd != nil {
// set cmd name
if len(s.opts.Cmd.App().Name) == 0 {
s.opts.Cmd.App().Name = s.Server().Options().Name
}
// Initialise the command options
if err := s.opts.Cmd.Init(); err != nil {
return err
}
}
if s.opts.Logger != nil {
if err := s.opts.Logger.Init(
logger.WithContext(s.opts.Context),
@ -61,6 +48,14 @@ func (s *service) Init(opts ...Option) error {
}
}
if s.opts.Configs != nil {
for _, c := range s.opts.Configs {
if err := c.Init(config.Context(s.opts.Context) ); err != nil {
return err
}
}
}
if s.opts.Registry != nil {
if err := s.opts.Registry.Init(
registry.Context(s.opts.Context),
@ -77,14 +72,6 @@ func (s *service) Init(opts ...Option) error {
}
}
if s.opts.Transport != nil {
if err := s.opts.Transport.Init(
transport.Context(s.opts.Context),
); err != nil {
return err
}
}
if s.opts.Store != nil {
if err := s.opts.Store.Init(
store.Context(s.opts.Context),
@ -140,14 +127,6 @@ func (s *service) Logger() logger.Logger {
return s.opts.Logger
}
func (s *service) Transport() transport.Transport {
return s.opts.Transport
}
func (s *service) Config() config.Config {
return s.opts.Config
}
func (s *service) Auth() auth.Auth {
return s.opts.Auth
}
@ -172,11 +151,27 @@ func (s *service) Start() error {
}
for _, fn := range s.opts.BeforeStart {
if err = fn(); err != nil {
if err = fn(s.opts.Context); err != nil {
return err
}
}
for _, cfg := range s.opts.Configs {
for _, fn := range cfg.Options().BeforeLoad {
if err := fn(s.opts.Context, cfg); err != nil {
return err
}
}
if err := cfg.Load(s.opts.Context); err != nil {
return err
}
for _, fn := range cfg.Options().AfterLoad {
if err := fn(s.opts.Context, cfg); err != nil {
return err
}
}
}
if s.opts.Server == nil {
return fmt.Errorf("cant start nil server")
}
@ -204,7 +199,7 @@ func (s *service) Start() error {
}
for _, fn := range s.opts.AfterStart {
if err = fn(); err != nil {
if err = fn(s.opts.Context); err != nil {
return err
}
}
@ -223,7 +218,7 @@ func (s *service) Stop() error {
var err error
for _, fn := range s.opts.BeforeStop {
if err = fn(); err != nil {
if err = fn(s.opts.Context); err != nil {
return err
}
}
@ -233,7 +228,7 @@ func (s *service) Stop() error {
}
for _, fn := range s.opts.AfterStop {
if err = fn(); err != nil {
if err = fn(s.opts.Context); err != nil {
return err
}
}
@ -273,12 +268,6 @@ func (s *service) Run() error {
defer s.opts.Profile.Stop()
}
if s.opts.Cmd != nil {
if err := s.opts.Cmd.Run(); err != nil {
return err
}
}
if err := s.Start(); err != nil {
return err
}