Compare commits
2 Commits
v4.1.8
...
c5e6d4cddc
Author | SHA1 | Date | |
---|---|---|---|
c5e6d4cddc | |||
c9066e0455 |
@@ -3,9 +3,6 @@ name: coverage
|
||||
on:
|
||||
push:
|
||||
branches: [ main, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
pull_request:
|
||||
branches: [ main, v3, v4 ]
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
@@ -25,7 +22,7 @@ jobs:
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
cache-dependency-path: "**/*.sum"
|
||||
go-version: 'stable'
|
||||
go-version: 'stable'
|
||||
|
||||
- name: test coverage
|
||||
run: |
|
||||
@@ -42,8 +39,8 @@ jobs:
|
||||
name: autocommit
|
||||
with:
|
||||
commit_message: Apply Code Coverage Badge
|
||||
skip_fetch: false
|
||||
skip_checkout: false
|
||||
skip_fetch: true
|
||||
skip_checkout: true
|
||||
file_pattern: ./README.md
|
||||
|
||||
- name: push
|
||||
@@ -51,4 +48,4 @@ jobs:
|
||||
uses: ad-m/github-push-action@master
|
||||
with:
|
||||
github_token: ${{ github.token }}
|
||||
branch: ${{ github.ref }}
|
||||
branch: ${{ github.ref }}
|
@@ -3,10 +3,10 @@ name: lint
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
@@ -24,6 +24,6 @@ jobs:
|
||||
- name: setup deps
|
||||
run: go get -v ./...
|
||||
- name: run lint
|
||||
uses: golangci/golangci-lint-action@v6
|
||||
uses: https://github.com/golangci/golangci-lint-action@v6
|
||||
with:
|
||||
version: 'latest'
|
@@ -3,12 +3,15 @@ name: test
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches: [ master, v3, v4 ]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
push:
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
test:
|
@@ -3,12 +3,15 @@ name: test
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches: [ master, v3, v4 ]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
push:
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
test:
|
||||
@@ -32,19 +35,19 @@ jobs:
|
||||
go-version: 'stable'
|
||||
- name: setup go work
|
||||
env:
|
||||
GOWORK: ${{ github.workspace }}/go.work
|
||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
||||
run: |
|
||||
go work init
|
||||
go work use .
|
||||
go work use micro-tests
|
||||
- name: setup deps
|
||||
env:
|
||||
GOWORK: ${{ github.workspace }}/go.work
|
||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
||||
run: go get -v ./...
|
||||
- name: run tests
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
GOWORK: ${{ github.workspace }}/go.work
|
||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
||||
run: |
|
||||
cd micro-tests
|
||||
go test -mod readonly -v ./... || true
|
57
.github/workflows/job_sync.yml
vendored
57
.github/workflows/job_sync.yml
vendored
@@ -1,57 +0,0 @@
|
||||
name: sync
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '*/5 * * * *'
|
||||
push:
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
sync:
|
||||
if: github.server_url != 'https://github.com'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: init
|
||||
run: |
|
||||
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
|
||||
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
|
||||
|
||||
- name: sync master
|
||||
run: |
|
||||
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
git remote add --no-tags --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
||||
git pull --rebase upstream master
|
||||
git push upstream master --progress
|
||||
git push origin master --progress
|
||||
cd ../
|
||||
rm -rf repo
|
||||
|
||||
- name: sync v3
|
||||
run: |
|
||||
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||
git pull --rebase upstream v3
|
||||
git push upstream v3
|
||||
git push origin v3 --progress
|
||||
cd ../
|
||||
rm -rf repo
|
||||
|
||||
- name: sync v4
|
||||
run: |
|
||||
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||
git pull --rebase upstream v4
|
||||
git push upstream v4
|
||||
git push origin v4 --progress
|
||||
cd ../
|
||||
rm -rf repo
|
@@ -1,5 +1,5 @@
|
||||
run:
|
||||
concurrency: 8
|
||||
timeout: 5m
|
||||
deadline: 5m
|
||||
issues-exit-code: 1
|
||||
tests: true
|
||||
|
@@ -1,9 +1,9 @@
|
||||
# Micro
|
||||

|
||||

|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
||||
[](https://goreportcard.com/report/go.unistack.org/micro/v4)
|
||||
[](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
|
||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
|
||||
[](https://goreportcard.com/report/go.unistack.org/micro/v3)
|
||||
|
||||
Micro is a standard library for microservices.
|
||||
|
||||
|
15
SECURITY.md
Normal file
15
SECURITY.md
Normal file
@@ -0,0 +1,15 @@
|
||||
# Security Policy
|
||||
|
||||
## Supported Versions
|
||||
|
||||
Use this section to tell people about which versions of your project are
|
||||
currently being supported with security updates.
|
||||
|
||||
| Version | Supported |
|
||||
| ------- | ------------------ |
|
||||
| 3.7.x | :white_check_mark: |
|
||||
| < 3.7.0 | :x: |
|
||||
|
||||
## Reporting a Vulnerability
|
||||
|
||||
If you find any issue, please create github issue in this repo
|
@@ -21,7 +21,7 @@ var (
|
||||
// ErrInvalidMessage returns when invalid Message passed
|
||||
ErrInvalidMessage = errors.New("invalid message")
|
||||
// ErrInvalidHandler returns when subscriber passed to Subscribe
|
||||
ErrInvalidHandler = errors.New("invalid handler, ony func(Message) error and func([]Message) error supported")
|
||||
ErrInvalidHandler = errors.New("invalid handler")
|
||||
// DefaultGracefulTimeout
|
||||
DefaultGracefulTimeout = 5 * time.Second
|
||||
)
|
||||
|
@@ -42,16 +42,6 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SetPublishOption returns a function to setup a context with given value
|
||||
func SetPublishOption(k, v interface{}) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// SetOption returns a function to setup a context with given value
|
||||
func SetOption(k, v interface{}) Option {
|
||||
return func(o *Options) {
|
||||
|
@@ -210,15 +210,8 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
||||
}
|
||||
case func(broker.Message) error:
|
||||
for _, message := range messages {
|
||||
msg, ok := message.(*memoryMessage)
|
||||
if !ok {
|
||||
if b.opts.Logger.V(logger.ErrorLevel) {
|
||||
b.opts.Logger.Error(ctx, "broker handler error", broker.ErrInvalidMessage)
|
||||
}
|
||||
}
|
||||
msg.topic = topic
|
||||
if err = s(msg); err == nil && sub.opts.AutoAck {
|
||||
err = msg.Ack()
|
||||
if err = s(message); err == nil && sub.opts.AutoAck {
|
||||
err = message.Ack()
|
||||
}
|
||||
if err != nil {
|
||||
if b.opts.Logger.V(logger.ErrorLevel) {
|
||||
|
@@ -79,15 +79,11 @@ type PublishOptions struct {
|
||||
// BodyOnly flag says the message contains raw body bytes and don't need
|
||||
// codec Marshal method
|
||||
BodyOnly bool
|
||||
// Context holds custom options
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// NewPublishOptions creates PublishOptions struct
|
||||
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||
options := PublishOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
options := PublishOptions{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
@@ -1,14 +1,87 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
const (
|
||||
messageSig = "func(broker.Message) error"
|
||||
messagesSig = "func([]broker.Message) error"
|
||||
)
|
||||
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
// because Typeof takes an empty interface value. This is annoying.
|
||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||
|
||||
// Is this an exported - upper case - name?
|
||||
func isExported(name string) bool {
|
||||
r, _ := utf8.DecodeRuneInString(name)
|
||||
return unicode.IsUpper(r)
|
||||
}
|
||||
|
||||
// Is this type exported or a builtin?
|
||||
func isExportedOrBuiltinType(t reflect.Type) bool {
|
||||
for t.Kind() == reflect.Ptr {
|
||||
t = t.Elem()
|
||||
}
|
||||
// PkgPath will be non-empty even for an exported type,
|
||||
// so we need to check the type name as well.
|
||||
return isExported(t.Name()) || t.PkgPath() == ""
|
||||
}
|
||||
|
||||
// IsValidHandler func signature
|
||||
func IsValidHandler(sub interface{}) error {
|
||||
switch sub.(type) {
|
||||
typ := reflect.TypeOf(sub)
|
||||
var argType reflect.Type
|
||||
switch typ.Kind() {
|
||||
case reflect.Func:
|
||||
name := "Func"
|
||||
switch typ.NumIn() {
|
||||
case 1:
|
||||
argType = typ.In(0)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
|
||||
}
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if typ.NumOut() != 1 {
|
||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
|
||||
name, typ.NumOut(), messageSig)
|
||||
}
|
||||
if returnType := typ.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||
}
|
||||
default:
|
||||
return ErrInvalidHandler
|
||||
case func(Message) error:
|
||||
break
|
||||
case func([]Message) error:
|
||||
break
|
||||
hdlr := reflect.ValueOf(sub)
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
method := typ.Method(m)
|
||||
switch method.Type.NumIn() {
|
||||
case 3:
|
||||
argType = method.Type.In(2)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
||||
name, method.Name, method.Type.NumIn(), messageSig)
|
||||
}
|
||||
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("%v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if method.Type.NumOut() != 1 {
|
||||
return fmt.Errorf(
|
||||
"subscriber %v.%v has wrong number of return values: %v require signature %s",
|
||||
name, method.Name, method.Type.NumOut(), messageSig)
|
||||
}
|
||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
154
changes
Normal file
154
changes
Normal file
@@ -0,0 +1,154 @@
|
||||
broker/broker.go
|
||||
broker/context.go
|
||||
broker/context_test.go
|
||||
broker/memory.go
|
||||
broker/memory/memory.go
|
||||
broker/memory_test.go
|
||||
broker/noop.go
|
||||
broker/noop_test.go
|
||||
broker/options.go
|
||||
broker/subscriber.go
|
||||
client/backoff.go
|
||||
client/backoff_test.go
|
||||
client/client.go
|
||||
client/client_call_options.go
|
||||
client/client_call_options_test.go
|
||||
client/context.go
|
||||
client/context_test.go
|
||||
client/noop.go
|
||||
client/noop_test.go
|
||||
client/options.go
|
||||
codec/codec.go
|
||||
codec/context.go
|
||||
codec/frame.go
|
||||
codec/frame.proto
|
||||
codec/options.go
|
||||
config/config.go
|
||||
config/context.go
|
||||
config/context_test.go
|
||||
config/default.go
|
||||
config/default_test.go
|
||||
config/options.go
|
||||
database/dsn.go
|
||||
errors/errors.go
|
||||
errors/errors.proto
|
||||
errors/errors_test.go
|
||||
flow/context.go
|
||||
flow/context_test.go
|
||||
flow/default.go
|
||||
flow/flow.go
|
||||
flow/flow_test.go
|
||||
flow/options.go
|
||||
meter/context.go
|
||||
meter/context_test.go
|
||||
meter/meter.go
|
||||
meter/noop.go
|
||||
meter/options.go
|
||||
meter/wrapper/wrapper.go
|
||||
micro_test.go
|
||||
mtls/mtls.go
|
||||
mtls/options.go
|
||||
options.go
|
||||
options/hooks.go
|
||||
options/options.go
|
||||
options/options_test.go
|
||||
profiler/http/http.go
|
||||
profiler/noop.go
|
||||
profiler/pprof/pprof.go
|
||||
profiler/profile.go
|
||||
proxy/options.go
|
||||
proxy/proxy.go
|
||||
register/context.go
|
||||
register/extractor.go
|
||||
register/extractor_test.go
|
||||
register/memory/memory.go
|
||||
register/memory/memory_test.go
|
||||
register/options.go
|
||||
register/register.go
|
||||
register/watcher.go
|
||||
resolver/dns/dns.go
|
||||
resolver/dnssrv/dnssrv.go
|
||||
resolver/http/http.go
|
||||
resolver/noop/noop.go
|
||||
resolver/registry/registry.go
|
||||
resolver/resolver.go
|
||||
resolver/static/static.go
|
||||
router/context.go
|
||||
router/options.go
|
||||
router/router.go
|
||||
selector/random/random.go
|
||||
selector/roundrobin/roundrobin.go
|
||||
selector/selector.go
|
||||
semconv/broker.go
|
||||
semconv/cache.go
|
||||
semconv/client.go
|
||||
semconv/logger.go
|
||||
semconv/metadata.go
|
||||
semconv/pool.go
|
||||
semconv/server.go
|
||||
semconv/store.go
|
||||
server/context.go
|
||||
server/context_test.go
|
||||
server/noop.go
|
||||
server/noop_test.go
|
||||
server/options.go
|
||||
server/registry.go
|
||||
server/server.go
|
||||
server/wrapper.go
|
||||
service.go
|
||||
service_test.go
|
||||
store/context.go
|
||||
store/context_test.go
|
||||
store/memory.go
|
||||
store/memory/memory.go
|
||||
store/memory_test.go
|
||||
store/options.go
|
||||
store/store.go
|
||||
store/wrapper.go
|
||||
sync/memory.go
|
||||
sync/sync.go
|
||||
tools.go
|
||||
tracer/context.go
|
||||
tracer/memory/memory.go
|
||||
tracer/memory/memory_test.go
|
||||
tracer/noop.go
|
||||
tracer/options.go
|
||||
tracer/tracer.go
|
||||
tracer/tracer_test.go
|
||||
tracer/wrapper/wrapper.go
|
||||
util/addr/addr.go
|
||||
util/backoff/backoff.go
|
||||
util/buf/buf.go
|
||||
util/buffer/buffer.go
|
||||
util/buffer/buffer_test.go
|
||||
util/ctx/ctx.go
|
||||
util/dns/cache.go
|
||||
util/dns/cache_test.go
|
||||
util/dns/conn.go
|
||||
util/grpc/tracer.go
|
||||
util/http/clienttracer.go
|
||||
util/http/http.go
|
||||
util/http/trie.go
|
||||
util/http/trie_test.go
|
||||
util/io/redirect.go
|
||||
util/io/redirect_test.go
|
||||
util/io/redirect_unix.go
|
||||
util/io/redirect_windows.go
|
||||
util/net/net.go
|
||||
util/pki/pki.go
|
||||
util/rand/rand.go
|
||||
util/reflect/path.go
|
||||
util/reflect/reflect.go
|
||||
util/reflect/reflect_test.go
|
||||
util/reflect/struct.go
|
||||
util/reflect/struct_test.go
|
||||
util/register/util.go
|
||||
util/ring/buffer.go
|
||||
util/sort/sort.go
|
||||
util/sort/sort_test.go
|
||||
util/stream/stream.go
|
||||
util/structfs/metadata_digitalocean.go
|
||||
util/structfs/metadata_ec2.go
|
||||
util/structfs/structfs.go
|
||||
util/structfs/structfs_test.go
|
||||
util/test/test.go
|
@@ -17,7 +17,7 @@ syntax = "proto3";
|
||||
package micro.errors;
|
||||
|
||||
option cc_enable_arenas = true;
|
||||
option go_package = "go.unistack.org/micro/v4/errors;errors";
|
||||
option go_package = "go.unistack.org/micro/v3/errors;errors";
|
||||
option java_multiple_files = true;
|
||||
option java_outer_classname = "MicroErrors";
|
||||
option java_package = "micro.errors";
|
||||
|
@@ -15,6 +15,15 @@ func FromContext(ctx context.Context) (Flow, bool) {
|
||||
return c, ok
|
||||
}
|
||||
|
||||
// MustContext returns Flow from context
|
||||
func MustContext(ctx context.Context) Flow {
|
||||
f, ok := FromContext(ctx)
|
||||
if !ok {
|
||||
panic("missing flow")
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// NewContext stores Flow to context
|
||||
func NewContext(ctx context.Context, f Flow) context.Context {
|
||||
if ctx == nil {
|
||||
|
@@ -1,5 +1,3 @@
|
||||
//go:build ignore
|
||||
|
||||
package flow
|
||||
|
||||
import (
|
||||
|
472
flow/default.go
472
flow/default.go
@@ -1,16 +1,14 @@
|
||||
//go:build ignore
|
||||
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/heimdalr/dag"
|
||||
"github.com/silas/dag"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/store"
|
||||
"go.unistack.org/micro/v4/util/id"
|
||||
@@ -22,7 +20,7 @@ type microFlow struct {
|
||||
|
||||
type microWorkflow struct {
|
||||
opts Options
|
||||
g *dag.DAG
|
||||
g *dag.AcyclicGraph
|
||||
steps map[string]Step
|
||||
id string
|
||||
status Status
|
||||
@@ -34,20 +32,20 @@ func (w *microWorkflow) ID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Steps() ([][]Step, error) {
|
||||
return w.getSteps("", false)
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Status() Status {
|
||||
return w.status
|
||||
}
|
||||
|
||||
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||
var err error
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
for _, s := range steps {
|
||||
w.steps[s.String()] = s
|
||||
if _, err = w.g.AddVertex(s); err != nil {
|
||||
return err
|
||||
}
|
||||
w.g.Add(s)
|
||||
}
|
||||
|
||||
for _, dst := range steps {
|
||||
@@ -56,13 +54,18 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||
if !ok {
|
||||
return ErrStepNotExists
|
||||
}
|
||||
if err = w.g.AddEdge(src.String(), dst.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
w.g.Connect(dag.BasicEdge(src, dst))
|
||||
}
|
||||
}
|
||||
|
||||
w.g.ReduceTransitively()
|
||||
if err := w.g.Validate(); err != nil {
|
||||
w.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
w.g.TransitiveReduction()
|
||||
|
||||
w.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -71,11 +74,10 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||
// TODO: handle case when some step requires or required by removed step
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
for _, s := range steps {
|
||||
delete(w.steps, s.String())
|
||||
w.g.DeleteVertex(s.String())
|
||||
w.g.Remove(s)
|
||||
}
|
||||
|
||||
for _, dst := range steps {
|
||||
@@ -84,34 +86,91 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||
if !ok {
|
||||
return ErrStepNotExists
|
||||
}
|
||||
w.g.AddEdge(src.String(), dst.String())
|
||||
w.g.Connect(dag.BasicEdge(src, dst))
|
||||
}
|
||||
}
|
||||
|
||||
w.g.ReduceTransitively()
|
||||
if err := w.g.Validate(); err != nil {
|
||||
w.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
w.g.TransitiveReduction()
|
||||
|
||||
w.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
|
||||
var steps [][]Step
|
||||
var root dag.Vertex
|
||||
var err error
|
||||
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]Step, 1)
|
||||
steps[0] = make([]Step, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]Step, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]Step, 0, 1)
|
||||
}
|
||||
steps[idx] = append(steps[idx], n.(Step))
|
||||
return nil
|
||||
}
|
||||
|
||||
if start != "" {
|
||||
var ok bool
|
||||
w.RLock()
|
||||
root, ok = w.steps[start]
|
||||
w.RUnlock()
|
||||
if !ok {
|
||||
return nil, ErrStepNotExists
|
||||
}
|
||||
} else {
|
||||
root, err = w.g.Root()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if reverse {
|
||||
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
} else {
|
||||
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return steps, nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Resume(ctx context.Context, id string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
|
||||
w.Lock()
|
||||
if !w.init {
|
||||
w.g.ReduceTransitively()
|
||||
if err := w.g.Validate(); err != nil {
|
||||
w.Unlock()
|
||||
return "", err
|
||||
}
|
||||
w.g.TransitiveReduction()
|
||||
w.init = true
|
||||
}
|
||||
w.Unlock()
|
||||
@@ -121,11 +180,26 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
||||
return "", err
|
||||
}
|
||||
|
||||
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid)
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid)
|
||||
|
||||
options := NewExecuteOptions(opts...)
|
||||
|
||||
steps, err := w.getSteps(options.Start, options.Reverse)
|
||||
if err != nil {
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
cherr := make(chan error, 1)
|
||||
chstatus := make(chan Status, 1)
|
||||
|
||||
nctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
||||
|
||||
nopts = append(nopts,
|
||||
@@ -135,274 +209,143 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
||||
ExecuteMeter(w.opts.Meter),
|
||||
)
|
||||
nopts = append(nopts, opts...)
|
||||
done := make(chan struct{})
|
||||
|
||||
if werr := workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
w.opts.Logger.Error(ctx, "store error: %v", werr)
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
|
||||
return eid, werr
|
||||
}
|
||||
|
||||
var startID string
|
||||
if options.Start == "" {
|
||||
mp := w.g.GetRoots()
|
||||
if len(mp) != 1 {
|
||||
return eid, ErrStepNotExists
|
||||
}
|
||||
for k := range mp {
|
||||
startID = k
|
||||
}
|
||||
} else {
|
||||
for k, v := range w.g.GetVertices() {
|
||||
if v == options.Start {
|
||||
startID = k
|
||||
for idx := range steps {
|
||||
for nidx := range steps[idx] {
|
||||
cstep := steps[idx][nidx]
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||
return eid, werr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if startID == "" {
|
||||
return eid, ErrStepNotExists
|
||||
}
|
||||
|
||||
if options.Async {
|
||||
go w.handleWorkflow(startID, nopts...)
|
||||
return eid, nil
|
||||
}
|
||||
|
||||
return eid, w.handleWorkflow(startID, nopts...)
|
||||
}
|
||||
|
||||
func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
||||
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||
// workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
|
||||
// Get IDs of all descendant vertices.
|
||||
flowIDs, errDes := w.g.GetDescendants(startID)
|
||||
if errDes != nil {
|
||||
return errDes
|
||||
}
|
||||
|
||||
// inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex).
|
||||
inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1)
|
||||
|
||||
// Iterate vertex IDs and create an input channel for each of them and a single
|
||||
// output channel for leaves. Note, this "pre-flight" is needed to ensure we
|
||||
// really have an input channel regardless of how we traverse the tree and spawn
|
||||
// workers.
|
||||
leafCount := 0
|
||||
|
||||
for id := range flowIDs {
|
||||
|
||||
// Get all parents of this vertex.
|
||||
parents, errPar := w.g.GetParents(id)
|
||||
if errPar != nil {
|
||||
return errPar
|
||||
}
|
||||
|
||||
// Create a buffered input channel that has capacity for all parent results.
|
||||
inputChannels[id] = make(chan FlowResult, len(parents))
|
||||
|
||||
if ok, err := w.g.IsLeaf(id); ok && err == nil {
|
||||
leafCount += 1
|
||||
}
|
||||
}
|
||||
|
||||
// outputChannel caries the results of leaf vertices.
|
||||
outputChannel := make(chan FlowResult, leafCount)
|
||||
|
||||
// To also process the start vertex and to have its results being passed to its
|
||||
// children, add it to the vertex IDs. Also add an input channel for the start
|
||||
// vertex and feed the inputs to this channel.
|
||||
flowIDs[startID] = struct{}{}
|
||||
inputChannels[startID] = make(chan FlowResult, len(inputs))
|
||||
for _, i := range inputs {
|
||||
inputChannels[startID] <- i
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
// Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl.
|
||||
// inputs and outputs) in a separate goroutine.
|
||||
for id := range flowIDs {
|
||||
|
||||
// Get all children of this vertex that later need to be notified. Note, we
|
||||
// collect all children before the goroutine to be able to release the read
|
||||
// lock as early as possible.
|
||||
children, errChildren := w.g.GetChildren(id)
|
||||
if errChildren != nil {
|
||||
return errChildren
|
||||
}
|
||||
|
||||
// Remember to wait for this goroutine.
|
||||
wg.Add(1)
|
||||
|
||||
go func(id string) {
|
||||
// Get this vertex's input channel.
|
||||
// Note, only concurrent read here, which is fine.
|
||||
c := inputChannels[id]
|
||||
|
||||
// Await all parent inputs and stuff them into a slice.
|
||||
parentCount := cap(c)
|
||||
parentResults := make([]FlowResult, parentCount)
|
||||
for i := 0; i < parentCount; i++ {
|
||||
parentResults[i] = <-c
|
||||
}
|
||||
|
||||
// Execute the worker.
|
||||
errWorker := callback(w.g, id, parentResults)
|
||||
if errWorker != nil {
|
||||
return errWorker
|
||||
}
|
||||
|
||||
// Send this worker's FlowResult onto all children's input channels or, if it is
|
||||
// a leaf (i.e. no children), send the result onto the output channel.
|
||||
if len(children) > 0 {
|
||||
for child := range children {
|
||||
inputChannels[child] <- flowResult
|
||||
go func() {
|
||||
for idx := range steps {
|
||||
for nidx := range steps[idx] {
|
||||
wStatus := &codec.Frame{}
|
||||
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
} else {
|
||||
outputChannel <- flowResult
|
||||
}
|
||||
|
||||
// "Sign off".
|
||||
wg.Done()
|
||||
}(id)
|
||||
}
|
||||
|
||||
// Wait for all go routines to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Await all leaf vertex results and stuff them into a slice.
|
||||
resultCount := cap(outputChannel)
|
||||
results := make([]FlowResult, resultCount)
|
||||
for i := 0; i < resultCount; i++ {
|
||||
results[i] = <-outputChannel
|
||||
}
|
||||
|
||||
/*
|
||||
go func() {
|
||||
for idx := range steps {
|
||||
for nidx := range steps[idx] {
|
||||
wStatus := &codec.Frame{}
|
||||
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
|
||||
chstatus <- status
|
||||
return
|
||||
}
|
||||
if w.opts.Logger.V(logger.TraceLevel) {
|
||||
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
|
||||
}
|
||||
cstep := steps[idx][nidx]
|
||||
// nolint: nestif
|
||||
if len(cstep.Requires()) == 0 {
|
||||
wg.Add(1)
|
||||
go func(step Step) {
|
||||
defer wg.Done()
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
rsp, serr := step.Execute(nctx, req, nopts...)
|
||||
if serr != nil {
|
||||
step.SetStatus(StatusFailure)
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
}
|
||||
cherr <- serr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
}(cstep)
|
||||
wg.Wait()
|
||||
} else {
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
|
||||
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
|
||||
chstatus <- status
|
||||
return
|
||||
}
|
||||
if w.opts.Logger.V(logger.TraceLevel) {
|
||||
w.opts.Logger.Trace(nctx, fmt.Sprintf("will be executed %v", steps[idx][nidx]))
|
||||
}
|
||||
cstep := steps[idx][nidx]
|
||||
// nolint: nestif
|
||||
if len(cstep.Requires()) == 0 {
|
||||
wg.Add(1)
|
||||
go func(step Step) {
|
||||
defer wg.Done()
|
||||
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
||||
rsp, serr := step.Execute(nctx, req, nopts...)
|
||||
if serr != nil {
|
||||
cstep.SetStatus(StatusFailure)
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
step.SetStatus(StatusFailure)
|
||||
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
}
|
||||
cherr <- serr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
}(cstep)
|
||||
wg.Wait()
|
||||
} else {
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
||||
if serr != nil {
|
||||
cstep.SetStatus(StatusFailure)
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
}
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
}
|
||||
cherr <- serr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
|
||||
w.opts.Logger.Error(ctx, "store write error", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
if options.Async {
|
||||
return eid, nil
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
logger.Tracef(ctx, "wait for finish or error")
|
||||
select {
|
||||
case <-nctx.Done():
|
||||
err = nctx.Err()
|
||||
case cerr := <-cherr:
|
||||
err = cerr
|
||||
case <-done:
|
||||
close(cherr)
|
||||
case <-chstatus:
|
||||
close(chstatus)
|
||||
return eid, nil
|
||||
}
|
||||
if options.Async {
|
||||
return eid, nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case nctx.Err() != nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
case err == nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
case err != nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
logger.DefaultLogger.Trace(ctx, "wait for finish or error")
|
||||
select {
|
||||
case <-nctx.Done():
|
||||
err = nctx.Err()
|
||||
case cerr := <-cherr:
|
||||
err = cerr
|
||||
case <-done:
|
||||
close(cherr)
|
||||
case <-chstatus:
|
||||
close(chstatus)
|
||||
return eid, nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case nctx.Err() != nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
|
||||
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
|
||||
}
|
||||
*/
|
||||
return err
|
||||
case err == nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
|
||||
}
|
||||
case err != nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
|
||||
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
|
||||
}
|
||||
}
|
||||
|
||||
return eid, err
|
||||
}
|
||||
|
||||
// NewFlow create new flow
|
||||
@@ -442,11 +385,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
|
||||
}
|
||||
|
||||
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
|
||||
w := µWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))}
|
||||
w := µWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
|
||||
|
||||
for _, s := range steps {
|
||||
w.steps[s.String()] = s
|
||||
w.g.AddVertex(s)
|
||||
w.g.Add(s)
|
||||
}
|
||||
|
||||
for _, dst := range steps {
|
||||
@@ -455,11 +398,14 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step
|
||||
if !ok {
|
||||
return nil, ErrStepNotExists
|
||||
}
|
||||
w.g.AddEdge(src.String(), dst.String())
|
||||
w.g.Connect(dag.BasicEdge(src, dst))
|
||||
}
|
||||
}
|
||||
|
||||
w.g.ReduceTransitively()
|
||||
if err := w.g.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w.g.TransitiveReduction()
|
||||
|
||||
w.init = true
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package flow is an interface used for saga pattern microservice workflow
|
||||
package flow // import "go.unistack.org/micro/v4/flow"
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -125,6 +125,8 @@ type Workflow interface {
|
||||
AppendSteps(steps ...Step) error
|
||||
// Status returns workflow status
|
||||
Status() Status
|
||||
// Steps returns steps slice where parallel steps returned on the same level
|
||||
Steps() ([][]Step, error)
|
||||
// Suspend suspends execution
|
||||
Suspend(ctx context.Context, id string) error
|
||||
// Resume resumes execution
|
||||
|
@@ -123,6 +123,8 @@ type ExecuteOptions struct {
|
||||
Start string
|
||||
// Timeout for execution
|
||||
Timeout time.Duration
|
||||
// Reverse execution
|
||||
Reverse bool
|
||||
// Async enables async execution
|
||||
Async bool
|
||||
}
|
||||
@@ -165,6 +167,13 @@ func ExecuteContext(ctx context.Context) ExecuteOption {
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteReverse says that dag must be run in reverse order
|
||||
func ExecuteReverse(b bool) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Reverse = b
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteTimeout pass timeout time.Duration for execution
|
||||
func ExecuteTimeout(td time.Duration) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
|
5
go.mod
5
go.mod
@@ -11,9 +11,6 @@ require (
|
||||
github.com/matoous/go-nanoid v1.5.1
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
|
||||
github.com/spf13/cast v1.7.1
|
||||
github.com/stretchr/testify v1.10.0
|
||||
go.uber.org/atomic v1.11.0
|
||||
go.uber.org/automaxprocs v1.6.0
|
||||
go.unistack.org/micro-proto/v4 v4.1.0
|
||||
golang.org/x/sync v0.10.0
|
||||
@@ -24,9 +21,11 @@ require (
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/rogpeppe/go-internal v1.13.1 // indirect
|
||||
github.com/stretchr/testify v1.10.0 // indirect
|
||||
golang.org/x/net v0.34.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
|
||||
|
9
go.sum
9
go.sum
@@ -6,10 +6,9 @@ github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlD
|
||||
github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
|
||||
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
||||
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
@@ -30,20 +29,18 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
|
||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
||||
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
|
||||
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
|
||||
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
|
||||
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
|
||||
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
||||
|
@@ -1,117 +0,0 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
)
|
||||
|
||||
type wrapper struct {
|
||||
keys []string
|
||||
|
||||
client.Client
|
||||
}
|
||||
|
||||
func NewClientWrapper(keys ...string) client.Wrapper {
|
||||
return func(c client.Client) client.Client {
|
||||
handler := &wrapper{
|
||||
Client: c,
|
||||
keys: keys,
|
||||
}
|
||||
return handler
|
||||
}
|
||||
}
|
||||
|
||||
func NewClientCallWrapper(keys ...string) client.CallWrapper {
|
||||
return func(fn client.CallFunc) client.CallFunc {
|
||||
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
if keys == nil {
|
||||
return fn(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range keys {
|
||||
if v := imd.Get(k); v != nil {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return fn(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
if w.keys == nil {
|
||||
return w.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range w.keys {
|
||||
if v := imd.Get(k); v != nil {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return w.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
|
||||
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
if w.keys == nil {
|
||||
return w.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range w.keys {
|
||||
if v := imd.Get(k); v != nil {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return w.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
|
||||
func NewServerHandlerWrapper(keys ...string) server.HandlerWrapper {
|
||||
return func(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
if keys == nil {
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range keys {
|
||||
if v := imd.Get(k); v != nil {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,63 +0,0 @@
|
||||
package recovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
)
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
ServerHandlerFn: DefaultServerHandlerFn,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerHandlerFn = fn
|
||||
}
|
||||
}
|
||||
|
||||
var DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
|
||||
return errors.BadRequest("", "%v", err)
|
||||
}
|
||||
|
||||
var Hook = NewHook()
|
||||
|
||||
type hook struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func NewHook(opts ...Option) *hook {
|
||||
return &hook{opts: NewOptions(opts...)}
|
||||
}
|
||||
|
||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
switch verr := r.(type) {
|
||||
case nil:
|
||||
return
|
||||
case error:
|
||||
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
|
||||
default:
|
||||
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
|
||||
}
|
||||
}()
|
||||
err = next(ctx, req, rsp)
|
||||
return err
|
||||
}
|
||||
}
|
@@ -1,112 +0,0 @@
|
||||
package requestid
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/textproto"
|
||||
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
"go.unistack.org/micro/v4/util/id"
|
||||
)
|
||||
|
||||
type XRequestIDKey struct{}
|
||||
|
||||
// DefaultMetadataKey contains metadata key
|
||||
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
|
||||
|
||||
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
|
||||
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
|
||||
var xid string
|
||||
|
||||
cid, cok := ctx.Value(XRequestIDKey{}).(string)
|
||||
if cok && cid != "" {
|
||||
xid = cid
|
||||
}
|
||||
|
||||
imd, iok := metadata.FromIncomingContext(ctx)
|
||||
if !iok || imd == nil {
|
||||
imd = metadata.New(1)
|
||||
ctx = metadata.NewIncomingContext(ctx, imd)
|
||||
}
|
||||
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(1)
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
|
||||
if xid == "" {
|
||||
var ids []string
|
||||
|
||||
for i := range imd.Get(DefaultMetadataKey) {
|
||||
if ids[i] != "" {
|
||||
xid = ids[i]
|
||||
}
|
||||
}
|
||||
|
||||
for i := range omd.Get(DefaultMetadataKey) {
|
||||
if ids[i] != "" {
|
||||
xid = ids[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if xid == "" {
|
||||
var err error
|
||||
xid, err = id.New()
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
}
|
||||
|
||||
if !cok {
|
||||
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
|
||||
}
|
||||
|
||||
if !iok {
|
||||
imd.Set(DefaultMetadataKey, xid)
|
||||
}
|
||||
|
||||
if !ook {
|
||||
omd.Set(DefaultMetadataKey, xid)
|
||||
}
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
type hook struct{}
|
||||
|
||||
func NewHook() *hook {
|
||||
return &hook{}
|
||||
}
|
||||
|
||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
var err error
|
||||
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return next(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
var err error
|
||||
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return next(ctx, req, rsp, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
var err error
|
||||
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return next(ctx, req, opts...)
|
||||
}
|
||||
}
|
@@ -1,34 +0,0 @@
|
||||
package requestid
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
)
|
||||
|
||||
func TestDefaultMetadataFunc(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
nctx, err := DefaultMetadataFunc(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
imd, ok := metadata.FromIncomingContext(nctx)
|
||||
if !ok {
|
||||
t.Fatalf("md missing in incoming context")
|
||||
}
|
||||
omd, ok := metadata.FromOutgoingContext(nctx)
|
||||
if !ok {
|
||||
t.Fatalf("md missing in outgoing context")
|
||||
}
|
||||
|
||||
iv := imd.Get(DefaultMetadataKey)
|
||||
ov := omd.Get(DefaultMetadataKey)
|
||||
|
||||
if !slices.Equal(iv, ov) {
|
||||
t.Fatalf("missing metadata key value %v != %v", iv, ov)
|
||||
}
|
||||
}
|
@@ -1,133 +0,0 @@
|
||||
package validator
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
|
||||
if rsp != nil {
|
||||
return errors.BadGateway(req.Service(), "%v", err)
|
||||
}
|
||||
return errors.BadRequest(req.Service(), "%v", err)
|
||||
}
|
||||
|
||||
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
|
||||
if rsp != nil {
|
||||
return errors.BadGateway(req.Service(), "%v", err)
|
||||
}
|
||||
return errors.BadRequest(req.Service(), "%v", err)
|
||||
}
|
||||
)
|
||||
|
||||
type (
|
||||
ClientErrorFunc func(client.Request, interface{}, error) error
|
||||
ServerErrorFunc func(server.Request, interface{}, error) error
|
||||
)
|
||||
|
||||
// Options struct holds wrapper options
|
||||
type Options struct {
|
||||
ClientErrorFn ClientErrorFunc
|
||||
ServerErrorFn ServerErrorFunc
|
||||
ClientValidateResponse bool
|
||||
ServerValidateResponse bool
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
func ClientValidateResponse(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientValidateResponse = b
|
||||
}
|
||||
}
|
||||
|
||||
func ServerValidateResponse(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientValidateResponse = b
|
||||
}
|
||||
}
|
||||
|
||||
func ClientReqErrorFn(fn ClientErrorFunc) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientErrorFn = fn
|
||||
}
|
||||
}
|
||||
|
||||
func ServerErrorFn(fn ServerErrorFunc) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerErrorFn = fn
|
||||
}
|
||||
}
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
ClientErrorFn: DefaultClientErrorFunc,
|
||||
ServerErrorFn: DefaultServerErrorFunc,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
func NewHook(opts ...Option) *hook {
|
||||
return &hook{opts: NewOptions(opts...)}
|
||||
}
|
||||
|
||||
type validator interface {
|
||||
Validate() error
|
||||
}
|
||||
|
||||
type hook struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
if v, ok := req.Body().(validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return w.opts.ClientErrorFn(req, nil, err)
|
||||
}
|
||||
}
|
||||
err := next(ctx, req, rsp, opts...)
|
||||
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
|
||||
if verr := v.Validate(); verr != nil {
|
||||
return w.opts.ClientErrorFn(req, rsp, verr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
if v, ok := req.Body().(validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return nil, w.opts.ClientErrorFn(req, nil, err)
|
||||
}
|
||||
}
|
||||
return next(ctx, req, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
if v, ok := req.Body().(validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return w.opts.ServerErrorFn(req, nil, err)
|
||||
}
|
||||
}
|
||||
err := next(ctx, req, rsp)
|
||||
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
|
||||
if verr := v.Validate(); verr != nil {
|
||||
return w.opts.ServerErrorFn(req, rsp, verr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
@@ -99,7 +99,6 @@ func WithAddFields(fields ...interface{}) Option {
|
||||
iv, iok := o.Fields[i].(string)
|
||||
jv, jok := fields[j].(string)
|
||||
if iok && jok && iv == jv {
|
||||
o.Fields[i+1] = fields[j+1]
|
||||
fields = slices.Delete(fields, j, j+2)
|
||||
}
|
||||
}
|
||||
|
@@ -278,7 +278,7 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
|
||||
}
|
||||
}
|
||||
|
||||
if s.opts.AddStacktrace && (lvl == logger.FatalLevel || lvl == logger.ErrorLevel) {
|
||||
if (s.opts.AddStacktrace || lvl == logger.FatalLevel) || (s.opts.AddStacktrace && lvl == logger.ErrorLevel) {
|
||||
stackInfo := make([]byte, 1024*1024)
|
||||
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
|
||||
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)
|
||||
|
@@ -21,7 +21,7 @@ import (
|
||||
func TestStacktrace(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
buf := bytes.NewBuffer(nil)
|
||||
l := NewLogger(logger.WithLevel(logger.DebugLevel), logger.WithOutput(buf),
|
||||
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf),
|
||||
WithHandlerFunc(slog.NewTextHandler),
|
||||
logger.WithAddStacktrace(true),
|
||||
)
|
||||
@@ -124,7 +124,7 @@ func TestWithDedupKeysWithAddFields(t *testing.T) {
|
||||
|
||||
l.Info(ctx, "msg3")
|
||||
|
||||
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val4 key2=val3`)) {
|
||||
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val1 key2=val2`)) {
|
||||
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
@@ -406,7 +406,7 @@ func TestLogger(t *testing.T) {
|
||||
func Test_WithContextAttrFunc(t *testing.T) {
|
||||
loggerContextAttrFuncs := []logger.ContextAttrFunc{
|
||||
func(ctx context.Context) []interface{} {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
@@ -425,7 +425,7 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
||||
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, loggerContextAttrFuncs...)
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx = metadata.AppendOutgoingContext(ctx, "X-Request-Id", uuid.New().String(),
|
||||
ctx = metadata.AppendIncomingContext(ctx, "X-Request-Id", uuid.New().String(),
|
||||
"Source-Service", "Test-System")
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
@@ -445,9 +445,9 @@ func Test_WithContextAttrFunc(t *testing.T) {
|
||||
t.Fatalf("logger info, buf %s", buf.Bytes())
|
||||
}
|
||||
buf.Reset()
|
||||
omd, _ := metadata.FromOutgoingContext(ctx)
|
||||
imd, _ := metadata.FromIncomingContext(ctx)
|
||||
l.Info(ctx, "test message1")
|
||||
omd.Set("Source-Service", "Test-System2")
|
||||
imd.Set("Source-Service", "Test-System2")
|
||||
l.Info(ctx, "test message2")
|
||||
|
||||
// t.Logf("xxx %s", buf.Bytes())
|
||||
|
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/textproto"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -69,15 +68,6 @@ func (md Metadata) Copy() Metadata {
|
||||
return out
|
||||
}
|
||||
|
||||
// AsMap returns a copy of Metadata with map[string]string.
|
||||
func (md Metadata) AsMap() map[string]string {
|
||||
out := make(map[string]string, len(md))
|
||||
for k, v := range md {
|
||||
out[k] = strings.Join(v, ",")
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// AsHTTP1 returns a copy of Metadata
|
||||
// with CanonicalMIMEHeaderKey.
|
||||
func (md Metadata) AsHTTP1() map[string][]string {
|
||||
@@ -106,7 +96,16 @@ func (md Metadata) CopyTo(out Metadata) {
|
||||
}
|
||||
|
||||
// Get obtains the values for a given key.
|
||||
func (md Metadata) Get(k string) []string {
|
||||
func (md Metadata) MustGet(k string) []string {
|
||||
v, ok := md.Get(k)
|
||||
if !ok {
|
||||
panic("missing metadata key")
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// Get obtains the values for a given key.
|
||||
func (md Metadata) Get(k string) ([]string, bool) {
|
||||
v, ok := md[k]
|
||||
if !ok {
|
||||
v, ok = md[strings.ToLower(k)]
|
||||
@@ -114,13 +113,27 @@ func (md Metadata) Get(k string) []string {
|
||||
if !ok {
|
||||
v, ok = md[textproto.CanonicalMIMEHeaderKey(k)]
|
||||
}
|
||||
return v, ok
|
||||
}
|
||||
|
||||
// MustGetJoined obtains the values for a given key
|
||||
// with joined values with "," symbol
|
||||
func (md Metadata) MustGetJoined(k string) string {
|
||||
v, ok := md.GetJoined(k)
|
||||
if !ok {
|
||||
panic("missing metadata key")
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// GetJoined obtains the values for a given key
|
||||
// with joined values with "," symbol
|
||||
func (md Metadata) GetJoined(k string) string {
|
||||
return strings.Join(md.Get(k), ",")
|
||||
func (md Metadata) GetJoined(k string) (string, bool) {
|
||||
v, ok := md.Get(k)
|
||||
if !ok {
|
||||
return "", ok
|
||||
}
|
||||
return strings.Join(v, ","), true
|
||||
}
|
||||
|
||||
// Set sets the value of a given key with a slice of values.
|
||||
@@ -218,6 +231,24 @@ func AppendContext(ctx context.Context, kv ...string) context.Context {
|
||||
return context.WithValue(ctx, metadataCurrentKey{}, rawMetadata{md: md.md, added: added})
|
||||
}
|
||||
|
||||
// AppendIncomingContext returns a new context with the provided kv merged
|
||||
// with any existing metadata in the context. Please refer to the documentation
|
||||
// of Pairs for a description of kv.
|
||||
func AppendIncomingContext(ctx context.Context, kv ...string) context.Context {
|
||||
if len(kv)%2 == 1 {
|
||||
panic(fmt.Sprintf("metadata: AppendIncomingContext got an odd number of input pairs for metadata: %d", len(kv)))
|
||||
}
|
||||
md, _ := ctx.Value(metadataIncomingKey{}).(rawMetadata)
|
||||
added := make([][]string, len(md.added)+1)
|
||||
copy(added, md.added)
|
||||
kvCopy := make([]string, 0, len(kv))
|
||||
for i := 0; i < len(kv); i += 2 {
|
||||
kvCopy = append(kvCopy, strings.ToLower(kv[i]), kv[i+1])
|
||||
}
|
||||
added[len(added)-1] = kvCopy
|
||||
return context.WithValue(ctx, metadataIncomingKey{}, rawMetadata{md: md.md, added: added})
|
||||
}
|
||||
|
||||
// AppendOutgoingContext returns a new context with the provided kv merged
|
||||
// with any existing metadata in the context. Please refer to the documentation
|
||||
// of Pairs for a description of kv.
|
||||
@@ -433,29 +464,27 @@ type Iterator struct {
|
||||
cnt int
|
||||
}
|
||||
|
||||
/*
|
||||
// Next advance iterator to next element
|
||||
func (iter *Iterator) Next(k *string, v *[]string) bool {
|
||||
func (iter *Iterator) Next(k, v *string) bool {
|
||||
if iter.cur+1 > iter.cnt {
|
||||
return false
|
||||
}
|
||||
|
||||
if k != nil && v != nil {
|
||||
*k = iter.keys[iter.cur]
|
||||
vv := iter.md[*k]
|
||||
*v = make([]string, len(vv))
|
||||
copy(*v, vv)
|
||||
iter.cur++
|
||||
}
|
||||
*k = iter.keys[iter.cur]
|
||||
*v = iter.Metadata[*k]
|
||||
iter.cur++
|
||||
return true
|
||||
}
|
||||
|
||||
// Iterator returns the itarator for metadata in sorted order
|
||||
func (md Metadata) Iterator() *Iterator {
|
||||
iter := &Iterator{md: md, cnt: len(md)}
|
||||
func (Metadata Metadata) Iterator() *Iterator {
|
||||
iter := &Iterator{Metadata: Metadata, cnt: len(Metadata)}
|
||||
iter.keys = make([]string, 0, iter.cnt)
|
||||
for k := range md {
|
||||
for k := range Metadata {
|
||||
iter.keys = append(iter.keys, k)
|
||||
}
|
||||
sort.Strings(iter.keys)
|
||||
return iter
|
||||
}
|
||||
*/
|
||||
|
@@ -19,8 +19,8 @@ func TestAppendOutgoingContextModify(t *testing.T) {
|
||||
func TestLowercase(t *testing.T) {
|
||||
md := New(1)
|
||||
md["x-request-id"] = []string{"12345"}
|
||||
v := md.GetJoined("X-Request-Id")
|
||||
if v == "" {
|
||||
v, ok := md.GetJoined("X-Request-Id")
|
||||
if !ok || v == "" {
|
||||
t.Fatalf("metadata invalid %#+v", md)
|
||||
}
|
||||
}
|
||||
@@ -51,17 +51,29 @@ func TestMetadataSetMultiple(t *testing.T) {
|
||||
md := New(4)
|
||||
md.Set("key1", "val1", "key2", "val2")
|
||||
|
||||
if v := md.GetJoined("key1"); v != "val1" {
|
||||
if v, ok := md.GetJoined("key1"); !ok || v != "val1" {
|
||||
t.Fatalf("invalid kv %#+v", md)
|
||||
}
|
||||
if v := md.GetJoined("key2"); v != "val2" {
|
||||
if v, ok := md.GetJoined("key2"); !ok || v != "val2" {
|
||||
t.Fatalf("invalid kv %#+v", md)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppend(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx = AppendIncomingContext(ctx, "key1", "val1", "key2", "val2")
|
||||
md, ok := FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("metadata empty")
|
||||
}
|
||||
if _, ok := md.Get("key1"); !ok {
|
||||
t.Fatal("key1 not found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPairs(t *testing.T) {
|
||||
md := Pairs("key1", "val1", "key2", "val2")
|
||||
if v := md.Get("key1"); v == nil {
|
||||
if _, ok := md.Get("key1"); !ok {
|
||||
t.Fatal("key1 not found")
|
||||
}
|
||||
}
|
||||
@@ -85,51 +97,46 @@ func TestPassing(t *testing.T) {
|
||||
if !ok {
|
||||
t.Fatalf("missing metadata from outgoing context")
|
||||
}
|
||||
if v := md.Get("Key1"); v == nil || v[0] != "Val1" {
|
||||
if v, ok := md.Get("Key1"); !ok || v[0] != "Val1" {
|
||||
t.Fatalf("invalid metadata value %#+v", md)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIterator(t *testing.T) {
|
||||
md := Pairs(
|
||||
"1Last", "last",
|
||||
"2First", "first",
|
||||
"3Second", "second",
|
||||
)
|
||||
/*
|
||||
func TestIterator(_ *testing.T) {
|
||||
md := Metadata{
|
||||
"1Last": "last",
|
||||
"2First": "first",
|
||||
"3Second": "second",
|
||||
}
|
||||
|
||||
iter := md.Iterator()
|
||||
var k string
|
||||
var v []string
|
||||
chk := New(3)
|
||||
for iter.Next(&k, &v) {
|
||||
chk[k] = v
|
||||
}
|
||||
var k, v string
|
||||
|
||||
for k, v := range chk {
|
||||
if cv, ok := md[k]; !ok || len(cv) != len(v) || cv[0] != v[0] {
|
||||
t.Fatalf("XXXX %#+v %#+v", chk, md)
|
||||
}
|
||||
for iter.Next(&k, &v) {
|
||||
// fmt.Printf("k: %s, v: %s\n", k, v)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestMedataCanonicalKey(t *testing.T) {
|
||||
md := New(1)
|
||||
md.Set("x-request-id", "12345")
|
||||
v := md.GetJoined("x-request-id")
|
||||
if v == "" {
|
||||
v, ok := md.GetJoined("x-request-id")
|
||||
if !ok {
|
||||
t.Fatalf("failed to get x-request-id")
|
||||
} else if v != "12345" {
|
||||
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
|
||||
}
|
||||
|
||||
v = md.GetJoined("X-Request-Id")
|
||||
if v == "" {
|
||||
v, ok = md.GetJoined("X-Request-Id")
|
||||
if !ok {
|
||||
t.Fatalf("failed to get x-request-id")
|
||||
} else if v != "12345" {
|
||||
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
|
||||
}
|
||||
v = md.GetJoined("X-Request-ID")
|
||||
if v == "" {
|
||||
v, ok = md.GetJoined("X-Request-ID")
|
||||
if !ok {
|
||||
t.Fatalf("failed to get x-request-id")
|
||||
} else if v != "12345" {
|
||||
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
|
||||
@@ -141,8 +148,8 @@ func TestMetadataSet(t *testing.T) {
|
||||
|
||||
md.Set("Key", "val")
|
||||
|
||||
val := md.GetJoined("Key")
|
||||
if val == "" {
|
||||
val, ok := md.GetJoined("Key")
|
||||
if !ok {
|
||||
t.Fatal("key Key not found")
|
||||
}
|
||||
if val != "val" {
|
||||
@@ -157,8 +164,8 @@ func TestMetadataDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
md.Del("Baz")
|
||||
v := md.Get("Baz")
|
||||
if v != nil {
|
||||
_, ok := md.Get("Baz")
|
||||
if ok {
|
||||
t.Fatal("key Baz not deleted")
|
||||
}
|
||||
}
|
||||
@@ -257,6 +264,20 @@ func TestNewOutgoingContext(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendIncomingContext(t *testing.T) {
|
||||
md := New(1)
|
||||
md.Set("key1", "val1")
|
||||
ctx := AppendIncomingContext(context.TODO(), "key2", "val2")
|
||||
|
||||
nmd, ok := FromIncomingContext(ctx)
|
||||
if nmd == nil || !ok {
|
||||
t.Fatal("AppendIncomingContext not works")
|
||||
}
|
||||
if v, ok := nmd.GetJoined("key2"); !ok || v != "val2" {
|
||||
t.Fatal("AppendIncomingContext not works")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendOutgoingContext(t *testing.T) {
|
||||
md := New(1)
|
||||
md.Set("key1", "val1")
|
||||
@@ -266,7 +287,7 @@ func TestAppendOutgoingContext(t *testing.T) {
|
||||
if nmd == nil || !ok {
|
||||
t.Fatal("AppendOutgoingContext not works")
|
||||
}
|
||||
if v := nmd.GetJoined("key2"); v != "val2" {
|
||||
if v, ok := nmd.GetJoined("key2"); !ok || v != "val2" {
|
||||
t.Fatal("AppendOutgoingContext not works")
|
||||
}
|
||||
}
|
||||
|
@@ -1,222 +0,0 @@
|
||||
package options
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cast"
|
||||
mreflect "go.unistack.org/micro/v4/util/reflect"
|
||||
)
|
||||
|
||||
// Options interface must be used by all options
|
||||
type Validator interface {
|
||||
// Validate returns nil, if all options are correct,
|
||||
// otherwise returns an error explaining the mistake
|
||||
Validate() error
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(interface{}) error
|
||||
|
||||
// Apply assign options to struct src
|
||||
func Apply(src interface{}, opts ...Option) error {
|
||||
for _, opt := range opts {
|
||||
if err := opt(src); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetValueByPath set src struct field to val dst via path
|
||||
func SetValueByPath(src interface{}, dst interface{}, path string) error {
|
||||
var err error
|
||||
|
||||
switch v := dst.(type) {
|
||||
case []interface{}:
|
||||
if len(v) == 1 {
|
||||
dst = v[0]
|
||||
}
|
||||
}
|
||||
|
||||
var sv reflect.Value
|
||||
switch t := src.(type) {
|
||||
case reflect.Value:
|
||||
sv = t
|
||||
default:
|
||||
sv = reflect.ValueOf(src)
|
||||
}
|
||||
|
||||
parts := strings.Split(path, ".")
|
||||
|
||||
for _, p := range parts {
|
||||
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return mreflect.ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
|
||||
/*
|
||||
if len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
*/
|
||||
|
||||
if fld.Anonymous {
|
||||
if len(parts) == 1 && val.Kind() == reflect.Struct {
|
||||
if err = SetValueByPath(val, dst, p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if fld.Name != p && !strings.EqualFold(strings.ToLower(fld.Name), strings.ToLower(p)) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch val.Interface().(type) {
|
||||
case []time.Duration:
|
||||
dst, err = cast.ToDurationSliceE(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reflect.Copy(val, reflect.ValueOf(dst))
|
||||
return nil
|
||||
case time.Duration:
|
||||
dst, err = cast.ToDurationE(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
val.Set(reflect.ValueOf(dst))
|
||||
return nil
|
||||
case time.Time:
|
||||
dst, err = cast.ToTimeE(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
val.Set(reflect.ValueOf(dst))
|
||||
return nil
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case reflect.Map:
|
||||
if val.IsZero() {
|
||||
val.Set(reflect.MakeMap(val.Type()))
|
||||
}
|
||||
|
||||
return setMap(val.Interface(), dst)
|
||||
case reflect.Array, reflect.Slice:
|
||||
switch val.Type().Elem().Kind() {
|
||||
case reflect.Bool:
|
||||
dst, err = cast.ToBoolSliceE(dst)
|
||||
case reflect.String:
|
||||
dst, err = cast.ToStringSliceE(dst)
|
||||
case reflect.Float32:
|
||||
dst, err = toFloat32SliceE(dst)
|
||||
case reflect.Float64:
|
||||
dst, err = toFloat64SliceE(dst)
|
||||
case reflect.Int8:
|
||||
dst, err = toInt8SliceE(dst)
|
||||
case reflect.Int:
|
||||
dst, err = cast.ToIntSliceE(dst)
|
||||
case reflect.Int16:
|
||||
dst, err = toInt16SliceE(dst)
|
||||
case reflect.Int32:
|
||||
dst, err = toInt32SliceE(dst)
|
||||
case reflect.Int64:
|
||||
dst, err = toInt64SliceE(dst)
|
||||
case reflect.Uint8:
|
||||
dst, err = toUint8SliceE(dst)
|
||||
case reflect.Uint:
|
||||
dst, err = toUintSliceE(dst)
|
||||
case reflect.Uint16:
|
||||
dst, err = toUint16SliceE(dst)
|
||||
case reflect.Uint32:
|
||||
dst, err = toUint32SliceE(dst)
|
||||
case reflect.Uint64:
|
||||
dst, err = toUint64SliceE(dst)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if val.Kind() == reflect.Slice {
|
||||
val.Set(reflect.ValueOf(dst))
|
||||
} else {
|
||||
reflect.Copy(val, reflect.ValueOf(dst))
|
||||
}
|
||||
return nil
|
||||
case reflect.Float32:
|
||||
dst, err = toFloat32SliceE(dst)
|
||||
case reflect.Float64:
|
||||
dst, err = toFloat64SliceE(dst)
|
||||
case reflect.Bool:
|
||||
dst, err = cast.ToBoolE(dst)
|
||||
case reflect.String:
|
||||
dst, err = cast.ToStringE(dst)
|
||||
case reflect.Int8:
|
||||
dst, err = cast.ToInt8E(dst)
|
||||
case reflect.Int:
|
||||
dst, err = cast.ToIntE(dst)
|
||||
case reflect.Int16:
|
||||
dst, err = cast.ToInt16E(dst)
|
||||
case reflect.Int32:
|
||||
dst, err = cast.ToInt32E(dst)
|
||||
case reflect.Int64:
|
||||
dst, err = cast.ToInt64E(dst)
|
||||
case reflect.Uint8:
|
||||
dst, err = cast.ToUint8E(dst)
|
||||
case reflect.Uint:
|
||||
dst, err = cast.ToUintE(dst)
|
||||
case reflect.Uint16:
|
||||
dst, err = cast.ToUint16E(dst)
|
||||
case reflect.Uint32:
|
||||
dst, err = cast.ToUint32E(dst)
|
||||
case reflect.Uint64:
|
||||
dst, err = cast.ToUint64E(dst)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
val.Set(reflect.ValueOf(dst))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewOption create new option with name
|
||||
func NewOption(name string) func(...interface{}) Option {
|
||||
return func(dst ...interface{}) Option {
|
||||
return func(src interface{}) error {
|
||||
return SetValueByPath(src, dst, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
Address = NewOption("Address")
|
||||
Name = NewOption("Name")
|
||||
Broker = NewOption("Broker")
|
||||
Logger = NewOption("Logger")
|
||||
Meter = NewOption("Meter")
|
||||
Tracer = NewOption("Tracer")
|
||||
Store = NewOption("Store")
|
||||
Register = NewOption("Register")
|
||||
Router = NewOption("Router")
|
||||
Codec = NewOption("Codec")
|
||||
Codecs = NewOption("Codecs")
|
||||
Client = NewOption("Client")
|
||||
Context = NewOption("Context")
|
||||
TLSConfig = NewOption("TLSConfig")
|
||||
Metadata = NewOption("Metadata")
|
||||
Timeout = NewOption("Timeout")
|
||||
)
|
@@ -1,181 +0,0 @@
|
||||
package options_test
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"go.unistack.org/micro/v4/options"
|
||||
)
|
||||
|
||||
type codec interface {
|
||||
Marshal(v interface{}, opts ...options.Option) ([]byte, error)
|
||||
Unmarshal(b []byte, v interface{}, opts ...options.Option) error
|
||||
String() string
|
||||
}
|
||||
|
||||
func TestCodecs(t *testing.T) {
|
||||
type s struct {
|
||||
Codecs map[string]codec
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
tc := &tls.Config{InsecureSkipVerify: true}
|
||||
opts := []options.Option{
|
||||
options.NewOption("Codecs")(wg),
|
||||
options.NewOption("TLSConfig")(tc),
|
||||
}
|
||||
|
||||
src := &s{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpecial(t *testing.T) {
|
||||
type s struct {
|
||||
Wait *sync.WaitGroup
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
tc := &tls.Config{InsecureSkipVerify: true}
|
||||
opts := []options.Option{
|
||||
options.NewOption("Wait")(wg),
|
||||
options.NewOption("TLSConfig")(tc),
|
||||
}
|
||||
|
||||
src := &s{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if src.Wait == nil {
|
||||
t.Fatalf("failed to set Wait %#+v", src)
|
||||
}
|
||||
|
||||
if src.TLSConfig == nil {
|
||||
t.Fatalf("failed to set TLSConfig %#+v", src)
|
||||
}
|
||||
|
||||
if src.TLSConfig.InsecureSkipVerify != true {
|
||||
t.Fatalf("failed to set TLSConfig %#+v", src)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNested(t *testing.T) {
|
||||
type server struct {
|
||||
Address []string
|
||||
}
|
||||
type ownserver struct {
|
||||
server
|
||||
OwnField string
|
||||
}
|
||||
|
||||
opts := []options.Option{
|
||||
options.Address("host:port"),
|
||||
options.NewOption("OwnField")("fieldval"),
|
||||
}
|
||||
|
||||
src := &ownserver{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if src.Address[0] != "host:port" {
|
||||
t.Fatalf("failed to set Address %#+v", src)
|
||||
}
|
||||
|
||||
if src.OwnField != "fieldval" {
|
||||
t.Fatalf("failed to set OwnField %#+v", src)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddress(t *testing.T) {
|
||||
type s struct {
|
||||
Address []string
|
||||
}
|
||||
|
||||
opts := []options.Option{options.Address("host:port")}
|
||||
|
||||
src := &s{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if src.Address[0] != "host:port" {
|
||||
t.Fatalf("failed to set Address %#+v", src)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewOption(t *testing.T) {
|
||||
type s struct {
|
||||
Address []string
|
||||
}
|
||||
|
||||
opts := []options.Option{options.NewOption("Address")("host1:port1", "host2:port2")}
|
||||
|
||||
src := &s{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if src.Address[0] != "host1:port1" {
|
||||
t.Fatalf("failed to set Address %#+v", src)
|
||||
}
|
||||
if src.Address[1] != "host2:port2" {
|
||||
t.Fatalf("failed to set Address %#+v", src)
|
||||
}
|
||||
}
|
||||
|
||||
func TestArray(t *testing.T) {
|
||||
type s struct {
|
||||
Address [1]string
|
||||
}
|
||||
|
||||
opts := []options.Option{options.NewOption("Address")("host:port", "host1:port1")}
|
||||
|
||||
src := &s{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if src.Address[0] != "host:port" {
|
||||
t.Fatalf("failed to set Address %#+v", src)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMap(t *testing.T) {
|
||||
type s struct {
|
||||
Metadata map[string]string
|
||||
}
|
||||
|
||||
opts := []options.Option{
|
||||
options.NewOption("Metadata")("key1", "val1"),
|
||||
options.NewOption("Metadata")(map[string]string{"key2": "val2"}),
|
||||
}
|
||||
|
||||
src := &s{}
|
||||
|
||||
if err := options.Apply(src, opts...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(src.Metadata) != 2 {
|
||||
t.Fatalf("failed to set Metadata %#+v", src)
|
||||
}
|
||||
|
||||
if src.Metadata["key1"] != "val1" {
|
||||
t.Fatalf("failed to set Metadata %#+v", src)
|
||||
}
|
||||
|
||||
if src.Metadata["key2"] != "val2" {
|
||||
t.Fatalf("failed to set Metadata %#+v", src)
|
||||
}
|
||||
}
|
577
options/util.go
577
options/util.go
@@ -1,577 +0,0 @@
|
||||
package options
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
func toInt8SliceE(i interface{}) ([]int8, error) {
|
||||
if i == nil {
|
||||
return []int8{}, fmt.Errorf("unable to cast %#v of type %T to []int8", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []int8:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]int8, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToInt8E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []int8{}, fmt.Errorf("unable to cast %#v of type %T to []int8", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []int8{}, fmt.Errorf("unable to cast %#v of type %T to []int8", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toInt16SliceE(i interface{}) ([]int16, error) {
|
||||
if i == nil {
|
||||
return []int16{}, fmt.Errorf("unable to cast %#v of type %T to []int16", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []int16:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]int16, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToInt16E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []int16{}, fmt.Errorf("unable to cast %#v of type %T to []int16", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []int16{}, fmt.Errorf("unable to cast %#v of type %T to []int16", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toInt32SliceE(i interface{}) ([]int32, error) {
|
||||
if i == nil {
|
||||
return []int32{}, fmt.Errorf("unable to cast %#v of type %T to []int32", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []int32:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]int32, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToInt32E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []int32{}, fmt.Errorf("unable to cast %#v of type %T to []int32", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []int32{}, fmt.Errorf("unable to cast %#v of type %T to []int32", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toInt64SliceE(i interface{}) ([]int64, error) {
|
||||
if i == nil {
|
||||
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []int64:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]int64, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToInt64E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toUintSliceE(i interface{}) ([]uint, error) {
|
||||
if i == nil {
|
||||
return []uint{}, fmt.Errorf("unable to cast %#v of type %T to []uint", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []uint:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]uint, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToUintE(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []uint{}, fmt.Errorf("unable to cast %#v of type %T to []uint", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []uint{}, fmt.Errorf("unable to cast %#v of type %T to []uint", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toUint8SliceE(i interface{}) ([]uint8, error) {
|
||||
if i == nil {
|
||||
return []uint8{}, fmt.Errorf("unable to cast %#v of type %T to []uint8", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []uint8:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]uint8, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToUint8E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []uint8{}, fmt.Errorf("unable to cast %#v of type %T to []uint8", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []uint8{}, fmt.Errorf("unable to cast %#v of type %T to []uint8", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toUint16SliceE(i interface{}) ([]uint16, error) {
|
||||
if i == nil {
|
||||
return []uint16{}, fmt.Errorf("unable to cast %#v of type %T to []uint16", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []uint16:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]uint16, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToUint16E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []uint16{}, fmt.Errorf("unable to cast %#v of type %T to []uint16", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []uint16{}, fmt.Errorf("unable to cast %#v of type %T to []uint16", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toUint32SliceE(i interface{}) ([]uint32, error) {
|
||||
if i == nil {
|
||||
return []uint32{}, fmt.Errorf("unable to cast %#v of type %T to []uint32", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []uint32:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]uint32, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToUint32E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []uint32{}, fmt.Errorf("unable to cast %#v of type %T to []uint32", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []uint32{}, fmt.Errorf("unable to cast %#v of type %T to []uint32", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toUint64SliceE(i interface{}) ([]uint64, error) {
|
||||
if i == nil {
|
||||
return []uint64{}, fmt.Errorf("unable to cast %#v of type %T to []uint64", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []uint64:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]uint64, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToUint64E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []uint64{}, fmt.Errorf("unable to cast %#v of type %T to []uint64", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []uint64{}, fmt.Errorf("unable to cast %#v of type %T to []uint64", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toFloat32SliceE(i interface{}) ([]float32, error) {
|
||||
if i == nil {
|
||||
return []float32{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []float32:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]float32, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToFloat32E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []float32{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []float32{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func toFloat64SliceE(i interface{}) ([]float64, error) {
|
||||
if i == nil {
|
||||
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
|
||||
}
|
||||
|
||||
switch v := i.(type) {
|
||||
case []float64:
|
||||
return v, nil
|
||||
}
|
||||
|
||||
kind := reflect.TypeOf(i).Kind()
|
||||
switch kind {
|
||||
case reflect.Slice, reflect.Array:
|
||||
s := reflect.ValueOf(i)
|
||||
a := make([]float64, s.Len())
|
||||
for j := 0; j < s.Len(); j++ {
|
||||
val, err := cast.ToFloat64E(s.Index(j).Interface())
|
||||
if err != nil {
|
||||
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
|
||||
}
|
||||
a[j] = val
|
||||
}
|
||||
return a, nil
|
||||
default:
|
||||
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
|
||||
}
|
||||
}
|
||||
|
||||
func setMap(src interface{}, dst interface{}) error {
|
||||
var err error
|
||||
|
||||
if src == nil {
|
||||
return fmt.Errorf("unable to cast %#v of type %T", src, src)
|
||||
}
|
||||
if dst == nil {
|
||||
return fmt.Errorf("unable to cast %#v of type %T", dst, dst)
|
||||
}
|
||||
|
||||
val := reflect.ValueOf(src)
|
||||
|
||||
keyKind := val.Type().Key().Kind()
|
||||
valKind := val.Type().Elem().Kind()
|
||||
|
||||
switch v := dst.(type) {
|
||||
case []interface{}:
|
||||
if len(v) == 1 {
|
||||
dstVal := reflect.ValueOf(v[0])
|
||||
if dstVal.Kind() != reflect.Map {
|
||||
return nil
|
||||
}
|
||||
mapIter := dstVal.MapRange()
|
||||
for mapIter.Next() {
|
||||
var (
|
||||
keyVal interface{}
|
||||
valVal interface{}
|
||||
)
|
||||
switch keyKind {
|
||||
case reflect.Bool:
|
||||
keyVal, err = cast.ToBoolE(mapIter.Key())
|
||||
case reflect.String:
|
||||
keyVal, err = cast.ToStringE(mapIter.Key())
|
||||
case reflect.Float32:
|
||||
keyVal, err = cast.ToFloat32E(mapIter.Key())
|
||||
case reflect.Float64:
|
||||
keyVal, err = cast.ToFloat64E(mapIter.Key())
|
||||
case reflect.Int8:
|
||||
keyVal, err = cast.ToInt8E(mapIter.Key())
|
||||
case reflect.Int:
|
||||
keyVal, err = cast.ToIntE(mapIter.Key())
|
||||
case reflect.Int16:
|
||||
keyVal, err = cast.ToInt16E(mapIter.Key())
|
||||
case reflect.Int32:
|
||||
keyVal, err = cast.ToInt32E(mapIter.Key())
|
||||
case reflect.Int64:
|
||||
keyVal, err = cast.ToInt64E(mapIter.Key())
|
||||
case reflect.Uint8:
|
||||
keyVal, err = cast.ToUint8E(mapIter.Key())
|
||||
case reflect.Uint:
|
||||
keyVal, err = cast.ToUintE(mapIter.Key())
|
||||
case reflect.Uint16:
|
||||
keyVal, err = cast.ToUint16E(mapIter.Key())
|
||||
case reflect.Uint32:
|
||||
keyVal, err = cast.ToUint32E(mapIter.Key())
|
||||
case reflect.Uint64:
|
||||
keyVal, err = cast.ToUint64E(mapIter.Key())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch valKind {
|
||||
case reflect.Bool:
|
||||
valVal, err = cast.ToBoolE(mapIter.Value())
|
||||
case reflect.String:
|
||||
valVal, err = cast.ToStringE(mapIter.Value())
|
||||
case reflect.Float32:
|
||||
valVal, err = cast.ToFloat32E(mapIter.Value())
|
||||
case reflect.Float64:
|
||||
valVal, err = cast.ToFloat64E(mapIter.Value())
|
||||
case reflect.Int8:
|
||||
valVal, err = cast.ToInt8E(mapIter.Value())
|
||||
case reflect.Int:
|
||||
valVal, err = cast.ToIntE(mapIter.Value())
|
||||
case reflect.Int16:
|
||||
valVal, err = cast.ToInt16E(mapIter.Value())
|
||||
case reflect.Int32:
|
||||
valVal, err = cast.ToInt32E(mapIter.Value())
|
||||
case reflect.Int64:
|
||||
valVal, err = cast.ToInt64E(mapIter.Value())
|
||||
case reflect.Uint8:
|
||||
valVal, err = cast.ToUint8E(mapIter.Value())
|
||||
case reflect.Uint:
|
||||
valVal, err = cast.ToUintE(mapIter.Value())
|
||||
case reflect.Uint16:
|
||||
valVal, err = cast.ToUint16E(mapIter.Value())
|
||||
case reflect.Uint32:
|
||||
valVal, err = cast.ToUint32E(mapIter.Value())
|
||||
case reflect.Uint64:
|
||||
valVal, err = cast.ToUint64E(mapIter.Value())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val.SetMapIndex(reflect.ValueOf(keyVal), reflect.ValueOf(valVal))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if l := len(v) % 2; l == 1 {
|
||||
v = v[:len(v)-1]
|
||||
}
|
||||
var (
|
||||
keyVal interface{}
|
||||
valVal interface{}
|
||||
)
|
||||
for i := 0; i < len(v); i += 2 {
|
||||
switch keyKind {
|
||||
case reflect.Bool:
|
||||
keyVal, err = cast.ToBoolE(v[i])
|
||||
case reflect.String:
|
||||
keyVal, err = cast.ToStringE(v[i])
|
||||
case reflect.Float32:
|
||||
keyVal, err = cast.ToFloat32E(v[i])
|
||||
case reflect.Float64:
|
||||
keyVal, err = cast.ToFloat64E(v[i])
|
||||
case reflect.Int8:
|
||||
keyVal, err = cast.ToInt8E(v[i])
|
||||
case reflect.Int:
|
||||
keyVal, err = cast.ToIntE(v[i])
|
||||
case reflect.Int16:
|
||||
keyVal, err = cast.ToInt16E(v[i])
|
||||
case reflect.Int32:
|
||||
keyVal, err = cast.ToInt32E(v[i])
|
||||
case reflect.Int64:
|
||||
keyVal, err = cast.ToInt64E(v[i])
|
||||
case reflect.Uint8:
|
||||
keyVal, err = cast.ToUint8E(v[i])
|
||||
case reflect.Uint:
|
||||
keyVal, err = cast.ToUintE(v[i])
|
||||
case reflect.Uint16:
|
||||
keyVal, err = cast.ToUint16E(v[i])
|
||||
case reflect.Uint32:
|
||||
keyVal, err = cast.ToUint32E(v[i])
|
||||
case reflect.Uint64:
|
||||
keyVal, err = cast.ToUint64E(v[i])
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch valKind {
|
||||
case reflect.Bool:
|
||||
valVal, err = cast.ToBoolE(v[i+1])
|
||||
case reflect.String:
|
||||
valVal, err = cast.ToStringE(v[i+1])
|
||||
case reflect.Float32:
|
||||
valVal, err = cast.ToFloat32E(v[i+1])
|
||||
case reflect.Float64:
|
||||
valVal, err = cast.ToFloat64E(v[i+1])
|
||||
case reflect.Int8:
|
||||
valVal, err = cast.ToInt8E(v[i+1])
|
||||
case reflect.Int:
|
||||
valVal, err = cast.ToIntE(v[i+1])
|
||||
case reflect.Int16:
|
||||
valVal, err = cast.ToInt16E(v[i+1])
|
||||
case reflect.Int32:
|
||||
valVal, err = cast.ToInt32E(v[i+1])
|
||||
case reflect.Int64:
|
||||
valVal, err = cast.ToInt64E(v[i+1])
|
||||
case reflect.Uint8:
|
||||
valVal, err = cast.ToUint8E(v[i+1])
|
||||
case reflect.Uint:
|
||||
valVal, err = cast.ToUintE(v[i+1])
|
||||
case reflect.Uint16:
|
||||
valVal, err = cast.ToUint16E(v[i+1])
|
||||
case reflect.Uint32:
|
||||
valVal, err = cast.ToUint32E(v[i+1])
|
||||
case reflect.Uint64:
|
||||
valVal, err = cast.ToUint64E(v[i+1])
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val.SetMapIndex(reflect.ValueOf(keyVal), reflect.ValueOf(valVal))
|
||||
}
|
||||
default:
|
||||
dstVal := reflect.ValueOf(dst)
|
||||
if dstVal.Kind() != reflect.Map {
|
||||
return nil
|
||||
}
|
||||
mapIter := dstVal.MapRange()
|
||||
for mapIter.Next() {
|
||||
var (
|
||||
keyVal interface{}
|
||||
valVal interface{}
|
||||
)
|
||||
switch keyKind {
|
||||
case reflect.Bool:
|
||||
keyVal, err = cast.ToBoolE(mapIter.Key())
|
||||
case reflect.String:
|
||||
keyVal, err = cast.ToStringE(mapIter.Key())
|
||||
case reflect.Float32:
|
||||
keyVal, err = cast.ToFloat32E(mapIter.Key())
|
||||
case reflect.Float64:
|
||||
keyVal, err = cast.ToFloat64E(mapIter.Key())
|
||||
case reflect.Int8:
|
||||
keyVal, err = cast.ToInt8E(mapIter.Key())
|
||||
case reflect.Int:
|
||||
keyVal, err = cast.ToIntE(mapIter.Key())
|
||||
case reflect.Int16:
|
||||
keyVal, err = cast.ToInt16E(mapIter.Key())
|
||||
case reflect.Int32:
|
||||
keyVal, err = cast.ToInt32E(mapIter.Key())
|
||||
case reflect.Int64:
|
||||
keyVal, err = cast.ToInt64E(mapIter.Key())
|
||||
case reflect.Uint8:
|
||||
keyVal, err = cast.ToUint8E(mapIter.Key())
|
||||
case reflect.Uint:
|
||||
keyVal, err = cast.ToUintE(mapIter.Key())
|
||||
case reflect.Uint16:
|
||||
keyVal, err = cast.ToUint16E(mapIter.Key())
|
||||
case reflect.Uint32:
|
||||
keyVal, err = cast.ToUint32E(mapIter.Key())
|
||||
case reflect.Uint64:
|
||||
keyVal, err = cast.ToUint64E(mapIter.Key())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch valKind {
|
||||
case reflect.Bool:
|
||||
valVal, err = cast.ToBoolE(mapIter.Value())
|
||||
case reflect.String:
|
||||
valVal, err = cast.ToStringE(mapIter.Value())
|
||||
case reflect.Float32:
|
||||
valVal, err = cast.ToFloat32E(mapIter.Value())
|
||||
case reflect.Float64:
|
||||
valVal, err = cast.ToFloat64E(mapIter.Value())
|
||||
case reflect.Int8:
|
||||
valVal, err = cast.ToInt8E(mapIter.Value())
|
||||
case reflect.Int:
|
||||
valVal, err = cast.ToIntE(mapIter.Value())
|
||||
case reflect.Int16:
|
||||
valVal, err = cast.ToInt16E(mapIter.Value())
|
||||
case reflect.Int32:
|
||||
valVal, err = cast.ToInt32E(mapIter.Value())
|
||||
case reflect.Int64:
|
||||
valVal, err = cast.ToInt64E(mapIter.Value())
|
||||
case reflect.Uint8:
|
||||
valVal, err = cast.ToUint8E(mapIter.Value())
|
||||
case reflect.Uint:
|
||||
valVal, err = cast.ToUintE(mapIter.Value())
|
||||
case reflect.Uint16:
|
||||
valVal, err = cast.ToUint16E(mapIter.Value())
|
||||
case reflect.Uint32:
|
||||
valVal, err = cast.ToUint32E(mapIter.Value())
|
||||
case reflect.Uint64:
|
||||
valVal, err = cast.ToUint64E(mapIter.Value())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val.SetMapIndex(reflect.ValueOf(keyVal), reflect.ValueOf(valVal))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -6,10 +6,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/register"
|
||||
"go.unistack.org/micro/v4/util/id"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/util/id"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -7,7 +7,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/register"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
)
|
||||
|
||||
var testData = map[string][]*register.Service{
|
||||
|
20
service.go
20
service.go
@@ -99,7 +99,6 @@ type service struct {
|
||||
done chan struct{}
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// NewService creates and returns a new Service based on the packages within.
|
||||
@@ -425,7 +424,7 @@ func (s *service) Stop() error {
|
||||
}
|
||||
}
|
||||
|
||||
s.notifyShutdown()
|
||||
close(s.done)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -449,23 +448,10 @@ func (s *service) Run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait on context cancel
|
||||
<-s.done
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyShutdown marks the service as stopped and closes the done channel.
|
||||
// It ensures the channel is closed only once, preventing multiple closures.
|
||||
func (s *service) notifyShutdown() {
|
||||
s.Lock()
|
||||
if s.stopped {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
s.stopped = true
|
||||
s.Unlock()
|
||||
|
||||
close(s.done)
|
||||
return s.Stop()
|
||||
}
|
||||
|
||||
type Namer interface {
|
||||
|
@@ -3,9 +3,7 @@ package micro
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/config"
|
||||
@@ -739,41 +737,3 @@ func Test_getNameIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestServiceShutdown(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("service shutdown failed: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s, ok := NewService().(*service)
|
||||
require.NotNil(t, s)
|
||||
require.True(t, ok)
|
||||
|
||||
require.NoError(t, s.Start())
|
||||
require.False(t, s.stopped)
|
||||
|
||||
require.NoError(t, s.Stop())
|
||||
require.True(t, s.stopped)
|
||||
}
|
||||
|
||||
func TestServiceMultipleShutdowns(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("service shutdown failed: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s := NewService()
|
||||
|
||||
go func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// first call
|
||||
require.NoError(t, s.Stop())
|
||||
// duplicate call
|
||||
require.NoError(t, s.Stop())
|
||||
}()
|
||||
|
||||
require.NoError(t, s.Run())
|
||||
}
|
||||
|
@@ -89,10 +89,6 @@ func (s *Span) Tracer() tracer.Tracer {
|
||||
return s.tracer
|
||||
}
|
||||
|
||||
func (s *Span) IsRecording() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
name string
|
||||
labels []interface{}
|
||||
|
@@ -120,10 +120,6 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
|
||||
s.statusMsg = msg
|
||||
}
|
||||
|
||||
func (s *noopSpan) IsRecording() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// NewTracer returns new memory tracer
|
||||
func NewTracer(opts ...Option) Tracer {
|
||||
return &noopTracer{
|
||||
|
@@ -78,6 +78,4 @@ type Span interface {
|
||||
TraceID() string
|
||||
// SpanID returns span id
|
||||
SpanID() string
|
||||
// IsRecording returns the recording state of the Span.
|
||||
IsRecording() bool
|
||||
}
|
||||
|
@@ -1,78 +0,0 @@
|
||||
package buffer
|
||||
|
||||
import "io"
|
||||
|
||||
var _ interface {
|
||||
io.ReadCloser
|
||||
io.ReadSeeker
|
||||
} = (*SeekerBuffer)(nil)
|
||||
|
||||
// Buffer is a ReadWriteCloser that supports seeking. It's intended to
|
||||
// replicate the functionality of bytes.Buffer that I use in my projects.
|
||||
//
|
||||
// Note that the seeking is limited to the read marker; all writes are
|
||||
// append-only.
|
||||
type SeekerBuffer struct {
|
||||
data []byte
|
||||
pos int64
|
||||
}
|
||||
|
||||
func NewSeekerBuffer(data []byte) *SeekerBuffer {
|
||||
return &SeekerBuffer{
|
||||
data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *SeekerBuffer) Read(p []byte) (int, error) {
|
||||
if b.pos >= int64(len(b.data)) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
n := copy(p, b.data[b.pos:])
|
||||
b.pos += int64(n)
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (b *SeekerBuffer) Write(p []byte) (int, error) {
|
||||
b.data = append(b.data, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Seek sets the read pointer to pos.
|
||||
func (b *SeekerBuffer) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
b.pos = offset
|
||||
case io.SeekEnd:
|
||||
b.pos = int64(len(b.data)) + offset
|
||||
case io.SeekCurrent:
|
||||
b.pos += offset
|
||||
}
|
||||
|
||||
return b.pos, nil
|
||||
}
|
||||
|
||||
// Rewind resets the read pointer to 0.
|
||||
func (b *SeekerBuffer) Rewind() error {
|
||||
if _, err := b.Seek(0, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close clears all the data out of the buffer and sets the read position to 0.
|
||||
func (b *SeekerBuffer) Close() error {
|
||||
b.data = nil
|
||||
b.pos = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
// Len returns the length of data remaining to be read.
|
||||
func (b *SeekerBuffer) Len() int {
|
||||
return len(b.data[b.pos:])
|
||||
}
|
||||
|
||||
// Bytes returns the underlying bytes from the current position.
|
||||
func (b *SeekerBuffer) Bytes() []byte {
|
||||
return b.data[b.pos:]
|
||||
}
|
@@ -1,55 +0,0 @@
|
||||
package buffer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func noErrorT(t *testing.T, err error) {
|
||||
if nil != err {
|
||||
t.Fatalf("%s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func boolT(t *testing.T, cond bool, s ...string) {
|
||||
if !cond {
|
||||
what := strings.Join(s, ", ")
|
||||
if len(what) > 0 {
|
||||
what = ": " + what
|
||||
}
|
||||
t.Fatalf("assert.Bool failed%s", what)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeeking(t *testing.T) {
|
||||
partA := []byte("hello, ")
|
||||
partB := []byte("world!")
|
||||
|
||||
buf := NewSeekerBuffer(partA)
|
||||
|
||||
boolT(t, buf.Len() == len(partA), fmt.Sprintf("on init: have length %d, want length %d", buf.Len(), len(partA)))
|
||||
|
||||
b := make([]byte, 32)
|
||||
|
||||
n, err := buf.Read(b)
|
||||
noErrorT(t, err)
|
||||
boolT(t, buf.Len() == 0, fmt.Sprintf("after reading 1: have length %d, want length 0", buf.Len()))
|
||||
boolT(t, n == len(partA), fmt.Sprintf("after reading 2: have length %d, want length %d", n, len(partA)))
|
||||
|
||||
n, err = buf.Write(partB)
|
||||
noErrorT(t, err)
|
||||
boolT(t, n == len(partB), fmt.Sprintf("after writing: have length %d, want length %d", n, len(partB)))
|
||||
|
||||
n, err = buf.Read(b)
|
||||
noErrorT(t, err)
|
||||
boolT(t, buf.Len() == 0, fmt.Sprintf("after rereading 1: have length %d, want length 0", buf.Len()))
|
||||
boolT(t, n == len(partB), fmt.Sprintf("after rereading 2: have length %d, want length %d", n, len(partB)))
|
||||
|
||||
partsLen := len(partA) + len(partB)
|
||||
_ = buf.Rewind()
|
||||
boolT(t, buf.Len() == partsLen, fmt.Sprintf("after rewinding: have length %d, want length %d", buf.Len(), partsLen))
|
||||
|
||||
buf.Close()
|
||||
boolT(t, buf.Len() == 0, fmt.Sprintf("after closing, have length %d, want length 0", buf.Len()))
|
||||
}
|
@@ -489,74 +489,35 @@ func URLMap(query string) (map[string]interface{}, error) {
|
||||
return mp.(map[string]interface{}), nil
|
||||
}
|
||||
|
||||
// FlattenMap flattens a nested map into a single-level map using dot notation for nested keys.
|
||||
// In case of key conflicts, all nested levels will be discarded in favor of the first-level key.
|
||||
//
|
||||
// Example #1:
|
||||
//
|
||||
// Input:
|
||||
// {
|
||||
// "user.name": "alex",
|
||||
// "user.document.id": "document_id"
|
||||
// "user.document.number": "document_number"
|
||||
// }
|
||||
// Output:
|
||||
// {
|
||||
// "user": {
|
||||
// "name": "alex",
|
||||
// "document": {
|
||||
// "id": "document_id"
|
||||
// "number": "document_number"
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// Example #2 (with conflicts):
|
||||
//
|
||||
// Input:
|
||||
// {
|
||||
// "user": "alex",
|
||||
// "user.document.id": "document_id"
|
||||
// "user.document.number": "document_number"
|
||||
// }
|
||||
// Output:
|
||||
// {
|
||||
// "user": "alex"
|
||||
// }
|
||||
func FlattenMap(input map[string]interface{}) map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
|
||||
for k, v := range input {
|
||||
parts := strings.Split(k, ".")
|
||||
|
||||
if len(parts) == 1 {
|
||||
result[k] = v
|
||||
// FlattenMap expand key.subkey to nested map
|
||||
func FlattenMap(a map[string]interface{}) map[string]interface{} {
|
||||
// preprocess map
|
||||
nb := make(map[string]interface{}, len(a))
|
||||
for k, v := range a {
|
||||
ps := strings.Split(k, ".")
|
||||
if len(ps) == 1 {
|
||||
nb[k] = v
|
||||
continue
|
||||
}
|
||||
|
||||
current := result
|
||||
|
||||
for i, part := range parts {
|
||||
// last element in the path
|
||||
if i == len(parts)-1 {
|
||||
current[part] = v
|
||||
break
|
||||
}
|
||||
|
||||
// initialize map for current level if not exist
|
||||
if _, ok := current[part]; !ok {
|
||||
current[part] = make(map[string]interface{})
|
||||
}
|
||||
|
||||
if nested, ok := current[part].(map[string]interface{}); ok {
|
||||
current = nested // continue to the nested map
|
||||
} else {
|
||||
break // if current element is not a map, ignore it
|
||||
em := make(map[string]interface{})
|
||||
em[ps[len(ps)-1]] = v
|
||||
for i := len(ps) - 2; i > 0; i-- {
|
||||
nm := make(map[string]interface{})
|
||||
nm[ps[i]] = em
|
||||
em = nm
|
||||
}
|
||||
if vm, ok := nb[ps[0]]; ok {
|
||||
// nested map
|
||||
nm := vm.(map[string]interface{})
|
||||
for vk, vv := range em {
|
||||
nm[vk] = vv
|
||||
}
|
||||
nb[ps[0]] = nm
|
||||
} else {
|
||||
nb[ps[0]] = em
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
return nb
|
||||
}
|
||||
|
||||
/*
|
||||
|
@@ -6,7 +6,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
rutil "go.unistack.org/micro/v4/util/reflect"
|
||||
)
|
||||
|
||||
@@ -320,140 +319,3 @@ func TestIsZero(t *testing.T) {
|
||||
|
||||
// t.Logf("XX %#+v\n", ok)
|
||||
}
|
||||
|
||||
func TestFlattenMap(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]interface{}
|
||||
expected map[string]interface{}
|
||||
}{
|
||||
{
|
||||
name: "empty map",
|
||||
input: map[string]interface{}{},
|
||||
expected: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "nil map",
|
||||
input: nil,
|
||||
expected: map[string]interface{}{},
|
||||
},
|
||||
{
|
||||
name: "single level",
|
||||
input: map[string]interface{}{
|
||||
"username": "username",
|
||||
"password": "password",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"username": "username",
|
||||
"password": "password",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "two level",
|
||||
input: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user.name": "username",
|
||||
"user.password": "password",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user": map[string]interface{}{
|
||||
"name": "username",
|
||||
"password": "password",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "three level",
|
||||
input: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user.name": "username",
|
||||
"user.password": "password",
|
||||
"user.document.id": "document_id",
|
||||
"user.document.number": "document_number",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user": map[string]interface{}{
|
||||
"name": "username",
|
||||
"password": "password",
|
||||
"document": map[string]interface{}{
|
||||
"id": "document_id",
|
||||
"number": "document_number",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "four level",
|
||||
input: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user.name": "username",
|
||||
"user.password": "password",
|
||||
"user.document.id": "document_id",
|
||||
"user.document.number": "document_number",
|
||||
"user.info.permissions.read": "available",
|
||||
"user.info.permissions.write": "available",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user": map[string]interface{}{
|
||||
"name": "username",
|
||||
"password": "password",
|
||||
"document": map[string]interface{}{
|
||||
"id": "document_id",
|
||||
"number": "document_number",
|
||||
},
|
||||
"info": map[string]interface{}{
|
||||
"permissions": map[string]interface{}{
|
||||
"read": "available",
|
||||
"write": "available",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "key conflicts",
|
||||
input: map[string]interface{}{
|
||||
"user": "user",
|
||||
"user.name": "username",
|
||||
"user.password": "password",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"user": "user",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "overwriting conflicts",
|
||||
input: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user.document.id": "document_id",
|
||||
"user.document.number": "document_number",
|
||||
"user.info.address": "address",
|
||||
"user.info.phone": "phone",
|
||||
},
|
||||
expected: map[string]interface{}{
|
||||
"order_id": "order_id",
|
||||
"user": map[string]interface{}{
|
||||
"document": map[string]interface{}{
|
||||
"id": "document_id",
|
||||
"number": "document_number",
|
||||
},
|
||||
"info": map[string]interface{}{
|
||||
"address": "address",
|
||||
"phone": "phone",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
for range 100 { // need to exclude the impact of key order in the map on the test.
|
||||
require.Equal(t, tt.expected, rutil.FlattenMap(tt.input))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user