Compare commits

..

2 Commits

Author SHA1 Message Date
c5e6d4cddc broker refactor
Some checks failed
coverage / build (pull_request) Failing after 36s
lint / lint (pull_request) Failing after 2m55s
test / test (pull_request) Successful in 4m28s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-01-30 01:44:11 +03:00
c9066e0455 intermediate
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-01-29 01:47:58 +03:00
53 changed files with 667 additions and 2304 deletions

View File

@@ -3,9 +3,6 @@ name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
# Allows you to run this workflow manually from the Actions tab
@@ -25,7 +22,7 @@ jobs:
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
go-version: 'stable'
- name: test coverage
run: |
@@ -42,8 +39,8 @@ jobs:
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: false
skip_checkout: false
skip_fetch: true
skip_checkout: true
file_pattern: ./README.md
- name: push
@@ -51,4 +48,4 @@ jobs:
uses: ad-m/github-push-action@master
with:
github_token: ${{ github.token }}
branch: ${{ github.ref }}
branch: ${{ github.ref }}

View File

@@ -3,10 +3,10 @@ name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
branches:
- master
- v3
- v4
jobs:
lint:
@@ -24,6 +24,6 @@ jobs:
- name: setup deps
run: go get -v ./...
- name: run lint
uses: golangci/golangci-lint-action@v6
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@@ -3,12 +3,15 @@ name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
branches:
- master
- v3
- v4
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
branches:
- master
- v3
- v4
jobs:
test:

View File

@@ -3,12 +3,15 @@ name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
branches:
- master
- v3
- v4
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
branches:
- master
- v3
- v4
jobs:
test:
@@ -32,19 +35,19 @@ jobs:
go-version: 'stable'
- name: setup go work
env:
GOWORK: ${{ github.workspace }}/go.work
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: ${{ github.workspace }}/go.work
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: ${{ github.workspace }}/go.work
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

View File

@@ -1,57 +0,0 @@
name: sync
on:
schedule:
- cron: '*/5 * * * *'
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
sync:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: init
run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: sync master
run: |
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --track master upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream master
git push upstream master --progress
git push origin master --progress
cd ../
rm -rf repo
- name: sync v3
run: |
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v3
git push upstream v3
git push origin v3 --progress
cd ../
rm -rf repo
- name: sync v4
run: |
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v4
git push upstream v4
git push origin v4 --progress
cd ../
rm -rf repo

View File

@@ -1,5 +1,5 @@
run:
concurrency: 8
timeout: 5m
deadline: 5m
issues-exit-code: 1
tests: true

View File

@@ -1,9 +1,9 @@
# Micro
![Coverage](https://img.shields.io/badge/Coverage-45.6%25-yellow)
![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
[![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v4)](https://goreportcard.com/report/go.unistack.org/micro/v4)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
[![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3)
Micro is a standard library for microservices.

15
SECURITY.md Normal file
View File

@@ -0,0 +1,15 @@
# Security Policy
## Supported Versions
Use this section to tell people about which versions of your project are
currently being supported with security updates.
| Version | Supported |
| ------- | ------------------ |
| 3.7.x | :white_check_mark: |
| < 3.7.0 | :x: |
## Reporting a Vulnerability
If you find any issue, please create github issue in this repo

View File

@@ -21,7 +21,7 @@ var (
// ErrInvalidMessage returns when invalid Message passed
ErrInvalidMessage = errors.New("invalid message")
// ErrInvalidHandler returns when subscriber passed to Subscribe
ErrInvalidHandler = errors.New("invalid handler, ony func(Message) error and func([]Message) error supported")
ErrInvalidHandler = errors.New("invalid handler")
// DefaultGracefulTimeout
DefaultGracefulTimeout = 5 * time.Second
)

View File

@@ -42,16 +42,6 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
}
}
// SetPublishOption returns a function to setup a context with given value
func SetPublishOption(k, v interface{}) PublishOption {
return func(o *PublishOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// SetOption returns a function to setup a context with given value
func SetOption(k, v interface{}) Option {
return func(o *Options) {

View File

@@ -210,15 +210,8 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
}
case func(broker.Message) error:
for _, message := range messages {
msg, ok := message.(*memoryMessage)
if !ok {
if b.opts.Logger.V(logger.ErrorLevel) {
b.opts.Logger.Error(ctx, "broker handler error", broker.ErrInvalidMessage)
}
}
msg.topic = topic
if err = s(msg); err == nil && sub.opts.AutoAck {
err = msg.Ack()
if err = s(message); err == nil && sub.opts.AutoAck {
err = message.Ack()
}
if err != nil {
if b.opts.Logger.V(logger.ErrorLevel) {

View File

@@ -79,15 +79,11 @@ type PublishOptions struct {
// BodyOnly flag says the message contains raw body bytes and don't need
// codec Marshal method
BodyOnly bool
// Context holds custom options
Context context.Context
}
// NewPublishOptions creates PublishOptions struct
func NewPublishOptions(opts ...PublishOption) PublishOptions {
options := PublishOptions{
Context: context.Background(),
}
options := PublishOptions{}
for _, o := range opts {
o(&options)
}

View File

@@ -1,14 +1,87 @@
package broker
import (
"fmt"
"reflect"
"unicode"
"unicode/utf8"
)
const (
messageSig = "func(broker.Message) error"
messagesSig = "func([]broker.Message) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name?
func isExported(name string) bool {
r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(r)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// IsValidHandler func signature
func IsValidHandler(sub interface{}) error {
switch sub.(type) {
typ := reflect.TypeOf(sub)
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 1:
argType = typ.In(0)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), messageSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
return ErrInvalidHandler
case func(Message) error:
break
case func([]Message) error:
break
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), messageSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), messageSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}

154
changes Normal file
View File

@@ -0,0 +1,154 @@
broker/broker.go
broker/context.go
broker/context_test.go
broker/memory.go
broker/memory/memory.go
broker/memory_test.go
broker/noop.go
broker/noop_test.go
broker/options.go
broker/subscriber.go
client/backoff.go
client/backoff_test.go
client/client.go
client/client_call_options.go
client/client_call_options_test.go
client/context.go
client/context_test.go
client/noop.go
client/noop_test.go
client/options.go
codec/codec.go
codec/context.go
codec/frame.go
codec/frame.proto
codec/options.go
config/config.go
config/context.go
config/context_test.go
config/default.go
config/default_test.go
config/options.go
database/dsn.go
errors/errors.go
errors/errors.proto
errors/errors_test.go
flow/context.go
flow/context_test.go
flow/default.go
flow/flow.go
flow/flow_test.go
flow/options.go
meter/context.go
meter/context_test.go
meter/meter.go
meter/noop.go
meter/options.go
meter/wrapper/wrapper.go
micro_test.go
mtls/mtls.go
mtls/options.go
options.go
options/hooks.go
options/options.go
options/options_test.go
profiler/http/http.go
profiler/noop.go
profiler/pprof/pprof.go
profiler/profile.go
proxy/options.go
proxy/proxy.go
register/context.go
register/extractor.go
register/extractor_test.go
register/memory/memory.go
register/memory/memory_test.go
register/options.go
register/register.go
register/watcher.go
resolver/dns/dns.go
resolver/dnssrv/dnssrv.go
resolver/http/http.go
resolver/noop/noop.go
resolver/registry/registry.go
resolver/resolver.go
resolver/static/static.go
router/context.go
router/options.go
router/router.go
selector/random/random.go
selector/roundrobin/roundrobin.go
selector/selector.go
semconv/broker.go
semconv/cache.go
semconv/client.go
semconv/logger.go
semconv/metadata.go
semconv/pool.go
semconv/server.go
semconv/store.go
server/context.go
server/context_test.go
server/noop.go
server/noop_test.go
server/options.go
server/registry.go
server/server.go
server/wrapper.go
service.go
service_test.go
store/context.go
store/context_test.go
store/memory.go
store/memory/memory.go
store/memory_test.go
store/options.go
store/store.go
store/wrapper.go
sync/memory.go
sync/sync.go
tools.go
tracer/context.go
tracer/memory/memory.go
tracer/memory/memory_test.go
tracer/noop.go
tracer/options.go
tracer/tracer.go
tracer/tracer_test.go
tracer/wrapper/wrapper.go
util/addr/addr.go
util/backoff/backoff.go
util/buf/buf.go
util/buffer/buffer.go
util/buffer/buffer_test.go
util/ctx/ctx.go
util/dns/cache.go
util/dns/cache_test.go
util/dns/conn.go
util/grpc/tracer.go
util/http/clienttracer.go
util/http/http.go
util/http/trie.go
util/http/trie_test.go
util/io/redirect.go
util/io/redirect_test.go
util/io/redirect_unix.go
util/io/redirect_windows.go
util/net/net.go
util/pki/pki.go
util/rand/rand.go
util/reflect/path.go
util/reflect/reflect.go
util/reflect/reflect_test.go
util/reflect/struct.go
util/reflect/struct_test.go
util/register/util.go
util/ring/buffer.go
util/sort/sort.go
util/sort/sort_test.go
util/stream/stream.go
util/structfs/metadata_digitalocean.go
util/structfs/metadata_ec2.go
util/structfs/structfs.go
util/structfs/structfs_test.go
util/test/test.go

View File

@@ -17,7 +17,7 @@ syntax = "proto3";
package micro.errors;
option cc_enable_arenas = true;
option go_package = "go.unistack.org/micro/v4/errors;errors";
option go_package = "go.unistack.org/micro/v3/errors;errors";
option java_multiple_files = true;
option java_outer_classname = "MicroErrors";
option java_package = "micro.errors";

View File

@@ -15,6 +15,15 @@ func FromContext(ctx context.Context) (Flow, bool) {
return c, ok
}
// MustContext returns Flow from context
func MustContext(ctx context.Context) Flow {
f, ok := FromContext(ctx)
if !ok {
panic("missing flow")
}
return f
}
// NewContext stores Flow to context
func NewContext(ctx context.Context, f Flow) context.Context {
if ctx == nil {

View File

@@ -1,5 +1,3 @@
//go:build ignore
package flow
import (

View File

@@ -1,16 +1,14 @@
//go:build ignore
package flow
import (
"context"
"fmt"
"path/filepath"
"sync"
"github.com/heimdalr/dag"
"github.com/silas/dag"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/store"
"go.unistack.org/micro/v4/util/id"
@@ -22,7 +20,7 @@ type microFlow struct {
type microWorkflow struct {
opts Options
g *dag.DAG
g *dag.AcyclicGraph
steps map[string]Step
id string
status Status
@@ -34,20 +32,20 @@ func (w *microWorkflow) ID() string {
return w.id
}
func (w *microWorkflow) Steps() ([][]Step, error) {
return w.getSteps("", false)
}
func (w *microWorkflow) Status() Status {
return w.status
}
func (w *microWorkflow) AppendSteps(steps ...Step) error {
var err error
w.Lock()
defer w.Unlock()
for _, s := range steps {
w.steps[s.String()] = s
if _, err = w.g.AddVertex(s); err != nil {
return err
}
w.g.Add(s)
}
for _, dst := range steps {
@@ -56,13 +54,18 @@ func (w *microWorkflow) AppendSteps(steps ...Step) error {
if !ok {
return ErrStepNotExists
}
if err = w.g.AddEdge(src.String(), dst.String()); err != nil {
return err
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
w.g.ReduceTransitively()
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil
}
@@ -71,11 +74,10 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
// TODO: handle case when some step requires or required by removed step
w.Lock()
defer w.Unlock()
for _, s := range steps {
delete(w.steps, s.String())
w.g.DeleteVertex(s.String())
w.g.Remove(s)
}
for _, dst := range steps {
@@ -84,34 +86,91 @@ func (w *microWorkflow) RemoveSteps(steps ...Step) error {
if !ok {
return ErrStepNotExists
}
w.g.AddEdge(src.String(), dst.String())
w.g.Connect(dag.BasicEdge(src, dst))
}
}
w.g.ReduceTransitively()
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil
}
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
var steps [][]Step
var root dag.Vertex
var err error
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
if start != "" {
var ok bool
w.RLock()
root, ok = w.steps[start]
w.RUnlock()
if !ok {
return nil, ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return nil, err
}
}
if reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil {
return nil, err
}
return steps, nil
}
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}
func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}
func (w *microWorkflow) Resume(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
w.Lock()
if !w.init {
w.g.ReduceTransitively()
if err := w.g.Validate(); err != nil {
w.Unlock()
return "", err
}
w.g.TransitiveReduction()
w.init = true
}
w.Unlock()
@@ -121,11 +180,26 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
return "", err
}
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid)
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid)
options := NewExecuteOptions(opts...)
steps, err := w.getSteps(options.Start, options.Reverse)
if err != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
}
return "", err
}
var wg sync.WaitGroup
cherr := make(chan error, 1)
chstatus := make(chan Status, 1)
nctx, cancel := context.WithCancel(ctx)
defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts,
@@ -135,274 +209,143 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
ExecuteMeter(w.opts.Meter),
)
nopts = append(nopts, opts...)
done := make(chan struct{})
if werr := workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
w.opts.Logger.Error(ctx, "store error: %v", werr)
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
return eid, werr
}
var startID string
if options.Start == "" {
mp := w.g.GetRoots()
if len(mp) != 1 {
return eid, ErrStepNotExists
}
for k := range mp {
startID = k
}
} else {
for k, v := range w.g.GetVertices() {
if v == options.Start {
startID = k
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr
}
}
}
if startID == "" {
return eid, ErrStepNotExists
}
if options.Async {
go w.handleWorkflow(startID, nopts...)
return eid, nil
}
return eid, w.handleWorkflow(startID, nopts...)
}
func (w *microWorkflow) handleWorkflow(startID string, opts ...ExecuteOption) error {
w.RLock()
defer w.RUnlock()
// stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
// workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
// Get IDs of all descendant vertices.
flowIDs, errDes := w.g.GetDescendants(startID)
if errDes != nil {
return errDes
}
// inputChannels provides for input channels for each of the descendant vertices (+ the start-vertex).
inputChannels := make(map[string]chan FlowResult, len(flowIDs)+1)
// Iterate vertex IDs and create an input channel for each of them and a single
// output channel for leaves. Note, this "pre-flight" is needed to ensure we
// really have an input channel regardless of how we traverse the tree and spawn
// workers.
leafCount := 0
for id := range flowIDs {
// Get all parents of this vertex.
parents, errPar := w.g.GetParents(id)
if errPar != nil {
return errPar
}
// Create a buffered input channel that has capacity for all parent results.
inputChannels[id] = make(chan FlowResult, len(parents))
if ok, err := w.g.IsLeaf(id); ok && err == nil {
leafCount += 1
}
}
// outputChannel caries the results of leaf vertices.
outputChannel := make(chan FlowResult, leafCount)
// To also process the start vertex and to have its results being passed to its
// children, add it to the vertex IDs. Also add an input channel for the start
// vertex and feed the inputs to this channel.
flowIDs[startID] = struct{}{}
inputChannels[startID] = make(chan FlowResult, len(inputs))
for _, i := range inputs {
inputChannels[startID] <- i
}
wg := sync.WaitGroup{}
// Iterate all vertex IDs (now incl. start vertex) and handle each worker (incl.
// inputs and outputs) in a separate goroutine.
for id := range flowIDs {
// Get all children of this vertex that later need to be notified. Note, we
// collect all children before the goroutine to be able to release the read
// lock as early as possible.
children, errChildren := w.g.GetChildren(id)
if errChildren != nil {
return errChildren
}
// Remember to wait for this goroutine.
wg.Add(1)
go func(id string) {
// Get this vertex's input channel.
// Note, only concurrent read here, which is fine.
c := inputChannels[id]
// Await all parent inputs and stuff them into a slice.
parentCount := cap(c)
parentResults := make([]FlowResult, parentCount)
for i := 0; i < parentCount; i++ {
parentResults[i] = <-c
}
// Execute the worker.
errWorker := callback(w.g, id, parentResults)
if errWorker != nil {
return errWorker
}
// Send this worker's FlowResult onto all children's input channels or, if it is
// a leaf (i.e. no children), send the result onto the output channel.
if len(children) > 0 {
for child := range children {
inputChannels[child] <- flowResult
go func() {
for idx := range steps {
for nidx := range steps[idx] {
wStatus := &codec.Frame{}
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
cherr <- werr
return
}
} else {
outputChannel <- flowResult
}
// "Sign off".
wg.Done()
}(id)
}
// Wait for all go routines to finish.
wg.Wait()
// Await all leaf vertex results and stuff them into a slice.
resultCount := cap(outputChannel)
results := make([]FlowResult, resultCount)
for i := 0; i < resultCount; i++ {
results[i] = <-outputChannel
}
/*
go func() {
for idx := range steps {
for nidx := range steps[idx] {
wStatus := &codec.Frame{}
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
cherr <- werr
return
}
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
chstatus <- status
return
}
if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
}
cstep := steps[idx][nidx]
// nolint: nestif
if len(cstep.Requires()) == 0 {
wg.Add(1)
go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
chstatus <- status
return
}
if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Trace(nctx, fmt.Sprintf("will be executed %v", steps[idx][nidx]))
}
cstep := steps[idx][nidx]
// nolint: nestif
if len(cstep.Requires()) == 0 {
wg.Add(1)
go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Error(ctx, "store write error", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Error(ctx, "store write error", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Error(ctx, "store write error", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Error(ctx, "store write error", werr)
cherr <- werr
return
}
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Error(ctx, "store write error", werr)
}
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Error(ctx, "store write error", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Error(ctx, "store write error", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}
}
}
close(done)
}()
if options.Async {
return eid, nil
}
close(done)
}()
logger.Tracef(ctx, "wait for finish or error")
select {
case <-nctx.Done():
err = nctx.Err()
case cerr := <-cherr:
err = cerr
case <-done:
close(cherr)
case <-chstatus:
close(chstatus)
return eid, nil
}
if options.Async {
return eid, nil
}
switch {
case nctx.Err() != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
logger.DefaultLogger.Trace(ctx, "wait for finish or error")
select {
case <-nctx.Done():
err = nctx.Err()
case cerr := <-cherr:
err = cerr
case <-done:
close(cherr)
case <-chstatus:
close(chstatus)
return eid, nil
}
switch {
case nctx.Err() != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
}
*/
return err
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
}
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
w.opts.Logger.Error(w.opts.Context, "store write error", werr)
}
}
return eid, err
}
// NewFlow create new flow
@@ -442,11 +385,11 @@ func (f *microFlow) WorkflowList(ctx context.Context) ([]Workflow, error) {
}
func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) {
w := &microWorkflow{opts: f.opts, id: id, g: &dag.DAG{}, steps: make(map[string]Step, len(steps))}
w := &microWorkflow{opts: f.opts, id: id, g: &dag.AcyclicGraph{}, steps: make(map[string]Step, len(steps))}
for _, s := range steps {
w.steps[s.String()] = s
w.g.AddVertex(s)
w.g.Add(s)
}
for _, dst := range steps {
@@ -455,11 +398,14 @@ func (f *microFlow) WorkflowCreate(ctx context.Context, id string, steps ...Step
if !ok {
return nil, ErrStepNotExists
}
w.g.AddEdge(src.String(), dst.String())
w.g.Connect(dag.BasicEdge(src, dst))
}
}
w.g.ReduceTransitively()
if err := w.g.Validate(); err != nil {
return nil, err
}
w.g.TransitiveReduction()
w.init = true

View File

@@ -1,5 +1,5 @@
// Package flow is an interface used for saga pattern microservice workflow
package flow // import "go.unistack.org/micro/v4/flow"
package flow
import (
"context"
@@ -125,6 +125,8 @@ type Workflow interface {
AppendSteps(steps ...Step) error
// Status returns workflow status
Status() Status
// Steps returns steps slice where parallel steps returned on the same level
Steps() ([][]Step, error)
// Suspend suspends execution
Suspend(ctx context.Context, id string) error
// Resume resumes execution

View File

@@ -123,6 +123,8 @@ type ExecuteOptions struct {
Start string
// Timeout for execution
Timeout time.Duration
// Reverse execution
Reverse bool
// Async enables async execution
Async bool
}
@@ -165,6 +167,13 @@ func ExecuteContext(ctx context.Context) ExecuteOption {
}
}
// ExecuteReverse says that dag must be run in reverse order
func ExecuteReverse(b bool) ExecuteOption {
return func(o *ExecuteOptions) {
o.Reverse = b
}
}
// ExecuteTimeout pass timeout time.Duration for execution
func ExecuteTimeout(td time.Duration) ExecuteOption {
return func(o *ExecuteOptions) {

5
go.mod
View File

@@ -11,9 +11,6 @@ require (
github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
github.com/spf13/cast v1.7.1
github.com/stretchr/testify v1.10.0
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v4 v4.1.0
golang.org/x/sync v0.10.0
@@ -24,9 +21,11 @@ require (
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/stretchr/testify v1.10.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect

9
go.sum
View File

@@ -6,10 +6,9 @@ github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlD
github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -30,20 +29,18 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=

View File

@@ -1,117 +0,0 @@
package metadata
import (
"context"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
)
type wrapper struct {
keys []string
client.Client
}
func NewClientWrapper(keys ...string) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
Client: c,
keys: keys,
}
return handler
}
}
func NewClientCallWrapper(keys ...string) client.CallWrapper {
return func(fn client.CallFunc) client.CallFunc {
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
if keys == nil {
return fn(ctx, addr, req, rsp, opts)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range keys {
if v := imd.Get(k); v != nil {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return fn(ctx, addr, req, rsp, opts)
}
}
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if w.keys == nil {
return w.Client.Call(ctx, req, rsp, opts...)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range w.keys {
if v := imd.Get(k); v != nil {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return w.Client.Call(ctx, req, rsp, opts...)
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
if w.keys == nil {
return w.Client.Stream(ctx, req, opts...)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range w.keys {
if v := imd.Get(k); v != nil {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return w.Client.Stream(ctx, req, opts...)
}
func NewServerHandlerWrapper(keys ...string) server.HandlerWrapper {
return func(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
if keys == nil {
return fn(ctx, req, rsp)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range keys {
if v := imd.Get(k); v != nil {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return fn(ctx, req, rsp)
}
}
}

View File

@@ -1,63 +0,0 @@
package recovery
import (
"context"
"fmt"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/server"
)
func NewOptions(opts ...Option) Options {
options := Options{
ServerHandlerFn: DefaultServerHandlerFn,
}
for _, o := range opts {
o(&options)
}
return options
}
type Options struct {
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
}
type Option func(*Options)
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
return func(o *Options) {
o.ServerHandlerFn = fn
}
}
var DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
return errors.BadRequest("", "%v", err)
}
var Hook = NewHook()
type hook struct {
opts Options
}
func NewHook(opts ...Option) *hook {
return &hook{opts: NewOptions(opts...)}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
defer func() {
r := recover()
switch verr := r.(type) {
case nil:
return
case error:
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
default:
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
}
}()
err = next(ctx, req, rsp)
return err
}
}

View File

@@ -1,112 +0,0 @@
package requestid
import (
"context"
"net/textproto"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v4/util/id"
)
type XRequestIDKey struct{}
// DefaultMetadataKey contains metadata key
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
var xid string
cid, cok := ctx.Value(XRequestIDKey{}).(string)
if cok && cid != "" {
xid = cid
}
imd, iok := metadata.FromIncomingContext(ctx)
if !iok || imd == nil {
imd = metadata.New(1)
ctx = metadata.NewIncomingContext(ctx, imd)
}
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(1)
ctx = metadata.NewOutgoingContext(ctx, omd)
}
if xid == "" {
var ids []string
for i := range imd.Get(DefaultMetadataKey) {
if ids[i] != "" {
xid = ids[i]
}
}
for i := range omd.Get(DefaultMetadataKey) {
if ids[i] != "" {
xid = ids[i]
}
}
}
if xid == "" {
var err error
xid, err = id.New()
if err != nil {
return ctx, err
}
}
if !cok {
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
}
if !iok {
imd.Set(DefaultMetadataKey, xid)
}
if !ook {
omd.Set(DefaultMetadataKey, xid)
}
return ctx, nil
}
type hook struct{}
func NewHook() *hook {
return &hook{}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, req, rsp)
}
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, req, rsp, opts...)
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return nil, err
}
return next(ctx, req, opts...)
}
}

View File

@@ -1,34 +0,0 @@
package requestid
import (
"context"
"slices"
"testing"
"go.unistack.org/micro/v4/metadata"
)
func TestDefaultMetadataFunc(t *testing.T) {
ctx := context.TODO()
nctx, err := DefaultMetadataFunc(ctx)
if err != nil {
t.Fatalf("%v", err)
}
imd, ok := metadata.FromIncomingContext(nctx)
if !ok {
t.Fatalf("md missing in incoming context")
}
omd, ok := metadata.FromOutgoingContext(nctx)
if !ok {
t.Fatalf("md missing in outgoing context")
}
iv := imd.Get(DefaultMetadataKey)
ov := omd.Get(DefaultMetadataKey)
if !slices.Equal(iv, ov) {
t.Fatalf("missing metadata key value %v != %v", iv, ov)
}
}

View File

@@ -1,133 +0,0 @@
package validator
import (
"context"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/server"
)
var (
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
if rsp != nil {
return errors.BadGateway(req.Service(), "%v", err)
}
return errors.BadRequest(req.Service(), "%v", err)
}
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
if rsp != nil {
return errors.BadGateway(req.Service(), "%v", err)
}
return errors.BadRequest(req.Service(), "%v", err)
}
)
type (
ClientErrorFunc func(client.Request, interface{}, error) error
ServerErrorFunc func(server.Request, interface{}, error) error
)
// Options struct holds wrapper options
type Options struct {
ClientErrorFn ClientErrorFunc
ServerErrorFn ServerErrorFunc
ClientValidateResponse bool
ServerValidateResponse bool
}
// Option func signature
type Option func(*Options)
func ClientValidateResponse(b bool) Option {
return func(o *Options) {
o.ClientValidateResponse = b
}
}
func ServerValidateResponse(b bool) Option {
return func(o *Options) {
o.ClientValidateResponse = b
}
}
func ClientReqErrorFn(fn ClientErrorFunc) Option {
return func(o *Options) {
o.ClientErrorFn = fn
}
}
func ServerErrorFn(fn ServerErrorFunc) Option {
return func(o *Options) {
o.ServerErrorFn = fn
}
}
func NewOptions(opts ...Option) Options {
options := Options{
ClientErrorFn: DefaultClientErrorFunc,
ServerErrorFn: DefaultServerErrorFunc,
}
for _, o := range opts {
o(&options)
}
return options
}
func NewHook(opts ...Option) *hook {
return &hook{opts: NewOptions(opts...)}
}
type validator interface {
Validate() error
}
type hook struct {
opts Options
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.ClientErrorFn(req, nil, err)
}
}
err := next(ctx, req, rsp, opts...)
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
if verr := v.Validate(); verr != nil {
return w.opts.ClientErrorFn(req, rsp, verr)
}
}
return err
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return nil, w.opts.ClientErrorFn(req, nil, err)
}
}
return next(ctx, req, opts...)
}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.ServerErrorFn(req, nil, err)
}
}
err := next(ctx, req, rsp)
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
if verr := v.Validate(); verr != nil {
return w.opts.ServerErrorFn(req, rsp, verr)
}
}
return err
}
}

View File

@@ -99,7 +99,6 @@ func WithAddFields(fields ...interface{}) Option {
iv, iok := o.Fields[i].(string)
jv, jok := fields[j].(string)
if iok && jok && iv == jv {
o.Fields[i+1] = fields[j+1]
fields = slices.Delete(fields, j, j+2)
}
}

View File

@@ -278,7 +278,7 @@ func (s *slogLogger) printLog(ctx context.Context, lvl logger.Level, msg string,
}
}
if s.opts.AddStacktrace && (lvl == logger.FatalLevel || lvl == logger.ErrorLevel) {
if (s.opts.AddStacktrace || lvl == logger.FatalLevel) || (s.opts.AddStacktrace && lvl == logger.ErrorLevel) {
stackInfo := make([]byte, 1024*1024)
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)

View File

@@ -21,7 +21,7 @@ import (
func TestStacktrace(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.DebugLevel), logger.WithOutput(buf),
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf),
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
)
@@ -124,7 +124,7 @@ func TestWithDedupKeysWithAddFields(t *testing.T) {
l.Info(ctx, "msg3")
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val4 key2=val3`)) {
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val1 key2=val2`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
@@ -406,7 +406,7 @@ func TestLogger(t *testing.T) {
func Test_WithContextAttrFunc(t *testing.T) {
loggerContextAttrFuncs := []logger.ContextAttrFunc{
func(ctx context.Context) []interface{} {
md, ok := metadata.FromOutgoingContext(ctx)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil
}
@@ -425,7 +425,7 @@ func Test_WithContextAttrFunc(t *testing.T) {
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs, loggerContextAttrFuncs...)
ctx := context.TODO()
ctx = metadata.AppendOutgoingContext(ctx, "X-Request-Id", uuid.New().String(),
ctx = metadata.AppendIncomingContext(ctx, "X-Request-Id", uuid.New().String(),
"Source-Service", "Test-System")
buf := bytes.NewBuffer(nil)
@@ -445,9 +445,9 @@ func Test_WithContextAttrFunc(t *testing.T) {
t.Fatalf("logger info, buf %s", buf.Bytes())
}
buf.Reset()
omd, _ := metadata.FromOutgoingContext(ctx)
imd, _ := metadata.FromIncomingContext(ctx)
l.Info(ctx, "test message1")
omd.Set("Source-Service", "Test-System2")
imd.Set("Source-Service", "Test-System2")
l.Info(ctx, "test message2")
// t.Logf("xxx %s", buf.Bytes())

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net/textproto"
"sort"
"strings"
)
@@ -69,15 +68,6 @@ func (md Metadata) Copy() Metadata {
return out
}
// AsMap returns a copy of Metadata with map[string]string.
func (md Metadata) AsMap() map[string]string {
out := make(map[string]string, len(md))
for k, v := range md {
out[k] = strings.Join(v, ",")
}
return out
}
// AsHTTP1 returns a copy of Metadata
// with CanonicalMIMEHeaderKey.
func (md Metadata) AsHTTP1() map[string][]string {
@@ -106,7 +96,16 @@ func (md Metadata) CopyTo(out Metadata) {
}
// Get obtains the values for a given key.
func (md Metadata) Get(k string) []string {
func (md Metadata) MustGet(k string) []string {
v, ok := md.Get(k)
if !ok {
panic("missing metadata key")
}
return v
}
// Get obtains the values for a given key.
func (md Metadata) Get(k string) ([]string, bool) {
v, ok := md[k]
if !ok {
v, ok = md[strings.ToLower(k)]
@@ -114,13 +113,27 @@ func (md Metadata) Get(k string) []string {
if !ok {
v, ok = md[textproto.CanonicalMIMEHeaderKey(k)]
}
return v, ok
}
// MustGetJoined obtains the values for a given key
// with joined values with "," symbol
func (md Metadata) MustGetJoined(k string) string {
v, ok := md.GetJoined(k)
if !ok {
panic("missing metadata key")
}
return v
}
// GetJoined obtains the values for a given key
// with joined values with "," symbol
func (md Metadata) GetJoined(k string) string {
return strings.Join(md.Get(k), ",")
func (md Metadata) GetJoined(k string) (string, bool) {
v, ok := md.Get(k)
if !ok {
return "", ok
}
return strings.Join(v, ","), true
}
// Set sets the value of a given key with a slice of values.
@@ -218,6 +231,24 @@ func AppendContext(ctx context.Context, kv ...string) context.Context {
return context.WithValue(ctx, metadataCurrentKey{}, rawMetadata{md: md.md, added: added})
}
// AppendIncomingContext returns a new context with the provided kv merged
// with any existing metadata in the context. Please refer to the documentation
// of Pairs for a description of kv.
func AppendIncomingContext(ctx context.Context, kv ...string) context.Context {
if len(kv)%2 == 1 {
panic(fmt.Sprintf("metadata: AppendIncomingContext got an odd number of input pairs for metadata: %d", len(kv)))
}
md, _ := ctx.Value(metadataIncomingKey{}).(rawMetadata)
added := make([][]string, len(md.added)+1)
copy(added, md.added)
kvCopy := make([]string, 0, len(kv))
for i := 0; i < len(kv); i += 2 {
kvCopy = append(kvCopy, strings.ToLower(kv[i]), kv[i+1])
}
added[len(added)-1] = kvCopy
return context.WithValue(ctx, metadataIncomingKey{}, rawMetadata{md: md.md, added: added})
}
// AppendOutgoingContext returns a new context with the provided kv merged
// with any existing metadata in the context. Please refer to the documentation
// of Pairs for a description of kv.
@@ -433,29 +464,27 @@ type Iterator struct {
cnt int
}
/*
// Next advance iterator to next element
func (iter *Iterator) Next(k *string, v *[]string) bool {
func (iter *Iterator) Next(k, v *string) bool {
if iter.cur+1 > iter.cnt {
return false
}
if k != nil && v != nil {
*k = iter.keys[iter.cur]
vv := iter.md[*k]
*v = make([]string, len(vv))
copy(*v, vv)
iter.cur++
}
*k = iter.keys[iter.cur]
*v = iter.Metadata[*k]
iter.cur++
return true
}
// Iterator returns the itarator for metadata in sorted order
func (md Metadata) Iterator() *Iterator {
iter := &Iterator{md: md, cnt: len(md)}
func (Metadata Metadata) Iterator() *Iterator {
iter := &Iterator{Metadata: Metadata, cnt: len(Metadata)}
iter.keys = make([]string, 0, iter.cnt)
for k := range md {
for k := range Metadata {
iter.keys = append(iter.keys, k)
}
sort.Strings(iter.keys)
return iter
}
*/

View File

@@ -19,8 +19,8 @@ func TestAppendOutgoingContextModify(t *testing.T) {
func TestLowercase(t *testing.T) {
md := New(1)
md["x-request-id"] = []string{"12345"}
v := md.GetJoined("X-Request-Id")
if v == "" {
v, ok := md.GetJoined("X-Request-Id")
if !ok || v == "" {
t.Fatalf("metadata invalid %#+v", md)
}
}
@@ -51,17 +51,29 @@ func TestMetadataSetMultiple(t *testing.T) {
md := New(4)
md.Set("key1", "val1", "key2", "val2")
if v := md.GetJoined("key1"); v != "val1" {
if v, ok := md.GetJoined("key1"); !ok || v != "val1" {
t.Fatalf("invalid kv %#+v", md)
}
if v := md.GetJoined("key2"); v != "val2" {
if v, ok := md.GetJoined("key2"); !ok || v != "val2" {
t.Fatalf("invalid kv %#+v", md)
}
}
func TestAppend(t *testing.T) {
ctx := context.Background()
ctx = AppendIncomingContext(ctx, "key1", "val1", "key2", "val2")
md, ok := FromIncomingContext(ctx)
if !ok {
t.Fatal("metadata empty")
}
if _, ok := md.Get("key1"); !ok {
t.Fatal("key1 not found")
}
}
func TestPairs(t *testing.T) {
md := Pairs("key1", "val1", "key2", "val2")
if v := md.Get("key1"); v == nil {
if _, ok := md.Get("key1"); !ok {
t.Fatal("key1 not found")
}
}
@@ -85,51 +97,46 @@ func TestPassing(t *testing.T) {
if !ok {
t.Fatalf("missing metadata from outgoing context")
}
if v := md.Get("Key1"); v == nil || v[0] != "Val1" {
if v, ok := md.Get("Key1"); !ok || v[0] != "Val1" {
t.Fatalf("invalid metadata value %#+v", md)
}
}
func TestIterator(t *testing.T) {
md := Pairs(
"1Last", "last",
"2First", "first",
"3Second", "second",
)
/*
func TestIterator(_ *testing.T) {
md := Metadata{
"1Last": "last",
"2First": "first",
"3Second": "second",
}
iter := md.Iterator()
var k string
var v []string
chk := New(3)
for iter.Next(&k, &v) {
chk[k] = v
}
var k, v string
for k, v := range chk {
if cv, ok := md[k]; !ok || len(cv) != len(v) || cv[0] != v[0] {
t.Fatalf("XXXX %#+v %#+v", chk, md)
}
for iter.Next(&k, &v) {
// fmt.Printf("k: %s, v: %s\n", k, v)
}
}
*/
func TestMedataCanonicalKey(t *testing.T) {
md := New(1)
md.Set("x-request-id", "12345")
v := md.GetJoined("x-request-id")
if v == "" {
v, ok := md.GetJoined("x-request-id")
if !ok {
t.Fatalf("failed to get x-request-id")
} else if v != "12345" {
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
}
v = md.GetJoined("X-Request-Id")
if v == "" {
v, ok = md.GetJoined("X-Request-Id")
if !ok {
t.Fatalf("failed to get x-request-id")
} else if v != "12345" {
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
}
v = md.GetJoined("X-Request-ID")
if v == "" {
v, ok = md.GetJoined("X-Request-ID")
if !ok {
t.Fatalf("failed to get x-request-id")
} else if v != "12345" {
t.Fatalf("invalid metadata value: %s != %s", "12345", v)
@@ -141,8 +148,8 @@ func TestMetadataSet(t *testing.T) {
md.Set("Key", "val")
val := md.GetJoined("Key")
if val == "" {
val, ok := md.GetJoined("Key")
if !ok {
t.Fatal("key Key not found")
}
if val != "val" {
@@ -157,8 +164,8 @@ func TestMetadataDelete(t *testing.T) {
}
md.Del("Baz")
v := md.Get("Baz")
if v != nil {
_, ok := md.Get("Baz")
if ok {
t.Fatal("key Baz not deleted")
}
}
@@ -257,6 +264,20 @@ func TestNewOutgoingContext(t *testing.T) {
}
}
func TestAppendIncomingContext(t *testing.T) {
md := New(1)
md.Set("key1", "val1")
ctx := AppendIncomingContext(context.TODO(), "key2", "val2")
nmd, ok := FromIncomingContext(ctx)
if nmd == nil || !ok {
t.Fatal("AppendIncomingContext not works")
}
if v, ok := nmd.GetJoined("key2"); !ok || v != "val2" {
t.Fatal("AppendIncomingContext not works")
}
}
func TestAppendOutgoingContext(t *testing.T) {
md := New(1)
md.Set("key1", "val1")
@@ -266,7 +287,7 @@ func TestAppendOutgoingContext(t *testing.T) {
if nmd == nil || !ok {
t.Fatal("AppendOutgoingContext not works")
}
if v := nmd.GetJoined("key2"); v != "val2" {
if v, ok := nmd.GetJoined("key2"); !ok || v != "val2" {
t.Fatal("AppendOutgoingContext not works")
}
}

View File

@@ -1,222 +0,0 @@
package options
import (
"reflect"
"strings"
"time"
"github.com/spf13/cast"
mreflect "go.unistack.org/micro/v4/util/reflect"
)
// Options interface must be used by all options
type Validator interface {
// Validate returns nil, if all options are correct,
// otherwise returns an error explaining the mistake
Validate() error
}
// Option func signature
type Option func(interface{}) error
// Apply assign options to struct src
func Apply(src interface{}, opts ...Option) error {
for _, opt := range opts {
if err := opt(src); err != nil {
return err
}
}
return nil
}
// SetValueByPath set src struct field to val dst via path
func SetValueByPath(src interface{}, dst interface{}, path string) error {
var err error
switch v := dst.(type) {
case []interface{}:
if len(v) == 1 {
dst = v[0]
}
}
var sv reflect.Value
switch t := src.(type) {
case reflect.Value:
sv = t
default:
sv = reflect.ValueOf(src)
}
parts := strings.Split(path, ".")
for _, p := range parts {
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return mreflect.ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
/*
if len(fld.PkgPath) != 0 {
continue
}
*/
if fld.Anonymous {
if len(parts) == 1 && val.Kind() == reflect.Struct {
if err = SetValueByPath(val, dst, p); err != nil {
return err
}
}
}
if fld.Name != p && !strings.EqualFold(strings.ToLower(fld.Name), strings.ToLower(p)) {
continue
}
switch val.Interface().(type) {
case []time.Duration:
dst, err = cast.ToDurationSliceE(dst)
if err != nil {
return err
}
reflect.Copy(val, reflect.ValueOf(dst))
return nil
case time.Duration:
dst, err = cast.ToDurationE(dst)
if err != nil {
return err
}
val.Set(reflect.ValueOf(dst))
return nil
case time.Time:
dst, err = cast.ToTimeE(dst)
if err != nil {
return err
}
val.Set(reflect.ValueOf(dst))
return nil
}
switch val.Kind() {
case reflect.Map:
if val.IsZero() {
val.Set(reflect.MakeMap(val.Type()))
}
return setMap(val.Interface(), dst)
case reflect.Array, reflect.Slice:
switch val.Type().Elem().Kind() {
case reflect.Bool:
dst, err = cast.ToBoolSliceE(dst)
case reflect.String:
dst, err = cast.ToStringSliceE(dst)
case reflect.Float32:
dst, err = toFloat32SliceE(dst)
case reflect.Float64:
dst, err = toFloat64SliceE(dst)
case reflect.Int8:
dst, err = toInt8SliceE(dst)
case reflect.Int:
dst, err = cast.ToIntSliceE(dst)
case reflect.Int16:
dst, err = toInt16SliceE(dst)
case reflect.Int32:
dst, err = toInt32SliceE(dst)
case reflect.Int64:
dst, err = toInt64SliceE(dst)
case reflect.Uint8:
dst, err = toUint8SliceE(dst)
case reflect.Uint:
dst, err = toUintSliceE(dst)
case reflect.Uint16:
dst, err = toUint16SliceE(dst)
case reflect.Uint32:
dst, err = toUint32SliceE(dst)
case reflect.Uint64:
dst, err = toUint64SliceE(dst)
}
if err != nil {
return err
}
if val.Kind() == reflect.Slice {
val.Set(reflect.ValueOf(dst))
} else {
reflect.Copy(val, reflect.ValueOf(dst))
}
return nil
case reflect.Float32:
dst, err = toFloat32SliceE(dst)
case reflect.Float64:
dst, err = toFloat64SliceE(dst)
case reflect.Bool:
dst, err = cast.ToBoolE(dst)
case reflect.String:
dst, err = cast.ToStringE(dst)
case reflect.Int8:
dst, err = cast.ToInt8E(dst)
case reflect.Int:
dst, err = cast.ToIntE(dst)
case reflect.Int16:
dst, err = cast.ToInt16E(dst)
case reflect.Int32:
dst, err = cast.ToInt32E(dst)
case reflect.Int64:
dst, err = cast.ToInt64E(dst)
case reflect.Uint8:
dst, err = cast.ToUint8E(dst)
case reflect.Uint:
dst, err = cast.ToUintE(dst)
case reflect.Uint16:
dst, err = cast.ToUint16E(dst)
case reflect.Uint32:
dst, err = cast.ToUint32E(dst)
case reflect.Uint64:
dst, err = cast.ToUint64E(dst)
default:
}
if err != nil {
return err
}
val.Set(reflect.ValueOf(dst))
}
}
return nil
}
// NewOption create new option with name
func NewOption(name string) func(...interface{}) Option {
return func(dst ...interface{}) Option {
return func(src interface{}) error {
return SetValueByPath(src, dst, name)
}
}
}
var (
Address = NewOption("Address")
Name = NewOption("Name")
Broker = NewOption("Broker")
Logger = NewOption("Logger")
Meter = NewOption("Meter")
Tracer = NewOption("Tracer")
Store = NewOption("Store")
Register = NewOption("Register")
Router = NewOption("Router")
Codec = NewOption("Codec")
Codecs = NewOption("Codecs")
Client = NewOption("Client")
Context = NewOption("Context")
TLSConfig = NewOption("TLSConfig")
Metadata = NewOption("Metadata")
Timeout = NewOption("Timeout")
)

View File

@@ -1,181 +0,0 @@
package options_test
import (
"crypto/tls"
"sync"
"testing"
"go.unistack.org/micro/v4/options"
)
type codec interface {
Marshal(v interface{}, opts ...options.Option) ([]byte, error)
Unmarshal(b []byte, v interface{}, opts ...options.Option) error
String() string
}
func TestCodecs(t *testing.T) {
type s struct {
Codecs map[string]codec
}
wg := &sync.WaitGroup{}
tc := &tls.Config{InsecureSkipVerify: true}
opts := []options.Option{
options.NewOption("Codecs")(wg),
options.NewOption("TLSConfig")(tc),
}
src := &s{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
}
func TestSpecial(t *testing.T) {
type s struct {
Wait *sync.WaitGroup
TLSConfig *tls.Config
}
wg := &sync.WaitGroup{}
tc := &tls.Config{InsecureSkipVerify: true}
opts := []options.Option{
options.NewOption("Wait")(wg),
options.NewOption("TLSConfig")(tc),
}
src := &s{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
if src.Wait == nil {
t.Fatalf("failed to set Wait %#+v", src)
}
if src.TLSConfig == nil {
t.Fatalf("failed to set TLSConfig %#+v", src)
}
if src.TLSConfig.InsecureSkipVerify != true {
t.Fatalf("failed to set TLSConfig %#+v", src)
}
}
func TestNested(t *testing.T) {
type server struct {
Address []string
}
type ownserver struct {
server
OwnField string
}
opts := []options.Option{
options.Address("host:port"),
options.NewOption("OwnField")("fieldval"),
}
src := &ownserver{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
if src.Address[0] != "host:port" {
t.Fatalf("failed to set Address %#+v", src)
}
if src.OwnField != "fieldval" {
t.Fatalf("failed to set OwnField %#+v", src)
}
}
func TestAddress(t *testing.T) {
type s struct {
Address []string
}
opts := []options.Option{options.Address("host:port")}
src := &s{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
if src.Address[0] != "host:port" {
t.Fatalf("failed to set Address %#+v", src)
}
}
func TestNewOption(t *testing.T) {
type s struct {
Address []string
}
opts := []options.Option{options.NewOption("Address")("host1:port1", "host2:port2")}
src := &s{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
if src.Address[0] != "host1:port1" {
t.Fatalf("failed to set Address %#+v", src)
}
if src.Address[1] != "host2:port2" {
t.Fatalf("failed to set Address %#+v", src)
}
}
func TestArray(t *testing.T) {
type s struct {
Address [1]string
}
opts := []options.Option{options.NewOption("Address")("host:port", "host1:port1")}
src := &s{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
if src.Address[0] != "host:port" {
t.Fatalf("failed to set Address %#+v", src)
}
}
func TestMap(t *testing.T) {
type s struct {
Metadata map[string]string
}
opts := []options.Option{
options.NewOption("Metadata")("key1", "val1"),
options.NewOption("Metadata")(map[string]string{"key2": "val2"}),
}
src := &s{}
if err := options.Apply(src, opts...); err != nil {
t.Fatal(err)
}
if len(src.Metadata) != 2 {
t.Fatalf("failed to set Metadata %#+v", src)
}
if src.Metadata["key1"] != "val1" {
t.Fatalf("failed to set Metadata %#+v", src)
}
if src.Metadata["key2"] != "val2" {
t.Fatalf("failed to set Metadata %#+v", src)
}
}

View File

@@ -1,577 +0,0 @@
package options
import (
"fmt"
"reflect"
"github.com/spf13/cast"
)
func toInt8SliceE(i interface{}) ([]int8, error) {
if i == nil {
return []int8{}, fmt.Errorf("unable to cast %#v of type %T to []int8", i, i)
}
switch v := i.(type) {
case []int8:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]int8, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToInt8E(s.Index(j).Interface())
if err != nil {
return []int8{}, fmt.Errorf("unable to cast %#v of type %T to []int8", i, i)
}
a[j] = val
}
return a, nil
default:
return []int8{}, fmt.Errorf("unable to cast %#v of type %T to []int8", i, i)
}
}
func toInt16SliceE(i interface{}) ([]int16, error) {
if i == nil {
return []int16{}, fmt.Errorf("unable to cast %#v of type %T to []int16", i, i)
}
switch v := i.(type) {
case []int16:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]int16, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToInt16E(s.Index(j).Interface())
if err != nil {
return []int16{}, fmt.Errorf("unable to cast %#v of type %T to []int16", i, i)
}
a[j] = val
}
return a, nil
default:
return []int16{}, fmt.Errorf("unable to cast %#v of type %T to []int16", i, i)
}
}
func toInt32SliceE(i interface{}) ([]int32, error) {
if i == nil {
return []int32{}, fmt.Errorf("unable to cast %#v of type %T to []int32", i, i)
}
switch v := i.(type) {
case []int32:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]int32, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToInt32E(s.Index(j).Interface())
if err != nil {
return []int32{}, fmt.Errorf("unable to cast %#v of type %T to []int32", i, i)
}
a[j] = val
}
return a, nil
default:
return []int32{}, fmt.Errorf("unable to cast %#v of type %T to []int32", i, i)
}
}
func toInt64SliceE(i interface{}) ([]int64, error) {
if i == nil {
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
}
switch v := i.(type) {
case []int64:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]int64, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToInt64E(s.Index(j).Interface())
if err != nil {
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
}
a[j] = val
}
return a, nil
default:
return []int64{}, fmt.Errorf("unable to cast %#v of type %T to []int64", i, i)
}
}
func toUintSliceE(i interface{}) ([]uint, error) {
if i == nil {
return []uint{}, fmt.Errorf("unable to cast %#v of type %T to []uint", i, i)
}
switch v := i.(type) {
case []uint:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]uint, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToUintE(s.Index(j).Interface())
if err != nil {
return []uint{}, fmt.Errorf("unable to cast %#v of type %T to []uint", i, i)
}
a[j] = val
}
return a, nil
default:
return []uint{}, fmt.Errorf("unable to cast %#v of type %T to []uint", i, i)
}
}
func toUint8SliceE(i interface{}) ([]uint8, error) {
if i == nil {
return []uint8{}, fmt.Errorf("unable to cast %#v of type %T to []uint8", i, i)
}
switch v := i.(type) {
case []uint8:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]uint8, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToUint8E(s.Index(j).Interface())
if err != nil {
return []uint8{}, fmt.Errorf("unable to cast %#v of type %T to []uint8", i, i)
}
a[j] = val
}
return a, nil
default:
return []uint8{}, fmt.Errorf("unable to cast %#v of type %T to []uint8", i, i)
}
}
func toUint16SliceE(i interface{}) ([]uint16, error) {
if i == nil {
return []uint16{}, fmt.Errorf("unable to cast %#v of type %T to []uint16", i, i)
}
switch v := i.(type) {
case []uint16:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]uint16, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToUint16E(s.Index(j).Interface())
if err != nil {
return []uint16{}, fmt.Errorf("unable to cast %#v of type %T to []uint16", i, i)
}
a[j] = val
}
return a, nil
default:
return []uint16{}, fmt.Errorf("unable to cast %#v of type %T to []uint16", i, i)
}
}
func toUint32SliceE(i interface{}) ([]uint32, error) {
if i == nil {
return []uint32{}, fmt.Errorf("unable to cast %#v of type %T to []uint32", i, i)
}
switch v := i.(type) {
case []uint32:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]uint32, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToUint32E(s.Index(j).Interface())
if err != nil {
return []uint32{}, fmt.Errorf("unable to cast %#v of type %T to []uint32", i, i)
}
a[j] = val
}
return a, nil
default:
return []uint32{}, fmt.Errorf("unable to cast %#v of type %T to []uint32", i, i)
}
}
func toUint64SliceE(i interface{}) ([]uint64, error) {
if i == nil {
return []uint64{}, fmt.Errorf("unable to cast %#v of type %T to []uint64", i, i)
}
switch v := i.(type) {
case []uint64:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]uint64, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToUint64E(s.Index(j).Interface())
if err != nil {
return []uint64{}, fmt.Errorf("unable to cast %#v of type %T to []uint64", i, i)
}
a[j] = val
}
return a, nil
default:
return []uint64{}, fmt.Errorf("unable to cast %#v of type %T to []uint64", i, i)
}
}
func toFloat32SliceE(i interface{}) ([]float32, error) {
if i == nil {
return []float32{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
}
switch v := i.(type) {
case []float32:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]float32, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToFloat32E(s.Index(j).Interface())
if err != nil {
return []float32{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
}
a[j] = val
}
return a, nil
default:
return []float32{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
}
}
func toFloat64SliceE(i interface{}) ([]float64, error) {
if i == nil {
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
}
switch v := i.(type) {
case []float64:
return v, nil
}
kind := reflect.TypeOf(i).Kind()
switch kind {
case reflect.Slice, reflect.Array:
s := reflect.ValueOf(i)
a := make([]float64, s.Len())
for j := 0; j < s.Len(); j++ {
val, err := cast.ToFloat64E(s.Index(j).Interface())
if err != nil {
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float64", i, i)
}
a[j] = val
}
return a, nil
default:
return []float64{}, fmt.Errorf("unable to cast %#v of type %T to []float32", i, i)
}
}
func setMap(src interface{}, dst interface{}) error {
var err error
if src == nil {
return fmt.Errorf("unable to cast %#v of type %T", src, src)
}
if dst == nil {
return fmt.Errorf("unable to cast %#v of type %T", dst, dst)
}
val := reflect.ValueOf(src)
keyKind := val.Type().Key().Kind()
valKind := val.Type().Elem().Kind()
switch v := dst.(type) {
case []interface{}:
if len(v) == 1 {
dstVal := reflect.ValueOf(v[0])
if dstVal.Kind() != reflect.Map {
return nil
}
mapIter := dstVal.MapRange()
for mapIter.Next() {
var (
keyVal interface{}
valVal interface{}
)
switch keyKind {
case reflect.Bool:
keyVal, err = cast.ToBoolE(mapIter.Key())
case reflect.String:
keyVal, err = cast.ToStringE(mapIter.Key())
case reflect.Float32:
keyVal, err = cast.ToFloat32E(mapIter.Key())
case reflect.Float64:
keyVal, err = cast.ToFloat64E(mapIter.Key())
case reflect.Int8:
keyVal, err = cast.ToInt8E(mapIter.Key())
case reflect.Int:
keyVal, err = cast.ToIntE(mapIter.Key())
case reflect.Int16:
keyVal, err = cast.ToInt16E(mapIter.Key())
case reflect.Int32:
keyVal, err = cast.ToInt32E(mapIter.Key())
case reflect.Int64:
keyVal, err = cast.ToInt64E(mapIter.Key())
case reflect.Uint8:
keyVal, err = cast.ToUint8E(mapIter.Key())
case reflect.Uint:
keyVal, err = cast.ToUintE(mapIter.Key())
case reflect.Uint16:
keyVal, err = cast.ToUint16E(mapIter.Key())
case reflect.Uint32:
keyVal, err = cast.ToUint32E(mapIter.Key())
case reflect.Uint64:
keyVal, err = cast.ToUint64E(mapIter.Key())
}
if err != nil {
return err
}
switch valKind {
case reflect.Bool:
valVal, err = cast.ToBoolE(mapIter.Value())
case reflect.String:
valVal, err = cast.ToStringE(mapIter.Value())
case reflect.Float32:
valVal, err = cast.ToFloat32E(mapIter.Value())
case reflect.Float64:
valVal, err = cast.ToFloat64E(mapIter.Value())
case reflect.Int8:
valVal, err = cast.ToInt8E(mapIter.Value())
case reflect.Int:
valVal, err = cast.ToIntE(mapIter.Value())
case reflect.Int16:
valVal, err = cast.ToInt16E(mapIter.Value())
case reflect.Int32:
valVal, err = cast.ToInt32E(mapIter.Value())
case reflect.Int64:
valVal, err = cast.ToInt64E(mapIter.Value())
case reflect.Uint8:
valVal, err = cast.ToUint8E(mapIter.Value())
case reflect.Uint:
valVal, err = cast.ToUintE(mapIter.Value())
case reflect.Uint16:
valVal, err = cast.ToUint16E(mapIter.Value())
case reflect.Uint32:
valVal, err = cast.ToUint32E(mapIter.Value())
case reflect.Uint64:
valVal, err = cast.ToUint64E(mapIter.Value())
}
if err != nil {
return err
}
val.SetMapIndex(reflect.ValueOf(keyVal), reflect.ValueOf(valVal))
}
return nil
}
if l := len(v) % 2; l == 1 {
v = v[:len(v)-1]
}
var (
keyVal interface{}
valVal interface{}
)
for i := 0; i < len(v); i += 2 {
switch keyKind {
case reflect.Bool:
keyVal, err = cast.ToBoolE(v[i])
case reflect.String:
keyVal, err = cast.ToStringE(v[i])
case reflect.Float32:
keyVal, err = cast.ToFloat32E(v[i])
case reflect.Float64:
keyVal, err = cast.ToFloat64E(v[i])
case reflect.Int8:
keyVal, err = cast.ToInt8E(v[i])
case reflect.Int:
keyVal, err = cast.ToIntE(v[i])
case reflect.Int16:
keyVal, err = cast.ToInt16E(v[i])
case reflect.Int32:
keyVal, err = cast.ToInt32E(v[i])
case reflect.Int64:
keyVal, err = cast.ToInt64E(v[i])
case reflect.Uint8:
keyVal, err = cast.ToUint8E(v[i])
case reflect.Uint:
keyVal, err = cast.ToUintE(v[i])
case reflect.Uint16:
keyVal, err = cast.ToUint16E(v[i])
case reflect.Uint32:
keyVal, err = cast.ToUint32E(v[i])
case reflect.Uint64:
keyVal, err = cast.ToUint64E(v[i])
}
if err != nil {
return err
}
switch valKind {
case reflect.Bool:
valVal, err = cast.ToBoolE(v[i+1])
case reflect.String:
valVal, err = cast.ToStringE(v[i+1])
case reflect.Float32:
valVal, err = cast.ToFloat32E(v[i+1])
case reflect.Float64:
valVal, err = cast.ToFloat64E(v[i+1])
case reflect.Int8:
valVal, err = cast.ToInt8E(v[i+1])
case reflect.Int:
valVal, err = cast.ToIntE(v[i+1])
case reflect.Int16:
valVal, err = cast.ToInt16E(v[i+1])
case reflect.Int32:
valVal, err = cast.ToInt32E(v[i+1])
case reflect.Int64:
valVal, err = cast.ToInt64E(v[i+1])
case reflect.Uint8:
valVal, err = cast.ToUint8E(v[i+1])
case reflect.Uint:
valVal, err = cast.ToUintE(v[i+1])
case reflect.Uint16:
valVal, err = cast.ToUint16E(v[i+1])
case reflect.Uint32:
valVal, err = cast.ToUint32E(v[i+1])
case reflect.Uint64:
valVal, err = cast.ToUint64E(v[i+1])
}
if err != nil {
return err
}
val.SetMapIndex(reflect.ValueOf(keyVal), reflect.ValueOf(valVal))
}
default:
dstVal := reflect.ValueOf(dst)
if dstVal.Kind() != reflect.Map {
return nil
}
mapIter := dstVal.MapRange()
for mapIter.Next() {
var (
keyVal interface{}
valVal interface{}
)
switch keyKind {
case reflect.Bool:
keyVal, err = cast.ToBoolE(mapIter.Key())
case reflect.String:
keyVal, err = cast.ToStringE(mapIter.Key())
case reflect.Float32:
keyVal, err = cast.ToFloat32E(mapIter.Key())
case reflect.Float64:
keyVal, err = cast.ToFloat64E(mapIter.Key())
case reflect.Int8:
keyVal, err = cast.ToInt8E(mapIter.Key())
case reflect.Int:
keyVal, err = cast.ToIntE(mapIter.Key())
case reflect.Int16:
keyVal, err = cast.ToInt16E(mapIter.Key())
case reflect.Int32:
keyVal, err = cast.ToInt32E(mapIter.Key())
case reflect.Int64:
keyVal, err = cast.ToInt64E(mapIter.Key())
case reflect.Uint8:
keyVal, err = cast.ToUint8E(mapIter.Key())
case reflect.Uint:
keyVal, err = cast.ToUintE(mapIter.Key())
case reflect.Uint16:
keyVal, err = cast.ToUint16E(mapIter.Key())
case reflect.Uint32:
keyVal, err = cast.ToUint32E(mapIter.Key())
case reflect.Uint64:
keyVal, err = cast.ToUint64E(mapIter.Key())
}
if err != nil {
return err
}
switch valKind {
case reflect.Bool:
valVal, err = cast.ToBoolE(mapIter.Value())
case reflect.String:
valVal, err = cast.ToStringE(mapIter.Value())
case reflect.Float32:
valVal, err = cast.ToFloat32E(mapIter.Value())
case reflect.Float64:
valVal, err = cast.ToFloat64E(mapIter.Value())
case reflect.Int8:
valVal, err = cast.ToInt8E(mapIter.Value())
case reflect.Int:
valVal, err = cast.ToIntE(mapIter.Value())
case reflect.Int16:
valVal, err = cast.ToInt16E(mapIter.Value())
case reflect.Int32:
valVal, err = cast.ToInt32E(mapIter.Value())
case reflect.Int64:
valVal, err = cast.ToInt64E(mapIter.Value())
case reflect.Uint8:
valVal, err = cast.ToUint8E(mapIter.Value())
case reflect.Uint:
valVal, err = cast.ToUintE(mapIter.Value())
case reflect.Uint16:
valVal, err = cast.ToUint16E(mapIter.Value())
case reflect.Uint32:
valVal, err = cast.ToUint32E(mapIter.Value())
case reflect.Uint64:
valVal, err = cast.ToUint64E(mapIter.Value())
}
if err != nil {
return err
}
val.SetMapIndex(reflect.ValueOf(keyVal), reflect.ValueOf(valVal))
}
return nil
}
return nil
}

View File

@@ -6,10 +6,10 @@ import (
"sync"
"time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/util/id"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/util/id"
)
var (

View File

@@ -7,7 +7,7 @@ import (
"testing"
"time"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/register"
)
var testData = map[string][]*register.Service{

View File

@@ -99,7 +99,6 @@ type service struct {
done chan struct{}
opts Options
sync.RWMutex
stopped bool
}
// NewService creates and returns a new Service based on the packages within.
@@ -425,7 +424,7 @@ func (s *service) Stop() error {
}
}
s.notifyShutdown()
close(s.done)
return nil
}
@@ -449,23 +448,10 @@ func (s *service) Run() error {
return err
}
// wait on context cancel
<-s.done
return nil
}
// notifyShutdown marks the service as stopped and closes the done channel.
// It ensures the channel is closed only once, preventing multiple closures.
func (s *service) notifyShutdown() {
s.Lock()
if s.stopped {
s.Unlock()
return
}
s.stopped = true
s.Unlock()
close(s.done)
return s.Stop()
}
type Namer interface {

View File

@@ -3,9 +3,7 @@ package micro
import (
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config"
@@ -739,41 +737,3 @@ func Test_getNameIndex(t *testing.T) {
}
}
*/
func TestServiceShutdown(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("service shutdown failed: %v", r)
}
}()
s, ok := NewService().(*service)
require.NotNil(t, s)
require.True(t, ok)
require.NoError(t, s.Start())
require.False(t, s.stopped)
require.NoError(t, s.Stop())
require.True(t, s.stopped)
}
func TestServiceMultipleShutdowns(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("service shutdown failed: %v", r)
}
}()
s := NewService()
go func() {
time.Sleep(10 * time.Millisecond)
// first call
require.NoError(t, s.Stop())
// duplicate call
require.NoError(t, s.Stop())
}()
require.NoError(t, s.Run())
}

View File

@@ -89,10 +89,6 @@ func (s *Span) Tracer() tracer.Tracer {
return s.tracer
}
func (s *Span) IsRecording() bool {
return true
}
type Event struct {
name string
labels []interface{}

View File

@@ -120,10 +120,6 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
s.statusMsg = msg
}
func (s *noopSpan) IsRecording() bool {
return false
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &noopTracer{

View File

@@ -78,6 +78,4 @@ type Span interface {
TraceID() string
// SpanID returns span id
SpanID() string
// IsRecording returns the recording state of the Span.
IsRecording() bool
}

View File

@@ -1,78 +0,0 @@
package buffer
import "io"
var _ interface {
io.ReadCloser
io.ReadSeeker
} = (*SeekerBuffer)(nil)
// Buffer is a ReadWriteCloser that supports seeking. It's intended to
// replicate the functionality of bytes.Buffer that I use in my projects.
//
// Note that the seeking is limited to the read marker; all writes are
// append-only.
type SeekerBuffer struct {
data []byte
pos int64
}
func NewSeekerBuffer(data []byte) *SeekerBuffer {
return &SeekerBuffer{
data: data,
}
}
func (b *SeekerBuffer) Read(p []byte) (int, error) {
if b.pos >= int64(len(b.data)) {
return 0, io.EOF
}
n := copy(p, b.data[b.pos:])
b.pos += int64(n)
return n, nil
}
func (b *SeekerBuffer) Write(p []byte) (int, error) {
b.data = append(b.data, p...)
return len(p), nil
}
// Seek sets the read pointer to pos.
func (b *SeekerBuffer) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
b.pos = offset
case io.SeekEnd:
b.pos = int64(len(b.data)) + offset
case io.SeekCurrent:
b.pos += offset
}
return b.pos, nil
}
// Rewind resets the read pointer to 0.
func (b *SeekerBuffer) Rewind() error {
if _, err := b.Seek(0, io.SeekStart); err != nil {
return err
}
return nil
}
// Close clears all the data out of the buffer and sets the read position to 0.
func (b *SeekerBuffer) Close() error {
b.data = nil
b.pos = 0
return nil
}
// Len returns the length of data remaining to be read.
func (b *SeekerBuffer) Len() int {
return len(b.data[b.pos:])
}
// Bytes returns the underlying bytes from the current position.
func (b *SeekerBuffer) Bytes() []byte {
return b.data[b.pos:]
}

View File

@@ -1,55 +0,0 @@
package buffer
import (
"fmt"
"strings"
"testing"
)
func noErrorT(t *testing.T, err error) {
if nil != err {
t.Fatalf("%s", err)
}
}
func boolT(t *testing.T, cond bool, s ...string) {
if !cond {
what := strings.Join(s, ", ")
if len(what) > 0 {
what = ": " + what
}
t.Fatalf("assert.Bool failed%s", what)
}
}
func TestSeeking(t *testing.T) {
partA := []byte("hello, ")
partB := []byte("world!")
buf := NewSeekerBuffer(partA)
boolT(t, buf.Len() == len(partA), fmt.Sprintf("on init: have length %d, want length %d", buf.Len(), len(partA)))
b := make([]byte, 32)
n, err := buf.Read(b)
noErrorT(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after reading 1: have length %d, want length 0", buf.Len()))
boolT(t, n == len(partA), fmt.Sprintf("after reading 2: have length %d, want length %d", n, len(partA)))
n, err = buf.Write(partB)
noErrorT(t, err)
boolT(t, n == len(partB), fmt.Sprintf("after writing: have length %d, want length %d", n, len(partB)))
n, err = buf.Read(b)
noErrorT(t, err)
boolT(t, buf.Len() == 0, fmt.Sprintf("after rereading 1: have length %d, want length 0", buf.Len()))
boolT(t, n == len(partB), fmt.Sprintf("after rereading 2: have length %d, want length %d", n, len(partB)))
partsLen := len(partA) + len(partB)
_ = buf.Rewind()
boolT(t, buf.Len() == partsLen, fmt.Sprintf("after rewinding: have length %d, want length %d", buf.Len(), partsLen))
buf.Close()
boolT(t, buf.Len() == 0, fmt.Sprintf("after closing, have length %d, want length 0", buf.Len()))
}

View File

@@ -489,74 +489,35 @@ func URLMap(query string) (map[string]interface{}, error) {
return mp.(map[string]interface{}), nil
}
// FlattenMap flattens a nested map into a single-level map using dot notation for nested keys.
// In case of key conflicts, all nested levels will be discarded in favor of the first-level key.
//
// Example #1:
//
// Input:
// {
// "user.name": "alex",
// "user.document.id": "document_id"
// "user.document.number": "document_number"
// }
// Output:
// {
// "user": {
// "name": "alex",
// "document": {
// "id": "document_id"
// "number": "document_number"
// }
// }
// }
//
// Example #2 (with conflicts):
//
// Input:
// {
// "user": "alex",
// "user.document.id": "document_id"
// "user.document.number": "document_number"
// }
// Output:
// {
// "user": "alex"
// }
func FlattenMap(input map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{})
for k, v := range input {
parts := strings.Split(k, ".")
if len(parts) == 1 {
result[k] = v
// FlattenMap expand key.subkey to nested map
func FlattenMap(a map[string]interface{}) map[string]interface{} {
// preprocess map
nb := make(map[string]interface{}, len(a))
for k, v := range a {
ps := strings.Split(k, ".")
if len(ps) == 1 {
nb[k] = v
continue
}
current := result
for i, part := range parts {
// last element in the path
if i == len(parts)-1 {
current[part] = v
break
}
// initialize map for current level if not exist
if _, ok := current[part]; !ok {
current[part] = make(map[string]interface{})
}
if nested, ok := current[part].(map[string]interface{}); ok {
current = nested // continue to the nested map
} else {
break // if current element is not a map, ignore it
em := make(map[string]interface{})
em[ps[len(ps)-1]] = v
for i := len(ps) - 2; i > 0; i-- {
nm := make(map[string]interface{})
nm[ps[i]] = em
em = nm
}
if vm, ok := nb[ps[0]]; ok {
// nested map
nm := vm.(map[string]interface{})
for vk, vv := range em {
nm[vk] = vv
}
nb[ps[0]] = nm
} else {
nb[ps[0]] = em
}
}
return result
return nb
}
/*

View File

@@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
rutil "go.unistack.org/micro/v4/util/reflect"
)
@@ -320,140 +319,3 @@ func TestIsZero(t *testing.T) {
// t.Logf("XX %#+v\n", ok)
}
func TestFlattenMap(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
expected map[string]interface{}
}{
{
name: "empty map",
input: map[string]interface{}{},
expected: map[string]interface{}{},
},
{
name: "nil map",
input: nil,
expected: map[string]interface{}{},
},
{
name: "single level",
input: map[string]interface{}{
"username": "username",
"password": "password",
},
expected: map[string]interface{}{
"username": "username",
"password": "password",
},
},
{
name: "two level",
input: map[string]interface{}{
"order_id": "order_id",
"user.name": "username",
"user.password": "password",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"name": "username",
"password": "password",
},
},
},
{
name: "three level",
input: map[string]interface{}{
"order_id": "order_id",
"user.name": "username",
"user.password": "password",
"user.document.id": "document_id",
"user.document.number": "document_number",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"name": "username",
"password": "password",
"document": map[string]interface{}{
"id": "document_id",
"number": "document_number",
},
},
},
},
{
name: "four level",
input: map[string]interface{}{
"order_id": "order_id",
"user.name": "username",
"user.password": "password",
"user.document.id": "document_id",
"user.document.number": "document_number",
"user.info.permissions.read": "available",
"user.info.permissions.write": "available",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"name": "username",
"password": "password",
"document": map[string]interface{}{
"id": "document_id",
"number": "document_number",
},
"info": map[string]interface{}{
"permissions": map[string]interface{}{
"read": "available",
"write": "available",
},
},
},
},
},
{
name: "key conflicts",
input: map[string]interface{}{
"user": "user",
"user.name": "username",
"user.password": "password",
},
expected: map[string]interface{}{
"user": "user",
},
},
{
name: "overwriting conflicts",
input: map[string]interface{}{
"order_id": "order_id",
"user.document.id": "document_id",
"user.document.number": "document_number",
"user.info.address": "address",
"user.info.phone": "phone",
},
expected: map[string]interface{}{
"order_id": "order_id",
"user": map[string]interface{}{
"document": map[string]interface{}{
"id": "document_id",
"number": "document_number",
},
"info": map[string]interface{}{
"address": "address",
"phone": "phone",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for range 100 { // need to exclude the impact of key order in the map on the test.
require.Equal(t, tt.expected, rutil.FlattenMap(tt.input))
}
})
}
}