fix pipeline #365

Merged
vtolstov merged 14 commits from :atolstikhin-v3 into v3 2024-12-06 19:05:28 +03:00
23 changed files with 157 additions and 115 deletions
Showing only changes of commit 2bbbaf8963 - Show all commits

View File

@ -298,7 +298,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i) t, err := callOpts.Backoff(ctx, req, i)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%s", err.Error())
} }
// only sleep if greater than 0 // only sleep if greater than 0
@ -312,7 +312,7 @@ func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, o
// TODO apply any filtering here // TODO apply any filtering here
routes, err = n.opts.Lookup(ctx, req, callOpts) routes, err = n.opts.Lookup(ctx, req, callOpts)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%s", err.Error())
} }
// balance the list of nodes // balance the list of nodes
@ -466,7 +466,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, cerr := callOpts.Backoff(ctx, req, i) t, cerr := callOpts.Backoff(ctx, req, i)
if cerr != nil { if cerr != nil {
return nil, errors.InternalServerError("go.micro.client", cerr.Error()) return nil, errors.InternalServerError("go.micro.client", "%s", cerr.Error())
} }
// only sleep if greater than 0 // only sleep if greater than 0
@ -480,7 +480,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
// TODO apply any filtering here // TODO apply any filtering here
routes, err = n.opts.Lookup(ctx, req, callOpts) routes, err = n.opts.Lookup(ctx, req, callOpts)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", "%s", err.Error())
} }
// balance the list of nodes // balance the list of nodes
@ -609,13 +609,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
// use codec for payload // use codec for payload
cf, err := n.newCodec(p.ContentType()) cf, err := n.newCodec(p.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%s", err.Error())
} }
// set the body // set the body
b, err := cf.Marshal(p.Payload()) b, err := cf.Marshal(p.Payload())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", "%s", err.Error())
} }
body = b body = b
} }

View File

@ -2,6 +2,7 @@ package errors
import ( import (
"encoding/json" "encoding/json"
"errors"
er "errors" er "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -26,7 +27,7 @@ func TestMarshalJSON(t *testing.T) {
func TestEmpty(t *testing.T) { func TestEmpty(t *testing.T) {
msg := "test" msg := "test"
var err *Error var err *Error
err = FromError(fmt.Errorf(msg)) err = FromError(errors.New(msg))
if err.Detail != msg { if err.Detail != msg {
t.Fatalf("invalid error %v", err) t.Fatalf("invalid error %v", err)
} }

View File

@ -46,11 +46,11 @@ func (h *wrapper) Handle(ctx context.Context, rec slog.Record) error {
} }
func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler { func (h *wrapper) WithAttrs(attrs []slog.Attr) slog.Handler {
return h.WithAttrs(attrs) return h.h.WithAttrs(attrs)
} }
func (h *wrapper) WithGroup(name string) slog.Handler { func (h *wrapper) WithGroup(name string) slog.Handler {
return h.WithGroup(name) return h.h.WithGroup(name)
} }
func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr { func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
@ -89,7 +89,6 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
} }
type slogLogger struct { type slogLogger struct {
leveler *slog.LevelVar
handler *wrapper handler *wrapper
opts logger.Options opts logger.Options
mu sync.RWMutex mu sync.RWMutex

View File

@ -36,8 +36,8 @@ var (
circularShortBytes = []byte("<shown>") circularShortBytes = []byte("<shown>")
invalidAngleBytes = []byte("<invalid>") invalidAngleBytes = []byte("<invalid>")
filteredBytes = []byte("<filtered>") filteredBytes = []byte("<filtered>")
openBracketBytes = []byte("[") // openBracketBytes = []byte("[")
closeBracketBytes = []byte("]") // closeBracketBytes = []byte("]")
percentBytes = []byte("%") percentBytes = []byte("%")
precisionBytes = []byte(".") precisionBytes = []byte(".")
openAngleBytes = []byte("<") openAngleBytes = []byte("<")

View File

@ -82,11 +82,11 @@ func TestTagged(t *testing.T) {
func TestTaggedNested(t *testing.T) { func TestTaggedNested(t *testing.T) {
type val struct { type val struct {
key string `logger:"take"` key string `logger:"take"`
val string `logger:"omit"` // val string `logger:"omit"`
unk string unk string
} }
type str struct { type str struct {
key string `logger:"omit"` // key string `logger:"omit"`
val *val `logger:"take"` val *val `logger:"take"`
} }

View File

@ -83,6 +83,7 @@ func TestPassing(t *testing.T) {
if ok { if ok {
t.Fatalf("create outgoing context") t.Fatalf("create outgoing context")
} }
_ = md
ctx = NewOutgoingContext(ctx, New(1)) ctx = NewOutgoingContext(ctx, New(1))
testCtx(ctx) testCtx(ctx)

View File

@ -65,6 +65,8 @@ func As(b any, target any) bool {
break break
case targetType.Implements(routerType): case targetType.Implements(routerType):
break break
case targetType.Implements(tracerType):
break
default: default:
return false return false
} }
@ -76,19 +78,21 @@ func As(b any, target any) bool {
return false return false
} }
var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem() var (
var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem() brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
var clientType = reflect.TypeOf((*client.Client)(nil)).Elem() loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
var serverType = reflect.TypeOf((*server.Server)(nil)).Elem() clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem() serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem() codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem() flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem() fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
var registerType = reflect.TypeOf((*register.Register)(nil)).Elem() meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem() registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
var routerType = reflect.TypeOf((*router.Router)(nil)).Elem() resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem() routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
var storeType = reflect.TypeOf((*store.Store)(nil)).Elem() selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem() storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem() syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
var serviceType = reflect.TypeOf((*Service)(nil)).Elem() tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
serviceType = reflect.TypeOf((*Service)(nil)).Elem()
)

View File

@ -294,7 +294,7 @@ type loggerOptions struct {
brokers []string brokers []string
registers []string registers []string
stores []string stores []string
meters []string // meters []string
tracers []string tracers []string
} }

View File

@ -469,9 +469,7 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
} }
endpoints := make([]*register.Endpoint, len(s.Endpoints)) endpoints := make([]*register.Endpoint, len(s.Endpoints))
for i, e := range s.Endpoints { copy(endpoints, s.Endpoints)
endpoints[i] = e
}
return &record{ return &record{
Name: s.Name, Name: s.Name,

View File

@ -290,27 +290,25 @@ func TestWatcher(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
m := NewRegister() m := NewRegister()
m.Init() _ = m.Init()
m.Connect(ctx) _ = m.Connect(ctx)
wc, err := m.Watch(ctx) wc, err := m.Watch(ctx)
if err != nil { if err != nil {
t.Fatalf("cant watch: %v", err) t.Fatalf("cant watch: %v", err)
} }
defer wc.Stop() defer wc.Stop()
cherr := make(chan error, 10)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
for {
_, err := wc.Next() _, err := wc.Next()
if err != nil { if err != nil {
t.Fatal("unexpected err", err) cherr <- fmt.Errorf("unexpected err %v", err)
} }
// t.Logf("changes %#+v", ch.Service) // t.Logf("changes %#+v", ch.Service)
wc.Stop() wc.Stop()
wg.Done() wg.Done()
return
}
}() }()
if err := m.Register(ctx, testSrv); err != nil { if err := m.Register(ctx, testSrv); err != nil {

View File

@ -171,7 +171,6 @@ type rpcMessage struct {
header metadata.Metadata header metadata.Metadata
topic string topic string
contentType string contentType string
body []byte
} }
func (r *rpcMessage) ContentType() string { func (r *rpcMessage) ContentType() string {

View File

@ -38,7 +38,7 @@ func TestNoopSub(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)) _ = logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
s := server.NewServer( s := server.NewServer(
server.Broker(b), server.Broker(b),
server.Codec("application/octet-stream", codec.NewCodec()), server.Codec("application/octet-stream", codec.NewCodec()),

View File

@ -23,8 +23,8 @@ import (
) )
func init() { func init() {
maxprocs.Set() _, _ = maxprocs.Set()
memlimit.SetGoMemLimitWithOpts( _, _ = memlimit.SetGoMemLimitWithOpts(
memlimit.WithRatio(0.9), memlimit.WithRatio(0.9),
memlimit.WithProvider( memlimit.WithProvider(
memlimit.ApplyFallback( memlimit.ApplyFallback(

View File

@ -66,8 +66,13 @@ func (c *dnsConn) RemoteAddr() net.Addr {
} }
func (c *dnsConn) SetDeadline(t time.Time) error { func (c *dnsConn) SetDeadline(t time.Time) error {
c.SetReadDeadline(t) var err error
c.SetWriteDeadline(t) if err = c.SetReadDeadline(t); err != nil {
return err
}
if err = c.SetWriteDeadline(t); err != nil {
return err
}
return nil return nil
} }

View File

@ -16,7 +16,6 @@ type Ticker struct {
C chan time.Time C chan time.Time
min int64 min int64
max int64 max int64
exp int64
exit bool exit bool
rng rand.Rand rng rand.Rand
} }

View File

@ -91,7 +91,7 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
} }
if mapper, ok := dst.(map[string]interface{}); ok { if mapper, ok := dst.(map[string]interface{}); ok {
dst = mergeMap(mapper, mp, 0) mergeMap(mapper, mp, 0)
return nil return nil
} }

View File

@ -1,9 +1,38 @@
package reflect package reflect
import ( import (
"fmt"
"testing" "testing"
) )
func TestMergeMapStringInterface(t *testing.T) {
var dst interface{} //nolint:gosimple
dst = map[string]interface{}{
"xx": 11,
}
src := map[string]interface{}{
"zz": "aa",
}
if err := Merge(dst, src); err != nil {
t.Fatal(err)
}
mp, ok := dst.(map[string]interface{})
if !ok || mp == nil {
t.Fatalf("xxx %#+v\n", dst)
}
if fmt.Sprintf("%v", mp["xx"]) != "11" {
t.Fatalf("xxx zzzz %#+v", mp)
}
if fmt.Sprintf("%v", mp["zz"]) != "aa" {
t.Fatalf("xxx zzzz %#+v", mp)
}
}
func TestMergeMap(t *testing.T) { func TestMergeMap(t *testing.T) {
src := map[string]interface{}{ src := map[string]interface{}{
"skey1": "sval1", "skey1": "sval1",

View File

@ -56,7 +56,7 @@ type DigitalOceanMetadata struct {
func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (stfs *DigitalOceanMetadata) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path { switch r.URL.Path {
case "/metadata/v1.json": case "/metadata/v1.json":
json.NewEncoder(w).Encode(stfs.Metadata.V1) _ = json.NewEncoder(w).Encode(stfs.Metadata.V1)
default: default:
fs := FileServer(stfs, "json", time.Now()) fs := FileServer(stfs, "json", time.Now())
idx := strings.Index(r.URL.Path[1:], "/") idx := strings.Index(r.URL.Path[1:], "/")

View File

@ -12,7 +12,7 @@ type EC2Metadata struct {
InstanceType string `json:"instance-type"` InstanceType string `json:"instance-type"`
LocalHostname string `json:"local-hostname"` LocalHostname string `json:"local-hostname"`
LocalIPv4 string `json:"local-ipv4"` LocalIPv4 string `json:"local-ipv4"`
kernelID int `json:"kernel-id"` KernelID int `json:"kernel-id"`
Placement string `json:"placement"` Placement string `json:"placement"`
AvailabilityZone string `json:"availability-zone"` AvailabilityZone string `json:"availability-zone"`
ProductCodes string `json:"product-codes"` ProductCodes string `json:"product-codes"`

View File

@ -27,7 +27,7 @@ func (fs *fs) ServeHTTP(w http.ResponseWriter, r *http.Request) {
f, err := fs.Open(r.URL.Path) f, err := fs.Open(r.URL.Path)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error())) _, _ = w.Write([]byte(err.Error()))
return return
} }
w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Type", "application/octet-stream")
@ -67,9 +67,9 @@ func (fi *fileInfo) Name() string {
func (fi *fileInfo) Mode() os.FileMode { func (fi *fileInfo) Mode() os.FileMode {
if strings.HasSuffix(fi.name, "/") { if strings.HasSuffix(fi.name, "/") {
return os.FileMode(0755) | os.ModeDir return os.FileMode(0o755) | os.ModeDir
} }
return os.FileMode(0644) return os.FileMode(0o644)
} }
func (fi *fileInfo) IsDir() bool { func (fi *fileInfo) IsDir() bool {
@ -112,15 +112,14 @@ func (f *file) Readdir(count int) ([]os.FileInfo, error) {
func (f *file) Seek(offset int64, whence int) (int64, error) { func (f *file) Seek(offset int64, whence int) (int64, error) {
// log.Printf("seek %d %d %s\n", offset, whence, f.name) // log.Printf("seek %d %d %s\n", offset, whence, f.name)
switch whence { switch whence {
case os.SEEK_SET: case io.SeekStart:
f.offset = offset f.offset = offset
case os.SEEK_CUR: case io.SeekCurrent:
f.offset += offset f.offset += offset
case os.SEEK_END: case io.SeekEnd:
f.offset = int64(len(f.data)) + offset f.offset = int64(len(f.data)) + offset
} }
return f.offset, nil return f.offset, nil
} }
func (f *file) Stat() (os.FileInfo, error) { func (f *file) Stat() (os.FileInfo, error) {
@ -222,6 +221,7 @@ func getValue(name string, iface interface{}, tag string) ([]byte, error) {
return nil, fmt.Errorf("failed to find %s in interface %T", name, iface) return nil, fmt.Errorf("failed to find %s in interface %T", name, iface)
} }
/*
func hasValidType(obj interface{}, types []reflect.Kind) bool { func hasValidType(obj interface{}, types []reflect.Kind) bool {
for _, t := range types { for _, t := range types {
if reflect.TypeOf(obj).Kind() == t { if reflect.TypeOf(obj).Kind() == t {
@ -231,6 +231,7 @@ func hasValidType(obj interface{}, types []reflect.Kind) bool {
return false return false
} }
*/
func reflectValue(obj interface{}) reflect.Value { func reflectValue(obj interface{}) reflect.Value {
var val reflect.Value var val reflect.Value

View File

@ -2,7 +2,7 @@ package structfs
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io"
"net/http" "net/http"
"reflect" "reflect"
"testing" "testing"
@ -61,7 +61,7 @@ var doOrig = []byte(`{
} }
`) `)
func server(t *testing.T) { func server(t *testing.T, ch chan error) {
stfs := DigitalOceanMetadata{} stfs := DigitalOceanMetadata{}
err := json.Unmarshal(doOrig, &stfs.Metadata.V1) err := json.Unmarshal(doOrig, &stfs.Metadata.V1)
if err != nil { if err != nil {
@ -71,7 +71,7 @@ func server(t *testing.T) {
http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now())) http.Handle("/metadata/v1/", FileServer(&stfs, "json", time.Now()))
http.Handle("/metadata/v1.json", &stfs) http.Handle("/metadata/v1.json", &stfs)
go func() { go func() {
t.Fatal(http.ListenAndServe("127.0.0.1:8080", nil)) ch <- http.ListenAndServe("127.0.0.1:8080", nil)
}() }()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
@ -82,13 +82,14 @@ func get(path string) ([]byte, error) {
return nil, err return nil, err
} }
defer res.Body.Close() defer res.Body.Close()
return ioutil.ReadAll(res.Body) return io.ReadAll(res.Body)
} }
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
server(t) ch := make(chan error)
server(t, ch)
var tests = []struct { tests := []struct {
in string in string
out string out string
}{ }{
@ -100,6 +101,10 @@ func TestAll(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
select {
case err := <-ch:
t.Fatal(err)
default:
buf, err := get(tt.in) buf, err := get(tt.in)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -108,7 +113,12 @@ func TestAll(t *testing.T) {
t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out) t.Errorf("req %s output %s not match requested %s", tt.in, string(buf), tt.out)
} }
} }
}
select {
case err := <-ch:
t.Fatal(err)
default:
doTest, err := get("http://127.0.0.1:8080/metadata/v1.json") doTest, err := get("http://127.0.0.1:8080/metadata/v1.json")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -130,4 +140,5 @@ func TestAll(t *testing.T) {
if !reflect.DeepEqual(oSt, nSt) { if !reflect.DeepEqual(oSt, nSt) {
t.Fatalf("%v not match %v", oSt, nSt) t.Fatalf("%v not match %v", oSt, nSt)
} }
}
} }

View File

@ -14,7 +14,7 @@ type Duration int64
func ParseDuration(s string) (time.Duration, error) { func ParseDuration(s string) (time.Duration, error) {
if s == "" { if s == "" {
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`) return 0, errors.New(`time: invalid duration "` + s + `"`)
} }
var p int var p int

View File

@ -39,9 +39,7 @@ func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval) ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop() defer ticker.Stop()
for { for range ticker.C {
select {
case <-ticker.C:
poolsMu.Lock() poolsMu.Lock()
for _, st := range pools { for _, st := range pools {
stats := st.Stats() stats := st.Stats()
@ -52,7 +50,6 @@ func newStatsMeter() {
} }
poolsMu.Unlock() poolsMu.Unlock()
} }
}
} }
var ( var (