sync/waitgroup: initial sync.WaitGroup wrapper with context support #319

Merged
vtolstov merged 7 commits from waitgroup into master 2024-03-09 23:35:14 +03:00
19 changed files with 293 additions and 147 deletions

View File

@ -1,6 +1,5 @@
run: run:
concurrency: 4 concurrency: 4
deadline: 5m
issues-exit-code: 1 issues-exit-code: 1
tests: true tests: true
@ -13,15 +12,13 @@ linters-settings:
linters: linters:
enable: enable:
- govet - govet
- deadcode
- errcheck - errcheck
- govet - govet
- ineffassign - ineffassign
- staticcheck - staticcheck
- structcheck
- typecheck - typecheck
- unused - unused
- varcheck - spancheck
- bodyclose - bodyclose
- gci - gci
- goconst - goconst
@ -41,4 +38,5 @@ linters:
- prealloc - prealloc
- unconvert - unconvert
- unparam - unparam
- unused
disable-all: false disable-all: false

View File

@ -58,14 +58,14 @@ func NewOptions(opts ...options.Option) Options {
type PublishOptions struct { type PublishOptions struct {
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// BodyOnly flag says the message contains raw body bytes
BodyOnly bool
// Message metadata usually passed as message headers // Message metadata usually passed as message headers
Metadata metadata.Metadata Metadata metadata.Metadata
// Content-Type of message for marshal // Content-Type of message for marshal
ContentType string ContentType string
// Topic destination // Topic destination
Topic string Topic string
// BodyOnly flag says the message contains raw body bytes
BodyOnly bool
} }
// NewPublishOptions creates PublishOptions struct // NewPublishOptions creates PublishOptions struct

View File

@ -19,8 +19,8 @@ var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name? // Is this an exported - upper case - name?
func isExported(name string) bool { func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name) r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune) return unicode.IsUpper(r)
} }
// Is this type exported or a builtin? // Is this type exported or a builtin?

View File

@ -75,78 +75,80 @@ func ParseDSN(dsn string) (*Config, error) {
// Find last '/' that goes before dbname // Find last '/' that goes before dbname
foundSlash := false foundSlash := false
for i := len(dsn) - 1; i >= 0; i-- { for i := len(dsn) - 1; i >= 0; i-- {
if dsn[i] == '/' { if dsn[i] != '/' {
foundSlash = true continue
var j, k int }
// left part is empty if i <= 0 foundSlash = true
if i > 0 { var j, k int
// Find the first ':' in dsn
for j = i; j >= 0; j-- { // left part is empty if i <= 0
if dsn[j] == ':' { if i > 0 {
cfg.Scheme = dsn[0:j] // Find the first ':' in dsn
} for j = i; j >= 0; j-- {
if dsn[j] == ':' {
cfg.Scheme = dsn[0:j]
} }
// [username[:password]@][host]
// Find the last '@' in dsn[:i]
for j = i; j >= 0; j-- {
if dsn[j] == '@' {
// username[:password]
// Find the second ':' in dsn[:j]
for k = 0; k < j; k++ {
if dsn[k] == ':' {
if cfg.Scheme == dsn[:k] {
continue
}
var err error
cfg.Password, err = url.PathUnescape(dsn[k+1 : j])
if err != nil {
return nil, err
}
break
}
}
cfg.Username = dsn[len(cfg.Scheme)+3 : k]
break
}
}
for k = j + 1; k < i; k++ {
if dsn[k] == ':' {
cfg.Host = dsn[j+1 : k]
cfg.Port = dsn[k+1 : i]
break
}
}
} }
// dbname[?param1=value1&...&paramN=valueN] // [username[:password]@][host]
// Find the first '?' in dsn[i+1:] // Find the last '@' in dsn[:i]
for j = i + 1; j < len(dsn); j++ { for j = i; j >= 0; j-- {
if dsn[j] == '?' { if dsn[j] == '@' {
parts := strings.Split(dsn[j+1:], "&") // username[:password]
cfg.Params = make([]string, 0, len(parts)*2) // Find the second ':' in dsn[:j]
for _, p := range parts { for k = 0; k < j; k++ {
k, v, found := strings.Cut(p, "=") if dsn[k] == ':' {
if !found { if cfg.Scheme == dsn[:k] {
continue continue
}
var err error
cfg.Password, err = url.PathUnescape(dsn[k+1 : j])
if err != nil {
return nil, err
}
break
} }
cfg.Params = append(cfg.Params, k, v)
} }
cfg.Username = dsn[len(cfg.Scheme)+3 : k]
break break
} }
} }
var err error
dbname := dsn[i+1 : j] for k = j + 1; k < i; k++ {
if cfg.Database, err = url.PathUnescape(dbname); err != nil { if dsn[k] == ':' {
return nil, fmt.Errorf("invalid dbname %q: %w", dbname, err) cfg.Host = dsn[j+1 : k]
cfg.Port = dsn[k+1 : i]
break
}
} }
break
} }
// dbname[?param1=value1&...&paramN=valueN]
// Find the first '?' in dsn[i+1:]
for j = i + 1; j < len(dsn); j++ {
if dsn[j] == '?' {
parts := strings.Split(dsn[j+1:], "&")
cfg.Params = make([]string, 0, len(parts)*2)
for _, p := range parts {
k, v, found := strings.Cut(p, "=")
if !found {
continue
}
cfg.Params = append(cfg.Params, k, v)
}
break
}
}
var err error
dbname := dsn[i+1 : j]
if cfg.Database, err = url.PathUnescape(dbname); err != nil {
return nil, fmt.Errorf("invalid dbname %q: %w", dbname, err)
}
break
} }
if !foundSlash && len(dsn) > 0 { if !foundSlash && len(dsn) > 0 {

13
go.mod
View File

@ -5,17 +5,16 @@ go 1.20
require ( require (
dario.cat/mergo v1.0.0 dario.cat/mergo v1.0.0
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/google/uuid v1.3.1 github.com/google/uuid v1.6.0
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
golang.org/x/sync v0.3.0 golang.org/x/sync v0.6.0
golang.org/x/sys v0.12.0 golang.org/x/sys v0.16.0
google.golang.org/grpc v1.58.2 google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.31.0 google.golang.org/protobuf v1.32.0
) )
require ( require (
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.15.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
) )

31
go.sum
View File

@ -6,29 +6,28 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -18,18 +18,12 @@ type Options struct {
Out io.Writer Out io.Writer
// Context holds exernal options // Context holds exernal options
Context context.Context Context context.Context
// Attrs holds additional attributes // TimeFunc used to obtain current time
Attrs []interface{} TimeFunc func() time.Time
// Name holds the logger name
Name string
// The logging level the logger should log
Level Level
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc
// TimeKey is the key used for the time of the log call // TimeKey is the key used for the time of the log call
TimeKey string TimeKey string
// Name holds the logger name
Name string
// LevelKey is the key used for the level of the log call // LevelKey is the key used for the level of the log call
LevelKey string LevelKey string
// MessageKey is the key used for the message of the log call // MessageKey is the key used for the message of the log call
@ -40,12 +34,18 @@ type Options struct {
SourceKey string SourceKey string
// StacktraceKey is the key used for the stacktrace // StacktraceKey is the key used for the stacktrace
StacktraceKey string StacktraceKey string
// Attrs holds additional attributes
Attrs []interface{}
// ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// The logging level the logger should log
Level Level
// AddStacktrace controls writing of stacktaces on error // AddStacktrace controls writing of stacktaces on error
AddStacktrace bool AddStacktrace bool
// AddSource enabled writing source file and position in log // AddSource enabled writing source file and position in log
AddSource bool AddSource bool
// TimeFunc used to obtain current time
TimeFunc func() time.Time
} }
// NewOptions creates new options struct // NewOptions creates new options struct

View File

@ -56,9 +56,9 @@ type Wrapper struct {
s fmt.State s fmt.State
pointers map[uintptr]int pointers map[uintptr]int
opts *Options opts *Options
takeMap map[int]bool
depth int depth int
ignoreNextType bool ignoreNextType bool
takeMap map[int]bool
protoWrapperType bool protoWrapperType bool
sqlWrapperType bool sqlWrapperType bool
} }

View File

@ -54,7 +54,24 @@ func testOutgoingCtx(ctx context.Context) {
} }
} }
func TestPassing(t *testing.T) { func TestIncoming(t *testing.T) {
ctx := context.TODO()
md1 := New(2)
md1.Set("Key1", "Val1")
md1.Set("Key2", "Val2")
ctx = NewIncomingContext(ctx, md1)
testIncomingCtx(ctx)
md, ok := FromIncomingContext(ctx)
if !ok {
t.Fatalf("missing metadata from incoming context")
}
if v, ok := md.Get("Key1"); !ok || v != "Val1_new" {
t.Fatalf("invalid metadata value %#+v", md)
}
}
func TestOutgoing(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
md1 := New(2) md1 := New(2)
md1.Set("Key1", "Val1") md1.Set("Key1", "Val1")

View File

@ -65,6 +65,8 @@ func As(b any, target any) bool {
break break
case targetType.Implements(routerType): case targetType.Implements(routerType):
break break
case targetType.Implements(tracerType):
break
default: default:
return false return false
} }
@ -76,19 +78,21 @@ func As(b any, target any) bool {
return false return false
} }
var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem() var (
var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem() brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
var clientType = reflect.TypeOf((*client.Client)(nil)).Elem() loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
var serverType = reflect.TypeOf((*server.Server)(nil)).Elem() clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem() serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem() codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem() flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem() fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
var registerType = reflect.TypeOf((*register.Register)(nil)).Elem() meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem() registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
var routerType = reflect.TypeOf((*router.Router)(nil)).Elem() resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem() routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
var storeType = reflect.TypeOf((*store.Store)(nil)).Elem() selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem() storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem() syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
var serviceType = reflect.TypeOf((*Service)(nil)).Elem() tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
serviceType = reflect.TypeOf((*Service)(nil)).Elem()
)

View File

@ -1,7 +1,6 @@
package options_test package options_test
import ( import (
"fmt"
"testing" "testing"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
@ -132,7 +131,6 @@ func TestMetadataAny(t *testing.T) {
var opts []options.Option var opts []options.Option
switch valData := tt.Data.(type) { switch valData := tt.Data.(type) {
case []any: case []any:
fmt.Printf("%s any %#+v\n", tt.Name, valData)
opts = append(opts, options.Metadata(valData...)) opts = append(opts, options.Metadata(valData...))
case map[string]string, map[string][]string, metadata.Metadata: case map[string]string, map[string][]string, metadata.Metadata:
opts = append(opts, options.Metadata(valData)) opts = append(opts, options.Metadata(valData))

View File

@ -32,10 +32,10 @@ type record struct {
} }
type memory struct { type memory struct {
sync.RWMutex
records map[string]services records map[string]services
watchers map[string]*watcher watchers map[string]*watcher
opts register.Options opts register.Options
sync.RWMutex
} }
// services is a KV map with service name as the key and a map of records as the value // services is a KV map with service name as the key and a map of records as the value
@ -102,10 +102,20 @@ func (m *memory) sendEvent(r *register.Result) {
} }
func (m *memory) Connect(ctx context.Context) error { func (m *memory) Connect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil return nil
} }
func (m *memory) Disconnect(ctx context.Context) error { func (m *memory) Disconnect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil return nil
} }
@ -126,6 +136,11 @@ func (m *memory) Options() register.Options {
} }
func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error { func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@ -467,9 +482,7 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
} }
endpoints := make([]*register.Endpoint, len(s.Endpoints)) endpoints := make([]*register.Endpoint, len(s.Endpoints))
for i, e := range s.Endpoints { // TODO: vtolstov use copy copy(endpoints, s.Endpoints)
endpoints[i] = e
}
return &record{ return &record{
Name: s.Name, Name: s.Name,

View File

@ -3,12 +3,13 @@ package memory
import ( import (
"context" "context"
"fmt" "fmt"
"go.unistack.org/micro/v4"
"go.unistack.org/micro/v4/register"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"time" "time"
"go.unistack.org/micro/v4"
"go.unistack.org/micro/v4/register"
) )
var testData = map[string][]*register.Service{ var testData = map[string][]*register.Service{
@ -209,9 +210,9 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) {
} }
} }
//if len(os.Getenv("IN_TRAVIS_CI")) == 0 { // if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
// t.Logf("test will wait %v, then check TTL timeouts", waitTime) // t.Logf("test will wait %v, then check TTL timeouts", waitTime)
//} // }
errChan := make(chan error, concurrency) errChan := make(chan error, concurrency)
syncChan := make(chan struct{}) syncChan := make(chan struct{})
@ -252,6 +253,13 @@ func TestMemoryWildcard(t *testing.T) {
m := NewRegister() m := NewRegister()
ctx := context.TODO() ctx := context.TODO()
if err := m.Init(); err != nil {
t.Fatal(err)
}
if err := m.Connect(ctx); err != nil {
t.Fatal(err)
}
testSrv := &register.Service{Name: "foo", Version: "1.0.0"} testSrv := &register.Service{Name: "foo", Version: "1.0.0"}
if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil { if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil {
@ -291,8 +299,12 @@ func TestWatcher(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
m := NewRegister() m := NewRegister()
m.Init() if err := m.Init(); err != nil {
m.Connect(ctx) t.Fatal(err)
}
if err := m.Connect(ctx); err != nil {
t.Fatal(err)
}
wc, err := m.Watch(ctx) wc, err := m.Watch(ctx)
if err != nil { if err != nil {
t.Fatalf("cant watch: %v", err) t.Fatalf("cant watch: %v", err)

69
sync/waitgroup.go Normal file
View File

@ -0,0 +1,69 @@
package sync
import (
"context"
"sync"
)
type WaitGroup struct {
wg *sync.WaitGroup
c int
mu sync.Mutex
}
func WrapWaitGroup(wg *sync.WaitGroup) *WaitGroup {
g := &WaitGroup{
wg: wg,
}
return g
}
func NewWaitGroup() *WaitGroup {
var wg sync.WaitGroup
return WrapWaitGroup(&wg)
}
func (g *WaitGroup) Add(n int) {
g.mu.Lock()
g.c += n
g.wg.Add(n)
g.mu.Unlock()
}
func (g *WaitGroup) Done() {
g.mu.Lock()
g.c += -1
g.wg.Add(-1)
g.mu.Unlock()
}
func (g *WaitGroup) Wait() {
g.wg.Wait()
}
func (g *WaitGroup) WaitContext(ctx context.Context) {
done := make(chan struct{})
go func() {
g.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
g.mu.Lock()
g.wg.Add(-g.c)
<-done
g.wg.Add(g.c)
g.mu.Unlock()
return
case <-done:
return
}
}
func (g *WaitGroup) Waiters() int {
g.mu.Lock()
c := g.c
g.mu.Unlock()
return c
}

37
sync/waitgroup_test.go Normal file
View File

@ -0,0 +1,37 @@
package sync
import (
"context"
"testing"
"time"
)
func TestWaitGroupContext(t *testing.T) {
wg := NewWaitGroup()
_ = t
wg.Add(1)
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
}
func TestWaitGroupReuse(t *testing.T) {
wg := NewWaitGroup()
defer func() {
if wg.Waiters() != 0 {
t.Fatal("lost goroutines")
}
}()
wg.Add(1)
defer wg.Done()
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
wg.Add(1)
defer wg.Done()
ctx, cancel = context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
}

View File

@ -109,12 +109,11 @@ func Merge(olist []*register.Service, nlist []*register.Service) []*register.Ser
seen = true seen = true
srv = append(srv, sp) srv = append(srv, sp)
break break
} else {
sp := &register.Service{}
// make copy
*sp = *o
srv = append(srv, sp)
} }
sp := &register.Service{}
// make copy
*sp = *o
srv = append(srv, sp)
} }
if !seen { if !seen {
srv = append(srv, Copy([]*register.Service{n})...) srv = append(srv, Copy([]*register.Service{n})...)

View File

@ -12,7 +12,7 @@ type EC2Metadata struct {
InstanceType string `json:"instance-type"` InstanceType string `json:"instance-type"`
LocalHostname string `json:"local-hostname"` LocalHostname string `json:"local-hostname"`
LocalIPv4 string `json:"local-ipv4"` LocalIPv4 string `json:"local-ipv4"`
kernelID int `json:"kernel-id"` KernelID int `json:"kernel-id"`
Placement string `json:"placement"` Placement string `json:"placement"`
AvailabilityZone string `json:"availability-zone"` AvailabilityZone string `json:"availability-zone"`
ProductCodes string `json:"product-codes"` ProductCodes string `json:"product-codes"`

View File

@ -67,9 +67,9 @@ func (fi *fileInfo) Name() string {
func (fi *fileInfo) Mode() os.FileMode { func (fi *fileInfo) Mode() os.FileMode {
if strings.HasSuffix(fi.name, "/") { if strings.HasSuffix(fi.name, "/") {
return os.FileMode(0755) | os.ModeDir return os.FileMode(0o755) | os.ModeDir
} }
return os.FileMode(0644) return os.FileMode(0o644)
} }
func (fi *fileInfo) IsDir() bool { func (fi *fileInfo) IsDir() bool {
@ -112,15 +112,14 @@ func (f *file) Readdir(count int) ([]os.FileInfo, error) {
func (f *file) Seek(offset int64, whence int) (int64, error) { func (f *file) Seek(offset int64, whence int) (int64, error) {
// log.Printf("seek %d %d %s\n", offset, whence, f.name) // log.Printf("seek %d %d %s\n", offset, whence, f.name)
switch whence { switch whence {
case os.SEEK_SET: case io.SeekStart:
f.offset = offset f.offset = offset
case os.SEEK_CUR: case io.SeekCurrent:
f.offset += offset f.offset += offset
case os.SEEK_END: case io.SeekEnd:
f.offset = int64(len(f.data)) + offset f.offset = int64(len(f.data)) + offset
} }
return f.offset, nil return f.offset, nil
} }
func (f *file) Stat() (os.FileInfo, error) { func (f *file) Stat() (os.FileInfo, error) {

View File

@ -2,7 +2,7 @@ package structfs
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"reflect" "reflect"
"testing" "testing"
@ -82,7 +82,7 @@ func get(path string) ([]byte, error) {
return nil, err return nil, err
} }
defer res.Body.Close() defer res.Body.Close()
return ioutil.ReadAll(res.Body) return io.ReadAll(res.Body)
} }
func TestAll(t *testing.T) { func TestAll(t *testing.T) {