Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
1a9236caad | |||
6c68d39081 | |||
35e62fbeb0 | |||
00b3ceb468 | |||
7dc8f088c9 | |||
c65afcea1b | |||
3eebfb5b11 | |||
fa1427014c | |||
62074965ee | |||
9c8fbb2202 | |||
7c0a5f5e2a | |||
b08f5321b0 | |||
cc0f24e012 | |||
307a08f50c | |||
edc93e8c37 | |||
391813c260 | |||
1a1459dd0e | |||
4e99680c30 | |||
92a3a547b8 | |||
849c462037 | |||
54a55c83e2 | |||
781dee03db |
@@ -88,6 +88,8 @@ type BatchHandler func(Events) error
|
|||||||
|
|
||||||
// Event is given to a subscription handler for processing
|
// Event is given to a subscription handler for processing
|
||||||
type Event interface {
|
type Event interface {
|
||||||
|
// Context return context.Context for event
|
||||||
|
Context() context.Context
|
||||||
// Topic returns event topic
|
// Topic returns event topic
|
||||||
Topic() string
|
Topic() string
|
||||||
// Message returns broker message
|
// Message returns broker message
|
||||||
|
@@ -373,6 +373,10 @@ func (m *memoryEvent) SetError(err error) {
|
|||||||
m.err = err
|
m.err = err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryEvent) Context() context.Context {
|
||||||
|
return m.opts.Context
|
||||||
|
}
|
||||||
|
|
||||||
func (m *memorySubscriber) Options() broker.SubscribeOptions {
|
func (m *memorySubscriber) Options() broker.SubscribeOptions {
|
||||||
return m.opts
|
return m.opts
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
@@ -12,6 +13,8 @@ import (
|
|||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v3/selector"
|
||||||
|
"go.unistack.org/micro/v3/semconv"
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultCodecs will be used to encode/decode data
|
// DefaultCodecs will be used to encode/decode data
|
||||||
@@ -104,10 +107,13 @@ func (n *noopResponse) Read() ([]byte, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type noopStream struct{}
|
type noopStream struct {
|
||||||
|
err error
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
func (n *noopStream) Context() context.Context {
|
func (n *noopStream) Context() context.Context {
|
||||||
return context.Background()
|
return n.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStream) Request() Request {
|
func (n *noopStream) Request() Request {
|
||||||
@@ -135,15 +141,21 @@ func (n *noopStream) RecvMsg(interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStream) Error() error {
|
func (n *noopStream) Error() error {
|
||||||
return nil
|
return n.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStream) Close() error {
|
func (n *noopStream) Close() error {
|
||||||
return nil
|
if sp, ok := tracer.SpanFromContext(n.ctx); ok && sp != nil {
|
||||||
|
if n.err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, n.err.Error())
|
||||||
|
}
|
||||||
|
sp.Finish()
|
||||||
|
}
|
||||||
|
return n.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopStream) CloseSend() error {
|
func (n *noopStream) CloseSend() error {
|
||||||
return nil
|
return n.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopMessage) Topic() string {
|
func (n *noopMessage) Topic() string {
|
||||||
@@ -207,7 +219,28 @@ func (n *noopClient) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||||
return n.funcCall(ctx, req, rsp, opts...)
|
ts := time.Now()
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||||
|
var sp tracer.Span
|
||||||
|
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
||||||
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||||
|
)
|
||||||
|
err := n.funcCall(ctx, req, rsp, opts...)
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
||||||
|
te := time.Since(ts)
|
||||||
|
n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
|
||||||
|
if me := errors.FromError(err); me == nil {
|
||||||
|
sp.Finish()
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
||||||
|
} else {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||||
@@ -349,7 +382,28 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||||
return n.funcStream(ctx, req, opts...)
|
ts := time.Now()
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||||
|
var sp tracer.Span
|
||||||
|
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
||||||
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||||
|
)
|
||||||
|
stream, err := n.funcStream(ctx, req, opts...)
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
||||||
|
te := time.Since(ts)
|
||||||
|
n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
|
||||||
|
if me := errors.FromError(err); me == nil {
|
||||||
|
sp.Finish()
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
||||||
|
} else {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||||
@@ -493,7 +547,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
|
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
|
||||||
return &noopStream{}, nil
|
return &noopStream{ctx: ctx}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||||
|
@@ -44,6 +44,20 @@ var (
|
|||||||
ErrGatewayTimeout = &Error{Code: 504}
|
ErrGatewayTimeout = &Error{Code: 504}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const ProblemContentType = "application/problem+json"
|
||||||
|
|
||||||
|
type Problem struct {
|
||||||
|
Type string `json:"type,omitempty"`
|
||||||
|
Title string `json:"title,omitempty"`
|
||||||
|
Detail string `json:"detail,omitempty"`
|
||||||
|
Instance string `json:"instance,omitempty"`
|
||||||
|
Errors []struct {
|
||||||
|
Title string `json:"title,omitempty"`
|
||||||
|
Detail string `json:"detail,omitempty"`
|
||||||
|
} `json:"errors,omitempty"`
|
||||||
|
Status int `json:"status,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Error type
|
// Error type
|
||||||
type Error struct {
|
type Error struct {
|
||||||
// ID holds error id or service, usually someting like my_service or id
|
// ID holds error id or service, usually someting like my_service or id
|
||||||
@@ -262,6 +276,10 @@ func CodeIn(err interface{}, codes ...int32) bool {
|
|||||||
|
|
||||||
// FromError try to convert go error to *Error
|
// FromError try to convert go error to *Error
|
||||||
func FromError(err error) *Error {
|
func FromError(err error) *Error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if verr, ok := err.(*Error); ok && verr != nil {
|
if verr, ok := err.(*Error); ok && verr != nil {
|
||||||
return verr
|
return verr
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,17 @@ import "context"
|
|||||||
|
|
||||||
type loggerKey struct{}
|
type loggerKey struct{}
|
||||||
|
|
||||||
|
// MustContext returns logger from passed context or DefaultLogger if empty
|
||||||
|
func MustContext(ctx context.Context) Logger {
|
||||||
|
if ctx == nil {
|
||||||
|
return DefaultLogger
|
||||||
|
}
|
||||||
|
if l, ok := ctx.Value(loggerKey{}).(Logger); ok && l != nil {
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
return DefaultLogger
|
||||||
|
}
|
||||||
|
|
||||||
// FromContext returns logger from passed context
|
// FromContext returns logger from passed context
|
||||||
func FromContext(ctx context.Context) (Logger, bool) {
|
func FromContext(ctx context.Context) (Logger, bool) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package logger provides a log interface
|
// Package logger provides a log interface
|
||||||
package logger // import "go.unistack.org/micro/v3/logger"
|
package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@@ -6,6 +6,8 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/meter"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option func signature
|
// Option func signature
|
||||||
@@ -45,6 +47,8 @@ type Options struct {
|
|||||||
Level Level
|
Level Level
|
||||||
// TimeFunc used to obtain current time
|
// TimeFunc used to obtain current time
|
||||||
TimeFunc func() time.Time
|
TimeFunc func() time.Time
|
||||||
|
// Meter used to count logs for specific level
|
||||||
|
Meter meter.Meter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions creates new options struct
|
// NewOptions creates new options struct
|
||||||
@@ -58,6 +62,7 @@ func NewOptions(opts ...Option) Options {
|
|||||||
ContextAttrFuncs: DefaultContextAttrFuncs,
|
ContextAttrFuncs: DefaultContextAttrFuncs,
|
||||||
AddSource: true,
|
AddSource: true,
|
||||||
TimeFunc: time.Now,
|
TimeFunc: time.Now,
|
||||||
|
Meter: meter.DefaultMeter,
|
||||||
}
|
}
|
||||||
|
|
||||||
WithMicroKeys()(&options)
|
WithMicroKeys()(&options)
|
||||||
@@ -69,7 +74,7 @@ func NewOptions(opts ...Option) Options {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContextAttrFuncs appends default funcs for the context arrts filler
|
// WithContextAttrFuncs appends default funcs for the context attrs filler
|
||||||
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
|
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...)
|
o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...)
|
||||||
@@ -132,6 +137,13 @@ func WithName(n string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMeter sets the meter
|
||||||
|
func WithMeter(m meter.Meter) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Meter = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithTimeFunc sets the func to obtain current time
|
// WithTimeFunc sets the func to obtain current time
|
||||||
func WithTimeFunc(fn func() time.Time) Option {
|
func WithTimeFunc(fn func() time.Time) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@@ -11,6 +11,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
|
"go.unistack.org/micro/v3/semconv"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -150,6 +151,7 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interface{}) {
|
func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
|
||||||
if !s.V(lvl) {
|
if !s.V(lvl) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -189,6 +191,7 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interfa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
|
||||||
if !s.V(lvl) {
|
if !s.V(lvl) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -228,6 +231,7 @@ func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, att
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
|
func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
|
||||||
if !s.V(logger.InfoLevel) {
|
if !s.V(logger.InfoLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -249,6 +253,7 @@ func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
|
||||||
if !s.V(logger.InfoLevel) {
|
if !s.V(logger.InfoLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -270,6 +275,7 @@ func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
|
func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
|
||||||
if !s.V(logger.DebugLevel) {
|
if !s.V(logger.DebugLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -291,6 +297,7 @@ func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
|
||||||
if !s.V(logger.DebugLevel) {
|
if !s.V(logger.DebugLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -312,6 +319,7 @@ func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
|
func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
|
||||||
if !s.V(logger.TraceLevel) {
|
if !s.V(logger.TraceLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -333,6 +341,7 @@ func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
|
||||||
if !s.V(logger.TraceLevel) {
|
if !s.V(logger.TraceLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -354,6 +363,7 @@ func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
|
func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
|
||||||
if !s.V(logger.ErrorLevel) {
|
if !s.V(logger.ErrorLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -393,6 +403,7 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
|
||||||
if !s.V(logger.ErrorLevel) {
|
if !s.V(logger.ErrorLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -432,6 +443,7 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
|
func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
|
||||||
if !s.V(logger.FatalLevel) {
|
if !s.V(logger.FatalLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -454,6 +466,7 @@ func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
|
||||||
if !s.V(logger.FatalLevel) {
|
if !s.V(logger.FatalLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -476,6 +489,7 @@ func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
|
func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
|
||||||
if !s.V(logger.WarnLevel) {
|
if !s.V(logger.WarnLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -497,6 +511,7 @@ func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) {
|
func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||||
|
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
|
||||||
if !s.V(logger.WarnLevel) {
|
if !s.V(logger.WarnLevel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -24,6 +24,13 @@ var (
|
|||||||
DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1}
|
DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1}
|
||||||
// DefaultSummaryWindow is the default window for summary
|
// DefaultSummaryWindow is the default window for summary
|
||||||
DefaultSummaryWindow = 5 * time.Minute
|
DefaultSummaryWindow = 5 * time.Minute
|
||||||
|
// DefaultSkipEndpoints is the slice of endpoint that must not be metered
|
||||||
|
DefaultSkipEndpoints = []string{
|
||||||
|
"MeterService.Metrics",
|
||||||
|
"HealthService.Live",
|
||||||
|
"HealthService.Ready",
|
||||||
|
"HealthService.Version",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// Meter is an interface for collecting and instrumenting metrics
|
// Meter is an interface for collecting and instrumenting metrics
|
||||||
|
@@ -2,8 +2,6 @@ package meter
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option powers the configuration for metrics implementations:
|
// Option powers the configuration for metrics implementations:
|
||||||
@@ -11,8 +9,6 @@ type Option func(*Options)
|
|||||||
|
|
||||||
// Options for metrics implementations
|
// Options for metrics implementations
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Logger used for logging
|
|
||||||
Logger logger.Logger
|
|
||||||
// Context holds external options
|
// Context holds external options
|
||||||
Context context.Context
|
Context context.Context
|
||||||
// Name holds the meter name
|
// Name holds the meter name
|
||||||
@@ -39,7 +35,6 @@ func NewOptions(opt ...Option) Options {
|
|||||||
Address: DefaultAddress,
|
Address: DefaultAddress,
|
||||||
Path: DefaultPath,
|
Path: DefaultPath,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
Logger: logger.DefaultLogger,
|
|
||||||
MetricPrefix: DefaultMetricPrefix,
|
MetricPrefix: DefaultMetricPrefix,
|
||||||
LabelPrefix: DefaultLabelPrefix,
|
LabelPrefix: DefaultLabelPrefix,
|
||||||
}
|
}
|
||||||
@@ -95,13 +90,6 @@ func TimingObjectives(value map[float64]float64) Option {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Logger sets the logger
|
|
||||||
func Logger(l logger.Logger) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Logger = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Labels sets the meter labels
|
// Labels sets the meter labels
|
||||||
func Labels(ls ...string) Option {
|
func Labels(ls ...string) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@@ -305,6 +305,10 @@ func (t *tunEvent) SetError(err error) {
|
|||||||
t.err = err
|
t.err = err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *tunEvent) Context() context.Context {
|
||||||
|
return context.TODO()
|
||||||
|
}
|
||||||
|
|
||||||
// NewBroker returns new tunnel broker
|
// NewBroker returns new tunnel broker
|
||||||
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
|
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
|
||||||
options := broker.NewOptions(opts...)
|
options := broker.NewOptions(opts...)
|
||||||
|
10
options.go
10
options.go
@@ -269,15 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, mtr := range o.Meters {
|
|
||||||
for _, or := range lopts.meters {
|
|
||||||
if mtr.Name() == or || all {
|
|
||||||
if err = mtr.Init(meter.Logger(l)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, trc := range o.Tracers {
|
for _, trc := range o.Tracers {
|
||||||
for _, ot := range lopts.tracers {
|
for _, ot := range lopts.tracers {
|
||||||
if trc.Name() == ot || all {
|
if trc.Name() == ot || all {
|
||||||
|
@@ -1,12 +0,0 @@
|
|||||||
package semconv
|
|
||||||
|
|
||||||
var (
|
|
||||||
// CacheRequestDurationSeconds specifies meter metric name
|
|
||||||
CacheRequestDurationSeconds = "cache_request_duration_seconds"
|
|
||||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
|
||||||
CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds"
|
|
||||||
// CacheRequestTotal specifies meter metric name
|
|
||||||
CacheRequestTotal = "cache_request_total"
|
|
||||||
// CacheRequestInflight specifies meter metric name
|
|
||||||
CacheRequestInflight = "cache_request_inflight"
|
|
||||||
)
|
|
4
semconv/logger.go
Normal file
4
semconv/logger.go
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
package semconv
|
||||||
|
|
||||||
|
// LoggerMessageTotal specifies meter metric name for logger messages
|
||||||
|
var LoggerMessageTotal = "logger_message_total"
|
12
semconv/store.go
Normal file
12
semconv/store.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package semconv
|
||||||
|
|
||||||
|
var (
|
||||||
|
// StoreRequestDurationSeconds specifies meter metric name
|
||||||
|
StoreRequestDurationSeconds = "store_request_duration_seconds"
|
||||||
|
// ClientRequestLatencyMicroseconds specifies meter metric name
|
||||||
|
StoreRequestLatencyMicroseconds = "store_request_latency_microseconds"
|
||||||
|
// StoreRequestTotal specifies meter metric name
|
||||||
|
StoreRequestTotal = "store_request_total"
|
||||||
|
// StoreRequestInflight specifies meter metric name
|
||||||
|
StoreRequestInflight = "store_request_inflight"
|
||||||
|
)
|
182
server/noop.go
182
server/noop.go
@@ -274,7 +274,7 @@ func (n *noopServer) Register() error {
|
|||||||
|
|
||||||
if !registered {
|
if !registered {
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
|
config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -312,7 +312,7 @@ func (n *noopServer) Deregister() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
|
config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := DefaultDeregisterFunc(service, config); err != nil {
|
if err := DefaultDeregisterFunc(service, config); err != nil {
|
||||||
@@ -343,11 +343,11 @@ func (n *noopServer) Deregister() error {
|
|||||||
go func(s broker.Subscriber) {
|
go func(s broker.Subscriber) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
|
config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
|
||||||
}
|
}
|
||||||
if err := s.Unsubscribe(ncx); err != nil {
|
if err := s.Unsubscribe(ncx); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
|
config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(subs[idx])
|
}(subs[idx])
|
||||||
@@ -383,7 +383,7 @@ func (n *noopServer) Start() error {
|
|||||||
config.Address = addr
|
config.Address = addr
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
|
config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
|
||||||
}
|
}
|
||||||
|
|
||||||
n.Lock()
|
n.Lock()
|
||||||
@@ -397,13 +397,13 @@ func (n *noopServer) Start() error {
|
|||||||
// connect to the broker
|
// connect to the broker
|
||||||
if err := config.Broker.Connect(config.Context); err != nil {
|
if err := config.Broker.Connect(config.Context); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
|
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
|
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -411,13 +411,13 @@ func (n *noopServer) Start() error {
|
|||||||
// nolint: nestif
|
// nolint: nestif
|
||||||
if err := config.RegisterCheck(config.Context); err != nil {
|
if err := config.RegisterCheck(config.Context); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
|
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// announce self to the world
|
// announce self to the world
|
||||||
if err := n.Register(); err != nil {
|
if err := n.Register(); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server register error: %v", err)
|
config.Logger.Error(n.opts.Context, "server register error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -450,23 +450,23 @@ func (n *noopServer) Start() error {
|
|||||||
// nolint: nestif
|
// nolint: nestif
|
||||||
if rerr != nil && registered {
|
if rerr != nil && registered {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
|
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
|
||||||
}
|
}
|
||||||
// deregister self in case of error
|
// deregister self in case of error
|
||||||
if err := n.Deregister(); err != nil {
|
if err := n.Deregister(); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
|
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if rerr != nil && !registered {
|
} else if rerr != nil && !registered {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
|
config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := n.Register(); err != nil {
|
if err := n.Register(); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
|
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// wait for exit
|
// wait for exit
|
||||||
@@ -478,7 +478,7 @@ func (n *noopServer) Start() error {
|
|||||||
// deregister self
|
// deregister self
|
||||||
if err := n.Deregister(); err != nil {
|
if err := n.Deregister(); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "server deregister error: ", err)
|
config.Logger.Error(n.opts.Context, "server deregister error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -491,12 +491,12 @@ func (n *noopServer) Start() error {
|
|||||||
ch <- nil
|
ch <- nil
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
|
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
|
||||||
}
|
}
|
||||||
// disconnect broker
|
// disconnect broker
|
||||||
if err := config.Broker.Disconnect(config.Context); err != nil {
|
if err := config.Broker.Disconnect(config.Context); err != nil {
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
|
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -512,36 +512,33 @@ func (n *noopServer) Start() error {
|
|||||||
func (n *noopServer) subscribe() error {
|
func (n *noopServer) subscribe() error {
|
||||||
config := n.Options()
|
config := n.Options()
|
||||||
|
|
||||||
cx := config.Context
|
subCtx := config.Context
|
||||||
var err error
|
|
||||||
var sub broker.Subscriber
|
|
||||||
|
|
||||||
for sb := range n.subscribers {
|
for sb := range n.subscribers {
|
||||||
if sb.Options().Context != nil {
|
|
||||||
cx = sb.Options().Context
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []broker.SubscribeOption{
|
||||||
|
broker.SubscribeContext(subCtx),
|
||||||
|
broker.SubscribeAutoAck(sb.Options().AutoAck),
|
||||||
|
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
}
|
}
|
||||||
|
|
||||||
if sb.Options().Batch {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
// batch processing handler
|
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
|
||||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
|
||||||
} else {
|
|
||||||
// single processing handler
|
|
||||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
|
|
||||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -637,127 +634,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
|
||||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
|
||||||
return func(ps broker.Events) (err error) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
n.RLock()
|
|
||||||
config := n.opts
|
|
||||||
n.RUnlock()
|
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
|
||||||
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
|
|
||||||
config.Logger.Error(n.opts.Context, string(debug.Stack()))
|
|
||||||
}
|
|
||||||
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
msgs := make([]Message, 0, len(ps))
|
|
||||||
ctxs := make([]context.Context, 0, len(ps))
|
|
||||||
for _, p := range ps {
|
|
||||||
msg := p.Message()
|
|
||||||
// if we don't have headers, create empty map
|
|
||||||
if msg.Header == nil {
|
|
||||||
msg.Header = metadata.New(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
ct, _ := msg.Header.Get(metadata.HeaderContentType)
|
|
||||||
if len(ct) == 0 {
|
|
||||||
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
|
|
||||||
ct = defaultContentType
|
|
||||||
}
|
|
||||||
hdr := metadata.Copy(msg.Header)
|
|
||||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
|
||||||
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
|
|
||||||
msgs = append(msgs, &rpcMessage{
|
|
||||||
topic: topic,
|
|
||||||
contentType: ct,
|
|
||||||
header: msg.Header,
|
|
||||||
body: msg.Body,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
results := make(chan error, len(sb.handlers))
|
|
||||||
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
handler := sb.handlers[i]
|
|
||||||
|
|
||||||
var req reflect.Value
|
|
||||||
|
|
||||||
switch handler.reqType.Kind() {
|
|
||||||
case reflect.Ptr:
|
|
||||||
req = reflect.New(handler.reqType.Elem())
|
|
||||||
default:
|
|
||||||
req = reflect.New(handler.reqType.Elem()).Elem()
|
|
||||||
}
|
|
||||||
|
|
||||||
reqType := handler.reqType
|
|
||||||
var cf codec.Codec
|
|
||||||
for _, msg := range msgs {
|
|
||||||
cf, err = n.newCodec(msg.ContentType())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rb := reflect.New(req.Type().Elem())
|
|
||||||
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
msg.(*rpcMessage).codec = cf
|
|
||||||
msg.(*rpcMessage).payload = rb.Interface()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(ctxs []context.Context, ms []Message) error {
|
|
||||||
var vals []reflect.Value
|
|
||||||
if sb.typ.Kind() != reflect.Func {
|
|
||||||
vals = append(vals, sb.rcvr)
|
|
||||||
}
|
|
||||||
if handler.ctxType != nil {
|
|
||||||
vals = append(vals, reflect.ValueOf(ctxs))
|
|
||||||
}
|
|
||||||
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
|
||||||
for _, m := range ms {
|
|
||||||
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
|
|
||||||
}
|
|
||||||
vals = append(vals, payloads)
|
|
||||||
|
|
||||||
returnValues := handler.method.Call(vals)
|
|
||||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
|
||||||
return rerr.(error)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
opts.Hooks.EachNext(func(hook options.Hook) {
|
|
||||||
if h, ok := hook.(HookBatchSubHandler); ok {
|
|
||||||
fn = h(fn)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if n.wg != nil {
|
|
||||||
n.wg.Add(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
if n.wg != nil {
|
|
||||||
defer n.wg.Done()
|
|
||||||
}
|
|
||||||
results <- fn(ctxs, msgs)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
var errors []string
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
if rerr := <-results; rerr != nil {
|
|
||||||
errors = append(errors, rerr.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(errors) > 0 {
|
|
||||||
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
|
@@ -9,7 +9,6 @@ import (
|
|||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v3/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
|
|
||||||
if len(msgs) != 8 {
|
|
||||||
h.t.Fatal("invalid number of messages received")
|
|
||||||
}
|
|
||||||
for idx := 0; idx < len(msgs); idx++ {
|
|
||||||
md, _ := metadata.FromIncomingContext(ctxs[idx])
|
|
||||||
_ = md
|
|
||||||
// fmt.Printf("msg md %v\n", md)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNoopSub(t *testing.T) {
|
func TestNoopSub(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
@@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
|
|
||||||
server.SubscriberQueue("queue"),
|
|
||||||
server.SubscriberBatch(true),
|
|
||||||
)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.Start(); err != nil {
|
if err := s.Start(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@@ -341,8 +341,6 @@ type SubscriberOptions struct {
|
|||||||
AutoAck bool
|
AutoAck bool
|
||||||
// BodyOnly flag specifies that message without headers
|
// BodyOnly flag specifies that message without headers
|
||||||
BodyOnly bool
|
BodyOnly bool
|
||||||
// Batch flag specifies that message processed in batches
|
|
||||||
Batch bool
|
|
||||||
// BatchSize flag specifies max size of batch
|
// BatchSize flag specifies max size of batch
|
||||||
BatchSize int
|
BatchSize int
|
||||||
// BatchWait flag specifies max wait time for batch filling
|
// BatchWait flag specifies max wait time for batch filling
|
||||||
@@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscriberBatch control batch processing for handler
|
|
||||||
func SubscriberBatch(b bool) SubscriberOption {
|
|
||||||
return func(o *SubscriberOptions) {
|
|
||||||
o.Batch = b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscriberBatchSize control batch filling size for handler
|
// SubscriberBatchSize control batch filling size for handler
|
||||||
// Batch filling max waiting time controlled by SubscriberBatchWait
|
// Batch filling max waiting time controlled by SubscriberBatchWait
|
||||||
func SubscriberBatchSize(n int) SubscriberOption {
|
func SubscriberBatchSize(n int) SubscriberOption {
|
||||||
|
@@ -63,10 +63,10 @@ type Server interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
|
FuncSubHandler func(ctx context.Context, ms Message) error
|
||||||
HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
|
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
||||||
FuncSubHandler func(ctx context.Context, ms Message) error
|
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
|
||||||
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
HookHandler func(next FuncHandler) FuncHandler
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@@ -3,14 +3,12 @@ package server
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
|
||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subSig = "func(context.Context, interface{}) error"
|
subSig = "func(context.Context, interface{}) error"
|
||||||
batchSubSig = "func([]context.Context, []interface{}) error"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Precompute the reflect type for error. Can't use error directly
|
// Precompute the reflect type for error. Can't use error directly
|
||||||
@@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
|
|||||||
switch typ.NumIn() {
|
switch typ.NumIn() {
|
||||||
case 2:
|
case 2:
|
||||||
argType = typ.In(1)
|
argType = typ.In(1)
|
||||||
if sub.Options().Batch {
|
|
||||||
if argType.Kind() != reflect.Slice {
|
|
||||||
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
|
|
||||||
}
|
|
||||||
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
|
|
||||||
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
|
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
|
||||||
}
|
}
|
||||||
if !isExportedOrBuiltinType(argType) {
|
if !isExportedOrBuiltinType(argType) {
|
||||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
||||||
}
|
}
|
||||||
if typ.NumOut() != 1 {
|
if typ.NumOut() != 1 {
|
||||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
|
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
|
||||||
name, typ.NumOut(), subSig, batchSubSig)
|
name, typ.NumOut(), subSig)
|
||||||
}
|
}
|
||||||
if returnType := typ.Out(0); returnType != typeOfError {
|
if returnType := typ.Out(0); returnType != typeOfError {
|
||||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||||
@@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
|
|||||||
case 3:
|
case 3:
|
||||||
argType = method.Type.In(2)
|
argType = method.Type.In(2)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
|
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
||||||
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
|
name, method.Name, method.Type.NumIn(), subSig)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isExportedOrBuiltinType(argType) {
|
if !isExportedOrBuiltinType(argType) {
|
||||||
@@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
|
|||||||
}
|
}
|
||||||
if method.Type.NumOut() != 1 {
|
if method.Type.NumOut() != 1 {
|
||||||
return fmt.Errorf(
|
return fmt.Errorf(
|
||||||
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
|
"subscriber %v.%v has wrong number of return values: %v require signature %s",
|
||||||
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
|
name, method.Name, method.Type.NumOut(), subSig)
|
||||||
}
|
}
|
||||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
if returnType := method.Type.Out(0); returnType != typeOfError {
|
||||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
||||||
|
@@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
|
|||||||
// publication message.
|
// publication message.
|
||||||
type SubscriberFunc func(ctx context.Context, msg Message) error
|
type SubscriberFunc func(ctx context.Context, msg Message) error
|
||||||
|
|
||||||
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
|
|
||||||
// for the wrappers. What's handed to the actual method is the concrete
|
|
||||||
// publication message. This func used by batch subscribers
|
|
||||||
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
|
|
||||||
|
|
||||||
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
|
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
|
||||||
type HandlerWrapper func(HandlerFunc) HandlerFunc
|
type HandlerWrapper func(HandlerFunc) HandlerFunc
|
||||||
|
|
||||||
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
|
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
|
||||||
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
|
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
|
||||||
|
|
||||||
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
|
|
||||||
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
|
|
||||||
|
|
||||||
// StreamWrapper wraps a Stream interface and returns the equivalent.
|
// StreamWrapper wraps a Stream interface and returns the equivalent.
|
||||||
// Because streams exist for the lifetime of a method invocation this
|
// Because streams exist for the lifetime of a method invocation this
|
||||||
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
|
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
|
||||||
|
@@ -83,8 +83,11 @@ func (sk SpanKind) String() string {
|
|||||||
|
|
||||||
// SpanOptions contains span option
|
// SpanOptions contains span option
|
||||||
type SpanOptions struct {
|
type SpanOptions struct {
|
||||||
Labels []interface{}
|
StatusMsg string
|
||||||
Kind SpanKind
|
Labels []interface{}
|
||||||
|
Status SpanStatus
|
||||||
|
Kind SpanKind
|
||||||
|
Record bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SpanOption func signature
|
// SpanOption func signature
|
||||||
@@ -110,12 +113,25 @@ func WithSpanLabels(kv ...interface{}) SpanOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithSpanStatus(st SpanStatus, msg string) SpanOption {
|
||||||
|
return func(o *SpanOptions) {
|
||||||
|
o.Status = st
|
||||||
|
o.StatusMsg = msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithSpanKind(k SpanKind) SpanOption {
|
func WithSpanKind(k SpanKind) SpanOption {
|
||||||
return func(o *SpanOptions) {
|
return func(o *SpanOptions) {
|
||||||
o.Kind = k
|
o.Kind = k
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithSpanRecord(b bool) SpanOption {
|
||||||
|
return func(o *SpanOptions) {
|
||||||
|
o.Record = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Options struct
|
// Options struct
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Context used to store custome tracer options
|
// Context used to store custome tracer options
|
||||||
@@ -124,6 +140,8 @@ type Options struct {
|
|||||||
Logger logger.Logger
|
Logger logger.Logger
|
||||||
// Name of the tracer
|
// Name of the tracer
|
||||||
Name string
|
Name string
|
||||||
|
// ContextAttrFuncs contains funcs that provides tracing
|
||||||
|
ContextAttrFuncs []ContextAttrFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option func signature
|
// Option func signature
|
||||||
@@ -148,7 +166,8 @@ func NewEventOptions(opts ...EventOption) EventOptions {
|
|||||||
// NewSpanOptions returns default SpanOptions
|
// NewSpanOptions returns default SpanOptions
|
||||||
func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
||||||
options := SpanOptions{
|
options := SpanOptions{
|
||||||
Kind: SpanKindInternal,
|
Kind: SpanKindInternal,
|
||||||
|
Record: true,
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
@@ -159,8 +178,9 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
|||||||
// NewOptions returns default options
|
// NewOptions returns default options
|
||||||
func NewOptions(opts ...Option) Options {
|
func NewOptions(opts ...Option) Options {
|
||||||
options := Options{
|
options := Options{
|
||||||
Logger: logger.DefaultLogger,
|
Logger: logger.DefaultLogger,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
ContextAttrFuncs: DefaultContextAttrFuncs,
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
|
@@ -7,16 +7,25 @@ import (
|
|||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultTracer is the global default tracer
|
|
||||||
var DefaultTracer Tracer = NewTracer()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// DefaultTracer is the global default tracer
|
||||||
|
DefaultTracer Tracer = NewTracer() //nolint:revive
|
||||||
// TraceIDKey is the key used for the trace id in the log call
|
// TraceIDKey is the key used for the trace id in the log call
|
||||||
TraceIDKey = "trace-id"
|
TraceIDKey = "trace-id"
|
||||||
// SpanIDKey is the key used for the span id in the log call
|
// SpanIDKey is the key used for the span id in the log call
|
||||||
SpanIDKey = "span-id"
|
SpanIDKey = "span-id"
|
||||||
|
// DefaultSkipEndpoints is the slice of endpoint that must not be traced
|
||||||
|
DefaultSkipEndpoints = []string{
|
||||||
|
"MeterService.Metrics",
|
||||||
|
"HealthService.Live",
|
||||||
|
"HealthService.Ready",
|
||||||
|
"HealthService.Version",
|
||||||
|
}
|
||||||
|
DefaultContextAttrFuncs []ContextAttrFunc
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ContextAttrFunc func(ctx context.Context) []interface{}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
|
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
|
||||||
func(ctx context.Context) []interface{} {
|
func(ctx context.Context) []interface{} {
|
||||||
@@ -38,6 +47,8 @@ type Tracer interface {
|
|||||||
Init(...Option) error
|
Init(...Option) error
|
||||||
// Start a trace
|
// Start a trace
|
||||||
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
||||||
|
// Extract get span metadata from context
|
||||||
|
// Extract(ctx context.Context)
|
||||||
// Flush flushes spans
|
// Flush flushes spans
|
||||||
Flush(ctx context.Context) error
|
Flush(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
@@ -25,6 +25,48 @@ type StructField struct {
|
|||||||
Field reflect.StructField
|
Field reflect.StructField
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StructFieldNameByTag get struct field name by tag key and its value
|
||||||
|
func StructFieldNameByTag(src interface{}, tkey string, tval string) (string, interface{}, error) {
|
||||||
|
sv := reflect.ValueOf(src)
|
||||||
|
if sv.Kind() == reflect.Ptr {
|
||||||
|
sv = sv.Elem()
|
||||||
|
}
|
||||||
|
if sv.Kind() != reflect.Struct {
|
||||||
|
return "", nil, ErrInvalidStruct
|
||||||
|
}
|
||||||
|
|
||||||
|
typ := sv.Type()
|
||||||
|
for idx := 0; idx < typ.NumField(); idx++ {
|
||||||
|
fld := typ.Field(idx)
|
||||||
|
val := sv.Field(idx)
|
||||||
|
if len(fld.PkgPath) != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if ts, ok := fld.Tag.Lookup(tkey); ok {
|
||||||
|
for _, p := range strings.Split(ts, ",") {
|
||||||
|
if p == tval {
|
||||||
|
return fld.Name, val.Interface(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch val.Kind() {
|
||||||
|
case reflect.Ptr:
|
||||||
|
if val = val.Elem(); val.Kind() == reflect.Struct {
|
||||||
|
if name, fld, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
|
||||||
|
return name, fld, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case reflect.Struct:
|
||||||
|
if name, fld, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
|
||||||
|
return name, fld, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
// StructFieldByTag get struct field by tag key and its value
|
// StructFieldByTag get struct field by tag key and its value
|
||||||
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
|
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
|
||||||
sv := reflect.ValueOf(src)
|
sv := reflect.ValueOf(src)
|
||||||
@@ -46,9 +88,6 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
|||||||
if ts, ok := fld.Tag.Lookup(tkey); ok {
|
if ts, ok := fld.Tag.Lookup(tkey); ok {
|
||||||
for _, p := range strings.Split(ts, ",") {
|
for _, p := range strings.Split(ts, ",") {
|
||||||
if p == tval {
|
if p == tval {
|
||||||
if val.Kind() != reflect.Ptr && val.CanAddr() {
|
|
||||||
val = val.Addr()
|
|
||||||
}
|
|
||||||
return val.Interface(), nil
|
return val.Interface(), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,10 +111,21 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
|||||||
|
|
||||||
// ZeroFieldByPath clean struct field by its path
|
// ZeroFieldByPath clean struct field by its path
|
||||||
func ZeroFieldByPath(src interface{}, path string) error {
|
func ZeroFieldByPath(src interface{}, path string) error {
|
||||||
|
if src == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
var err error
|
var err error
|
||||||
val := reflect.ValueOf(src)
|
val := reflect.ValueOf(src)
|
||||||
|
|
||||||
|
if IsEmpty(val) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for _, p := range strings.Split(path, ".") {
|
for _, p := range strings.Split(path, ".") {
|
||||||
|
if IsEmpty(val) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
val, err = structValueByName(val, p)
|
val, err = structValueByName(val, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -493,13 +543,14 @@ func btSplitter(str string) []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// queryToMap turns something like a[b][c]=4 into
|
// queryToMap turns something like a[b][c]=4 into
|
||||||
// map[string]interface{}{
|
//
|
||||||
// "a": map[string]interface{}{
|
// map[string]interface{}{
|
||||||
// "b": map[string]interface{}{
|
// "a": map[string]interface{}{
|
||||||
// "c": 4,
|
// "b": map[string]interface{}{
|
||||||
// },
|
// "c": 4,
|
||||||
// },
|
// },
|
||||||
// }
|
// },
|
||||||
|
// }
|
||||||
func queryToMap(param string) (map[string]interface{}, error) {
|
func queryToMap(param string) (map[string]interface{}, error) {
|
||||||
rawKey, rawValue, err := splitKeyAndValue(param)
|
rawKey, rawValue, err := splitKeyAndValue(param)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -190,9 +190,9 @@ func TestStructFieldByTag(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v, ok := iface.(*[]string); !ok {
|
if v, ok := iface.([]string); !ok {
|
||||||
t.Fatalf("not *[]string %v", iface)
|
t.Fatalf("not *[]string %v", iface)
|
||||||
} else if len(*v) != 2 {
|
} else if len(v) != 2 {
|
||||||
t.Fatalf("invalid number %v", iface)
|
t.Fatalf("invalid number %v", iface)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
package pool
|
package pool
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
type Pool[T any] struct {
|
type Pool[T any] struct {
|
||||||
p *sync.Pool
|
p *sync.Pool
|
||||||
@@ -23,3 +26,11 @@ func (p Pool[T]) Get() T {
|
|||||||
func (p Pool[T]) Put(t T) {
|
func (p Pool[T]) Put(t T) {
|
||||||
p.p.Put(t)
|
p.p.Put(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewBytePool(size int) Pool[T] {
|
||||||
|
return NewPool(func() []byte { return make([]byte, size) })
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBytesPool() Pool[T] {
|
||||||
|
return NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) })
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user