flow: initial tests #18

Merged
vtolstov merged 5 commits from flow into master 2021-02-18 12:49:33 +03:00
6 changed files with 99 additions and 23 deletions
Showing only changes of commit 5596345382 - Show all commits

View File

@ -3,14 +3,13 @@ package broker
import ( import (
"context" "context"
"errors" "errors"
"math/rand"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
maddr "github.com/unistack-org/micro/v3/util/addr" maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net" mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
) )
type memoryBroker struct { type memoryBroker struct {
@ -59,7 +58,8 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
i := rand.Intn(20000) var rng rand.Rand
i := rng.Intn(20000)
// set addr with port // set addr with port
addr = mnet.HostPort(addr, 10000+i) addr = mnet.HostPort(addr, 10000+i)
@ -237,8 +237,6 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
// NewBroker return new memory broker // NewBroker return new memory broker
func NewBroker(opts ...Option) Broker { func NewBroker(opts ...Option) Broker {
rand.Seed(time.Now().UnixNano())
return &memoryBroker{ return &memoryBroker{
opts: NewOptions(opts...), opts: NewOptions(opts...),
Subscribers: make(map[string][]*memorySubscriber), Subscribers: make(map[string][]*memorySubscriber),

View File

@ -4,13 +4,13 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"net" "net"
"sync" "sync"
"time" "time"
maddr "github.com/unistack-org/micro/v3/util/addr" maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net" mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
) )
type memorySocket struct { type memorySocket struct {
@ -207,7 +207,8 @@ func (m *memoryTransport) Listen(ctx context.Context, addr string, opts ...Liste
// if zero port then randomly assign one // if zero port then randomly assign one
if len(port) > 0 && port == "0" { if len(port) > 0 && port == "0" {
i := rand.Intn(20000) var rng rand.Rand
i := rng.Intn(20000)
port = fmt.Sprintf("%d", 10000+i) port = fmt.Sprintf("%d", 10000+i)
} }
@ -255,8 +256,6 @@ func (m *memoryTransport) Name() string {
func NewTransport(opts ...Option) Transport { func NewTransport(opts ...Option) Transport {
options := NewOptions(opts...) options := NewOptions(opts...)
rand.Seed(time.Now().UnixNano())
return &memoryTransport{ return &memoryTransport{
opts: options, opts: options,
listeners: make(map[string]*memoryListener), listeners: make(map[string]*memoryListener),

View File

@ -1,9 +1,8 @@
package random package random
import ( import (
"math/rand"
"github.com/unistack-org/micro/v3/selector" "github.com/unistack-org/micro/v3/selector"
"github.com/unistack-org/micro/v3/util/rand"
) )
type random struct{} type random struct{}
@ -20,10 +19,9 @@ func (r *random) Select(routes []string, opts ...selector.SelectOption) (selecto
if len(routes) == 1 { if len(routes) == 1 {
return routes[0] return routes[0]
} }
var rng rand.Rand
// select a random route from the slice // select a random route from the slice
//nolint:gosec return routes[rng.Intn(len(routes))]
return routes[rand.Intn(len(routes))]
}, nil }, nil
} }

View File

@ -1,9 +1,8 @@
package roundrobin package roundrobin
import ( import (
"math/rand"
"github.com/unistack-org/micro/v3/selector" "github.com/unistack-org/micro/v3/selector"
"github.com/unistack-org/micro/v3/util/rand"
) )
// NewSelector returns an initialised round robin selector // NewSelector returns an initialised round robin selector
@ -18,8 +17,8 @@ func (r *roundrobin) Select(routes []string, opts ...selector.SelectOption) (sel
if len(routes) == 0 { if len(routes) == 0 {
return nil, selector.ErrNoneAvailable return nil, selector.ErrNoneAvailable
} }
var rng rand.Rand
i := rand.Intn(len(routes)) i := rng.Intn(len(routes))
return func() string { return func() string {
route := routes[i%len(routes)] route := routes[i%len(routes)]

View File

@ -2,16 +2,14 @@
package jitter package jitter
import ( import (
"math/rand"
"time" "time"
)
var ( "github.com/unistack-org/micro/v3/util/rand"
r = rand.New(rand.NewSource(time.Now().UnixNano()))
) )
// Do returns a random time to jitter with max cap specified // Do returns a random time to jitter with max cap specified
func Do(d time.Duration) time.Duration { func Do(d time.Duration) time.Duration {
v := r.Float64() * float64(d.Nanoseconds()) var rng rand.Rand
v := rng.Float64() * float64(d.Nanoseconds())
return time.Duration(v) return time.Duration(v)
} }

84
util/rand/rand.go Normal file
View File

@ -0,0 +1,84 @@
package rand
import (
"crypto/rand"
"encoding/binary"
)
// Rand is a wrapper around crypto/rand that adds some convenience functions known from math/rand.
type Rand struct {
buf [8]byte
}
func (r *Rand) Int31() int32 {
rand.Read(r.buf[:4])
return int32(binary.BigEndian.Uint32(r.buf[:4]) & ^uint32(1<<31))
}
func (r *Rand) Int() int {
u := uint(r.Int63())
return int(u << 1 >> 1) // clear sign bit if int == int32
}
func (r *Rand) Float64() float64 {
again:
f := float64(r.Int63()) / (1 << 63)
if f == 1 {
goto again // resample; this branch is taken O(never)
}
return f
}
func (r *Rand) Float32() float32 {
again:
f := float32(r.Float64())
if f == 1 {
goto again // resample; this branch is taken O(very rarely)
}
return f
}
func (r *Rand) Uint32() uint32 {
return uint32(r.Int63() >> 31)
}
func (r *Rand) Uint64() uint64 {
return uint64(r.Int63())>>31 | uint64(r.Int63())<<32
}
func (r *Rand) Intn(n int) int {
if n <= 1<<31-1 {
return int(r.Int31n(int32(n)))
}
return int(r.Int63n(int64(n)))
}
func (r *Rand) Int63() int64 {
rand.Read(r.buf[:])
return int64(binary.BigEndian.Uint64(r.buf[:]) & ^uint64(1<<63))
}
// copied from the standard library math/rand implementation of Int63n
func (r *Rand) Int31n(n int32) int32 {
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int31() & (n - 1)
}
max := int32((1 << 31) - 1 - (1<<31)%uint32(n))
v := r.Int31()
for v > max {
v = r.Int31()
}
return v % n
}
func (r *Rand) Int63n(n int64) int64 {
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int63() & (n - 1)
}
max := int64((1 << 63) - 1 - (1<<63)%uint64(n))
v := r.Int63()
for v > max {
v = r.Int63()
}
return v % n
}