Compare commits
1 Commits
v4.0.1
...
d660c085b2
Author | SHA1 | Date | |
---|---|---|---|
d660c085b2 |
@@ -3,7 +3,6 @@ package broker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v4/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
@@ -135,22 +134,13 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
|||||||
eh := m.opts.ErrorHandler
|
eh := m.opts.ErrorHandler
|
||||||
|
|
||||||
for t, ms := range msgTopicMap {
|
for t, ms := range msgTopicMap {
|
||||||
ts := time.Now()
|
|
||||||
|
|
||||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
|
|
||||||
|
|
||||||
m.RLock()
|
m.RLock()
|
||||||
subs, ok := m.subscribers[t]
|
subs, ok := m.subscribers[t]
|
||||||
m.RUnlock()
|
m.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
|
|
||||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
|
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
if sub.opts.BatchErrorHandler != nil {
|
if sub.opts.BatchErrorHandler != nil {
|
||||||
beh = sub.opts.BatchErrorHandler
|
beh = sub.opts.BatchErrorHandler
|
||||||
@@ -162,65 +152,37 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
|||||||
switch {
|
switch {
|
||||||
// batch processing
|
// batch processing
|
||||||
case sub.batchhandler != nil:
|
case sub.batchhandler != nil:
|
||||||
|
|
||||||
if err = sub.batchhandler(ms); err != nil {
|
if err = sub.batchhandler(ms); err != nil {
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
|
||||||
ms.SetError(err)
|
ms.SetError(err)
|
||||||
if beh != nil {
|
if beh != nil {
|
||||||
_ = beh(ms)
|
_ = beh(ms)
|
||||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||||
}
|
}
|
||||||
} else {
|
} else if sub.opts.AutoAck {
|
||||||
if sub.opts.AutoAck {
|
|
||||||
if err = ms.Ack(); err != nil {
|
if err = ms.Ack(); err != nil {
|
||||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
|
||||||
} else {
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
|
||||||
// single processing
|
// single processing
|
||||||
case sub.handler != nil:
|
case sub.handler != nil:
|
||||||
for _, p := range ms {
|
for _, p := range ms {
|
||||||
if err = sub.handler(p); err != nil {
|
if err = sub.handler(p); err != nil {
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
|
||||||
p.SetError(err)
|
p.SetError(err)
|
||||||
if eh != nil {
|
if eh != nil {
|
||||||
_ = eh(p)
|
_ = eh(p)
|
||||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||||
}
|
}
|
||||||
} else {
|
} else if sub.opts.AutoAck {
|
||||||
if sub.opts.AutoAck {
|
|
||||||
if err = p.Ack(); err != nil {
|
if err = p.Ack(); err != nil {
|
||||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
|
||||||
} else {
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
|
|
||||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
te := time.Since(ts)
|
|
||||||
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
|
||||||
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
|
||||||
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
|
||||||
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@@ -12,25 +12,6 @@ import (
|
|||||||
"go.unistack.org/micro/v4/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
// PublishMessageDurationSeconds specifies meter metric name
|
|
||||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
|
||||||
// PublishMessageLatencyMicroseconds specifies meter metric name
|
|
||||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
|
||||||
// PublishMessageTotal specifies meter metric name
|
|
||||||
PublishMessageTotal = "publish_message_total"
|
|
||||||
// PublishMessageInflight specifies meter metric name
|
|
||||||
PublishMessageInflight = "publish_message_inflight"
|
|
||||||
// SubscribeMessageDurationSeconds specifies meter metric name
|
|
||||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
|
||||||
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
|
||||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
|
||||||
// SubscribeMessageTotal specifies meter metric name
|
|
||||||
SubscribeMessageTotal = "subscribe_message_total"
|
|
||||||
// SubscribeMessageInflight specifies meter metric name
|
|
||||||
SubscribeMessageInflight = "subscribe_message_inflight"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Options struct
|
// Options struct
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Tracer used for tracing
|
// Tracer used for tracing
|
||||||
|
@@ -27,6 +27,22 @@ var (
|
|||||||
ServerRequestTotal = "server_request_total"
|
ServerRequestTotal = "server_request_total"
|
||||||
// ServerRequestInflight specifies meter metric name
|
// ServerRequestInflight specifies meter metric name
|
||||||
ServerRequestInflight = "server_request_inflight"
|
ServerRequestInflight = "server_request_inflight"
|
||||||
|
// PublishMessageDurationSeconds specifies meter metric name
|
||||||
|
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||||
|
// PublishMessageLatencyMicroseconds specifies meter metric name
|
||||||
|
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||||
|
// PublishMessageTotal specifies meter metric name
|
||||||
|
PublishMessageTotal = "publish_message_total"
|
||||||
|
// PublishMessageInflight specifies meter metric name
|
||||||
|
PublishMessageInflight = "publish_message_inflight"
|
||||||
|
// SubscribeMessageDurationSeconds specifies meter metric name
|
||||||
|
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||||
|
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
||||||
|
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||||
|
// SubscribeMessageTotal specifies meter metric name
|
||||||
|
SubscribeMessageTotal = "subscribe_message_total"
|
||||||
|
// SubscribeMessageInflight specifies meter metric name
|
||||||
|
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||||
|
|
||||||
labelSuccess = "success"
|
labelSuccess = "success"
|
||||||
labelFailure = "failure"
|
labelFailure = "failure"
|
||||||
@@ -214,7 +230,37 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
return w.Client.Publish(ctx, p, opts...)
|
endpoint := p.Topic()
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
|
||||||
|
ts := time.Now()
|
||||||
|
err := w.Client.Publish(ctx, p, opts...)
|
||||||
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
|
||||||
|
|
||||||
|
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
|
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
labels = append(labels, labelStatus, labelSuccess)
|
||||||
|
} else {
|
||||||
|
labels = append(labels, labelStatus, labelFailure)
|
||||||
|
}
|
||||||
|
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHandlerWrapper create new server handler wrapper
|
||||||
|
// deprecated
|
||||||
|
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||||
|
handler := &wrapper{
|
||||||
|
opts: NewOptions(opts...),
|
||||||
|
}
|
||||||
|
return handler.HandlerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServerHandlerWrapper create new server handler wrapper
|
// NewServerHandlerWrapper create new server handler wrapper
|
||||||
@@ -256,3 +302,46 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewSubscriberWrapper create server subscribe wrapper
|
||||||
|
// deprecated
|
||||||
|
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||||
|
handler := &wrapper{
|
||||||
|
opts: NewOptions(opts...),
|
||||||
|
}
|
||||||
|
return handler.SubscriberFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||||
|
handler := &wrapper{
|
||||||
|
opts: NewOptions(opts...),
|
||||||
|
}
|
||||||
|
return handler.SubscriberFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
|
||||||
|
return func(ctx context.Context, msg server.Message) error {
|
||||||
|
endpoint := msg.Topic()
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
|
||||||
|
ts := time.Now()
|
||||||
|
err := fn(ctx, msg)
|
||||||
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
|
||||||
|
|
||||||
|
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
|
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
labels = append(labels, labelStatus, labelSuccess)
|
||||||
|
} else {
|
||||||
|
labels = append(labels, labelStatus, labelFailure)
|
||||||
|
}
|
||||||
|
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -150,6 +150,7 @@ type Stream interface {
|
|||||||
// func (g *Greeter) Hello(context, request, response) error {
|
// func (g *Greeter) Hello(context, request, response) error {
|
||||||
// return nil
|
// return nil
|
||||||
// }
|
// }
|
||||||
|
//
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
Name() string
|
Name() string
|
||||||
Handler() interface{}
|
Handler() interface{}
|
||||||
|
@@ -88,7 +88,6 @@ func (s *service) Name() string {
|
|||||||
// Init initialises options. Additionally it calls cmd.Init
|
// Init initialises options. Additionally it calls cmd.Init
|
||||||
// which parses command line flags. cmd.Init is only called
|
// which parses command line flags. cmd.Init is only called
|
||||||
// on first Init.
|
// on first Init.
|
||||||
//
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (s *service) Init(opts ...Option) error {
|
func (s *service) Init(opts ...Option) error {
|
||||||
var err error
|
var err error
|
||||||
|
@@ -58,7 +58,6 @@ func IsLocal(addr string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Extract returns a real ip
|
// Extract returns a real ip
|
||||||
//
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func Extract(addr string) (string, error) {
|
func Extract(addr string) (string, error) {
|
||||||
// if addr specified then its returned
|
// if addr specified then its returned
|
||||||
|
@@ -1,17 +0,0 @@
|
|||||||
package io
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
var osStderrMu sync.Mutex
|
|
||||||
|
|
||||||
var OrigStderr = func() *os.File {
|
|
||||||
fd, err := dupFD(os.Stderr.Fd())
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return os.NewFile(fd, os.Stderr.Name())
|
|
||||||
}()
|
|
@@ -1,40 +0,0 @@
|
|||||||
package io
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"regexp"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var ErrorPattern = regexp.MustCompile(`"error"`)
|
|
||||||
|
|
||||||
func TestRedirect(t *testing.T) {
|
|
||||||
r, w, err := os.Pipe()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan string)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
_, _ = io.Copy(buf, r)
|
|
||||||
ch <- buf.String()
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err = RedirectStderr(w); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
os.Stderr.Write([]byte(`test redirect`))
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
r.Close()
|
|
||||||
str := <-ch
|
|
||||||
if ErrorPattern.MatchString(str) {
|
|
||||||
t.Fatal(fmt.Errorf(str))
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,48 +0,0 @@
|
|||||||
//go:build !windows
|
|
||||||
// +build !windows
|
|
||||||
|
|
||||||
package io
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"golang.org/x/sys/unix"
|
|
||||||
)
|
|
||||||
|
|
||||||
// dupFD is used to initialize OrigStderr (see stderr_redirect.go).
|
|
||||||
func dupFD(fd uintptr) (uintptr, error) {
|
|
||||||
// Warning: failing to set FD_CLOEXEC causes the duplicated file descriptor
|
|
||||||
// to leak into subprocesses created by exec.Command. If the file descriptor
|
|
||||||
// is a pipe, these subprocesses will hold the pipe open (i.e., prevent
|
|
||||||
// EOF), potentially beyond the lifetime of this process.
|
|
||||||
//
|
|
||||||
// This can break go test's timeouts. go test usually spawns a test process
|
|
||||||
// with its stdin and stderr streams hooked up to pipes; if the test process
|
|
||||||
// times out, it sends a SIGKILL and attempts to read stdin and stderr to
|
|
||||||
// completion. If the test process has itself spawned long-lived
|
|
||||||
// subprocesses that hold references to the stdin or stderr pipes, go test
|
|
||||||
// will hang until the subprocesses exit, rather defeating the purpose of
|
|
||||||
// a timeout.
|
|
||||||
nfd, err := unix.FcntlInt(fd, unix.F_DUPFD_CLOEXEC, 0)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return uintptr(nfd), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RedirectStderr is used to redirect internal writes to fd 2 to the
|
|
||||||
// specified file. This is needed to ensure that harcoded writes to fd
|
|
||||||
// 2 by e.g. the Go runtime are redirected to a log file of our
|
|
||||||
// choosing.
|
|
||||||
//
|
|
||||||
// We also override os.Stderr for those other parts of Go which use
|
|
||||||
// that and not fd 2 directly.
|
|
||||||
func RedirectStderr(f *os.File) error {
|
|
||||||
osStderrMu.Lock()
|
|
||||||
defer osStderrMu.Unlock()
|
|
||||||
if err := unix.Dup2(int(f.Fd()), unix.Stderr); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
os.Stderr = f
|
|
||||||
return nil
|
|
||||||
}
|
|
@@ -1,32 +0,0 @@
|
|||||||
package io
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"golang.org/x/sys/windows"
|
|
||||||
)
|
|
||||||
|
|
||||||
// dupFD is used to initialize OrigStderr (see stderr_redirect.go).
|
|
||||||
func dupFD(fd uintptr) (uintptr, error) {
|
|
||||||
// Adapted from https://github.com/golang/go/blob/go1.8/src/syscall/exec_windows.go#L303.
|
|
||||||
p := windows.CurrentProcess()
|
|
||||||
var h windows.Handle
|
|
||||||
return uintptr(h), windows.DuplicateHandle(p, windows.Handle(fd), p, &h, 0, true, windows.DUPLICATE_SAME_ACCESS)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RedirectStderr is used to redirect internal writes to the error
|
|
||||||
// handle to the specified file. This is needed to ensure that
|
|
||||||
// harcoded writes to the error handle by e.g. the Go runtime are
|
|
||||||
// redirected to a log file of our choosing.
|
|
||||||
//
|
|
||||||
// We also override os.Stderr for those other parts of Go which use
|
|
||||||
// that and not fd 2 directly.
|
|
||||||
func RedirectStderr(f *os.File) error {
|
|
||||||
osStderrMu.Lock()
|
|
||||||
defer osStderrMu.Unlock()
|
|
||||||
if err := windows.SetStdHandle(windows.STD_ERROR_HANDLE, windows.Handle(f.Fd())); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
os.Stderr = f
|
|
||||||
return nil
|
|
||||||
}
|
|
@@ -493,7 +493,6 @@ func btSplitter(str string) []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// queryToMap turns something like a[b][c]=4 into
|
// queryToMap turns something like a[b][c]=4 into
|
||||||
//
|
|
||||||
// map[string]interface{}{
|
// map[string]interface{}{
|
||||||
// "a": map[string]interface{}{
|
// "a": map[string]interface{}{
|
||||||
// "b": map[string]interface{}{
|
// "b": map[string]interface{}{
|
||||||
|
25
util/test/sqlmock_test.go
Normal file
25
util/test/sqlmock_test.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_NewSQLRowsFromFile(t *testing.T) {
|
||||||
|
db, c, err := sqlmock.New()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
rows, err := NewSQLRowsFromFile(c, "testdata/Call.csv")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !strings.Contains(fmt.Sprintf("%#+v", rows), `cols:[]string{"DepAgrId", "DepAgrNum", "DepAgrDate", "DepAgrCloseDate", "AccCur", "MainFinaccNum", "MainFinaccName", "MainFinaccId", "MainFinaccOpenDt", "DepAgrStatus", "MainFinaccBal", "DepartCode", "CardAccId"}`) {
|
||||||
|
t.Fatal("invalid cols after import csv")
|
||||||
|
}
|
||||||
|
}
|
@@ -1,63 +1,28 @@
|
|||||||
package test
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/DATA-DOG/go-sqlmock"
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
"go.unistack.org/micro/v4/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v4/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getExt(name string) string {
|
|
||||||
ext := filepath.Ext(name)
|
|
||||||
if len(ext) > 0 && ext[0] == '.' {
|
|
||||||
ext = ext[1:]
|
|
||||||
}
|
|
||||||
return ext
|
|
||||||
}
|
|
||||||
|
|
||||||
func getNameWithoutExt(name string) string {
|
|
||||||
return strings.TrimSuffix(name, filepath.Ext(name))
|
|
||||||
}
|
|
||||||
|
|
||||||
var ErrUnknownContentType = errors.New("unknown content type")
|
var ErrUnknownContentType = errors.New("unknown content type")
|
||||||
|
|
||||||
type Extension struct {
|
type Extension struct {
|
||||||
Ext []string
|
Ext []string
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var ExtToTypes = map[string][]string{
|
||||||
ExtToTypes = map[string][]string{
|
|
||||||
"json": {"application/json", "application/grpc+json"},
|
"json": {"application/json", "application/grpc+json"},
|
||||||
"yaml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
|
"yaml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
|
||||||
"yml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
|
"yml": {"application/yaml", "application/yml", "text/yaml", "text/yml"},
|
||||||
"proto": {"application/grpc", "application/grpc+proto", "application/proto"},
|
"proto": {"application/grpc", "application/grpc+proto", "application/proto"},
|
||||||
}
|
|
||||||
|
|
||||||
DefaultExts = []string{"csv", "json", "yaml", "yml", "proto"}
|
|
||||||
)
|
|
||||||
|
|
||||||
func clientCall(ctx context.Context, c client.Client, req client.Request, rsp interface{}) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewResponseFromFile(rspfile string) (*codec.Frame, error) {
|
|
||||||
rspbuf, err := os.ReadFile(rspfile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &codec.Frame{Data: rspbuf}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error) {
|
func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error) {
|
||||||
@@ -67,7 +32,10 @@ func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
endpoint := path.Base(path.Dir(reqfile))
|
endpoint := path.Base(path.Dir(reqfile))
|
||||||
ext := getExt(reqfile)
|
ext := path.Ext(reqfile)
|
||||||
|
if len(ext) > 0 && ext[0] == '.' {
|
||||||
|
ext = ext[1:]
|
||||||
|
}
|
||||||
|
|
||||||
var ct string
|
var ct string
|
||||||
if cts, ok := ExtToTypes[ext]; ok {
|
if cts, ok := ExtToTypes[ext]; ok {
|
||||||
@@ -88,189 +56,26 @@ func NewRequestFromFile(c client.Client, reqfile string) (client.Request, error)
|
|||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SQLFromFile(m sqlmock.Sqlmock, name string) error {
|
func NewSQLRowsFromFile(c sqlmock.Sqlmock, file string) (*sqlmock.Rows, error) {
|
||||||
fp, err := os.Open(name)
|
fp, err := os.Open(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer fp.Close()
|
defer fp.Close()
|
||||||
return SQLFromReader(m, fp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func SQLFromBytes(m sqlmock.Sqlmock, buf []byte) error {
|
r := csv.NewReader(fp)
|
||||||
return SQLFromReader(m, bytes.NewReader(buf))
|
r.Comma = '\t'
|
||||||
}
|
r.Comment = '#'
|
||||||
|
|
||||||
func SQLFromString(m sqlmock.Sqlmock, buf string) error {
|
records, err := r.ReadAll()
|
||||||
return SQLFromReader(m, strings.NewReader(buf))
|
|
||||||
}
|
|
||||||
|
|
||||||
func SQLFromReader(m sqlmock.Sqlmock, r io.Reader) error {
|
|
||||||
var rows *sqlmock.Rows
|
|
||||||
var exp *sqlmock.ExpectedQuery
|
|
||||||
br := bufio.NewReader(r)
|
|
||||||
|
|
||||||
for {
|
|
||||||
s, err := br.ReadString('\n')
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
return err
|
|
||||||
} else if err == io.EOF && len(s) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if s[0] != '#' {
|
|
||||||
r := csv.NewReader(strings.NewReader(s))
|
|
||||||
r.Comma = ','
|
|
||||||
var records [][]string
|
|
||||||
records, err = r.ReadAll()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
|
||||||
if rows == nil {
|
|
||||||
rows = m.NewRows(records[0])
|
|
||||||
} else {
|
|
||||||
for idx := 0; idx < len(records); idx++ {
|
|
||||||
rows.FromCSVString(strings.Join(records[idx], ","))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if rows != nil {
|
rows := c.NewRows(records[0])
|
||||||
exp.WillReturnRows(rows)
|
for idx := 1; idx < len(records); idx++ {
|
||||||
rows = nil
|
rows.FromCSVString(strings.Join(records[idx], ";"))
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
return rows, nil
|
||||||
case strings.HasPrefix(strings.ToLower(s[2:]), "begin"):
|
|
||||||
m.ExpectBegin()
|
|
||||||
case strings.HasPrefix(strings.ToLower(s[2:]), "commit"):
|
|
||||||
m.ExpectCommit()
|
|
||||||
case strings.HasPrefix(strings.ToLower(s[2:]), "rollback"):
|
|
||||||
m.ExpectRollback()
|
|
||||||
case strings.HasPrefix(strings.ToLower(s[2:]), "exec "):
|
|
||||||
m.ExpectExec(s[2+len("exec "):])
|
|
||||||
case strings.HasPrefix(strings.ToLower(s[2:]), "query "):
|
|
||||||
exp = m.ExpectQuery(s[2+len("query "):])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunWithClientExpectResults(ctx context.Context, c client.Client, m sqlmock.Sqlmock, dir string, exts []string) error {
|
|
||||||
tcases, err := GetCases(dir, exts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
g, gctx := errgroup.WithContext(ctx)
|
|
||||||
if !strings.Contains(dir, "parallel") {
|
|
||||||
g.SetLimit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tcase := range tcases {
|
|
||||||
for _, dbfile := range tcase.dbfiles {
|
|
||||||
if err = SQLFromFile(m, dbfile); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for idx := 0; idx < len(tcase.reqfiles); idx++ {
|
|
||||||
g.TryGo(func() error {
|
|
||||||
req, err := NewRequestFromFile(c, tcase.reqfiles[idx])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rsp, err := NewResponseFromFile(tcase.rspfiles[idx])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
data := &codec.Frame{}
|
|
||||||
err = c.Call(gctx, req, data, client.WithContentType(req.ContentType()))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !bytes.Equal(rsp.Data, data.Data) {
|
|
||||||
return fmt.Errorf("rsp not equal test %s != %s", rsp.Data, data.Data)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return g.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunWithClientExpectErrors(ctx context.Context, c client.Client, dir string) error {
|
|
||||||
g, gctx := errgroup.WithContext(ctx)
|
|
||||||
if !strings.Contains(dir, "parallel") {
|
|
||||||
g.SetLimit(1)
|
|
||||||
}
|
|
||||||
_ = gctx
|
|
||||||
g.TryGo(func() error {
|
|
||||||
// rsp := &codec.Frame{}
|
|
||||||
// return c.Call(ctx, req, rsp, client.WithContentType(req.ContentType()))
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
return g.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
type Case struct {
|
|
||||||
dbfiles []string
|
|
||||||
reqfiles []string
|
|
||||||
rspfiles []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetCases(dir string, exts []string) ([]Case, error) {
|
|
||||||
var tcases []Case
|
|
||||||
entries, err := os.ReadDir(dir)
|
|
||||||
if len(entries) == 0 && err != nil {
|
|
||||||
return tcases, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if exts == nil {
|
|
||||||
exts = DefaultExts
|
|
||||||
}
|
|
||||||
|
|
||||||
var dirs []string
|
|
||||||
var dbfiles, reqfiles, rspfiles []string
|
|
||||||
|
|
||||||
for _, entry := range entries {
|
|
||||||
if entry.IsDir() {
|
|
||||||
dirs = append(dirs, filepath.Join(dir, entry.Name()))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if info, err := entry.Info(); err != nil {
|
|
||||||
return tcases, err
|
|
||||||
} else if !info.Mode().IsRegular() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ext := range exts {
|
|
||||||
if getExt(entry.Name()) == ext {
|
|
||||||
name := getNameWithoutExt(entry.Name())
|
|
||||||
switch {
|
|
||||||
case strings.HasSuffix(name, "_db"):
|
|
||||||
dbfiles = append(dbfiles, filepath.Join(dir, entry.Name()))
|
|
||||||
case strings.HasSuffix(name, "_req"):
|
|
||||||
reqfiles = append(reqfiles, filepath.Join(dir, entry.Name()))
|
|
||||||
case strings.HasSuffix(name, "_rsp"):
|
|
||||||
rspfiles = append(rspfiles, filepath.Join(dir, entry.Name()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(reqfiles) > 0 && len(rspfiles) > 0 {
|
|
||||||
tcases = append(tcases, Case{dbfiles: dbfiles, reqfiles: reqfiles, rspfiles: rspfiles})
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, dir = range dirs {
|
|
||||||
ntcases, err := GetCases(dir, exts)
|
|
||||||
if len(ntcases) == 0 && err != nil {
|
|
||||||
return tcases, err
|
|
||||||
} else if len(ntcases) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
tcases = append(tcases, ntcases...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return tcases, nil
|
|
||||||
}
|
}
|
||||||
|
@@ -1,72 +0,0 @@
|
|||||||
package test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/DATA-DOG/go-sqlmock"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_SQLFromFile(t *testing.T) {
|
|
||||||
ctx := context.TODO()
|
|
||||||
db, c, err := sqlmock.New()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
if err = SQLFromFile(c, "testdata/result/01_firstcase/Call_db.csv"); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tx, err := db.BeginTx(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := tx.QueryContext(ctx, "select * from test;")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
for rows.Next() {
|
|
||||||
var id int64
|
|
||||||
var name string
|
|
||||||
err = rows.Scan(&id, &name)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if id != 1 || name != "test" {
|
|
||||||
t.Fatalf("invalid rows %v %v", id, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = rows.Close(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = rows.Err(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = tx.Commit(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err = c.ExpectationsWereMet(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_GetCases(t *testing.T) {
|
|
||||||
files, err := GetCases("testdata/", nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(files) == 0 {
|
|
||||||
t.Fatalf("no files matching")
|
|
||||||
}
|
|
||||||
|
|
||||||
if n := len(files); n != 1 {
|
|
||||||
t.Fatalf("invalid number of test cases %d", n)
|
|
||||||
}
|
|
||||||
}
|
|
3
util/test/testdata/Call.csv
vendored
Normal file
3
util/test/testdata/Call.csv
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
DepAgrId DepAgrNum DepAgrDate DepAgrCloseDate AccCur MainFinaccNum MainFinaccName MainFinaccId MainFinaccOpenDt DepAgrStatus MainFinaccBal DepartCode CardAccId
|
||||||
|
RBO223352456529 006-21/19-013946448 06112019 "" RUR 42306810521060000015 Депозит по договору № 006-21/19-013946448 220667379184 06112019 WORK 1100000 006-21
|
||||||
|
RBO151198718263 000-500/18-009684290 19062018 "" RUR 40817810001004518898 Текущий счет по договору № 144086737921 19062018 WORK 14091.13 000-500 144121164578
|
|
@@ -1,5 +0,0 @@
|
|||||||
# begin
|
|
||||||
# query select \* from test;
|
|
||||||
id,name
|
|
||||||
1,test
|
|
||||||
# commit
|
|
|
@@ -1 +0,0 @@
|
|||||||
{}
|
|
@@ -1 +0,0 @@
|
|||||||
{}
|
|
@@ -1,5 +1,6 @@
|
|||||||
package text
|
package text
|
||||||
|
|
||||||
|
|
||||||
func DetectEncoding(text string) map[string]int {
|
func DetectEncoding(text string) map[string]int {
|
||||||
charsets := map[string]int{
|
charsets := map[string]int{
|
||||||
"UTF-8": 0,
|
"UTF-8": 0,
|
||||||
|
Reference in New Issue
Block a user