flow: initial tests #18

Merged
vtolstov merged 5 commits from flow into master 2021-02-18 12:49:33 +03:00
19 changed files with 507 additions and 40 deletions

View File

@ -11,7 +11,7 @@ jobs:
- name: setup
uses: actions/setup-go@v2
with:
go-version: 1.15
go-version: 1.16
- name: cache
uses: actions/cache@v2
with:

View File

@ -11,7 +11,7 @@ jobs:
- name: setup
uses: actions/setup-go@v2
with:
go-version: 1.15
go-version: 1.16
- name: cache
uses: actions/cache@v2
with:

View File

@ -3,14 +3,13 @@ package broker
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
)
type memoryBroker struct {
@ -59,7 +58,8 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
if err != nil {
return err
}
i := rand.Intn(20000)
var rng rand.Rand
i := rng.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
@ -237,8 +237,6 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
rand.Seed(time.Now().UnixNano())
return &memoryBroker{
opts: NewOptions(opts...),
Subscribers: make(map[string][]*memorySubscriber),

View File

@ -3,7 +3,6 @@ package codec
import (
"encoding/json"
"io"
"io/ioutil"
)
type noopCodec struct {
@ -20,7 +19,7 @@ func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error
func (c *noopCodec) ReadBody(conn io.Reader, b interface{}) error {
// read bytes
buf, err := ioutil.ReadAll(conn)
buf, err := io.ReadAll(conn)
if err != nil {
return err
}

21
flow/dag.go Normal file
View File

@ -0,0 +1,21 @@
package flow
type node struct {
name string
}
func (n *node) ID() string {
return n.name
}
func (n *node) Name() string {
return n.name
}
func (n *node) String() string {
return n.name
}
func (n *node) Hashcode() interface{} {
return n.name
}

68
flow/dag_test.go Normal file
View File

@ -0,0 +1,68 @@
package flow
import (
"fmt"
"testing"
"github.com/silas/dag"
)
func checkErr(t *testing.T, err error) {
if err != nil {
t.Fatal(err)
}
}
func TestDag(t *testing.T) {
d1 := &dag.AcyclicGraph{}
d2 := &dag.AcyclicGraph{}
d2v1 := d2.Add(&node{"Substep.Create"})
v1 := d1.Add(&node{"AccountService.Create"})
v2 := d1.Add(&node{"AuthzService.Create"})
v3 := d1.Add(&node{"AuthnService.Create"})
v4 := d1.Add(&node{"ProjectService.Create"})
v5 := d1.Add(&node{"ContactService.Create"})
v6 := d1.Add(&node{"NetworkService.Create"})
v7 := d1.Add(&node{"MailerService.Create"})
v8 := d1.Add(&node{"NestedService.Create"})
v9 := d1.Add(d2v1)
d1.Connect(dag.BasicEdge(v1, v2))
d1.Connect(dag.BasicEdge(v1, v3))
d1.Connect(dag.BasicEdge(v1, v4))
d1.Connect(dag.BasicEdge(v1, v5))
d1.Connect(dag.BasicEdge(v1, v6))
d1.Connect(dag.BasicEdge(v1, v7))
d1.Connect(dag.BasicEdge(v7, v8))
d1.Connect(dag.BasicEdge(v8, v9))
if err := d1.Validate(); err != nil {
t.Fatal(err)
}
d1.TransitiveReduction()
var steps [][]string
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]string, 1, 1)
steps[0] = make([]string, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]string, idx+1, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]string, 0, 1)
}
steps[idx] = append(steps[idx], fmt.Sprintf("%s", n))
return nil
}
start := &node{"AccountService.Create"}
err := d1.SortedDepthFirstWalk([]dag.Vertex{start}, fn)
checkErr(t, err)
if len(steps) != 4 {
t.Fatalf("invalid steps: %#+v", steps)
}
if steps[3][0] != "Substep.Create" {
t.Fatalf("invalid last step: %#+v", steps)
}
}

7
flow/flow.go Normal file
View File

@ -0,0 +1,7 @@
// Package flow is an interface used for saga pattern messaging
package flow
type Step interface {
// Endpoint returns service_name.service_method
Endpoint() string
}

4
go.mod
View File

@ -1,15 +1,17 @@
module github.com/unistack-org/micro/v3
go 1.14
go 1.16
require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/heimdalr/dag v1.0.1 // indirect
github.com/imdario/mergo v0.3.11
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 // indirect
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)

10
go.sum
View File

@ -3,8 +3,13 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/heimdalr/dag v1.0.1 h1:iR2K3DSUFDYx0GeV7iXBnZkedWS1xePSGrylQ197uxg=
github.com/heimdalr/dag v1.0.1/go.mod h1:t+ZkR+sjKL4xhlE1B9rwpvwfo+x+2R0363efS+Oghns=
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -15,13 +20,12 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/unistack-org/micro v1.18.0 h1:EbFiII0bKV0Xcua7o6J30MFmm4/g0Hv3ECOKzsUBihU=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
)
type memorySocket struct {
@ -207,7 +207,8 @@ func (m *memoryTransport) Listen(ctx context.Context, addr string, opts ...Liste
// if zero port then randomly assign one
if len(port) > 0 && port == "0" {
i := rand.Intn(20000)
var rng rand.Rand
i := rng.Intn(20000)
port = fmt.Sprintf("%d", 10000+i)
}
@ -255,8 +256,6 @@ func (m *memoryTransport) Name() string {
func NewTransport(opts ...Option) Transport {
options := NewOptions(opts...)
rand.Seed(time.Now().UnixNano())
return &memoryTransport{
opts: options,
listeners: make(map[string]*memoryListener),

View File

@ -4,7 +4,7 @@ package http
import (
"encoding/json"
"errors"
"io/ioutil"
"io"
"net/http"
"net/url"
@ -61,7 +61,7 @@ func (r *HTTPResolver) Resolve(name string) ([]*resolver.Record, error) {
if rsp.StatusCode != 200 {
return nil, errors.New("non 200 response")
}
b, err := ioutil.ReadAll(rsp.Body)
b, err := io.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

View File

@ -1,9 +1,8 @@
package random
import (
"math/rand"
"github.com/unistack-org/micro/v3/selector"
"github.com/unistack-org/micro/v3/util/rand"
)
type random struct{}
@ -20,10 +19,9 @@ func (r *random) Select(routes []string, opts ...selector.SelectOption) (selecto
if len(routes) == 1 {
return routes[0]
}
var rng rand.Rand
// select a random route from the slice
//nolint:gosec
return routes[rand.Intn(len(routes))]
return routes[rng.Intn(len(routes))]
}, nil
}

View File

@ -1,9 +1,8 @@
package roundrobin
import (
"math/rand"
"github.com/unistack-org/micro/v3/selector"
"github.com/unistack-org/micro/v3/util/rand"
)
// NewSelector returns an initialised round robin selector
@ -18,8 +17,8 @@ func (r *roundrobin) Select(routes []string, opts ...selector.SelectOption) (sel
if len(routes) == 0 {
return nil, selector.ErrNoneAvailable
}
i := rand.Intn(len(routes))
var rng rand.Rand
i := rng.Intn(len(routes))
return func() string {
route := routes[i%len(routes)]

View File

@ -3,7 +3,6 @@
package http
import (
"io/ioutil"
"net"
"net/http"
"testing"
@ -52,7 +51,7 @@ func TestRoundTripper(t *testing.T) {
t.Fatal(err)
}
b, err := ioutil.ReadAll(w.Body)
b, err := io.ReadAll(w.Body)
if err != nil {
t.Fatal(err)
}
@ -72,7 +71,7 @@ func TestRoundTripper(t *testing.T) {
t.Fatal(err)
}
b, err = ioutil.ReadAll(rsp.Body)
b, err = io.ReadAll(rsp.Body)
if err != nil {
t.Fatal(err)
}

View File

@ -2,16 +2,14 @@
package jitter
import (
"math/rand"
"time"
)
var (
r = rand.New(rand.NewSource(time.Now().UnixNano()))
"github.com/unistack-org/micro/v3/util/rand"
)
// Do returns a random time to jitter with max cap specified
func Do(d time.Duration) time.Duration {
v := r.Float64() * float64(d.Nanoseconds())
var rng rand.Rand
v := rng.Float64() * float64(d.Nanoseconds())
return time.Duration(v)
}

84
util/rand/rand.go Normal file
View File

@ -0,0 +1,84 @@
package rand
import (
"crypto/rand"
"encoding/binary"
)
// Rand is a wrapper around crypto/rand that adds some convenience functions known from math/rand.
type Rand struct {
buf [8]byte
}
func (r *Rand) Int31() int32 {
rand.Read(r.buf[:4])
return int32(binary.BigEndian.Uint32(r.buf[:4]) & ^uint32(1<<31))
}
func (r *Rand) Int() int {
u := uint(r.Int63())
return int(u << 1 >> 1) // clear sign bit if int == int32
}
func (r *Rand) Float64() float64 {
again:
f := float64(r.Int63()) / (1 << 63)
if f == 1 {
goto again // resample; this branch is taken O(never)
}
return f
}
func (r *Rand) Float32() float32 {
again:
f := float32(r.Float64())
if f == 1 {
goto again // resample; this branch is taken O(very rarely)
}
return f
}
func (r *Rand) Uint32() uint32 {
return uint32(r.Int63() >> 31)
}
func (r *Rand) Uint64() uint64 {
return uint64(r.Int63())>>31 | uint64(r.Int63())<<32
}
func (r *Rand) Intn(n int) int {
if n <= 1<<31-1 {
return int(r.Int31n(int32(n)))
}
return int(r.Int63n(int64(n)))
}
func (r *Rand) Int63() int64 {
rand.Read(r.buf[:])
return int64(binary.BigEndian.Uint64(r.buf[:]) & ^uint64(1<<63))
}
// copied from the standard library math/rand implementation of Int63n
func (r *Rand) Int31n(n int32) int32 {
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int31() & (n - 1)
}
max := int32((1 << 31) - 1 - (1<<31)%uint32(n))
v := r.Int31()
for v > max {
v = r.Int31()
}
return v % n
}
func (r *Rand) Int63n(n int64) int64 {
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int63() & (n - 1)
}
max := int64((1 << 63) - 1 - (1<<63)%uint64(n))
v := r.Int63()
for v > max {
v = r.Int63()
}
return v % n
}

257
util/reflect/path.go Normal file
View File

@ -0,0 +1,257 @@
package reflect
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
)
const (
SplitToken = "."
IndexCloseChar = "]"
IndexOpenChar = "["
)
var (
ErrMalformedIndex = errors.New("Malformed index key")
ErrInvalidIndexUsage = errors.New("Invalid index key usage")
ErrKeyNotFound = errors.New("Unable to find the key")
ErrBadJSONPath = errors.New("Bad path: must start with $ and have more then 2 chars")
)
func Lookup(i interface{}, path string) (reflect.Value, error) {
if path == "" || path[0:1] != "$" {
return reflect.Value{}, ErrBadJSONPath
}
if path == "$" {
return reflect.ValueOf(i), nil
}
if len(path) < 2 {
return reflect.Value{}, ErrBadJSONPath
}
return lookup(i, strings.Split(path[2:], SplitToken)...)
}
// Lookup performs a lookup into a value, using a path of keys. The key should
// match with a Field or a MapIndex. For slice you can use the syntax key[index]
// to access a specific index. If one key owns to a slice and an index is not
// specificied the rest of the path will be apllied to evaley value of the
// slice, and the value will be merged into a slice.
func lookup(i interface{}, path ...string) (reflect.Value, error) {
value := reflect.ValueOf(i)
var parent reflect.Value
var err error
for i, part := range path {
parent = value
value, err = getValueByName(value, part)
if err == nil {
continue
}
if !isAggregable(parent) {
break
}
value, err = aggreateAggregableValue(parent, path[i:])
break
}
return value, err
}
func getValueByName(v reflect.Value, key string) (reflect.Value, error) {
var value reflect.Value
var index int
var err error
key, index, err = parseIndex(key)
if err != nil {
return value, err
}
switch v.Kind() {
case reflect.Ptr, reflect.Interface:
return getValueByName(v.Elem(), key)
case reflect.Struct:
value = v.FieldByName(key)
case reflect.Map:
kValue := reflect.Indirect(reflect.New(v.Type().Key()))
kValue.SetString(key)
value = v.MapIndex(kValue)
}
if !value.IsValid() {
return reflect.Value{}, ErrKeyNotFound
}
if index != -1 {
if value.Type().Kind() != reflect.Slice {
return reflect.Value{}, ErrInvalidIndexUsage
}
value = value.Index(index)
}
if value.Kind() == reflect.Ptr || value.Kind() == reflect.Interface {
value = value.Elem()
}
return value, nil
}
func aggreateAggregableValue(v reflect.Value, path []string) (reflect.Value, error) {
values := make([]reflect.Value, 0)
l := v.Len()
if l == 0 {
ty, ok := lookupType(v.Type(), path...)
if !ok {
return reflect.Value{}, ErrKeyNotFound
}
return reflect.MakeSlice(reflect.SliceOf(ty), 0, 0), nil
}
switch v.Kind() {
case reflect.Slice, reflect.Map:
break
default:
return reflect.Value{}, fmt.Errorf("unsuported kind for index")
}
index := indexFunction(v)
for i := 0; i < l; i++ {
value, err := lookup(index(i).Interface(), path...)
if err != nil {
return reflect.Value{}, err
}
values = append(values, value)
}
return mergeValue(values), nil
}
func indexFunction(v reflect.Value) func(i int) reflect.Value {
switch v.Kind() {
case reflect.Slice:
return v.Index
case reflect.Map:
keys := v.MapKeys()
return func(i int) reflect.Value {
return v.MapIndex(keys[i])
}
}
return func(i int) reflect.Value { return reflect.Value{} }
}
func mergeValue(values []reflect.Value) reflect.Value {
values = removeZeroValues(values)
l := len(values)
if l == 0 {
return reflect.Value{}
}
sample := values[0]
mergeable := isMergeable(sample)
t := sample.Type()
if mergeable {
t = t.Elem()
}
value := reflect.MakeSlice(reflect.SliceOf(t), 0, 0)
for i := 0; i < l; i++ {
if !values[i].IsValid() {
continue
}
if mergeable {
value = reflect.AppendSlice(value, values[i])
} else {
value = reflect.Append(value, values[i])
}
}
return value
}
func removeZeroValues(values []reflect.Value) []reflect.Value {
l := len(values)
var v []reflect.Value
for i := 0; i < l; i++ {
if values[i].IsValid() {
v = append(v, values[i])
}
}
return v
}
func isAggregable(v reflect.Value) bool {
k := v.Kind()
return k == reflect.Map || k == reflect.Slice
}
func isMergeable(v reflect.Value) bool {
k := v.Kind()
return k == reflect.Map || k == reflect.Slice
}
func hasIndex(s string) bool {
return strings.Contains(s, IndexOpenChar)
}
func parseIndex(s string) (string, int, error) {
start := strings.Index(s, IndexOpenChar)
end := strings.Index(s, IndexCloseChar)
if start == -1 && end == -1 {
return s, -1, nil
}
if (start != -1 && end == -1) || (start == -1 && end != -1) {
return "", -1, ErrMalformedIndex
}
index, err := strconv.Atoi(s[start+1 : end])
if err != nil {
return "", -1, ErrMalformedIndex
}
return s[:start], index, nil
}
func lookupType(ty reflect.Type, path ...string) (reflect.Type, bool) {
if len(path) == 0 {
return ty, true
}
switch ty.Kind() {
case reflect.Slice, reflect.Array, reflect.Map:
if hasIndex(path[0]) {
return lookupType(ty.Elem(), path[1:]...)
}
// Aggregate.
return lookupType(ty.Elem(), path...)
case reflect.Ptr:
return lookupType(ty.Elem(), path...)
case reflect.Interface:
// We can't know from here without a value. Let's just return this type.
return ty, true
case reflect.Struct:
f, ok := ty.FieldByName(path[0])
if ok {
return lookupType(f.Type, path[1:]...)
}
}
return nil, false
}

34
util/reflect/path_test.go Normal file
View File

@ -0,0 +1,34 @@
package reflect
import (
"testing"
)
func TestPath(t *testing.T) {
type Nested2 struct {
Name string
}
type Nested1 struct {
Nested2 Nested2
}
type Config struct {
Nested1 Nested1
}
cfg := &Config{
Nested1: Nested1{
Nested2: Nested2{
Name: "NAME",
},
},
}
v, err := Lookup(cfg, "$.Nested1.Nested2.Name")
if err != nil {
t.Fatal(err)
}
if v.String() != "NAME" {
t.Fatalf("lookup returns invalid value: %v", v)
}
}

View File

@ -1,7 +1,7 @@
package jwt
import (
"io/ioutil"
"os"
"testing"
"time"
@ -10,7 +10,7 @@ import (
)
func TestGenerate(t *testing.T) {
privKey, err := ioutil.ReadFile("test/sample_key")
privKey, err := os.ReadFile("test/sample_key")
if err != nil {
t.Fatalf("Unable to read private key: %v", err)
}
@ -26,11 +26,11 @@ func TestGenerate(t *testing.T) {
}
func TestInspect(t *testing.T) {
pubKey, err := ioutil.ReadFile("test/sample_key.pub")
pubKey, err := os.ReadFile("test/sample_key.pub")
if err != nil {
t.Fatalf("Unable to read public key: %v", err)
}
privKey, err := ioutil.ReadFile("test/sample_key")
privKey, err := os.ReadFile("test/sample_key")
if err != nil {
t.Fatalf("Unable to read private key: %v", err)
}