Compare commits
42 Commits
Author | SHA1 | Date | |
---|---|---|---|
be5f9ab77f | |||
144dca0cae | |||
75173560e3 | |||
9b3bccd1f1 | |||
ce125b77c1 | |||
2ee8d4ed46 | |||
f58781d076 | |||
e1af4aa3a4 | |||
1d5e795443 | |||
a3a434d923 | |||
bcc06054f1 | |||
270d26f1ae | |||
646212cc08 | |||
00c2c749db | |||
2dbada0e94 | |||
7b8f4410fb | |||
45ebef5544 | |||
cf4cac0733 | |||
50d60b5825 | |||
46ef491764 | |||
a51b8b8102 | |||
15aac48f1e | |||
078069b2d7 | |||
258812304a | |||
da5d50db5b | |||
384e4d113d | |||
dfd1da7f0d | |||
8e5015e580 | |||
bd0c309b71 | |||
b4f0c3e29a | |||
8fddaa0455 | |||
2710c269a8 | |||
70ea93e466 | |||
a87d0ab1c1 | |||
2e5e102719 | |||
36e492314d | |||
0c78873277 | |||
7f57dc09d3 | |||
447206d256 | |||
33a7feb970 | |||
3950f2ed86 | |||
68c1048a7d |
18
.gitea/ISSUE_TEMPLATE/bug_report.md
Normal file
18
.gitea/ISSUE_TEMPLATE/bug_report.md
Normal file
@@ -0,0 +1,18 @@
|
||||
---
|
||||
name: Bug report
|
||||
about: For reporting bugs in micro
|
||||
title: "[BUG]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Describe the bug**
|
||||
|
||||
1. What are you trying to do?
|
||||
2. What did you expect to happen?
|
||||
3. What happens instead?
|
||||
|
||||
**How to reproduce the bug:**
|
||||
|
||||
If possible, please include a minimal code snippet here.
|
17
.gitea/ISSUE_TEMPLATE/feature-request---enhancement.md
Normal file
17
.gitea/ISSUE_TEMPLATE/feature-request---enhancement.md
Normal file
@@ -0,0 +1,17 @@
|
||||
---
|
||||
name: Feature request / Enhancement
|
||||
about: If you have a need not served by micro
|
||||
title: "[FEATURE]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Is your feature request related to a problem? Please describe.**
|
||||
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||
|
||||
**Describe the solution you'd like**
|
||||
A clear and concise description of what you want to happen.
|
||||
|
||||
**Additional context**
|
||||
Add any other context or screenshots about the feature request here.
|
8
.gitea/ISSUE_TEMPLATE/question.md
Normal file
8
.gitea/ISSUE_TEMPLATE/question.md
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
name: Question
|
||||
about: Ask a question about micro
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
28
.gitea/autoapprove.yml
Normal file
28
.gitea/autoapprove.yml
Normal file
@@ -0,0 +1,28 @@
|
||||
name: "autoapprove"
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [assigned, opened, synchronize, reopened]
|
||||
workflow_run:
|
||||
workflows: ["prbuild"]
|
||||
types:
|
||||
- completed
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
autoapprove:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: approve
|
||||
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
|
||||
"chmod +x ./tea",
|
||||
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
|
||||
"./tea pr --repo ${{ github.event.repository.name }}"
|
||||
]
|
||||
if: github.actor == 'vtolstov'
|
||||
id: approve
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
24
.gitea/workflows/lint.yml
Normal file
24
.gitea/workflows/lint.yml
Normal file
@@ -0,0 +1,24 @@
|
||||
name: lint
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
jobs:
|
||||
lint:
|
||||
name: lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup-go
|
||||
uses: https://gitea.com/actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.18
|
||||
- name: checkout
|
||||
uses: https://gitea.com/actions/checkout@v3
|
||||
- name: deps
|
||||
run: go get -v -d ./...
|
||||
- name: lint
|
||||
uses: https://github.com/golangci/golangci-lint-action@v3.4.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
version: v1.52
|
23
.gitea/workflows/pr.yml
Normal file
23
.gitea/workflows/pr.yml
Normal file
@@ -0,0 +1,23 @@
|
||||
name: pr
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
jobs:
|
||||
test:
|
||||
name: test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: https://gitea.com/actions/checkout@v3
|
||||
- name: setup-go
|
||||
uses: https://gitea.com/actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.18
|
||||
- name: deps
|
||||
run: go get -v -t -d ./...
|
||||
- name: test
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
run: go test -mod readonly -v ./...
|
9
.github.old/PULL_REQUEST_TEMPLATE.md
Normal file
9
.github.old/PULL_REQUEST_TEMPLATE.md
Normal file
@@ -0,0 +1,9 @@
|
||||
## Pull Request template
|
||||
Please, go through these steps before clicking submit on this PR.
|
||||
|
||||
1. Give a descriptive title to your PR.
|
||||
2. Provide a description of your changes.
|
||||
3. Make sure you have some relevant tests.
|
||||
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
|
||||
|
||||
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
|
@@ -37,11 +37,4 @@ jobs:
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
# args: --issues-exit-code=0
|
||||
# Optional: show only new issues if it's a pull request. The default value is `false`.
|
||||
# only-new-issues: true
|
||||
version: v1.30
|
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
@@ -85,33 +86,12 @@ type Event interface {
|
||||
SetError(err error)
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||
type RawMessage []byte
|
||||
|
||||
// MarshalJSON returns m as the JSON encoding of m.
|
||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if m == nil {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return *m, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON sets *m to a copy of data.
|
||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
if m == nil {
|
||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||
}
|
||||
*m = append((*m)[0:0], data...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Message is used to transfer data
|
||||
type Message struct {
|
||||
// Header contains message metadata
|
||||
Header metadata.Metadata
|
||||
// Body contains message body
|
||||
Body RawMessage
|
||||
Body codec.RawMessage
|
||||
}
|
||||
|
||||
// NewMessage create broker message with topic filled
|
||||
|
@@ -84,3 +84,24 @@ func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte,
|
||||
|
||||
return append(buf, mbuf...), nil
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||
type RawMessage []byte
|
||||
|
||||
// MarshalJSON returns m as the JSON encoding of m.
|
||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if m == nil {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return *m, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON sets *m to a copy of data.
|
||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
if m == nil {
|
||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||
}
|
||||
*m = append((*m)[0:0], data...)
|
||||
return nil
|
||||
}
|
||||
|
@@ -124,33 +124,12 @@ func Validate(ctx context.Context, cfg interface{}) error {
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultAfterLoad default func that runs after config load
|
||||
DefaultAfterLoad = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterLoad {
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s AfterLoad err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultAfterSave default func that runs after config save
|
||||
DefaultAfterSave = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterSave {
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s AfterSave err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultBeforeLoad default func that runs before config load
|
||||
// DefaultBeforeLoad default func that runs before config Load
|
||||
DefaultBeforeLoad = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().BeforeLoad {
|
||||
if fn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s BeforeLoad err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
@@ -160,9 +139,27 @@ var (
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultBeforeSave default func that runs befora config save
|
||||
// DefaultAfterLoad default func that runs after config Load
|
||||
DefaultAfterLoad = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterLoad {
|
||||
if fn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s AfterLoad err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultBeforeSave default func that runs befora config Save
|
||||
DefaultBeforeSave = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().BeforeSave {
|
||||
if fn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s BeforeSave err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
@@ -172,4 +169,49 @@ var (
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultAfterSave default func that runs after config Save
|
||||
DefaultAfterSave = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterSave {
|
||||
if fn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s AfterSave err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultBeforeInit default func that runs befora config Init
|
||||
DefaultBeforeInit = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().BeforeInit {
|
||||
if fn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s BeforeInit err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// DefaultAfterInit default func that runs after config Init
|
||||
DefaultAfterInit = func(ctx context.Context, c Config) error {
|
||||
for _, fn := range c.Options().AfterSave {
|
||||
if fn == nil {
|
||||
return nil
|
||||
}
|
||||
if err := fn(ctx, c); err != nil {
|
||||
c.Options().Logger.Errorf(ctx, "%s AfterInit err: %v", c.String(), err)
|
||||
if !c.Options().AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
)
|
||||
|
@@ -24,11 +24,20 @@ func (c *defaultConfig) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&c.opts)
|
||||
}
|
||||
|
||||
if err := DefaultBeforeInit(c.opts.Context, c); err != nil && !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := DefaultAfterInit(c.opts.Context, c); err != nil && !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
||||
if err := DefaultBeforeLoad(ctx, c); err != nil {
|
||||
if err := DefaultBeforeLoad(ctx, c); err != nil && !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -51,21 +60,20 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
||||
if !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
return DefaultAfterLoad(ctx, c)
|
||||
if err = DefaultAfterLoad(ctx, c); err != nil && !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = fillValues(reflect.ValueOf(src), c.opts.StructTag); err == nil {
|
||||
err = mergo.Merge(dst, src, mopts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.opts.Logger.Errorf(ctx, "default load error: %v", err)
|
||||
if !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
if err != nil && !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := DefaultAfterLoad(ctx, c); err != nil {
|
||||
if err := DefaultAfterLoad(ctx, c); err != nil && !c.opts.AllowFail {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -28,14 +28,18 @@ type Options struct {
|
||||
Name string
|
||||
// StructTag name
|
||||
StructTag string
|
||||
// BeforeSave contains slice of funcs that runs before save
|
||||
// BeforeSave contains slice of funcs that runs before Save
|
||||
BeforeSave []func(context.Context, Config) error
|
||||
// AfterLoad contains slice of funcs that runs after load
|
||||
AfterLoad []func(context.Context, Config) error
|
||||
// BeforeLoad contains slice of funcs that runs before load
|
||||
BeforeLoad []func(context.Context, Config) error
|
||||
// AfterSave contains slice of funcs that runs after save
|
||||
// AfterSave contains slice of funcs that runs after Save
|
||||
AfterSave []func(context.Context, Config) error
|
||||
// BeforeLoad contains slice of funcs that runs before Load
|
||||
BeforeLoad []func(context.Context, Config) error
|
||||
// AfterLoad contains slice of funcs that runs after Load
|
||||
AfterLoad []func(context.Context, Config) error
|
||||
// BeforeInit contains slice of funcs that runs before Init
|
||||
BeforeInit []func(context.Context, Config) error
|
||||
// AfterInit contains slice of funcs that runs after Init
|
||||
AfterInit []func(context.Context, Config) error
|
||||
// AllowFail flag to allow fail in config source
|
||||
AllowFail bool
|
||||
}
|
||||
@@ -131,6 +135,20 @@ func AllowFail(b bool) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// BeforeInit run funcs before config Init
|
||||
func BeforeInit(fn ...func(context.Context, Config) error) Option {
|
||||
return func(o *Options) {
|
||||
o.BeforeInit = fn
|
||||
}
|
||||
}
|
||||
|
||||
// AfterInit run funcs after config Init
|
||||
func AfterInit(fn ...func(context.Context, Config) error) Option {
|
||||
return func(o *Options) {
|
||||
o.AfterInit = fn
|
||||
}
|
||||
}
|
||||
|
||||
// BeforeLoad run funcs before config load
|
||||
func BeforeLoad(fn ...func(context.Context, Config) error) Option {
|
||||
return func(o *Options) {
|
||||
|
5
go.mod
5
go.mod
@@ -1,10 +1,9 @@
|
||||
module go.unistack.org/micro/v3
|
||||
|
||||
go 1.16
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/imdario/mergo v0.3.13
|
||||
github.com/imdario/mergo v0.3.14
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
5
go.sum
5
go.sum
@@ -1,10 +1,9 @@
|
||||
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
|
||||
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
|
||||
github.com/imdario/mergo v0.3.14 h1:fOqeC1+nCuuk6PKQdg9YmosXX7Y7mHX6R/0ZldI9iHo=
|
||||
github.com/imdario/mergo v0.3.14/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
|
||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@@ -39,8 +39,6 @@ func FromOutgoingContext(ctx context.Context) (Metadata, bool) {
|
||||
|
||||
// FromContext returns metadata from the given context
|
||||
// returned metadata shoud not be modified or race condition happens
|
||||
//
|
||||
// Deprecated: use FromIncomingContext or FromOutgoingContext
|
||||
func FromContext(ctx context.Context) (Metadata, bool) {
|
||||
if ctx == nil {
|
||||
return nil, false
|
||||
@@ -53,8 +51,6 @@ func FromContext(ctx context.Context) (Metadata, bool) {
|
||||
}
|
||||
|
||||
// NewContext creates a new context with the given metadata
|
||||
//
|
||||
// Deprecated: use NewIncomingContext or NewOutgoingContext
|
||||
func NewContext(ctx context.Context, md Metadata) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
|
@@ -211,6 +211,7 @@ func Stores(s ...store.Store) Option {
|
||||
}
|
||||
|
||||
// Logger set the logger to use
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
||||
return func(o *Options) error {
|
||||
@@ -329,6 +330,7 @@ func Meters(m ...meter.Meter) Option {
|
||||
|
||||
// Register sets the register for the service
|
||||
// and the underlying components
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func Register(r register.Register, opts ...RegisterOption) Option {
|
||||
return func(o *Options) error {
|
||||
@@ -403,6 +405,7 @@ func RegisterBroker(n string) RegisterOption {
|
||||
}
|
||||
|
||||
// Tracer sets the tracer
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func Tracer(t tracer.Tracer, opts ...TracerOption) Option {
|
||||
return func(o *Options) error {
|
||||
|
31
options/hooks.go
Normal file
31
options/hooks.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package options // import "go.unistack.org/micro/v3/options"
|
||||
|
||||
// Hook func interface
|
||||
type Hook interface{}
|
||||
|
||||
// Hooks func slice
|
||||
type Hooks []Hook
|
||||
|
||||
// Append is used to add hooks
|
||||
func (hs *Hooks) Append(h ...Hook) {
|
||||
*hs = append(*hs, h...)
|
||||
}
|
||||
|
||||
// Replace is used to set hooks
|
||||
func (hs *Hooks) Replace(h ...Hook) {
|
||||
*hs = h
|
||||
}
|
||||
|
||||
// EachNext is used to itearate over hooks forward
|
||||
func (hs *Hooks) EachNext(fn func(Hook)) {
|
||||
for idx := 0; idx < len(*hs); idx++ {
|
||||
fn((*hs)[idx])
|
||||
}
|
||||
}
|
||||
|
||||
// EachPrev is used to iterate over hooks backward
|
||||
func (hs *Hooks) EachPrev(fn func(Hook)) {
|
||||
for idx := len(*hs) - 1; idx >= 0; idx-- {
|
||||
fn((*hs)[idx])
|
||||
}
|
||||
}
|
65
options/hooks_test.go
Normal file
65
options/hooks_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package options
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestHooks_Append(t *testing.T) {
|
||||
fn1 := func() {}
|
||||
fn2 := func() {}
|
||||
hs := &Hooks{}
|
||||
hs.Append(fn1, fn2)
|
||||
if len(*hs) != 2 {
|
||||
t.Fatalf("unexpected Append error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHooks_Replace(t *testing.T) {
|
||||
fn1 := func() {}
|
||||
fn2 := func() {}
|
||||
hs := &Hooks{}
|
||||
hs.Append(fn1, fn2, fn1)
|
||||
if len(*hs) != 3 {
|
||||
t.Fatalf("unexpected Append error")
|
||||
}
|
||||
hs.Replace(fn1, fn2)
|
||||
if len(*hs) != 2 {
|
||||
t.Fatalf("unexpected Replace error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHooks_EachNext(t *testing.T) {
|
||||
n := 5
|
||||
fn1 := func() {
|
||||
n *= 2
|
||||
}
|
||||
fn2 := func() {
|
||||
n -= 10
|
||||
}
|
||||
hs := &Hooks{}
|
||||
hs.Append(fn1, fn2)
|
||||
|
||||
hs.EachNext(func(h Hook) {
|
||||
h.(func())()
|
||||
})
|
||||
if n != 0 {
|
||||
t.Fatalf("unexpected EachNext")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHooks_EachPrev(t *testing.T) {
|
||||
n := 5
|
||||
fn1 := func() {
|
||||
n *= 2
|
||||
}
|
||||
fn2 := func() {
|
||||
n -= 10
|
||||
}
|
||||
hs := &Hooks{}
|
||||
hs.Append(fn2, fn1)
|
||||
|
||||
hs.EachPrev(func(h Hook) {
|
||||
h.(func())()
|
||||
})
|
||||
if n != 0 {
|
||||
t.Fatalf("unexpected EachPrev")
|
||||
}
|
||||
}
|
@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
cx := config.Context
|
||||
|
||||
var sub broker.Subscriber
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
n.registered = true
|
||||
if cacheService {
|
||||
n.rsvc = service
|
||||
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := n.subscribe(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
t := new(time.Ticker)
|
||||
|
||||
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noopServer) subscribe() error {
|
||||
config := n.Options()
|
||||
|
||||
cx := config.Context
|
||||
var err error
|
||||
var sub broker.Subscriber
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noopServer) Stop() error {
|
||||
n.RLock()
|
||||
if !n.started {
|
||||
|
@@ -13,6 +13,7 @@ import (
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/network/transport"
|
||||
"go.unistack.org/micro/v3/options"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
"go.unistack.org/micro/v3/util/id"
|
||||
@@ -83,6 +84,8 @@ type Options struct {
|
||||
MaxConn int
|
||||
// DeregisterAttempts holds the number of deregister attempts before error
|
||||
DeregisterAttempts int
|
||||
// Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper
|
||||
Hooks options.Hooks
|
||||
}
|
||||
|
||||
// NewOptions returns new options struct with default or passed values
|
||||
|
@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
return func(ps broker.Events) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
)
|
||||
|
||||
var _ Tracer = (*noopTracer)(nil)
|
||||
|
||||
type noopTracer struct {
|
||||
opts Options
|
||||
}
|
||||
@@ -21,6 +23,10 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption)
|
||||
return NewSpanContext(ctx, span), span
|
||||
}
|
||||
|
||||
func (t *noopTracer) Flush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *noopTracer) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.opts)
|
||||
|
@@ -16,6 +16,8 @@ type Tracer interface {
|
||||
Init(...Option) error
|
||||
// Start a trace
|
||||
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
||||
// Flush flushes spans
|
||||
Flush(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Span interface {
|
||||
|
@@ -2,7 +2,9 @@ package time
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -13,39 +15,42 @@ func ParseDuration(s string) (time.Duration, error) {
|
||||
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
||||
}
|
||||
|
||||
//var sb strings.Builder
|
||||
/*
|
||||
for i, r := range s {
|
||||
switch r {
|
||||
case 'd':
|
||||
n, err := strconv.Atoi(s[idx:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
s[idx:i] = fmt.Sprintf("%d", n*24)
|
||||
default:
|
||||
sb.WriteRune(r)
|
||||
var p int
|
||||
var hours int
|
||||
loop:
|
||||
for i, r := range s {
|
||||
switch r {
|
||||
case 's', 'm':
|
||||
break loop
|
||||
case 'h':
|
||||
d, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
}
|
||||
*/
|
||||
var td time.Duration
|
||||
var err error
|
||||
switch s[len(s)-1] {
|
||||
case 's', 'm', 'h':
|
||||
td, err = time.ParseDuration(s)
|
||||
case 'd':
|
||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
||||
td *= 24
|
||||
}
|
||||
case 'y':
|
||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
||||
year := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.Local)
|
||||
days := year.YearDay()
|
||||
td *= 24 * time.Duration(days)
|
||||
hours += d
|
||||
p = i + 1
|
||||
case 'd':
|
||||
d, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
hours += d * 24
|
||||
p = i + 1
|
||||
case 'y':
|
||||
n, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
var d int
|
||||
for j := n - 1; j >= 0; j-- {
|
||||
d += time.Date(time.Now().Year()+j, time.December, 31, 0, 0, 0, 0, time.Local).YearDay()
|
||||
}
|
||||
hours += d * 24
|
||||
p = i + 1
|
||||
}
|
||||
}
|
||||
|
||||
return td, err
|
||||
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
|
||||
}
|
||||
|
||||
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||
@@ -62,7 +67,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
|
||||
*d = Duration(time.Duration(value))
|
||||
return nil
|
||||
case string:
|
||||
dv, err := time.ParseDuration(value)
|
||||
dv, err := ParseDuration(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -23,27 +23,34 @@ func TestUnmarshalJSON(t *testing.T) {
|
||||
TTL Duration `json:"ttl"`
|
||||
}
|
||||
v := &str{}
|
||||
var err error
|
||||
|
||||
err := json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||
err = json.Unmarshal([]byte(`{"ttl":"10ms"}`), v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if v.TTL != 10000000 {
|
||||
t.Fatalf("invalid duration %v != 10000000", v.TTL)
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if v.TTL != 31536000000000000 {
|
||||
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDuration(t *testing.T) {
|
||||
var td time.Duration
|
||||
var err error
|
||||
t.Skip()
|
||||
|
||||
td, err = ParseDuration("14d4h")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
}
|
||||
if td.String() != "336h0m0s" {
|
||||
t.Fatalf("ParseDuration 14d != 336h0m0s : %s", td.String())
|
||||
if td.String() != "340h0m0s" {
|
||||
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
|
||||
}
|
||||
|
||||
td, err = ParseDuration("1y")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
|
Reference in New Issue
Block a user