Compare commits

..

32 Commits

Author SHA1 Message Date
3805d0f067 Merge pull request #107 from unistack-org/retries
client: determenistic retry backoff
2022-03-27 00:19:06 +03:00
680ac11ef9 client: determenistic retry backoff
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-03-27 00:16:22 +03:00
35ab6ae84e Merge pull request #106 from unistack-org/jitter
jitter: add NewTickerContext
2022-03-26 18:01:31 +03:00
c6c2b0884e jitter: add NewTickerContext
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-03-26 17:59:19 +03:00
297a80da84 Merge pull request #105 from unistack-org/improve
small fixes
2022-03-25 14:27:29 +03:00
2d292db7bd small fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-03-25 14:24:20 +03:00
54c4287fab Update README.md 2022-03-22 15:00:01 +03:00
9c074e5741 Merge pull request #103 from unistack-org/dependabot/github_actions/actions/cache-3
chore(deps): bump actions/cache from 2 to 3
2022-03-22 14:57:20 +03:00
290975eaf5 Merge pull request #104 from unistack-org/small_changes
config: add Validate func, small lint fixes
2022-03-22 14:57:03 +03:00
c64218d52c config: add Validate func, small lint fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-03-22 14:54:43 +03:00
dependabot[bot]
46c266a4a9 chore(deps): bump actions/cache from 2 to 3
Bumps [actions/cache](https://github.com/actions/cache) from 2 to 3.
- [Release notes](https://github.com/actions/cache/releases)
- [Commits](https://github.com/actions/cache/compare/v2...v3)

---
updated-dependencies:
- dependency-name: actions/cache
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-03-22 06:14:47 +00:00
5527b16cd8 Merge pull request #102 from unistack-org/cleanup
server: remove unparsed body from request and message
2022-03-21 15:26:20 +03:00
4904cad8ef server: remove unparsed body from request and message
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-03-21 15:23:41 +03:00
74633f4290 Merge pull request #101 from unistack-org/dependabot/go_modules/github.com/golang-jwt/jwt/v4-4.4.0
chore(deps): bump github.com/golang-jwt/jwt/v4 from 4.3.0 to 4.4.0
2022-03-18 16:29:26 +03:00
dependabot[bot]
c8ad4d772b chore(deps): bump github.com/golang-jwt/jwt/v4 from 4.3.0 to 4.4.0
Bumps [github.com/golang-jwt/jwt/v4](https://github.com/golang-jwt/jwt) from 4.3.0 to 4.4.0.
- [Release notes](https://github.com/golang-jwt/jwt/releases)
- [Changelog](https://github.com/golang-jwt/jwt/blob/main/VERSION_HISTORY.md)
- [Commits](https://github.com/golang-jwt/jwt/compare/v4.3.0...v4.4.0)

---
updated-dependencies:
- dependency-name: github.com/golang-jwt/jwt/v4
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-03-17 08:24:50 +00:00
91bd0f7efe Merge branch 'master' into v3 2022-03-17 11:23:08 +03:00
00dc7e1bb5 update go version
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2022-03-07 13:44:18 +03:00
5a5165a003 Merge pull request #99 from unistack-org/dependabot/go_modules/github.com/golang-jwt/jwt/v4-4.3.0
chore(deps): bump github.com/golang-jwt/jwt/v4 from 4.2.0 to 4.3.0
2022-03-07 13:16:48 +03:00
382e3d554b Merge pull request #98 from unistack-org/dependabot/github_actions/golangci/golangci-lint-action-3.1.0
chore(deps): bump golangci/golangci-lint-action from 2 to 3.1.0
2022-03-07 13:16:37 +03:00
05a0c97fc6 Merge pull request #100 from unistack-org/dependabot/go_modules/go.unistack.org/micro-proto/v3-3.2.7
chore(deps): bump go.unistack.org/micro-proto/v3 from 3.2.1 to 3.2.7
2022-03-07 13:14:31 +03:00
dependabot[bot]
5e06ae1a42 chore(deps): bump go.unistack.org/micro-proto/v3 from 3.2.1 to 3.2.7
Bumps [go.unistack.org/micro-proto/v3](https://github.com/unistack-org/micro-proto) from 3.2.1 to 3.2.7.
- [Release notes](https://github.com/unistack-org/micro-proto/releases)
- [Commits](https://github.com/unistack-org/micro-proto/compare/v3.2.1...v3.2.7)

---
updated-dependencies:
- dependency-name: go.unistack.org/micro-proto/v3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-03-07 10:11:59 +00:00
dependabot[bot]
7ac4ad4efa chore(deps): bump github.com/golang-jwt/jwt/v4 from 4.2.0 to 4.3.0
Bumps [github.com/golang-jwt/jwt/v4](https://github.com/golang-jwt/jwt) from 4.2.0 to 4.3.0.
- [Release notes](https://github.com/golang-jwt/jwt/releases)
- [Changelog](https://github.com/golang-jwt/jwt/blob/main/VERSION_HISTORY.md)
- [Commits](https://github.com/golang-jwt/jwt/compare/v4.2.0...v4.3.0)

---
updated-dependencies:
- dependency-name: github.com/golang-jwt/jwt/v4
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-03-07 10:11:54 +00:00
dependabot[bot]
01348bd9b2 chore(deps): bump golangci/golangci-lint-action from 2 to 3.1.0
Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 2 to 3.1.0.
- [Release notes](https://github.com/golangci/golangci-lint-action/releases)
- [Commits](https://github.com/golangci/golangci-lint-action/compare/v2...v3.1.0)

---
updated-dependencies:
- dependency-name: golangci/golangci-lint-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-03-07 10:11:45 +00:00
d0d04a840a Merge pull request #88 from unistack-org/master
merge master
2022-01-30 17:05:41 +03:00
f06610c9c2 Merge pull request #86 from unistack-org/master
update micro-proto
2022-01-26 00:48:11 +03:00
0257eae936 Merge pull request #85 from unistack-org/master
merge master
2022-01-25 00:41:46 +03:00
67d5dc7e28 Merge pull request #82 from unistack-org/master
errors: fix parsing
2022-01-21 19:14:08 +03:00
92b125c1ce Merge pull request #80 from unistack-org/master
merge master
2022-01-21 18:21:47 +03:00
927ca879b2 Merge pull request #78 from unistack-org/master
merge master
2022-01-21 00:51:14 +03:00
52182261af Merge pull request #74 from unistack-org/master
logger: fix Fields
2022-01-19 19:55:21 +03:00
8e5e2167cd Merge pull request #72 from unistack-org/master
lint fixes
2022-01-10 16:48:27 +03:00
6e6c31b5dd Merge pull request #69 from unistack-org/master
merge master
2021-12-28 09:30:34 +03:00
34 changed files with 399 additions and 108 deletions

View File

@@ -12,11 +12,11 @@ jobs:
- name: setup - name: setup
uses: actions/setup-go@v2 uses: actions/setup-go@v2
with: with:
go-version: 1.16 go-version: 1.17
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: cache - name: cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

View File

@@ -47,7 +47,7 @@ jobs:
- name: setup - name: setup
uses: actions/setup-go@v2 uses: actions/setup-go@v2
with: with:
go-version: 1.16 go-version: 1.17
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: init - name: init
uses: github/codeql-action/init@v1 uses: github/codeql-action/init@v1

View File

@@ -12,11 +12,11 @@ jobs:
- name: setup - name: setup
uses: actions/setup-go@v2 uses: actions/setup-go@v2
with: with:
go-version: 1.16 go-version: 1.17
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: cache - name: cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

View File

@@ -1,4 +1,4 @@
# Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/unistack-org/micro/v3?tab=overview) [![Status](https://github.com/unistack-org/micro/workflows/build/badge.svg?branch=master)](https://github.com/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Amaster+event%3Apush) [![Lint](https://goreportcard.com/report/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3) [![Slack](https://img.shields.io/static/v1?label=micro&message=slack&color=blueviolet)](https://unistack-org.slack.com/messages/default) # Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/unistack-org/micro/v3?tab=overview) [![Status](https://github.com/unistack-org/micro/workflows/build/badge.svg?branch=master)](https://github.com/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Amaster+event%3Apush) [![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3) [![Slack](https://img.shields.io/static/v1?label=micro&message=slack&color=blueviolet)](https://unistack-org.slack.com/messages/default)
Micro is a standard library for microservices. Micro is a standard library for microservices.

View File

@@ -50,6 +50,7 @@ type Handler func(Event) error
// Events contains multiple events // Events contains multiple events
type Events []Event type Events []Event
// Ack try to ack all events and return
func (evs Events) Ack() error { func (evs Events) Ack() error {
var err error var err error
for _, ev := range evs { for _, ev := range evs {
@@ -60,6 +61,7 @@ func (evs Events) Ack() error {
return nil return nil
} }
// SetError sets error on event
func (evs Events) SetError(err error) { func (evs Events) SetError(err error) {
for _, ev := range evs { for _, ev := range evs {
ev.SetError(err) ev.SetError(err)

View File

@@ -2,6 +2,7 @@ package client
import ( import (
"context" "context"
"math"
"time" "time"
"go.unistack.org/micro/v3/util/backoff" "go.unistack.org/micro/v3/util/backoff"
@@ -10,6 +11,20 @@ import (
// BackoffFunc is the backoff call func // BackoffFunc is the backoff call func
type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error) type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)
func exponentialBackoff(ctx context.Context, req Request, attempts int) (time.Duration, error) { // BackoffExp using exponential backoff func
func BackoffExp(_ context.Context, _ Request, attempts int) (time.Duration, error) {
return backoff.Do(attempts), nil return backoff.Do(attempts), nil
} }
// BackoffInterval specifies randomization interval for backoff func
func BackoffInterval(min time.Duration, max time.Duration) BackoffFunc {
return func(_ context.Context, _ Request, attempts int) (time.Duration, error) {
td := time.Duration(time.Duration(math.Pow(float64(attempts), math.E)) * time.Millisecond * 100)
if td < min {
return min, nil
} else if td > max {
return max, nil
}
return td, nil
}
}

View File

@@ -22,7 +22,7 @@ func TestBackoff(t *testing.T) {
} }
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), r, i) d, err := BackoffExp(context.TODO(), r, i)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -14,8 +14,8 @@ var (
DefaultClient Client = NewClient() DefaultClient Client = NewClient()
// DefaultContentType is the default content-type if not specified // DefaultContentType is the default content-type if not specified
DefaultContentType = "application/json" DefaultContentType = "application/json"
// DefaultBackoff is the default backoff function for retries // DefaultBackoff is the default backoff function for retries (minimum 10 millisecond and maximum 5 second)
DefaultBackoff = exponentialBackoff DefaultBackoff = BackoffInterval(10*time.Millisecond, 5*time.Second)
// DefaultRetry is the default check-for-retry function for retries // DefaultRetry is the default check-for-retry function for retries
DefaultRetry = RetryNever DefaultRetry = RetryNever
// DefaultRetries is the default number of times a request is tried // DefaultRetries is the default number of times a request is tried

View File

@@ -19,18 +19,32 @@ func RetryNever(ctx context.Context, req Request, retryCount int, err error) (bo
return false, nil return false, nil
} }
// RetryOnError retries a request on a 500 or timeout error // RetryOnError retries a request on a 500 or 408 (timeout) error
func RetryOnError(_ context.Context, _ Request, _ int, err error) (bool, error) { func RetryOnError(_ context.Context, _ Request, _ int, err error) (bool, error) {
if err == nil { if err == nil {
return false, nil return false, nil
} }
me := errors.FromError(err) me := errors.FromError(err)
switch me.Code { switch me.Code {
// retry on timeout or internal server error // retry on timeout or internal server error
case 408, 500: case 408, 500:
return true, nil return true, nil
} }
return false, nil return false, nil
} }
// RetryOnErrors retries a request on specified error codes
func RetryOnErrors(codes ...int32) RetryFunc {
return func(_ context.Context, _ Request, _ int, err error) (bool, error) {
if err == nil {
return false, nil
}
me := errors.FromError(err)
for _, code := range codes {
if me.Code == code {
return true, nil
}
}
return false, nil
}
}

View File

@@ -4,11 +4,16 @@ package config // import "go.unistack.org/micro/v3/config"
import ( import (
"context" "context"
"errors" "errors"
"reflect"
"time" "time"
) )
type Validator interface {
Validate() error
}
// DefaultConfig default config // DefaultConfig default config
var DefaultConfig Config = NewConfig() var DefaultConfig = NewConfig()
// DefaultWatcherMinInterval default min interval for poll changes // DefaultWatcherMinInterval default min interval for poll changes
var DefaultWatcherMinInterval = 5 * time.Second var DefaultWatcherMinInterval = 5 * time.Second
@@ -67,7 +72,59 @@ func Load(ctx context.Context, cs []Config, opts ...LoadOption) error {
return nil return nil
} }
// Validate runs Validate() error func for each struct field
func Validate(ctx context.Context, cfg interface{}) error {
if cfg == nil {
return nil
}
if v, ok := cfg.(Validator); ok {
if err := v.Validate(); err != nil {
return err
}
}
sv := reflect.ValueOf(cfg)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return nil
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.IsValid() || len(fld.PkgPath) != 0 {
continue
}
if v, ok := val.Interface().(Validator); ok {
if err := v.Validate(); err != nil {
return err
}
}
switch val.Kind() {
case reflect.Ptr:
if reflect.Indirect(val).Kind() == reflect.Struct {
if err := Validate(ctx, val.Interface()); err != nil {
return err
}
}
case reflect.Struct:
if err := Validate(ctx, val.Interface()); err != nil {
return err
}
}
}
return nil
}
var ( var (
// DefaultAfterLoad default func that runs after config load
DefaultAfterLoad = func(ctx context.Context, c Config) error { DefaultAfterLoad = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().AfterLoad { for _, fn := range c.Options().AfterLoad {
if err := fn(ctx, c); err != nil { if err := fn(ctx, c); err != nil {
@@ -79,7 +136,7 @@ var (
} }
return nil return nil
} }
// DefaultAfterSave default func that runs after config save
DefaultAfterSave = func(ctx context.Context, c Config) error { DefaultAfterSave = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().AfterSave { for _, fn := range c.Options().AfterSave {
if err := fn(ctx, c); err != nil { if err := fn(ctx, c); err != nil {
@@ -91,7 +148,7 @@ var (
} }
return nil return nil
} }
// DefaultBeforeLoad default func that runs before config load
DefaultBeforeLoad = func(ctx context.Context, c Config) error { DefaultBeforeLoad = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().BeforeLoad { for _, fn := range c.Options().BeforeLoad {
if err := fn(ctx, c); err != nil { if err := fn(ctx, c); err != nil {
@@ -103,11 +160,11 @@ var (
} }
return nil return nil
} }
// DefaultBeforeSave default func that runs befora config save
DefaultBeforeSave = func(ctx context.Context, c Config) error { DefaultBeforeSave = func(ctx context.Context, c Config) error {
for _, fn := range c.Options().BeforeSave { for _, fn := range c.Options().BeforeSave {
if err := fn(ctx, c); err != nil { if err := fn(ctx, c); err != nil {
c.Options().Logger.Errorf(ctx, "%s BeforeSavec err: %v", c.String(), err) c.Options().Logger.Errorf(ctx, "%s BeforeSave err: %v", c.String(), err)
if !c.Options().AllowFail { if !c.Options().AllowFail {
return err return err
} }

View File

@@ -8,30 +8,46 @@ import (
"go.unistack.org/micro/v3/config" "go.unistack.org/micro/v3/config"
) )
type Cfg struct { type cfg struct {
StringValue string `default:"string_value"` StringValue string `default:"string_value"`
IgnoreValue string `json:"-"` IgnoreValue string `json:"-"`
StructValue struct { StructValue *cfgStructValue
StringValue string `default:"string_value"` IntValue int `default:"99"`
}
type cfgStructValue struct {
StringValue string `default:"string_value"`
}
func (c *cfg) Validate() error {
if c.IntValue != 10 {
return fmt.Errorf("invalid IntValue %d != %d", 10, c.IntValue)
} }
IntValue int `default:"99"` return nil
}
func (c *cfgStructValue) Validate() error {
if c.StringValue != "string_value" {
return fmt.Errorf("invalid StringValue %s != %s", "string_value", c.StringValue)
}
return nil
} }
func TestDefault(t *testing.T) { func TestDefault(t *testing.T) {
ctx := context.Background() ctx := context.Background()
conf := &Cfg{IntValue: 10} conf := &cfg{IntValue: 10}
blfn := func(ctx context.Context, cfg config.Config) error { blfn := func(_ context.Context, c config.Config) error {
nconf, ok := cfg.Options().Struct.(*Cfg) nconf, ok := c.Options().Struct.(*cfg)
if !ok { if !ok {
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options()) return fmt.Errorf("failed to get Struct from options: %v", c.Options())
} }
nconf.StringValue = "before_load" nconf.StringValue = "before_load"
return nil return nil
} }
alfn := func(ctx context.Context, cfg config.Config) error { alfn := func(_ context.Context, c config.Config) error {
nconf, ok := cfg.Options().Struct.(*Cfg) nconf, ok := c.Options().Struct.(*cfg)
if !ok { if !ok {
return fmt.Errorf("failed to get Struct from options: %v", cfg.Options()) return fmt.Errorf("failed to get Struct from options: %v", c.Options())
} }
nconf.StringValue = "after_load" nconf.StringValue = "after_load"
return nil return nil
@@ -50,3 +66,19 @@ func TestDefault(t *testing.T) {
_ = conf _ = conf
// t.Logf("%#+v\n", conf) // t.Logf("%#+v\n", conf)
} }
func TestValidate(t *testing.T) {
ctx := context.Background()
conf := &cfg{IntValue: 10}
cfg := config.NewConfig(config.Struct(conf))
if err := cfg.Init(); err != nil {
t.Fatal(err)
}
if err := cfg.Load(ctx); err != nil {
t.Fatal(err)
}
if err := config.Validate(ctx, conf); err != nil {
t.Fatal(err)
}
}

View File

@@ -69,6 +69,7 @@ type LoadOptions struct {
Context context.Context Context context.Context
} }
// NewLoadOptions create LoadOptions struct with provided opts
func NewLoadOptions(opts ...LoadOption) LoadOptions { func NewLoadOptions(opts ...LoadOption) LoadOptions {
options := LoadOptions{} options := LoadOptions{}
for _, o := range opts { for _, o := range opts {
@@ -221,8 +222,10 @@ type WatchOptions struct {
Coalesce bool Coalesce bool
} }
// WatchOption func signature
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)
// NewWatchOptions create WatchOptions struct with provided opts
func NewWatchOptions(opts ...WatchOption) WatchOptions { func NewWatchOptions(opts ...WatchOption) WatchOptions {
options := WatchOptions{ options := WatchOptions{
Context: context.Background(), Context: context.Background(),

View File

@@ -11,7 +11,9 @@ import (
) )
var ( var (
// ErrStepNotExists returns when step not found
ErrStepNotExists = errors.New("step not exists") ErrStepNotExists = errors.New("step not exists")
// ErrMissingClient returns when client.Client is missing
ErrMissingClient = errors.New("client not set") ErrMissingClient = errors.New("client not set")
) )
@@ -36,6 +38,7 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
return nil return nil
} }
// Message used to transfer data between steps
type Message struct { type Message struct {
Header metadata.Metadata Header metadata.Metadata
Body RawMessage Body RawMessage
@@ -67,6 +70,7 @@ type Step interface {
Response() *Message Response() *Message
} }
// Status contains step current status
type Status int type Status int
func (status Status) String() string { func (status Status) String() string {
@@ -74,15 +78,22 @@ func (status Status) String() string {
} }
const ( const (
// StatusPending step waiting to start
StatusPending Status = iota StatusPending Status = iota
// StatusRunning step is running
StatusRunning StatusRunning
// StatusFailure step competed with error
StatusFailure StatusFailure
// StatusSuccess step completed without error
StatusSuccess StatusSuccess
// StatusAborted step aborted while it running
StatusAborted StatusAborted
// StatusSuspend step suspended
StatusSuspend StatusSuspend
) )
var ( var (
// StatusString contains map status => string
StatusString = map[Status]string{ StatusString = map[Status]string{
StatusPending: "StatusPending", StatusPending: "StatusPending",
StatusRunning: "StatusRunning", StatusRunning: "StatusRunning",
@@ -91,6 +102,7 @@ var (
StatusAborted: "StatusAborted", StatusAborted: "StatusAborted",
StatusSuspend: "StatusSuspend", StatusSuspend: "StatusSuspend",
} }
// StringStatus contains map string => status
StringStatus = map[string]Status{ StringStatus = map[string]Status{
"StatusPending": StatusPending, "StatusPending": StatusPending,
"StatusRunning": StatusRunning, "StatusRunning": StatusRunning,
@@ -144,6 +156,7 @@ var (
atomicSteps atomic.Value atomicSteps atomic.Value
) )
// RegisterStep register own step with workflow
func RegisterStep(step Step) { func RegisterStep(step Step) {
flowMu.Lock() flowMu.Lock()
steps, _ := atomicSteps.Load().([]Step) steps, _ := atomicSteps.Load().([]Step)

View File

@@ -91,7 +91,7 @@ func Store(s store.Store) Option {
} }
} }
// WorflowOption func signature // WorkflowOption func signature
type WorkflowOption func(*WorkflowOptions) type WorkflowOption func(*WorkflowOptions)
// WorkflowOptions holds workflow options // WorkflowOptions holds workflow options

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.16
require ( require (
github.com/ef-ds/deque v1.0.4 github.com/ef-ds/deque v1.0.4
github.com/golang-jwt/jwt/v4 v4.3.0 github.com/golang-jwt/jwt/v4 v4.4.0
github.com/imdario/mergo v0.3.12 github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35

4
go.sum
View File

@@ -23,8 +23,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang-jwt/jwt/v4 v4.3.0 h1:kHL1vqdqWNfATmA0FNMdmZNMyZI1U6O31X4rlIPoBog= github.com/golang-jwt/jwt/v4 v4.4.0 h1:EmVIxB5jzbllGIjiCV5JG4VylbK3KE400tLGLI1cdfU=
github.com/golang-jwt/jwt/v4 v4.3.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.4.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

View File

@@ -11,6 +11,7 @@ import (
) )
var ( var (
// DefaultClientCallObserver called by wrapper in client Call
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string { DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil { if err != nil {
@@ -19,6 +20,7 @@ var (
return labels return labels
} }
// DefaultClientStreamObserver called by wrapper in client Stream
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string { DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil { if err != nil {
@@ -27,6 +29,7 @@ var (
return labels return labels
} }
// DefaultClientPublishObserver called by wrapper in client Publish
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string { DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
labels := []string{"endpoint", msg.Topic()} labels := []string{"endpoint", msg.Topic()}
if err != nil { if err != nil {
@@ -35,6 +38,7 @@ var (
return labels return labels
} }
// DefaultServerHandlerObserver called by wrapper in server Handler
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string { DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil { if err != nil {
@@ -43,6 +47,7 @@ var (
return labels return labels
} }
// DefaultServerSubscriberObserver called by wrapper in server Subscriber
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string { DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string {
labels := []string{"endpoint", msg.Topic()} labels := []string{"endpoint", msg.Topic()}
if err != nil { if err != nil {
@@ -51,6 +56,7 @@ var (
return labels return labels
} }
// DefaultClientCallFuncObserver called by wrapper in client CallFunc
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string { DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil { if err != nil {
@@ -59,6 +65,7 @@ var (
return labels return labels
} }
// DefaultSkipEndpoints wrapper not called for this endpoints
DefaultSkipEndpoints = []string{"Meter.Metrics"} DefaultSkipEndpoints = []string{"Meter.Metrics"}
) )
@@ -71,11 +78,17 @@ type lWrapper struct {
} }
type ( type (
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string // ClientCallObserver func signature
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string // ClientStreamObserver func signature
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string // ClientPublishObserver func signature
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
// ClientCallFuncObserver func signature
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
// ServerHandlerObserver func signature
ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
// ServerSubscriberObserver func signature
ServerSubscriberObserver func(context.Context, server.Message, error) []string ServerSubscriberObserver func(context.Context, server.Message, error) []string
) )

View File

@@ -102,7 +102,7 @@ func (k byKey) Swap(i, j int) {
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
} }
// BuildLables used to sort labels and delete duplicates. // BuildLabels used to sort labels and delete duplicates.
// Last value wins in case of duplicate label keys. // Last value wins in case of duplicate label keys.
func BuildLabels(labels ...string) []string { func BuildLabels(labels ...string) []string {
if len(labels)%2 == 1 { if len(labels)%2 == 1 {

View File

@@ -104,7 +104,7 @@ func Meter(m meter.Meter) Option {
} }
} }
// SkipEndpoint add endpoint to skip // SkipEndoints add endpoint to skip
func SkipEndoints(eps ...string) Option { func SkipEndoints(eps ...string) Option {
return func(o *Options) { return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...) o.SkipEndpoints = append(o.SkipEndpoints, eps...)
@@ -294,7 +294,7 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
} }
} }
// NewSubscribeWrapper create server subscribe wrapper // NewSubscriberWrapper create server subscribe wrapper
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{ handler := &wrapper{
opts: NewOptions(opts...), opts: NewOptions(opts...),

View File

@@ -2,7 +2,6 @@ package register
import ( import (
"context" "context"
"errors"
"sync" "sync"
"time" "time"
@@ -438,7 +437,7 @@ func (m *watcher) Next() (*Result, error) {
return r, nil return r, nil
} }
case <-m.exit: case <-m.exit:
return nil, errors.New("watcher stopped") return nil, ErrWatcherStopped
} }
} }
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
) )
@@ -284,29 +285,39 @@ func TestMemoryWildcard(t *testing.T) {
} }
func TestWatcher(t *testing.T) { func TestWatcher(t *testing.T) {
w := &watcher{ testSrv := &Service{Name: "foo", Version: "1.0.0"}
id: "test",
res: make(chan *Result),
exit: make(chan bool),
wo: WatchOptions{
Domain: WildcardDomain,
},
}
ctx := context.TODO()
m := NewRegister()
m.Init()
m.Connect(ctx)
wc, err := m.Watch(ctx)
if err != nil {
t.Fatalf("cant watch: %v", err)
}
defer wc.Stop()
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
w.res <- &Result{ for {
Service: &Service{Name: "foo"}, ch, err := wc.Next()
if err != nil {
t.Fatal("unexpected err", err)
}
t.Logf("changes %#+v", ch.Service)
wc.Stop()
wg.Done()
return
} }
}() }()
_, err := w.Next() if err := m.Register(ctx, testSrv); err != nil {
if err != nil { t.Fatalf("Register err: %v", err)
t.Fatal("unexpected err", err)
} }
w.Stop() wg.Wait()
if _, err := wc.Next(); err == nil {
if _, err := w.Next(); err == nil {
t.Fatal("expected error on Next()") t.Fatal("expected error on Next()")
} }
} }

View File

@@ -44,9 +44,8 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
// nolint: golint,revive
// RegisterOptions holds options for register method // RegisterOptions holds options for register method
type RegisterOptions struct { type RegisterOptions struct { // nolint: golint,revive
Context context.Context Context context.Context
Domain string Domain string
TTL time.Duration TTL time.Duration
@@ -197,33 +196,29 @@ func TLSConfig(t *tls.Config) Option {
} }
} }
// nolint: golint,revive
// RegisterAttempts specifies register atempts count // RegisterAttempts specifies register atempts count
func RegisterAttempts(t int) RegisterOption { func RegisterAttempts(t int) RegisterOption { // nolint: golint,revive
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
o.Attempts = t o.Attempts = t
} }
} }
// nolint: golint,revive
// RegisterTTL specifies register ttl // RegisterTTL specifies register ttl
func RegisterTTL(t time.Duration) RegisterOption { func RegisterTTL(t time.Duration) RegisterOption { // nolint: golint,revive
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
o.TTL = t o.TTL = t
} }
} }
// nolint: golint,revive
// RegisterContext sets the register context // RegisterContext sets the register context
func RegisterContext(ctx context.Context) RegisterOption { func RegisterContext(ctx context.Context) RegisterOption { // nolint: golint,revive
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
o.Context = ctx o.Context = ctx
} }
} }
// nolint: golint,revive
// RegisterDomain secifies register domain // RegisterDomain secifies register domain
func RegisterDomain(d string) RegisterOption { func RegisterDomain(d string) RegisterOption { // nolint: golint,revive
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
o.Domain = d o.Domain = d
} }

View File

@@ -69,9 +69,8 @@ type Endpoint struct {
// Option func signature // Option func signature
type Option func(*Options) type Option func(*Options)
// nolint: golint,revive
// RegisterOption option is used to register service // RegisterOption option is used to register service
type RegisterOption func(*RegisterOptions) type RegisterOption func(*RegisterOptions) // nolint: golint,revive
// WatchOption option is used to watch service changes // WatchOption option is used to watch service changes
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)

View File

@@ -22,7 +22,7 @@ func (r *rpcMessage) Topic() string {
return r.topic return r.topic
} }
func (r *rpcMessage) Payload() interface{} { func (r *rpcMessage) Body() interface{} {
return r.payload return r.payload
} }
@@ -30,10 +30,6 @@ func (r *rpcMessage) Header() metadata.Metadata {
return r.header return r.header
} }
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Codec { func (r *rpcMessage) Codec() codec.Codec {
return r.codec return r.codec
} }

View File

@@ -75,13 +75,11 @@ type Message interface {
// Topic of the message // Topic of the message
Topic() string Topic() string
// The decoded payload value // The decoded payload value
Payload() interface{} Body() interface{}
// The content type of the payload // The content type of the payload
ContentType() string ContentType() string
// The raw headers of the message // The raw headers of the message
Header() metadata.Metadata Header() metadata.Metadata
// The raw body of the message
Body() []byte
// Codec used to decode the message // Codec used to decode the message
Codec() codec.Codec Codec() codec.Codec
} }

View File

@@ -252,7 +252,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
return err return err
} }
rb := reflect.New(req.Type().Elem()) rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil { if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err return err
} }
msg.(*rpcMessage).codec = cf msg.(*rpcMessage).codec = cf
@@ -269,7 +269,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
} }
payloads := reflect.MakeSlice(reqType, 0, len(ms)) payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms { for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload())) payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
} }
vals = append(vals, payloads) vals = append(vals, payloads)
@@ -381,7 +381,7 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg.Payload())) vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil { if rerr := returnValues[0].Interface(); rerr != nil {
@@ -406,7 +406,6 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
contentType: ct, contentType: ct,
payload: req.Interface(), payload: req.Interface(),
header: msg.Header, header: msg.Header,
body: msg.Body,
}) })
results <- cerr results <- cerr
}() }()

View File

@@ -54,8 +54,10 @@ type Service interface {
// Runtime(string) (runtime.Runtime, bool) // Runtime(string) (runtime.Runtime, bool)
// Profile // Profile
// Profile(string) (profile.Profile, bool) // Profile(string) (profile.Profile, bool)
// Run the service // Run the service and wait
Run() error Run() error
// Start the service
Start() error
// The service implementation // The service implementation
String() string String() string
} }
@@ -257,7 +259,7 @@ func (s *service) Start() error {
s.RUnlock() s.RUnlock()
if config.Loggers[0].V(logger.InfoLevel) { if config.Loggers[0].V(logger.InfoLevel) {
config.Loggers[0].Infof(s.opts.Context, "starting [service] %s", s.Name()) config.Loggers[0].Infof(s.opts.Context, "starting [service] %s version %s", s.Options().Name, s.Options().Version)
} }
for _, fn := range s.opts.BeforeStart { for _, fn := range s.opts.BeforeStart {

View File

@@ -2,12 +2,16 @@ package tracer
import "go.unistack.org/micro/v3/logger" import "go.unistack.org/micro/v3/logger"
// SpanOptions contains span option
type SpanOptions struct{} type SpanOptions struct{}
// SpanOption func signature
type SpanOption func(o *SpanOptions) type SpanOption func(o *SpanOptions)
// EventOptions contains event options
type EventOptions struct{} type EventOptions struct{}
// EventOption func signature
type EventOption func(o *EventOptions) type EventOption func(o *EventOptions)
// Options struct // Options struct
@@ -18,7 +22,7 @@ type Options struct {
Name string Name string
} }
// Option func // Option func signature
type Option func(o *Options) type Option func(o *Options)
// Logger sets the logger // Logger sets the logger

View File

@@ -1,6 +1,7 @@
package jitter package jitter // import "go.unistack.org/micro/v3/util/jitter"
import ( import (
"context"
"time" "time"
"go.unistack.org/micro/v3/util/rand" "go.unistack.org/micro/v3/util/rand"
@@ -10,13 +11,31 @@ import (
// the min and max duration values (stored internally as int64 nanosecond // the min and max duration values (stored internally as int64 nanosecond
// counts). // counts).
type Ticker struct { type Ticker struct {
C chan time.Time ctx context.Context
done chan chan struct{} done chan chan struct{}
C chan time.Time
min int64 min int64
max int64 max int64
exp int64
exit bool
rng rand.Rand rng rand.Rand
} }
// NewTickerContext returns a pointer to an initialized instance of the Ticker.
// It works like NewTicker except that it has ability to close via context.
// Also it works fine with context.WithTimeout to handle max time to run ticker.
func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
ticker := &Ticker{
C: make(chan time.Time),
done: make(chan chan struct{}),
min: min.Nanoseconds(),
max: max.Nanoseconds(),
ctx: ctx,
}
go ticker.run()
return ticker
}
// NewTicker returns a pointer to an initialized instance of the Ticker. // NewTicker returns a pointer to an initialized instance of the Ticker.
// Min and max are durations of the shortest and longest allowed // Min and max are durations of the shortest and longest allowed
// ticks. Ticker will run in a goroutine until explicitly stopped. // ticks. Ticker will run in a goroutine until explicitly stopped.
@@ -26,6 +45,7 @@ func NewTicker(min, max time.Duration) *Ticker {
done: make(chan chan struct{}), done: make(chan chan struct{}),
min: min.Nanoseconds(), min: min.Nanoseconds(),
max: max.Nanoseconds(), max: max.Nanoseconds(),
ctx: context.Background(),
} }
go ticker.run() go ticker.run()
return ticker return ticker
@@ -33,9 +53,14 @@ func NewTicker(min, max time.Duration) *Ticker {
// Stop terminates the ticker goroutine and closes the C channel. // Stop terminates the ticker goroutine and closes the C channel.
func (ticker *Ticker) Stop() { func (ticker *Ticker) Stop() {
if ticker.exit {
return
}
c := make(chan struct{}) c := make(chan struct{})
ticker.done <- c ticker.done <- c
<-c <-c
// close(ticker.C)
ticker.exit = true
} }
func (ticker *Ticker) run() { func (ticker *Ticker) run() {
@@ -44,6 +69,8 @@ func (ticker *Ticker) run() {
for { for {
// either a stop signal or a timeout // either a stop signal or a timeout
select { select {
case <-ticker.ctx.Done():
t.Stop()
case c := <-ticker.done: case c := <-ticker.done:
t.Stop() t.Stop()
close(c) close(c)

View File

@@ -0,0 +1,62 @@
package jitter
import (
"context"
"testing"
"time"
)
func TestNewTickerContext(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
ticker := NewTickerContext(ctx, 600*time.Millisecond, 1000*time.Millisecond)
defer ticker.Stop()
loop:
for {
select {
case <-ctx.Done():
ticker.Stop()
break loop
case v, ok := <-ticker.C:
if ok {
t.Fatalf("context must be closed %s", v)
}
break loop
}
}
}
func TestTicker(t *testing.T) {
t.Parallel()
min := time.Duration(10)
max := time.Duration(20)
// tick can take a little longer since we're not adjusting it to account for
// processing.
precision := time.Duration(4)
rt := NewTicker(min*time.Millisecond, max*time.Millisecond)
for i := 0; i < 5; i++ {
t0 := time.Now()
t1 := <-rt.C
td := t1.Sub(t0)
if td < min*time.Millisecond {
t.Fatalf("tick was shorter than expected: %s", td)
} else if td > (max+precision)*time.Millisecond {
t.Fatalf("tick was longer than expected: %s", td)
}
}
rt.Stop()
time.Sleep((max + precision) * time.Millisecond)
select {
case v, ok := <-rt.C:
if ok || !v.IsZero() {
t.Fatal("ticker did not shut down")
}
default:
t.Fatal("expected to receive close channel signal")
}
}

View File

@@ -9,18 +9,30 @@ import (
) )
const ( const (
SplitToken = "." // SplitToken used to detect path components
SplitToken = "."
// IndexCloseChar used to detect index end
IndexCloseChar = "]" IndexCloseChar = "]"
IndexOpenChar = "[" // IndexOpenChar used to detect index start
IndexOpenChar = "["
) )
var ( var (
ErrMalformedIndex = errors.New("Malformed index key") // ErrMalformedIndex returns when index key have invalid format
ErrInvalidIndexUsage = errors.New("Invalid index key usage") ErrMalformedIndex = errors.New("malformed index key")
ErrKeyNotFound = errors.New("Unable to find the key") // ErrInvalidIndexUsage returns when index key usage error
ErrBadJSONPath = errors.New("Bad path: must start with $ and have more then 2 chars") ErrInvalidIndexUsage = errors.New("invalid index key usage")
// ErrKeyNotFound returns when key not found
ErrKeyNotFound = errors.New("unable to find the key")
// ErrBadJSONPath returns when path have invalid syntax
ErrBadJSONPath = errors.New("bad path: must start with $ and have more then 2 chars")
) )
// Lookup performs a lookup into a value, using a path of keys. The key should
// match with a Field or a MapIndex. For slice you can use the syntax key[index]
// to access a specific index. If one key owns to a slice and an index is not
// specificied the rest of the path will be apllied to evaley value of the
// slice, and the value will be merged into a slice.
func Lookup(i interface{}, path string) (reflect.Value, error) { func Lookup(i interface{}, path string) (reflect.Value, error) {
if path == "" || path[0:1] != "$" { if path == "" || path[0:1] != "$" {
return reflect.Value{}, ErrBadJSONPath return reflect.Value{}, ErrBadJSONPath
@@ -37,11 +49,6 @@ func Lookup(i interface{}, path string) (reflect.Value, error) {
return lookup(i, strings.Split(path[2:], SplitToken)...) return lookup(i, strings.Split(path[2:], SplitToken)...)
} }
// Lookup performs a lookup into a value, using a path of keys. The key should
// match with a Field or a MapIndex. For slice you can use the syntax key[index]
// to access a specific index. If one key owns to a slice and an index is not
// specificied the rest of the path will be apllied to evaley value of the
// slice, and the value will be merged into a slice.
func lookup(i interface{}, path ...string) (reflect.Value, error) { func lookup(i interface{}, path ...string) (reflect.Value, error) {
value := reflect.ValueOf(i) value := reflect.ValueOf(i)
var parent reflect.Value var parent reflect.Value

View File

@@ -20,8 +20,8 @@ var bracketSplitter = regexp.MustCompile(`\[|\]`)
// StructField contains struct field path its value and field // StructField contains struct field path its value and field
type StructField struct { type StructField struct {
Path string
Value reflect.Value Value reflect.Value
Path string
Field reflect.StructField Field reflect.StructField
} }
@@ -245,8 +245,13 @@ func StructFields(src interface{}) ([]StructField, error) {
switch val.Kind() { switch val.Kind() {
case reflect.Ptr: case reflect.Ptr:
// if !val.IsValid() if val.CanSet() && fld.Type.Elem().Kind() == reflect.Struct {
if reflect.Indirect(val).Kind() == reflect.Struct { if val.IsNil() {
val.Set(reflect.New(fld.Type.Elem()))
}
}
switch reflect.Indirect(val).Kind() {
case reflect.Struct:
infields, err := StructFields(val.Interface()) infields, err := StructFields(val.Interface())
if err != nil { if err != nil {
return nil, err return nil, err
@@ -255,7 +260,7 @@ func StructFields(src interface{}) ([]StructField, error) {
infield.Path = fmt.Sprintf("%s.%s", fld.Name, infield.Path) infield.Path = fmt.Sprintf("%s.%s", fld.Name, infield.Path)
fields = append(fields, infield) fields = append(fields, infield)
} }
} else { default:
fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name}) fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
} }
case reflect.Struct: case reflect.Struct:
@@ -268,6 +273,7 @@ func StructFields(src interface{}) ([]StructField, error) {
fields = append(fields, infield) fields = append(fields, infield)
} }
default: default:
fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name}) fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
} }
} }

View File

@@ -10,22 +10,28 @@ import (
) )
func TestStructfields(t *testing.T) { func TestStructfields(t *testing.T) {
type NestedConfig struct {
Value string
}
type Config struct { type Config struct {
Wait time.Duration
Time time.Time Time time.Time
Nested *NestedConfig
Metadata map[string]int Metadata map[string]int
Broker string Broker string
Addr []string Addr []string
Wait time.Duration
Verbose bool Verbose bool
Nested *Config
} }
cfg := &Config{Nested: &Config{}} cfg := &Config{Nested: &NestedConfig{}}
fields, err := rutil.StructFields(cfg) fields, err := rutil.StructFields(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(fields) != 13 { if len(fields) != 7 {
t.Fatalf("invalid fields number: %v", fields) for _, field := range fields {
t.Logf("field %#+v\n", field)
}
t.Fatalf("invalid fields number: %d != %d", 7, len(fields))
} }
} }

View File

@@ -1,7 +1,11 @@
package register // import "go.unistack.org/micro/v3/util/register" package register // import "go.unistack.org/micro/v3/util/register"
import ( import (
"context"
"time"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/register"
jitter "go.unistack.org/micro/v3/util/jitter"
) )
func addNodes(old, neu []*register.Node) []*register.Node { func addNodes(old, neu []*register.Node) []*register.Node {
@@ -146,3 +150,30 @@ func Remove(old, del []*register.Service) []*register.Service {
return services return services
} }
// WaitService using register wait for service to appear with min/max interval for check and optional timeout.
// Timeout can be 0 to wait infinitive.
func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
ticker := jitter.NewTickerContext(ctx, min, max)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-ticker.C:
if _, err := reg.LookupService(ctx, name, opts...); err == nil {
return nil
}
if ok {
return register.ErrNotFound
}
}
}
}