workflow fix #371

Merged
vtolstov merged 5 commits from workflowfix into v3 2024-12-10 01:40:56 +03:00
67 changed files with 480 additions and 398 deletions
Showing only changes of commit e266683d96 - Show all commits

@ -74,7 +74,7 @@ func TestMemoryBroker(t *testing.T) {
topic := "test"
count := 10
fn := func(p broker.Event) error {
fn := func(_ broker.Event) error {
return nil
}

@ -16,6 +16,9 @@ import (
// Options struct
type Options struct {
// Name holds the broker name
Name string
// Tracer used for tracing
Tracer tracer.Tracer
// Register can be used for clustering
@ -28,23 +31,25 @@ type Options struct {
Meter meter.Meter
// Context holds external options
Context context.Context
// Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Name holds the broker name
Name string
// Addrs holds the broker address
Addrs []string
// Wait waits for a collection of goroutines to finish
Wait *sync.WaitGroup
// GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration
// Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
Hooks options.Hooks
// GracefulTimeout contains time to wait to finish in flight requests
GracefulTimeout time.Duration
}
// NewOptions create new Options

@ -17,13 +17,13 @@ func BackoffExp(_ context.Context, _ Request, attempts int) (time.Duration, erro
}
// BackoffInterval specifies randomization interval for backoff func
func BackoffInterval(min time.Duration, max time.Duration) BackoffFunc {
func BackoffInterval(minTime time.Duration, maxTime time.Duration) BackoffFunc {
return func(_ context.Context, _ Request, attempts int) (time.Duration, error) {
td := time.Duration(math.Pow(float64(attempts), math.E)) * time.Millisecond * 100
if td < min {
return min, nil
} else if td > max {
return max, nil
if td < minTime {
return minTime, nil
} else if td > maxTime {
return maxTime, nil
}
return td, nil
}

@ -34,23 +34,23 @@ func TestBackoffExp(t *testing.T) {
}
func TestBackoffInterval(t *testing.T) {
min := 100 * time.Millisecond
max := 300 * time.Millisecond
minTime := 100 * time.Millisecond
maxTime := 300 * time.Millisecond
r := &testRequest{
service: "test",
method: "test",
}
fn := BackoffInterval(min, max)
fn := BackoffInterval(minTime, maxTime)
for i := 0; i < 5; i++ {
d, err := fn(context.TODO(), r, i)
if err != nil {
t.Fatal(err)
}
if d < min || d > max {
t.Fatalf("Expected %v < %v < %v", min, d, max)
if d < minTime || d > maxTime {
t.Fatalf("Expected %v < %v < %v", minTime, d, maxTime)
}
}
}

@ -298,7 +298,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return errors.InternalServerError("go.micro.client", "%s", err.Error())
return errors.InternalServerError("go.micro.client", "%s", err)
}
// only sleep if greater than 0
@ -312,7 +312,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
// TODO apply any filtering here
routes, err = n.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", "%s", err.Error())
return errors.InternalServerError("go.micro.client", "%s", err)
}
// balance the list of nodes
@ -372,7 +372,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
return gerr
}
func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request {
func (n *noopClient) NewRequest(service, endpoint string, _ interface{}, _ ...RequestOption) Request {
return &noopRequest{service: service, endpoint: endpoint}
}
@ -466,7 +466,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
// call backoff first. Someone may want an initial start delay
t, cerr := callOpts.Backoff(ctx, req, i)
if cerr != nil {
return nil, errors.InternalServerError("go.micro.client", "%s", cerr.Error())
return nil, errors.InternalServerError("go.micro.client", "%s", cerr)
}
// only sleep if greater than 0
@ -480,7 +480,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
// TODO apply any filtering here
routes, err = n.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "%s", err.Error())
return nil, errors.InternalServerError("go.micro.client", "%s", err)
}
// balance the list of nodes
@ -546,7 +546,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
return nil, grr
}
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
func (n *noopClient) stream(ctx context.Context, _ string, _ Request, _ CallOptions) (Stream, error) {
return &noopStream{ctx: ctx}, nil
}
@ -609,13 +609,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
// use codec for payload
cf, err := n.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", "%s", err.Error())
return errors.InternalServerError("go.micro.client", "%s", err)
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", "%s", err.Error())
return errors.InternalServerError("go.micro.client", "%s", err)
}
body = b
}

@ -21,6 +21,16 @@ import (
// Options holds client options
type Options struct {
// Codecs map
Codecs map[string]codec.Codec
// Proxy is used for proxy requests
Proxy string
// ContentType is used to select codec
ContentType string
// Name is the client name
Name string
// Selector used to select needed address
Selector selector.Selector
// Logger used to log messages
@ -35,31 +45,28 @@ type Options struct {
Context context.Context
// Router used to get route
Router router.Router
// TLSConfig specifies tls.Config for secure connection
TLSConfig *tls.Config
// Codecs map
Codecs map[string]codec.Codec
// Lookup func used to get destination addr
Lookup LookupFunc
// Proxy is used for proxy requests
Proxy string
// ContentType is used to select codec
ContentType string
// Name is the client name
Name string
// ContextDialer used to connect
ContextDialer func(context.Context, string) (net.Conn, error)
// Wrappers contains wrappers
Wrappers []Wrapper
// Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
Hooks options.Hooks
// CallOptions contains default CallOptions
CallOptions CallOptions
// PoolSize connection pool size
PoolSize int
// PoolTTL connection pool ttl
PoolTTL time.Duration
// ContextDialer used to connect
ContextDialer func(context.Context, string) (net.Conn, error)
// Hooks can be run before broker Publish/BatchPublish and
// Subscribe/BatchSubscribe methods
Hooks options.Hooks
}
// NewCallOptions creates new call options struct
@ -73,6 +80,16 @@ func NewCallOptions(opts ...CallOption) CallOptions {
// CallOptions holds client call options
type CallOptions struct {
// RequestMetadata holds additional metadata for call
RequestMetadata metadata.Metadata
// Network name
Network string
// Content-Type
ContentType string
// AuthToken string
AuthToken string
// Selector selects addr
Selector selector.Selector
// Context used for deadline
@ -80,33 +97,30 @@ type CallOptions struct {
// Router used for route
Router router.Router
// Retry func used for retries
// ResponseMetadata holds additional metadata from call
ResponseMetadata *metadata.Metadata
Retry RetryFunc
// Backoff func used for backoff when retry
Backoff BackoffFunc
// Network name
Network string
// Content-Type
ContentType string
// AuthToken string
AuthToken string
// ContextDialer used to connect
ContextDialer func(context.Context, string) (net.Conn, error)
// Address specifies static addr list
Address []string
// SelectOptions selector options
SelectOptions []selector.SelectOption
// StreamTimeout stream timeout
StreamTimeout time.Duration
// RequestTimeout request timeout
RequestTimeout time.Duration
// RequestMetadata holds additional metadata for call
RequestMetadata metadata.Metadata
// ResponseMetadata holds additional metadata from call
ResponseMetadata *metadata.Metadata
// DialTimeout dial timeout
DialTimeout time.Duration
// Retries specifies retries num
Retries int
// ContextDialer used to connect
ContextDialer func(context.Context, string) (net.Conn, error)
}
// ContextDialer pass ContextDialer to client

@ -13,7 +13,7 @@ type Validator interface {
}
// DefaultConfig default config
var DefaultConfig Config = NewConfig()
var DefaultConfig = NewConfig()
// DefaultWatcherMinInterval default min interval for poll changes
var DefaultWatcherMinInterval = 5 * time.Second

@ -3,6 +3,7 @@ package config_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"
@ -12,15 +13,17 @@ import (
)
type cfg struct {
StringValue string `default:"string_value"`
IgnoreValue string `json:"-"`
StructValue *cfgStructValue
IntValue int `default:"99"`
DurationValue time.Duration `default:"10s"`
MDurationValue mtime.Duration `default:"10s"`
MapValue map[string]bool `default:"key1=true,key2=false"`
UUIDValue string `default:"micro:generate uuid"`
IDValue string `default:"micro:generate id"`
MapValue map[string]bool `default:"key1=true,key2=false"`
StructValue *cfgStructValue
StringValue string `default:"string_value"`
IgnoreValue string `json:"-"`
UUIDValue string `default:"micro:generate uuid"`
IDValue string `default:"micro:generate id"`
DurationValue time.Duration `default:"10s"`
MDurationValue mtime.Duration `default:"10s"`
IntValue int `default:"99"`
}
type cfgStructValue struct {
@ -134,3 +137,13 @@ func TestValidate(t *testing.T) {
t.Fatal(err)
}
}
func Test_SizeOf(t *testing.T) {
st := cfg{}
tVal := reflect.TypeOf(st)
for i := 0; i < tVal.NumField(); i++ {
field := tVal.Field(i)
fmt.Printf("Field: %s, Offset: %d, Size: %d\n", field.Name, field.Offset, field.Type.Size())
}
}

@ -41,14 +41,16 @@ type Options struct {
BeforeInit []func(context.Context, Config) error
// AfterInit contains slice of funcs that runs after Init
AfterInit []func(context.Context, Config) error
// AllowFail flag to allow fail in config source
AllowFail bool
// SkipLoad runs only if condition returns true
SkipLoad func(context.Context, Config) bool
// SkipSave runs only if condition returns true
SkipSave func(context.Context, Config) bool
// Hooks can be run before/after config Save/Load
Hooks options.Hooks
// AllowFail flag to allow fail in config source
AllowFail bool
}
// Option function signature
@ -278,10 +280,10 @@ func WatchCoalesce(b bool) WatchOption {
}
// WatchInterval specifies min and max time.Duration for pulling changes
func WatchInterval(min, max time.Duration) WatchOption {
func WatchInterval(minTime, maxTime time.Duration) WatchOption {
return func(o *WatchOptions) {
o.MinInterval = min
o.MaxInterval = max
o.MinInterval = minTime
o.MaxInterval = maxTime
}
}

@ -3,7 +3,6 @@ package errors
import (
"encoding/json"
"errors"
er "errors"
"fmt"
"net/http"
"testing"
@ -43,7 +42,7 @@ func TestFromError(t *testing.T) {
if merr.ID != "go.micro.test" || merr.Code != 404 {
t.Fatalf("invalid conversation %v != %v", err, merr)
}
err = er.New(err.Error())
err = errors.New(err.Error())
merr = FromError(err)
if merr.ID != "go.micro.test" || merr.Code != 404 {
t.Fatalf("invalid conversation %v != %v", err, merr)
@ -58,7 +57,7 @@ func TestEqual(t *testing.T) {
t.Fatal("errors must be equal")
}
err3 := er.New("my test err")
err3 := errors.New("my test err")
if Equal(err1, err3) {
t.Fatal("errors must be not equal")
}

@ -32,7 +32,7 @@ type fsm struct {
// NewFSM creates a new finite state machine having the specified initial state
// with specified options
func NewFSM(opts ...Option) *fsm {
func NewFSM(opts ...Option) FSM {
return &fsm{
statesMap: map[string]StateFunc{},
opts: NewOptions(opts...),

1
go.mod

@ -14,6 +14,7 @@ require (
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)
require (

2
go.sum

@ -74,5 +74,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -7,12 +7,12 @@ type loggerKey struct{}
// MustContext returns logger from passed context or DefaultLogger if empty
func MustContext(ctx context.Context) Logger {
if ctx == nil {
return DefaultLogger
return DefaultLogger.Clone()
}
if l, ok := ctx.Value(loggerKey{}).(Logger); ok && l != nil {
return l
}
return DefaultLogger
return DefaultLogger.Clone()
}
// FromContext returns logger from passed context

@ -11,7 +11,7 @@ var DefaultContextAttrFuncs []ContextAttrFunc
var (
// DefaultLogger variable
DefaultLogger Logger = NewLogger()
DefaultLogger = NewLogger()
// DefaultLevel used by logger
DefaultLevel = InfoLevel
)

@ -15,18 +15,6 @@ type Option func(*Options)
// Options holds logger options
type Options struct {
// Out holds the output writer
Out io.Writer
// Context holds exernal options
Context context.Context
// Name holds the logger name
Name string
// Fields holds additional metadata
Fields []interface{}
// callerSkipCount number of frmaes to skip
CallerSkipCount int
// ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc
// TimeKey is the key used for the time of the log call
TimeKey string
// LevelKey is the key used for the level of the log call
@ -39,16 +27,31 @@ type Options struct {
SourceKey string
// StacktraceKey is the key used for the stacktrace
StacktraceKey string
// AddStacktrace controls writing of stacktaces on error
AddStacktrace bool
// AddSource enabled writing source file and position in log
AddSource bool
// The logging level the logger should log
Level Level
// TimeFunc used to obtain current time
TimeFunc func() time.Time
// Name holds the logger name
Name string
// Out holds the output writer
Out io.Writer
// Context holds exernal options
Context context.Context
// Meter used to count logs for specific level
Meter meter.Meter
// TimeFunc used to obtain current time
TimeFunc func() time.Time
// Fields holds additional metadata
Fields []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc
// callerSkipCount number of frmaes to skip
CallerSkipCount int
// The logging level the logger should log
Level Level
// AddSource enabled writing source file and position in log
AddSource bool
// AddStacktrace controls writing of stacktaces on error
AddStacktrace bool
}
// NewOptions creates new options struct
@ -153,8 +156,8 @@ func WithTimeFunc(fn func() time.Time) Option {
func WithZapKeys() Option {
return func(o *Options) {
o.TimeKey = "@timestamp"
o.LevelKey = "level"
o.MessageKey = "msg"
o.LevelKey = slog.LevelKey
o.MessageKey = slog.MessageKey
o.SourceKey = "caller"
o.StacktraceKey = "stacktrace"
o.ErrorKey = "error"
@ -163,8 +166,8 @@ func WithZapKeys() Option {
func WithZerologKeys() Option {
return func(o *Options) {
o.TimeKey = "time"
o.LevelKey = "level"
o.TimeKey = slog.TimeKey
o.LevelKey = slog.LevelKey
o.MessageKey = "message"
o.SourceKey = "caller"
o.StacktraceKey = "stacktrace"
@ -186,8 +189,8 @@ func WithSlogKeys() Option {
func WithMicroKeys() Option {
return func(o *Options) {
o.TimeKey = "timestamp"
o.LevelKey = "level"
o.MessageKey = "msg"
o.LevelKey = slog.LevelKey
o.MessageKey = slog.MessageKey
o.SourceKey = "caller"
o.StacktraceKey = "stacktrace"
o.ErrorKey = "error"

@ -340,7 +340,7 @@ func (s *slogLogger) argsAttrs(args []interface{}) ([]slog.Attr, error) {
case string:
if idx+1 < len(args) {
attrs = append(attrs, slog.Any(arg, args[idx+1]))
idx += 1
idx++
} else {
attrs = append(attrs, slog.String(badKey, arg))
}

@ -11,9 +11,8 @@ import (
"testing"
"github.com/google/uuid"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
)
func TestWithHandlerFunc(t *testing.T) {
@ -133,7 +132,7 @@ func TestErrorf(t *testing.T) {
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), logger.WithAddStacktrace(true))
if err := l.Init(logger.WithContextAttrFuncs(func(ctx context.Context) []interface{} {
if err := l.Init(logger.WithContextAttrFuncs(func(_ context.Context) []interface{} {
return nil
})); err != nil {
t.Fatal(err)

@ -52,13 +52,15 @@ type protoMessage interface {
}
type Wrapper struct {
val interface{}
s fmt.State
pointers map[uintptr]int
opts *Options
pointers map[uintptr]int
takeMap map[int]bool
val interface{}
s fmt.State
opts *Options
depth int
ignoreNextType bool
takeMap map[int]bool
protoWrapperType bool
sqlWrapperType bool
}

@ -88,15 +88,14 @@ func TestPassing(t *testing.T) {
ctx = NewIncomingContext(ctx, md1)
testCtx(ctx)
md, ok := FromOutgoingContext(ctx)
_, ok := FromOutgoingContext(ctx)
if ok {
t.Fatalf("create outgoing context")
}
_ = md
ctx = NewOutgoingContext(ctx, New(1))
testCtx(ctx)
md, ok = FromOutgoingContext(ctx)
md, ok := FromOutgoingContext(ctx)
if !ok {
t.Fatalf("missing metadata from outgoing context")
}
@ -119,7 +118,7 @@ func TestMerge(t *testing.T) {
}
}
func TestIterator(t *testing.T) {
func TestIterator(_ *testing.T) {
md := Metadata{
"1Last": "last",
"2First": "first",

@ -11,7 +11,7 @@ import (
var (
// DefaultMeter is the default meter
DefaultMeter Meter = NewMeter()
DefaultMeter = NewMeter()
// DefaultAddress data will be made available on this host:port
DefaultAddress = ":9090"
// DefaultPath the meter endpoint where the Meter data will be made available

@ -37,32 +37,32 @@ func (r *noopMeter) Init(opts ...Option) error {
}
// Counter implements the Meter interface
func (r *noopMeter) Counter(name string, labels ...string) Counter {
func (r *noopMeter) Counter(_ string, labels ...string) Counter {
return &noopCounter{labels: labels}
}
// FloatCounter implements the Meter interface
func (r *noopMeter) FloatCounter(name string, labels ...string) FloatCounter {
func (r *noopMeter) FloatCounter(_ string, labels ...string) FloatCounter {
return &noopFloatCounter{labels: labels}
}
// Gauge implements the Meter interface
func (r *noopMeter) Gauge(name string, f func() float64, labels ...string) Gauge {
func (r *noopMeter) Gauge(_ string, _ func() float64, labels ...string) Gauge {
return &noopGauge{labels: labels}
}
// Summary implements the Meter interface
func (r *noopMeter) Summary(name string, labels ...string) Summary {
func (r *noopMeter) Summary(_ string, labels ...string) Summary {
return &noopSummary{labels: labels}
}
// SummaryExt implements the Meter interface
func (r *noopMeter) SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) Summary {
func (r *noopMeter) SummaryExt(_ string, _ time.Duration, _ []float64, labels ...string) Summary {
return &noopSummary{labels: labels}
}
// Histogram implements the Meter interface
func (r *noopMeter) Histogram(name string, labels ...string) Histogram {
func (r *noopMeter) Histogram(_ string, labels ...string) Histogram {
return &noopHistogram{labels: labels}
}
@ -77,7 +77,7 @@ func (r *noopMeter) Set(opts ...Option) Meter {
return m
}
func (r *noopMeter) Write(w io.Writer, opts ...Option) error {
func (r *noopMeter) Write(_ io.Writer, _ ...Option) error {
return nil
}

@ -18,26 +18,27 @@ func TestAs(t *testing.T) {
testCases := []struct {
b any
target any
match bool
want any
match bool
}{
{
broTarget,
&b,
true,
broTarget,
b: broTarget,
target: &b,
match: true,
want: broTarget,
},
{
nil,
&b,
false,
nil,
b: nil,
target: &b,
match: false,
want: nil,
},
{
fsmTarget,
&b,
false,
nil,
b: fsmTarget,
target: &b,
match: false,
want: nil,
},
}
for i, tc := range testCases {
@ -72,7 +73,7 @@ func (p *bro) Ready() bool { return true }
func (p *bro) Health() bool { return true }
func (p *bro) Init(opts ...broker.Option) error { return nil }
func (p *bro) Init(_ ...broker.Option) error { return nil }
// Options returns broker options
func (p *bro) Options() broker.Options { return broker.Options{} }
@ -81,28 +82,28 @@ func (p *bro) Options() broker.Options { return broker.Options{} }
func (p *bro) Address() string { return "" }
// Connect connects to broker
func (p *bro) Connect(ctx context.Context) error { return nil }
func (p *bro) Connect(_ context.Context) error { return nil }
// Disconnect disconnect from broker
func (p *bro) Disconnect(ctx context.Context) error { return nil }
func (p *bro) Disconnect(_ context.Context) error { return nil }
// Publish message, msg can be single broker.Message or []broker.Message
func (p *bro) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
func (p *bro) Publish(_ context.Context, _ string, _ *broker.Message, _ ...broker.PublishOption) error {
return nil
}
// BatchPublish messages to broker with multiple topics
func (p *bro) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
func (p *bro) BatchPublish(_ context.Context, _ []*broker.Message, _ ...broker.PublishOption) error {
return nil
}
// BatchSubscribe subscribes to topic messages via handler
func (p *bro) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
func (p *bro) BatchSubscribe(_ context.Context, _ string, _ broker.BatchHandler, _ ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
// Subscribe subscribes to topic message via handler
func (p *bro) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
func (p *bro) Subscribe(_ context.Context, _ string, _ broker.Handler, _ ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
@ -113,9 +114,9 @@ type fsmT struct {
name string
}
func (f *fsmT) Start(ctx context.Context, a interface{}, o ...Option) (interface{}, error) {
func (f *fsmT) Start(_ context.Context, _ interface{}, _ ...Option) (interface{}, error) {
return nil, nil
}
func (f *fsmT) Current() string { return f.name }
func (f *fsmT) Reset() {}
func (f *fsmT) State(s string, sf fsm.StateFunc) {}
func (f *fsmT) Current() string { return f.name }
func (f *fsmT) Reset() {}
func (f *fsmT) State(_ string, _ fsm.StateFunc) {}

@ -8,19 +8,20 @@ import (
// CertificateOptions holds options for x509.CreateCertificate
type CertificateOptions struct {
Organization []string
OrganizationalUnit []string
CommonName string
OCSPServer []string
IssuingCertificateURL []string
SerialNumber *big.Int
NotAfter time.Time
NotBefore time.Time
SignatureAlgorithm x509.SignatureAlgorithm
PublicKeyAlgorithm x509.PublicKeyAlgorithm
CommonName string
Organization []string
OrganizationalUnit []string
OCSPServer []string
IssuingCertificateURL []string
ExtKeyUsage []x509.ExtKeyUsage
KeyUsage x509.KeyUsage
IsCA bool
SignatureAlgorithm x509.SignatureAlgorithm
PublicKeyAlgorithm x509.PublicKeyAlgorithm
KeyUsage x509.KeyUsage
IsCA bool
}
// CertificateOrganizationalUnit set OrganizationalUnit in certificate subject

@ -10,7 +10,7 @@ import (
var (
// DefaultTransport is the global default transport
DefaultTransport Transport = NewTransport()
DefaultTransport = NewTransport()
// DefaultDialTimeout the default dial timeout
DefaultDialTimeout = time.Second * 5
)

@ -84,7 +84,7 @@ func (t *tunBroker) Disconnect(ctx context.Context) error {
return t.tunnel.Close(ctx)
}
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, _ ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
topicMap := make(map[string]tunnel.Session)
@ -114,7 +114,7 @@ func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, op
return nil
}
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, _ ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))

@ -15,6 +15,6 @@ func (p *noopProfiler) String() string {
}
// NewProfiler returns new noop profiler
func NewProfiler(opts ...Option) Profiler {
func NewProfiler(_ ...Option) Profiler {
return &noopProfiler{}
}

@ -12,7 +12,7 @@ type Profiler interface {
}
// DefaultProfiler holds the default profiler
var DefaultProfiler Profiler = NewProfiler()
var DefaultProfiler = NewProfiler()
// Options holds the options for profiler
type Options struct {

@ -31,10 +31,10 @@ type record struct {
}
type memory struct {
sync.RWMutex
records map[string]services
watchers map[string]*watcher
opts register.Options
sync.RWMutex
}
// services is a KV map with service name as the key and a map of records as the value
@ -100,11 +100,11 @@ func (m *memory) sendEvent(r *register.Result) {
}
}
func (m *memory) Connect(ctx context.Context) error {
func (m *memory) Connect(_ context.Context) error {
return nil
}
func (m *memory) Disconnect(ctx context.Context) error {
func (m *memory) Disconnect(_ context.Context) error {
return nil
}
@ -124,7 +124,7 @@ func (m *memory) Options() register.Options {
return m.opts
}
func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error {
func (m *memory) Register(_ context.Context, s *register.Service, opts ...register.RegisterOption) error {
m.Lock()
defer m.Unlock()

@ -208,9 +208,9 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) {
}
}
//if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
// if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
// t.Logf("test will wait %v, then check TTL timeouts", waitTime)
//}
// }
errChan := make(chan error, concurrency)
syncChan := make(chan struct{})
@ -290,8 +290,12 @@ func TestWatcher(t *testing.T) {
ctx := context.TODO()
m := NewRegister()
_ = m.Init()
_ = m.Connect(ctx)
if err := m.Init(); err != nil {
t.Fatal(err)
}
if err := m.Connect(ctx); err != nil {
t.Fatal(err)
}
wc, err := m.Watch(ctx)
if err != nil {
t.Fatalf("cant watch: %v", err)

@ -18,7 +18,7 @@ var DefaultDomain = "micro"
var (
// DefaultRegister is the global default register
DefaultRegister Register = NewRegister()
DefaultRegister = NewRegister()
// ErrNotFound returned when LookupService is called and no services found
ErrNotFound = errors.New("service not found")
// ErrWatcherStopped returned when when watcher is stopped

@ -9,6 +9,6 @@ import (
type Resolver struct{}
// Resolve returns the list of nodes
func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
func (r *Resolver) Resolve(_ string) ([]*resolver.Record, error) {
return []*resolver.Record{}, nil
}

@ -7,7 +7,7 @@ import (
var (
// DefaultRouter is the global default router
DefaultRouter Router = NewRouter()
DefaultRouter = NewRouter()
// DefaultNetwork is default micro network
DefaultNetwork = "micro"
// ErrRouteNotFound is returned when no route was found in the routing table

@ -25,7 +25,7 @@ func (r *random) Select(routes []string, opts ...selector.SelectOption) (selecto
}, nil
}
func (r *random) Record(addr string, err error) error {
func (r *random) Record(_ string, _ error) error {
return nil
}

@ -6,14 +6,14 @@ import (
)
// NewSelector returns an initialised round robin selector
func NewSelector(opts ...selector.Option) selector.Selector {
func NewSelector(_ ...selector.Option) selector.Selector {
return new(roundrobin)
}
type roundrobin struct{}
// Select return routes based on algo
func (r *roundrobin) Select(routes []string, opts ...selector.SelectOption) (selector.Next, error) {
func (r *roundrobin) Select(routes []string, _ ...selector.SelectOption) (selector.Next, error) {
if len(routes) == 0 {
return nil, selector.ErrNoneAvailable
}
@ -28,7 +28,7 @@ func (r *roundrobin) Select(routes []string, opts ...selector.SelectOption) (sel
}, nil
}
func (r *roundrobin) Record(addr string, err error) error { return nil }
func (r *roundrobin) Record(_ string, _ error) error { return nil }
func (r *roundrobin) Reset() error { return nil }

@ -775,13 +775,16 @@ func (s *subscriber) Options() SubscriberOptions {
}
type subscriber struct {
topic string
typ reflect.Type
subscriber interface{}
topic string
endpoints []*register.Endpoint
handlers []*handler
opts SubscriberOptions
rcvr reflect.Value
endpoints []*register.Endpoint
handlers []*handler
rcvr reflect.Value
opts SubscriberOptions
}
type handler struct {

@ -38,7 +38,9 @@ func TestNoopSub(t *testing.T) {
t.Fatal(err)
}
_ = logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
if err := logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)); err != nil {
t.Fatal(err)
}
s := server.NewServer(
server.Broker(b),
server.Codec("application/octet-stream", codec.NewCodec()),

@ -24,36 +24,11 @@ type Option func(*Options)
// Options server struct
type Options struct {
// Context holds the external options and can be used for server shutdown
Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register
Register register.Register
// Tracer holds the tracer
Tracer tracer.Tracer
// Logger holds the logger
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
/*
// Router for requests
Router Router
*/
// Listener may be passed if already created
Listener net.Listener
// Wait group
Wait *msync.WaitGroup
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Metadata holds the server metadata
Metadata metadata.Metadata
// RegisterCheck run before register server
RegisterCheck func(context.Context) error
// Codecs map to handle content-type
Codecs map[string]codec.Codec
// Metadata holds the server metadata
Metadata metadata.Metadata
// ID holds the id of the server
ID string
// Namespace for te server
@ -66,21 +41,46 @@ type Options struct {
Advertise string
// Version holds the server version
Version string
// RegisterAttempts holds the number of register attempts before error
RegisterAttempts int
// Context holds the external options and can be used for server shutdown
Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register
Register register.Register
// Tracer holds the tracer
Tracer tracer.Tracer
// Logger holds the logger
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
// Listener may be passed if already created
Listener net.Listener
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Wait group
Wait *msync.WaitGroup
// RegisterCheck run before register server
RegisterCheck func(context.Context) error
// Hooks may contains hook actions that performs before/after server handler
// or server subscriber handler
Hooks options.Hooks
// RegisterInterval holds he interval for re-register
RegisterInterval time.Duration
// RegisterTTL specifies TTL for register record
RegisterTTL time.Duration
// GracefulTimeout timeout for graceful stop server
GracefulTimeout time.Duration
// MaxConn limits number of connections
MaxConn int
// DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int
// Hooks may contains hook actions that performs before/after server handler
// or server subscriber handler
Hooks options.Hooks
// GracefulTimeout timeout for graceful stop server
GracefulTimeout time.Duration
// RegisterAttempts holds the number of register attempts before error
RegisterAttempts int
}
// NewOptions returns new options struct with default or passed values
@ -318,14 +318,14 @@ type SubscriberOptions struct {
Context context.Context
// Queue holds the subscription queue
Queue string
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
// BatchSize flag specifies max size of batch
BatchSize int
// AutoAck flag for auto ack messages after processing
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
}
// NewSubscriberOptions create new SubscriberOptions

@ -12,7 +12,7 @@ import (
// DefaultServer default server
var (
DefaultServer Server = NewServer()
DefaultServer = NewServer()
)
var (

@ -121,8 +121,10 @@ func TestNewService(t *testing.T) {
}
tests := []struct {
name string
args args
want Service
args args
}{
{
name: "NewService",
@ -146,9 +148,10 @@ func Test_service_Name(t *testing.T) {
opts Options
}
tests := []struct {
name string
name string
want string
fields fields
want string
}{
{
name: "Test_service_Name",
@ -245,10 +248,12 @@ func Test_service_Broker(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want broker.Broker
fields fields
args args
want broker.Broker
}{
{
name: "service.Broker",
@ -301,10 +306,12 @@ func Test_service_Tracer(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want tracer.Tracer
fields fields
args args
want tracer.Tracer
}{
{
name: "service.Tracer",
@ -338,10 +345,11 @@ func Test_service_Config(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want config.Config
fields fields
args args
want config.Config
}{
{
name: "service.Config",
@ -375,10 +383,12 @@ func Test_service_Client(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want client.Client
fields fields
args args
want client.Client
}{
{
name: "service.Client",
@ -412,10 +422,12 @@ func Test_service_Server(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want server.Server
fields fields
args args
want server.Server
}{
{
name: "service.Server",
@ -449,10 +461,12 @@ func Test_service_Store(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want store.Store
fields fields
args args
want store.Store
}{
{
name: "service.Store",
@ -486,10 +500,12 @@ func Test_service_Register(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want register.Register
fields fields
args args
want register.Register
}{
{
name: "service.Register",
@ -523,10 +539,12 @@ func Test_service_Logger(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want logger.Logger
fields fields
args args
want logger.Logger
}{
{
name: "service.Logger",
@ -560,10 +578,12 @@ func Test_service_Router(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want router.Router
fields fields
args args
want router.Router
}{
{
name: "service.Router",
@ -597,10 +617,12 @@ func Test_service_Meter(t *testing.T) {
names []string
}
tests := []struct {
name string
name string
want meter.Meter
fields fields
args args
want meter.Meter
}{
{
name: "service.Meter",
@ -631,8 +653,8 @@ func Test_service_String(t *testing.T) {
}
tests := []struct {
name string
fields fields
want string
fields fields
}{
{
name: "service.String",

@ -96,15 +96,15 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string {
if limit != 0 || offset != 0 {
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
end := len(allKeys)
end := uint(len(allKeys))
if limit > 0 {
calcLimit := int(offset + limit)
calcLimit := offset + limit
if calcLimit < end {
end = calcLimit
}
}
if int(offset) >= end {
if offset >= end {
return nil
}
return allKeys[offset:end]

@ -12,15 +12,18 @@ import (
var _ Store = (*noopStore)(nil)
type noopStore struct {
mu sync.Mutex
watchers map[string]Watcher
funcRead FuncRead
funcWrite FuncWrite
funcExists FuncExists
funcList FuncList
funcDelete FuncDelete
watchers map[string]Watcher
funcRead FuncRead
funcWrite FuncWrite
funcExists FuncExists
funcList FuncList
funcDelete FuncDelete
opts Options
isConnected atomic.Int32
mu sync.Mutex
}
func (n *noopStore) Live() bool {
@ -35,7 +38,7 @@ func (n *noopStore) Health() bool {
return true
}
func NewStore(opts ...Option) *noopStore {
func NewStore(opts ...Option) Store {
options := NewOptions(opts...)
return &noopStore{opts: options}
}
@ -94,7 +97,7 @@ func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts
return n.funcRead(ctx, key, val, opts...)
}
func (n *noopStore) fnRead(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
func (n *noopStore) fnRead(ctx context.Context, _ string, _ interface{}, _ ...ReadOption) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -112,7 +115,7 @@ func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption
return n.funcDelete(ctx, key, opts...)
}
func (n *noopStore) fnDelete(ctx context.Context, key string, opts ...DeleteOption) error {
func (n *noopStore) fnDelete(ctx context.Context, _ string, _ ...DeleteOption) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -130,7 +133,7 @@ func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption
return n.funcExists(ctx, key, opts...)
}
func (n *noopStore) fnExists(ctx context.Context, key string, opts ...ExistsOption) error {
func (n *noopStore) fnExists(ctx context.Context, _ string, _ ...ExistsOption) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -148,7 +151,7 @@ func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts
return n.funcWrite(ctx, key, val, opts...)
}
func (n *noopStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
func (n *noopStore) fnWrite(ctx context.Context, _ string, _ interface{}, _ ...WriteOption) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -200,13 +203,13 @@ func (n *noopStore) connect(ctx context.Context) error {
}
type watcher struct {
opts WatchOptions
ch chan Event
exit chan bool
id string
ch chan Event
opts WatchOptions
}
func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
func (n *noopStore) Watch(_ context.Context, opts ...WatchOption) (Watcher, error) {
id, err := id.New()
if err != nil {
return nil, err
@ -223,9 +226,9 @@ func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, er
opts: wo,
}
m.mu.Lock()
m.watchers[w.id] = w
m.mu.Unlock()
n.mu.Lock()
n.watchers[w.id] = w
n.mu.Unlock()
return w, nil
}

@ -15,6 +15,13 @@ import (
// Options contains configuration for the Store
type Options struct {
// Name specifies store name
Name string
// Namespace of the records
Namespace string
// Separator used as key parts separator
Separator string
// Meter used for metrics
Meter meter.Meter
// Tracer used for tracing
@ -25,22 +32,17 @@ type Options struct {
Codec codec.Codec
// Logger used for logging
Logger logger.Logger
// TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config
// Name specifies store name
Name string
// Namespace of the records
Namespace string
// Separator used as key parts separator
Separator string
// Addrs contains store address
Addrs []string
// Wrappers store wrapper that called before actual functions
// Wrappers []Wrapper
// Timeout specifies timeout duration for all operations
Timeout time.Duration
// Hooks can be run before/after store Read/List/Write/Exists/Delete
Hooks options.Hooks
// Timeout specifies timeout duration for all operations
Timeout time.Duration
// LazyConnect creates a connection when using store
LazyConnect bool
}

@ -16,7 +16,7 @@ var (
// ErrInvalidKey is returned when a key has empty or have invalid format
ErrInvalidKey = errors.New("invalid key")
// DefaultStore is the global default store
DefaultStore Store = NewStore()
DefaultStore = NewStore()
// DefaultSeparator is the gloabal default key parts separator
DefaultSeparator = "/"
)

@ -6,9 +6,10 @@ import (
)
type memorySync struct {
mtx gosync.RWMutex
locks map[string]*memoryLock
options Options
mtx gosync.RWMutex
}
type memoryLock struct {

@ -43,7 +43,7 @@ func NewNetDialer(parent DialFunc, opts ...Option) DialFunc {
if cache.opts.MaxCacheEntries == 0 {
cache.opts.MaxCacheEntries = DefaultMaxCacheEntries
}
return func(ctx context.Context, network, address string) (net.Conn, error) {
return func(_ context.Context, network, address string) (net.Conn, error) {
conn := &dnsConn{}
conn.roundTrip = cachingRoundTrip(&cache, network, address)
return conn, nil
@ -132,12 +132,12 @@ func PreferIPV6(b bool) Option {
}
type cache struct {
sync.RWMutex
dial DialFunc
entries map[string]cacheEntry
dial DialFunc
opts Options
sync.RWMutex
}
type cacheEntry struct {
@ -306,7 +306,7 @@ func getNameLen(msg string) int {
for i < len(msg) {
if msg[i] == 0 {
// end of name
i += 1
i++
break
}
if msg[i] >= 0xc0 {
@ -336,8 +336,7 @@ func cachingRoundTrip(cache *cache, network, address string) roundTripper {
cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Inc()
defer cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Dec()
// check cache
if res := cache.get(req); res != "" {
cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "hit").Inc()
if res = cache.get(req); res != "" {
return res, nil
}
cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "miss").Inc()

@ -8,15 +8,13 @@ import (
func TestCache(t *testing.T) {
net.DefaultResolver = NewNetResolver(PreferIPV4(true))
addrs, err := net.LookupHost("unistack.org")
_, err := net.LookupHost("unistack.org")
if err != nil {
t.Fatal(err)
}
addrs, err = net.LookupHost("unistack.org")
_, err = net.LookupHost("unistack.org")
if err != nil {
t.Fatal(err)
}
_ = addrs
}

@ -11,12 +11,15 @@ import (
)
type dnsConn struct {
deadline time.Time
ctx context.Context
cancel context.CancelFunc
roundTrip roundTripper
ibuf bytes.Buffer
obuf bytes.Buffer
deadline time.Time
ibuf bytes.Buffer
obuf bytes.Buffer
sync.Mutex
}
@ -81,7 +84,7 @@ func (c *dnsConn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *dnsConn) SetWriteDeadline(t time.Time) error {
func (c *dnsConn) SetWriteDeadline(_ time.Time) error {
// writes do not timeout
return nil
}
@ -159,23 +162,22 @@ func readMessage(c net.Conn) (string, error) {
return "", err
}
return string(b[:n]), nil
} else {
var sz [2]byte
_, err := io.ReadFull(c, sz[:])
if err != nil {
return "", err
}
size := int64(sz[0])<<8 | int64(sz[1])
var str strings.Builder
_, err = io.CopyN(&str, c, size)
if err == io.EOF {
return "", io.ErrUnexpectedEOF
}
if err != nil {
return "", err
}
return str.String(), nil
}
var sz [2]byte
_, err := io.ReadFull(c, sz[:])
if err != nil {
return "", err
}
size := int64(sz[0])<<8 | int64(sz[1])
var str strings.Builder
_, err = io.CopyN(&str, c, size)
if err == io.EOF {
return "", io.ErrUnexpectedEOF
}
if err != nil {
return "", err
}
return str.String(), nil
}

@ -71,7 +71,7 @@ func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
func (h *serverHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
if span, ok := tracer.SpanFromContext(ctx); ok {
attrs := peerAttr(peerFromCtx(ctx))
span.AddLabels(attrs...)
@ -80,7 +80,7 @@ func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) co
}
// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
func (h *serverHandler) HandleConn(_ context.Context, _ stats.ConnStats) {
}
type clientHandler struct {

@ -665,12 +665,12 @@ func patParamKeys(pattern string) ([]string, error) {
// longestPrefix finds the length of the shared prefix
// of two strings
func longestPrefix(k1, k2 string) int {
max := len(k1)
if l := len(k2); l < max {
max = l
maxLen := len(k1)
if l := len(k2); l < maxLen {
maxLen = l
}
var i int
for i = 0; i < max; i++ {
for i = 0; i < maxLen; i++ {
if k1[i] != k2[i] {
break
}

@ -14,7 +14,7 @@ func Random(d time.Duration) time.Duration {
return time.Duration(v)
}
func RandomInterval(min, max time.Duration) time.Duration {
func RandomInterval(minTime, maxTime time.Duration) time.Duration {
var rng rand.Rand
return time.Duration(rng.Int63n(max.Nanoseconds()-min.Nanoseconds())+min.Nanoseconds()) * time.Nanosecond
return time.Duration(rng.Int63n(maxTime.Nanoseconds()-minTime.Nanoseconds())+minTime.Nanoseconds()) * time.Nanosecond
}

@ -23,12 +23,12 @@ type Ticker struct {
// NewTickerContext returns a pointer to an initialized instance of the Ticker.
// It works like NewTicker except that it has ability to close via context.
// Also it works fine with context.WithTimeout to handle max time to run ticker.
func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
func NewTickerContext(ctx context.Context, minTime, maxTime time.Duration) *Ticker {
ticker := &Ticker{
C: make(chan time.Time),
done: make(chan chan struct{}),
min: min.Nanoseconds(),
max: max.Nanoseconds(),
min: minTime.Nanoseconds(),
max: maxTime.Nanoseconds(),
ctx: ctx,
}
go ticker.run()
@ -38,12 +38,12 @@ func NewTickerContext(ctx context.Context, min, max time.Duration) *Ticker {
// NewTicker returns a pointer to an initialized instance of the Ticker.
// Min and max are durations of the shortest and longest allowed
// ticks. Ticker will run in a goroutine until explicitly stopped.
func NewTicker(min, max time.Duration) *Ticker {
func NewTicker(minTime, maxTime time.Duration) *Ticker {
ticker := &Ticker{
C: make(chan time.Time),
done: make(chan chan struct{}),
min: min.Nanoseconds(),
max: max.Nanoseconds(),
min: minTime.Nanoseconds(),
max: maxTime.Nanoseconds(),
ctx: context.Background(),
}
go ticker.run()

@ -31,26 +31,26 @@ loop:
func TestTicker(t *testing.T) {
t.Parallel()
min := time.Duration(10)
max := time.Duration(20)
minTime := time.Duration(10)
maxTime := time.Duration(20)
// tick can take a little longer since we're not adjusting it to account for
// processing.
precision := time.Duration(4)
rt := NewTicker(min*time.Millisecond, max*time.Millisecond)
rt := NewTicker(minTime*time.Millisecond, maxTime*time.Millisecond)
for i := 0; i < 5; i++ {
t0 := time.Now()
t1 := <-rt.C
td := t1.Sub(t0)
if td < min*time.Millisecond {
if td < minTime*time.Millisecond {
t.Fatalf("tick was shorter than expected: %s", td)
} else if td > (max+precision)*time.Millisecond {
} else if td > (maxTime+precision)*time.Millisecond {
t.Fatalf("tick was longer than expected: %s", td)
}
}
rt.Stop()
time.Sleep((max + precision) * time.Millisecond)
time.Sleep((maxTime + precision) * time.Millisecond)
select {
case v, ok := <-rt.C:
if ok || !v.IsZero() {

@ -48,19 +48,19 @@ func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, e
// we have a port range
// extract min port
min, err := strconv.Atoi(prange[0])
minPort, err := strconv.Atoi(prange[0])
if err != nil {
return nil, errors.New("unable to extract port range")
}
// extract max port
max, err := strconv.Atoi(prange[1])
maxPort, err := strconv.Atoi(prange[1])
if err != nil {
return nil, errors.New("unable to extract port range")
}
// range the ports
for port := min; port <= max; port++ {
for port := minPort; port <= maxPort; port++ {
// try bind to host:port
ln, err := fn(HostPort(host, port))
if err == nil {
@ -68,7 +68,7 @@ func Listen(addr string, fn func(string) (net.Listener, error)) (net.Listener, e
}
// hit max port
if port == max {
if port == maxPort {
return nil, err
}
}

@ -155,7 +155,7 @@ func indexFunction(v reflect.Value) func(i int) reflect.Value {
return v.MapIndex(keys[i])
}
}
return func(i int) reflect.Value { return reflect.Value{} }
return func(_ int) reflect.Value { return reflect.Value{} }
}
func mergeValue(values []reflect.Value) reflect.Value {

@ -130,16 +130,16 @@ func TestMergeString(t *testing.T) {
func TestMergeNested(t *testing.T) {
type CallReqNested struct {
Nested *CallReqNested `json:"nested2"`
StringArgs []string `json:"string_args"`
Uint64Args []uint64 `json:"uint64_args"`
Nested *CallReqNested `json:"nested2"`
}
type CallReq struct {
Nested *CallReqNested `json:"nested"`
Name string `json:"name"`
Req string `json:"req"`
Arg2 int `json:"arg2"`
Nested *CallReqNested `json:"nested"`
}
dst := &CallReq{

@ -109,12 +109,11 @@ func Merge(olist []*register.Service, nlist []*register.Service) []*register.Ser
seen = true
srv = append(srv, sp)
break
} else {
sp := &register.Service{}
// make copy
*sp = *o
srv = append(srv, sp)
}
sp := &register.Service{}
// make copy
*sp = *o
srv = append(srv, sp)
}
if !seen {
srv = append(srv, Copy([]*register.Service{n})...)
@ -153,14 +152,14 @@ func Remove(old, del []*register.Service) []*register.Service {
// WaitService using register wait for service to appear with min/max interval for check and optional timeout.
// Timeout can be 0 to wait infinitive.
func WaitService(ctx context.Context, reg register.Register, name string, min time.Duration, max time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
func WaitService(ctx context.Context, reg register.Register, name string, minTime time.Duration, maxTime time.Duration, timeout time.Duration, opts ...register.LookupOption) error {
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
ticker := jitter.NewTickerContext(ctx, min, max)
ticker := jitter.NewTickerContext(ctx, minTime, maxTime)
defer ticker.Stop()
for {

@ -10,10 +10,11 @@ import (
// Buffer is ring buffer
type Buffer struct {
sync.RWMutex
streams map[string]*Stream
vals []*Entry
size int
sync.RWMutex
}
// Entry is ring buffer data entry

@ -6,8 +6,8 @@ import (
// Pool holds the socket pool
type Pool struct {
sync.RWMutex
pool map[string]*Socket
sync.RWMutex
}
// Get socket from pool

@ -20,10 +20,11 @@ type Stream interface {
}
type stream struct {
sync.RWMutex
Stream
err error
request *request
sync.RWMutex
}
type request struct {

@ -10,11 +10,18 @@ import (
type DigitalOceanMetadata struct {
Metadata struct {
V1 struct {
DropletID int64 `json:"droplet_id"`
Hostname string `json:"hostname"`
VendorData string `json:"vendor_data"`
Features map[string]interface{} `json:"features"`
Hostname string `json:"hostname"`
VendorData string `json:"vendor_data"`
Region string `json:"region"`
PublicKeys []string `json:"public_keys"`
Region string `json:"region"`
DNS struct {
Nameservers []string `json:"nameservers"`
} `json:"dns"`
Interfaces struct {
Private []struct {
IPv4 struct {
@ -31,24 +38,23 @@ type DigitalOceanMetadata struct {
Netmask string `json:"netmask"`
Gateway string `json:"gateway"`
} `json:"ipv4"`
IPv6 struct {
Address string `json:"ip_address"`
CIDR int `json:"cidr"`
Gateway string `json:"gateway"`
} `json:"ipv6"`
Mac string `json:"mac"`
Type string `json:"type"`
IPv6 struct {
Address string `json:"ip_address"`
Gateway string `json:"gateway"`
CIDR int `json:"cidr"`
} `json:"ipv6"`
} `json:"public"`
} `json:"interfaces"`
DropletID int64 `json:"droplet_id"`
FloatingIP struct {
IPv4 struct {
Active bool `json:"active"`
} `json:"ipv4"`
} `json:"floating_ip"`
DNS struct {
Nameservers []string `json:"nameservers"`
} `json:"dns"`
Features map[string]interface{} `json:"features"`
} `json:"v1"`
} `json:"metadata"`
}

@ -2,29 +2,29 @@ package structfs
type EC2Metadata struct {
Latest struct {
Userdata string `json:"user-data"`
Metadata struct {
AMIID int `json:"ami-id"`
AMILaunchIndex int `json:"ami-launch-index"`
AMIManifestPath string `json:"ami-manifest-path"`
AncestorAMIIDs []int `json:"ancestor-ami-ids"`
BlockDeviceMapping []string `json:"block-device-mapping"`
InstanceID int `json:"instance-id"`
InstanceType string `json:"instance-type"`
LocalHostname string `json:"local-hostname"`
LocalIPv4 string `json:"local-ipv4"`
KernelID int `json:"kernel-id"`
Placement string `json:"placement"`
AvailabilityZone string `json:"availability-zone"`
ProductCodes string `json:"product-codes"`
PublicHostname string `json:"public-hostname"`
PublicIPv4 string `json:"public-ipv4"`
PublicKeys []struct {
AMIManifestPath string `json:"ami-manifest-path"`
InstanceType string `json:"instance-type"`
LocalHostname string `json:"local-hostname"`
LocalIPv4 string `json:"local-ipv4"`
Placement string `json:"placement"`
AvailabilityZone string `json:"availability-zone"`
ProductCodes string `json:"product-codes"`
PublicHostname string `json:"public-hostname"`
PublicIPv4 string `json:"public-ipv4"`
PublicKeys []struct {
Key []string `json:"-"`
} `json:"public-keys"`
RamdiskID int `json:"ramdisk-id"`
ReservationID int `json:"reservation-id"`
SecurityGroups []string `json:"security-groups"`
AncestorAMIIDs []int `json:"ancestor-ami-ids"`
BlockDeviceMapping []string `json:"block-device-mapping"`
SecurityGroups []string `json:"security-groups"`
RamdiskID int `json:"ramdisk-id"`
ReservationID int `json:"reservation-id"`
AMIID int `json:"ami-id"`
AMILaunchIndex int `json:"ami-launch-index"`
KernelID int `json:"kernel-id"`
InstanceID int `json:"instance-id"`
} `json:"meta-data"`
Userdata string `json:"user-data"`
} `json:"latest"`
}

@ -35,22 +35,22 @@ func (fs *fs) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
type fs struct {
modtime time.Time
iface interface{}
tag string
modtime time.Time
}
type file struct {
name string
offset int64
data []byte
modtime time.Time
name string
data []byte
offset int64
}
type fileInfo struct {
modtime time.Time
name string
size int64
modtime time.Time
}
func (fi *fileInfo) Sys() interface{} {
@ -105,7 +105,7 @@ func (f *file) Read(b []byte) (int, error) {
return n, err
}
func (f *file) Readdir(count int) ([]os.FileInfo, error) {
func (f *file) Readdir(_ int) ([]os.FileInfo, error) {
return nil, nil
}

@ -93,7 +93,7 @@ func TestAll(t *testing.T) {
in string
out string
}{
{"http://127.0.0.1:8080/metadata/v1/", "droplet_id\nhostname\nvendor_data\npublic_keys\nregion\ninterfaces\nfloating_ip\ndns\nfeatures"},
{"http://127.0.0.1:8080/metadata/v1/", "features\nhostname\nvendor_data\nregion\npublic_keys\ndns\ninterfaces\ndroplet_id\nfloating_ip"},
{"http://127.0.0.1:8080/metadata/v1/droplet_id", "2756294"},
{"http://127.0.0.1:8080/metadata/v1/dns/", "nameservers"},
{"http://127.0.0.1:8080/metadata/v1/dns/nameservers", "2001:4860:4860::8844\n2001:4860:4860::8888\n8.8.8.8"},

@ -78,8 +78,8 @@ var (
for _, se := range st.Details() {
switch ne := se.(type) {
case proto.Message:
buf, err := testCodec.Marshal(ne)
if err != nil {
var buf []byte
if buf, err = testCodec.Marshal(ne); err != nil {
return fmt.Errorf("failed to marshal err: %w", err)
}
if err = testCodec.Unmarshal(buf, &testMap); err != nil {
@ -438,10 +438,10 @@ func Run(ctx context.Context, c client.Client, m sqlmock.Sqlmock, dir string, ex
}
type Case struct {
dbfiles []string
reqfile string
rspfile string
errfile string
dbfiles []string
}
func GetCases(dir string, exts []string) ([]Case, error) {

@ -1,6 +1,5 @@
package text
func DetectEncoding(text string) map[string]int {
charsets := map[string]int{
"UTF-8": 0,
@ -19,7 +18,7 @@ func DetectEncoding(text string) map[string]int {
utfupper := 5
lowercase := 3
uppercase := 1
last_simb := 0
lastSimb := 0
for a := 0; a < len(text); a++ {
char := int(text[a])
@ -30,10 +29,10 @@ func DetectEncoding(text string) map[string]int {
}
// UTF-8
if (last_simb == 208) && ((char > 143 && char < 176) || char == 129) {
if (lastSimb == 208) && ((char > 143 && char < 176) || char == 129) {
charsets["UTF-8"] += (utfupper * 2)
}
if ((last_simb == 208) && ((char > 175 && char < 192) || char == 145)) || (last_simb == 209 && char > 127 && char < 144) {
if ((lastSimb == 208) && ((char > 175 && char < 192) || char == 145)) || (lastSimb == 209 && char > 127 && char < 144) {
charsets["UTF-8"] += (utflower * 2)
}
@ -77,7 +76,7 @@ func DetectEncoding(text string) map[string]int {
charsets["MAC"] += uppercase
}
last_simb = char
lastSimb = char
}
return charsets

@ -2,7 +2,6 @@ package time
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
@ -14,7 +13,7 @@ type Duration int64
func ParseDuration(s string) (time.Duration, error) {
if s == "" {
return 0, errors.New(`time: invalid duration "` + s + `"`)
return 0, fmt.Errorf(`time: invalid duration "%s"`, s)
}
var p int
@ -27,21 +26,21 @@ loop:
case 'h':
d, err := strconv.Atoi(s[p:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
return 0, fmt.Errorf(`time: invalid duration "%s"`, s)
}
hours += d
p = i + 1
case 'd':
d, err := strconv.Atoi(s[p:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
return 0, fmt.Errorf(`time: invalid duration "%s"`, s)
}
hours += d * 24
p = i + 1
case 'y':
n, err := strconv.Atoi(s[p:i])
if err != nil {
return 0, errors.New("time: invalid duration " + s)
return 0, fmt.Errorf(`time: invalid duration "%s"`, s)
}
var d int
for j := n - 1; j >= 0; j-- {