Compare commits

...

13 Commits

Author SHA1 Message Date
a2a383606d Merge pull request 'util/test: update test cases code' (#216) from testcase into master
Reviewed-on: #216
2023-04-28 07:10:40 +03:00
55ce58617b util/test: update test cases code
Some checks failed
lint / lint (pull_request) Successful in 51s
pr / test (pull_request) Failing after 52s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-28 07:10:19 +03:00
0e587d923e Merge pull request 'meter: move metrics handling in broker implementations' (#215) from metrics into master
Reviewed-on: #215
2023-04-27 15:32:56 +03:00
fa0248c80c cleanup
All checks were successful
pr / test (pull_request) Successful in 50s
lint / lint (pull_request) Successful in 49s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-27 15:31:59 +03:00
054bd02b59 meter: move metrics handling in broker implementations
All checks were successful
lint / lint (pull_request) Successful in 1m4s
pr / test (pull_request) Successful in 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-27 15:30:55 +03:00
0cf246d2d6 Merge pull request 'util/io: add RedirectStderr' (#214) from io-redirect into master
Reviewed-on: #214
2023-04-24 12:59:31 +03:00
af278bd7d3 util/io: add RedirectStderr
All checks were successful
lint / lint (pull_request) Successful in 46s
pr / test (pull_request) Successful in 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-24 12:58:05 +03:00
814b90efe5 Merge pull request 'util/test: export GetCases func' (#213) from GetCases into master
Reviewed-on: #213
2023-04-19 01:23:53 +03:00
e403ae3d8e util/test: export GetCases func
All checks were successful
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-19 01:23:34 +03:00
c9816a3957 Merge pull request 'util/test: add helper funcs' (#212) from test into master
Reviewed-on: #212
2023-04-19 00:33:28 +03:00
5691238a6a util/test: add helper funcs
All checks were successful
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-18 23:47:12 +03:00
963a0fa7b7 Merge pull request 'gofmt -s code' (#209) from gofmt into master
Reviewed-on: #209
2023-04-11 23:34:41 +03:00
485257035c gofmt -s code
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-11 23:32:58 +03:00
20 changed files with 797 additions and 183 deletions

View File

@@ -3,6 +3,7 @@ package broker
import (
"context"
"sync"
"time"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
@@ -134,13 +135,22 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap {
ts := time.Now()
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
m.RLock()
subs, ok := m.subscribers[t]
m.RUnlock()
if !ok {
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
continue
}
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
@@ -152,37 +162,65 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
switch {
// batch processing
case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
ms.SetError(err)
if beh != nil {
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
} else {
if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
}
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
// single processing
case sub.handler != nil:
for _, p := range ms {
if err = sub.handler(p); err != nil {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
p.SetError(err)
if eh != nil {
_ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
} else {
if sub.opts.AutoAck {
if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
}
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
}
}
}
te := time.Since(ts)
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
}
}
return nil

View File

@@ -12,6 +12,25 @@ import (
"go.unistack.org/micro/v4/tracer"
)
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
)
// Options struct
type Options struct {
// Tracer used for tracing

13
go.mod
View File

@@ -4,7 +4,16 @@ go 1.20
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/imdario/mergo v0.3.14
github.com/imdario/mergo v0.3.15
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
golang.org/x/sync v0.1.0
golang.org/x/sys v0.7.0
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
)

28
go.sum
View File

@@ -1,11 +1,31 @@
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/imdario/mergo v0.3.14 h1:fOqeC1+nCuuk6PKQdg9YmosXX7Y7mHX6R/0ZldI9iHo=
github.com/imdario/mergo v0.3.14/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -27,22 +27,6 @@ var (
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success"
labelFailure = "failure"
@@ -230,37 +214,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
}
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err
}
// NewHandlerWrapper create new server handler wrapper
// deprecated
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
return w.Client.Publish(ctx, p, opts...)
}
// NewServerHandlerWrapper create new server handler wrapper
@@ -302,46 +256,3 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return err
}
}
// NewSubscriberWrapper create server subscribe wrapper
// deprecated
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}

View File

@@ -145,12 +145,11 @@ type Stream interface {
//
// Example:
//
// type Greeter struct {}
//
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
// type Greeter struct {}
//
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
type Handler interface {
Name() string
Handler() interface{}

View File

@@ -88,6 +88,7 @@ func (s *service) Name() string {
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
//
//nolint:gocyclo
func (s *service) Init(opts ...Option) error {
var err error

View File

@@ -58,6 +58,7 @@ func IsLocal(addr string) bool {
}
// Extract returns a real ip
//
//nolint:gocyclo
func Extract(addr string) (string, error) {
// if addr specified then its returned

17
util/io/redirect.go Normal file
View File

@@ -0,0 +1,17 @@
package io
import (
"os"
"sync"
)
var osStderrMu sync.Mutex
var OrigStderr = func() *os.File {
fd, err := dupFD(os.Stderr.Fd())
if err != nil {
panic(err)
}
return os.NewFile(fd, os.Stderr.Name())
}()

40
util/io/redirect_test.go Normal file
View File

@@ -0,0 +1,40 @@
package io
import (
"bytes"
"fmt"
"io"
"os"
"regexp"
"testing"
"time"
)
var ErrorPattern = regexp.MustCompile(`"error"`)
func TestRedirect(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
ch := make(chan string)
go func() {
buf := bytes.NewBuffer(nil)
_, _ = io.Copy(buf, r)
ch <- buf.String()
}()
if err = RedirectStderr(w); err != nil {
t.Fatal(err)
}
os.Stderr.Write([]byte(`test redirect`))
time.Sleep(1 * time.Millisecond)
r.Close()
str := <-ch
if ErrorPattern.MatchString(str) {
t.Fatal(fmt.Errorf(str))
}
}

48
util/io/redirect_unix.go Normal file
View File

@@ -0,0 +1,48 @@
//go:build !windows
// +build !windows
package io
import (
"os"
"golang.org/x/sys/unix"
)
// dupFD is used to initialize OrigStderr (see stderr_redirect.go).
func dupFD(fd uintptr) (uintptr, error) {
// Warning: failing to set FD_CLOEXEC causes the duplicated file descriptor
// to leak into subprocesses created by exec.Command. If the file descriptor
// is a pipe, these subprocesses will hold the pipe open (i.e., prevent
// EOF), potentially beyond the lifetime of this process.
//
// This can break go test's timeouts. go test usually spawns a test process
// with its stdin and stderr streams hooked up to pipes; if the test process
// times out, it sends a SIGKILL and attempts to read stdin and stderr to
// completion. If the test process has itself spawned long-lived
// subprocesses that hold references to the stdin or stderr pipes, go test
// will hang until the subprocesses exit, rather defeating the purpose of
// a timeout.
nfd, err := unix.FcntlInt(fd, unix.F_DUPFD_CLOEXEC, 0)
if err != nil {
return 0, err
}
return uintptr(nfd), nil
}
// RedirectStderr is used to redirect internal writes to fd 2 to the
// specified file. This is needed to ensure that harcoded writes to fd
// 2 by e.g. the Go runtime are redirected to a log file of our
// choosing.
//
// We also override os.Stderr for those other parts of Go which use
// that and not fd 2 directly.
func RedirectStderr(f *os.File) error {
osStderrMu.Lock()
defer osStderrMu.Unlock()
if err := unix.Dup2(int(f.Fd()), unix.Stderr); err != nil {
return err
}
os.Stderr = f
return nil
}

View File

@@ -0,0 +1,32 @@
package io
import (
"os"
"golang.org/x/sys/windows"
)
// dupFD is used to initialize OrigStderr (see stderr_redirect.go).
func dupFD(fd uintptr) (uintptr, error) {
// Adapted from https://github.com/golang/go/blob/go1.8/src/syscall/exec_windows.go#L303.
p := windows.CurrentProcess()
var h windows.Handle
return uintptr(h), windows.DuplicateHandle(p, windows.Handle(fd), p, &h, 0, true, windows.DUPLICATE_SAME_ACCESS)
}
// RedirectStderr is used to redirect internal writes to the error
// handle to the specified file. This is needed to ensure that
// harcoded writes to the error handle by e.g. the Go runtime are
// redirected to a log file of our choosing.
//
// We also override os.Stderr for those other parts of Go which use
// that and not fd 2 directly.
func RedirectStderr(f *os.File) error {
osStderrMu.Lock()
defer osStderrMu.Unlock()
if err := windows.SetStdHandle(windows.STD_ERROR_HANDLE, windows.Handle(f.Fd())); err != nil {
return err
}
os.Stderr = f
return nil
}

View File

@@ -493,13 +493,14 @@ func btSplitter(str string) []string {
}
// queryToMap turns something like a[b][c]=4 into
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
//
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
func queryToMap(param string) (map[string]interface{}, error) {
rawKey, rawValue, err := splitKeyAndValue(param)
if err != nil {

View File

@@ -1,25 +0,0 @@
package test
import (
"fmt"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
func Test_NewSQLRowsFromFile(t *testing.T) {
db, c, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
defer db.Close()
rows, err := NewSQLRowsFromFile(c, "testdata/Call.csv")
if err != nil {
t.Fatal(err)
}
if !strings.Contains(fmt.Sprintf("%#+v", rows), `cols:[]string{"DepAgrId", "DepAgrNum", "DepAgrDate", "DepAgrCloseDate", "AccCur", "MainFinaccNum", "MainFinaccName", "MainFinaccId", "MainFinaccOpenDt", "DepAgrStatus", "MainFinaccBal", "DepartCode", "CardAccId"}`) {
t.Fatal("invalid cols after import csv")
}
}

View File

@@ -1,28 +1,212 @@
package test
import (
"bufio"
"bytes"
"context"
"database/sql/driver"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"time"
"github.com/DATA-DOG/go-sqlmock"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/metadata"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
var ErrUnknownContentType = errors.New("unknown content type")
var ErrUnknownContentType = fmt.Errorf("unknown content type")
type Extension struct {
Ext []string
}
var ExtToTypes = map[string][]string{
"json": {"application/json", "application/grpc+json"},
"yaml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
"yml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
"proto": {"application/grpc", "application/grpc+proto", "application/proto"},
var (
// ExtToTypes map file extension to content type
ExtToTypes = map[string][]string{
"json": {"application/json", "application/grpc+json"},
"yaml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
"yml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
"proto": {"application/grpc", "application/grpc+proto", "application/proto"},
}
// DefaultExts specifies default file extensions to load data
DefaultExts = []string{"csv", "json", "yaml", "yml", "proto"}
// Codecs map to detect codec for test file or request content type
Codecs map[string]codec.Codec
// ResponseCompareFunc used to compare actual response with test case data
ResponseCompareFunc = func(expectRsp []byte, testRsp interface{}, expectCodec codec.Codec, testCodec codec.Codec) error {
var err error
expectMap := make(map[string]interface{})
if err = expectCodec.Unmarshal(expectRsp, &expectMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
testMap := make(map[string]interface{})
switch v := testRsp.(type) {
case *codec.Frame:
if err = testCodec.Unmarshal(v.Data, &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
case *errors.Error:
if err = expectCodec.Unmarshal([]byte(v.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
case error:
st, ok := status.FromError(v)
if !ok {
return v
}
me := errors.Parse(st.Message())
if me.Code != 0 {
if err = expectCodec.Unmarshal([]byte(me.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
break
}
for _, se := range st.Details() {
switch ne := se.(type) {
case proto.Message:
buf, err := testCodec.Marshal(ne)
if err != nil {
return fmt.Errorf("failed to marshal err: %w", err)
}
if err = testCodec.Unmarshal(buf, &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
default:
return st.Err()
}
}
case interface{ GRPCStatus() *status.Status }:
st := v.GRPCStatus()
me := errors.Parse(st.Message())
if me.Code != 0 {
if err = expectCodec.Unmarshal([]byte(me.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
break
}
case *status.Status:
me := errors.Parse(v.Message())
if me.Code != 0 {
if err = expectCodec.Unmarshal([]byte(me.Error()), &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
break
}
for _, se := range v.Details() {
switch ne := se.(type) {
case proto.Message:
buf, err := testCodec.Marshal(ne)
if err != nil {
return fmt.Errorf("failed to marshal err: %w", err)
}
if err = testCodec.Unmarshal(buf, &testMap); err != nil {
return fmt.Errorf("failed to unmarshal err: %w", err)
}
default:
return v.Err()
}
}
}
if !reflect.DeepEqual(expectMap, testMap) {
return fmt.Errorf("test: %s != rsp: %s", expectMap, testMap)
}
return nil
}
)
func FromCSVString(columns []*sqlmock.Column, rows *sqlmock.Rows, s string) *sqlmock.Rows {
res := strings.NewReader(strings.TrimSpace(s))
csvReader := csv.NewReader(res)
for {
res, err := csvReader.Read()
if err != nil || res == nil {
break
}
var row []driver.Value
for i, v := range res {
item := CSVColumnParser(strings.TrimSpace(v))
if null, nullOk := columns[i].IsNullable(); null && nullOk && item == nil {
row = append(row, nil)
} else {
row = append(row, item)
}
}
rows = rows.AddRow(row...)
}
return rows
}
func CSVColumnParser(s string) []byte {
switch {
case strings.ToLower(s) == "null":
return nil
case s == "":
return nil
}
return []byte(s)
}
func NewResponseFromFile(rspfile string) (*codec.Frame, error) {
rspbuf, err := os.ReadFile(rspfile)
if err != nil {
return nil, err
}
return &codec.Frame{Data: rspbuf}, nil
}
func getCodec(codecs map[string]codec.Codec, ext string) (codec.Codec, error) {
var c codec.Codec
if cts, ok := ExtToTypes[ext]; ok {
for _, t := range cts {
if c, ok = codecs[t]; ok {
return c, nil
}
}
}
return nil, ErrUnknownContentType
}
func getContentType(codecs map[string]codec.Codec, ext string) (string, error) {
if cts, ok := ExtToTypes[ext]; ok {
for _, t := range cts {
if _, ok = codecs[t]; ok {
return t, nil
}
}
}
return "", ErrUnknownContentType
}
func getExt(name string) string {
ext := filepath.Ext(name)
if len(ext) > 0 && ext[0] == '.' {
ext = ext[1:]
}
return ext
}
func getNameWithoutExt(name string) string {
return strings.TrimSuffix(name, filepath.Ext(name))
}
func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error) {
@@ -32,23 +216,14 @@ func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error)
}
endpoint := path.Base(path.Dir(reqfile))
ext := path.Ext(reqfile)
if len(ext) > 0 && ext[0] == '.' {
ext = ext[1:]
if idx := strings.Index(endpoint, "_"); idx > 0 {
endpoint = endpoint[idx+1:]
}
ext := getExt(reqfile)
var ct string
if cts, ok := ExtToTypes[ext]; ok {
for _, t := range cts {
if _, ok = c.Options().Codecs[t]; ok {
ct = t
break
}
}
}
if ct == "" {
return nil, ErrUnknownContentType
ct, err := getContentType(c.Options().Codecs, ext)
if err != nil {
return nil, err
}
req := c.NewRequest("test", endpoint, &codec.Frame{Data: reqbuf}, client.RequestContentType(ct))
@@ -56,26 +231,275 @@ func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error)
return req, nil
}
func NewSQLRowsFromFile(c sqlmock.Sqlmock, file string) (*sqlmock.Rows, error) {
fp, err := os.Open(file)
func SQLFromFile(m sqlmock.Sqlmock, name string) error {
fp, err := os.Open(name)
if err != nil {
return nil, err
return err
}
defer fp.Close()
r := csv.NewReader(fp)
r.Comma = '\t'
r.Comment = '#'
records, err := r.ReadAll()
if err != nil {
return nil, err
}
rows := c.NewRows(records[0])
for idx := 1; idx < len(records); idx++ {
rows.FromCSVString(strings.Join(records[idx], ";"))
}
return rows, nil
return SQLFromReader(m, fp)
}
func SQLFromBytes(m sqlmock.Sqlmock, buf []byte) error {
return SQLFromReader(m, bytes.NewReader(buf))
}
func SQLFromString(m sqlmock.Sqlmock, buf string) error {
return SQLFromReader(m, strings.NewReader(buf))
}
func SQLFromReader(m sqlmock.Sqlmock, r io.Reader) error {
var rows *sqlmock.Rows
var exp *sqlmock.ExpectedQuery
var columns []*sqlmock.Column
br := bufio.NewReader(r)
for {
s, err := br.ReadString('\n')
if err != nil && err != io.EOF {
return err
} else if err == io.EOF && len(s) == 0 {
if rows != nil && exp != nil {
exp.WillReturnRows(rows)
}
return nil
}
if s[0] != '#' {
r := csv.NewReader(strings.NewReader(s))
r.Comma = ','
var records [][]string
records, err = r.ReadAll()
if err != nil {
return err
}
if rows == nil && len(columns) > 0 {
rows = m.NewRowsWithColumnDefinition(columns...)
} else {
for idx := 0; idx < len(records); idx++ {
if len(columns) == 0 {
return fmt.Errorf("csv file not valid, does not have %q line", "# columns ")
}
rows = FromCSVString(columns, rows, strings.Join(records[idx], ","))
}
}
continue
}
if rows != nil {
exp.WillReturnRows(rows)
rows = nil
}
switch {
case strings.HasPrefix(strings.ToLower(s[2:]), "columns"):
for _, field := range strings.Split(s[2+len("columns")+1:], ",") {
args := strings.Split(field, "|")
column := sqlmock.NewColumn(args[0]).Nullable(false)
if len(args) > 1 {
for _, arg := range args {
switch arg {
case "BOOLEAN", "BOOL":
column = column.OfType("BOOL", false)
case "NUMBER", "DECIMAL":
column = column.OfType("DECIMAL", float64(0.0)).WithPrecisionAndScale(10, 4)
case "VARCHAR":
column = column.OfType("VARCHAR", nil)
case "NULL":
column = column.Nullable(true)
}
}
}
columns = append(columns, column)
}
case strings.HasPrefix(strings.ToLower(s[2:]), "begin"):
m.ExpectBegin()
case strings.HasPrefix(strings.ToLower(s[2:]), "commit"):
m.ExpectCommit()
case strings.HasPrefix(strings.ToLower(s[2:]), "rollback"):
m.ExpectRollback()
case strings.HasPrefix(strings.ToLower(s[2:]), "exec "):
m.ExpectExec(s[2+len("exec "):])
case strings.HasPrefix(strings.ToLower(s[2:]), "query "):
exp = m.ExpectQuery(s[2+len("query "):])
}
}
}
func Run(ctx context.Context, c client.Client, m sqlmock.Sqlmock, dir string, exts []string) error {
tcases, err := GetCases(dir, exts)
if err != nil {
return err
}
g, gctx := errgroup.WithContext(ctx)
if !strings.Contains(dir, "parallel") {
g.SetLimit(1)
}
for _, tcase := range tcases {
for _, dbfile := range tcase.dbfiles {
if err = SQLFromFile(m, dbfile); err != nil {
return err
}
}
tc := tcase
g.Go(func() error {
var xrid string
var gerr error
treq, err := NewRequestFromFile(c, tc.reqfile)
if err != nil {
gerr = fmt.Errorf("failed to read request from file %s err: %w", tc.reqfile, err)
return gerr
}
xrid = fmt.Sprintf("%s-%d", treq.Endpoint(), time.Now().Unix())
defer func() {
if gerr == nil {
fmt.Printf("test %s xrid: %s status: success\n", filepath.Dir(tc.reqfile), xrid)
} else {
fmt.Printf("test %s xrid: %s status: failure error: %v\n", filepath.Dir(tc.reqfile), xrid, err)
}
}()
data := &codec.Frame{}
md := metadata.New(1)
md.Set("X-Request-Id", xrid)
cerr := c.Call(metadata.NewOutgoingContext(gctx, md), treq, data, client.WithContentType(treq.ContentType()))
var rspfile string
if tc.errfile != "" {
rspfile = tc.errfile
} else if tc.rspfile != "" {
rspfile = tc.rspfile
} else {
gerr = fmt.Errorf("errfile and rspfile is empty")
return gerr
}
expectRsp, err := NewResponseFromFile(rspfile)
if err != nil {
gerr = fmt.Errorf("failed to read response from file %s err: %w", rspfile, err)
return gerr
}
testCodec, err := getCodec(Codecs, getExt(tc.reqfile))
if err != nil {
gerr = fmt.Errorf("failed to get response file codec err: %w", err)
return gerr
}
expectCodec, err := getCodec(Codecs, getExt(rspfile))
if err != nil {
gerr = fmt.Errorf("failed to get response file codec err: %w", err)
return gerr
}
if cerr == nil && tc.errfile != "" {
gerr = fmt.Errorf("expected err %s not happened", expectRsp.Data)
return gerr
} else if cerr != nil && tc.errfile != "" {
if err = ResponseCompareFunc(expectRsp.Data, cerr, expectCodec, testCodec); err != nil {
gerr = err
return gerr
}
} else if cerr != nil && tc.errfile == "" {
gerr = cerr
return gerr
} else if cerr == nil && tc.errfile == "" {
if err = ResponseCompareFunc(expectRsp.Data, data, expectCodec, testCodec); err != nil {
gerr = err
return gerr
}
}
/*
cf, err := getCodec(c.Options().Codecs, getExt(tc.rspfile))
if err != nil {
return err
}
*/
return nil
})
}
return g.Wait()
}
type Case struct {
dbfiles []string
reqfile string
rspfile string
errfile string
}
func GetCases(dir string, exts []string) ([]Case, error) {
var tcases []Case
entries, err := os.ReadDir(dir)
if len(entries) == 0 && err != nil {
return tcases, err
}
if exts == nil {
exts = DefaultExts
}
var dirs []string
var dbfiles []string
var reqfile, rspfile, errfile string
for _, entry := range entries {
if entry.IsDir() {
dirs = append(dirs, filepath.Join(dir, entry.Name()))
continue
}
if info, err := entry.Info(); err != nil {
return tcases, err
} else if !info.Mode().IsRegular() {
continue
}
for _, ext := range exts {
if getExt(entry.Name()) == ext {
name := getNameWithoutExt(entry.Name())
switch {
case strings.HasSuffix(name, "_db"):
dbfiles = append(dbfiles, filepath.Join(dir, entry.Name()))
case strings.HasSuffix(name, "_req"):
reqfile = filepath.Join(dir, entry.Name())
case strings.HasSuffix(name, "_rsp"):
rspfile = filepath.Join(dir, entry.Name())
case strings.HasSuffix(name, "_err"):
errfile = filepath.Join(dir, entry.Name())
}
}
}
}
if reqfile != "" && (rspfile != "" || errfile != "") {
tcases = append(tcases, Case{dbfiles: dbfiles, reqfile: reqfile, rspfile: rspfile, errfile: errfile})
}
for _, dir = range dirs {
ntcases, err := GetCases(dir, exts)
if len(ntcases) == 0 && err != nil {
return tcases, err
} else if len(ntcases) == 0 {
continue
}
tcases = append(tcases, ntcases...)
}
return tcases, nil
}

72
util/test/test_test.go Normal file
View File

@@ -0,0 +1,72 @@
package test
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
func Test_SQLFromFile(t *testing.T) {
ctx := context.TODO()
db, c, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
defer db.Close()
if err = SQLFromFile(c, "testdata/result/01_firstcase/Call_db.csv"); err != nil {
t.Fatal(err)
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
rows, err := tx.QueryContext(ctx, "select * from test;")
if err != nil {
t.Fatal(err)
}
for rows.Next() {
var id int64
var name string
err = rows.Scan(&id, &name)
if err != nil {
t.Fatal(err)
}
if id != 1 || name != "test" {
t.Fatalf("invalid rows %v %v", id, name)
}
}
if err = rows.Close(); err != nil {
t.Fatal(err)
}
if err = rows.Err(); err != nil {
t.Fatal(err)
}
if err = tx.Commit(); err != nil {
t.Fatal(err)
}
if err = c.ExpectationsWereMet(); err != nil {
t.Fatal(err)
}
}
func Test_GetCases(t *testing.T) {
files, err := GetCases("testdata/", nil)
if err != nil {
t.Fatal(err)
}
if len(files) == 0 {
t.Fatalf("no files matching")
}
if n := len(files); n != 1 {
t.Fatalf("invalid number of test cases %d", n)
}
}

View File

@@ -0,0 +1,6 @@
# begin
# query select \* from test;
# columns id|VARCHAR,name|VARCHAR
id,name
1,test
# commit
1 # begin
2 # query select \* from test;
3 # columns id|VARCHAR,name|VARCHAR
4 id,name
5 1,test
6 # commit

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1 @@
{}

View File

@@ -1,6 +1,5 @@
package text
func DetectEncoding(text string) map[string]int {
charsets := map[string]int{
"UTF-8": 0,
@@ -77,7 +76,7 @@ func DetectEncoding(text string) map[string]int {
charsets["MAC"] += uppercase
}
last_simb = char
last_simb = char
}
return charsets