Compare commits

...

34 Commits

Author SHA1 Message Date
82d269cfb4 xpool: add metrics
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-29 22:58:53 +03:00
6641463eed util/reflect: add ability to merge maps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 19:22:20 +03:00
faf2454f0a cleanup
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 17:54:17 +03:00
de9e4d73f5 change semconv metric names to include micro_ prefix
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 08:38:36 +03:00
4ae7277140 meter: remove prefix options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 08:27:25 +03:00
a98618ed5b add codec.Flatten option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-16 23:10:43 +03:00
3aaf1182cb add codec option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-16 23:02:45 +03:00
eb1482d789 codec: simplify codec interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-16 22:41:47 +03:00
a305f7553f Merge pull request '#347 add test' (#349) from kgorbunov/micro:#347-v3 into v3
Reviewed-on: #349
2024-09-16 14:59:58 +03:00
Gorbunov Kirill Andreevich
d9b2f2a45d #347 add test
Some checks failed
pr / test (pull_request) Failing after 0s
lint / lint (pull_request) Failing after 1s
2024-09-16 14:48:47 +03:00
3ace7657dc codec: RawMessage Marshal fix
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 10:43:45 +03:00
53b40617e2 fixup util/xpool
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-04 23:06:40 +03:00
1a9236caad update meter options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-04 22:41:10 +03:00
6c68d39081 errors: add RFC9457 problem type
closes #297

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-08-01 01:06:02 +03:00
35e62fbeb0 tracer: add default context attr funcs option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-06 00:09:27 +03:00
00b3ceb468 smeconv: fix naming
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-04 14:56:48 +03:00
7dc8f088c9 Merge pull request 'fix impl interface' (#346) from devstigneev/micro:fix_impl_mevent into v3
Reviewed-on: #346
2024-07-01 12:26:53 +03:00
c65afcea1b fix impl interface
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Has been cancelled
2024-07-01 09:47:51 +03:00
3eebfb5b11 Обновить options.go 2024-05-10 08:12:10 +03:00
fa1427014c close #343
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-09 19:16:12 +03:00
62074965ee close #329
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-09 16:41:22 +03:00
9c8fbb2202 broker: add Event Context() method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-05 16:22:06 +03:00
7c0a5f5e2a add abilit to skip span recording
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 19:31:35 +03:00
b08f5321b0 tracer: allow to skip span recording
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 19:18:12 +03:00
cc0f24e012 add ability to skip endpoints for tracer and meter
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 19:05:07 +03:00
307a08f50c add more checks
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 15:31:08 +03:00
edc93e8c37 util/reflect: update StructFieldNameByTag
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 14:43:46 +03:00
391813c260 util/reflect: add StructFieldNameByTag
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 14:34:41 +03:00
1a1459dd0e util/reflect: fix StructFieldByTag
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 13:16:31 +03:00
4e99680c30 server: add missing hook definitions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 07:39:08 +03:00
92a3a547b8 Merge pull request 'server/noop: cleanup' (#342) from server-noop into v3
Reviewed-on: #342
2024-04-23 07:30:20 +03:00
849c462037 server/noop: cleanup
All checks were successful
pr / test (pull_request) Successful in 1m38s
lint / lint (pull_request) Successful in 10m35s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 07:28:58 +03:00
54a55c83e2 Merge pull request 'add client tracing' (#341) from traceclient into v3
Reviewed-on: #341
2024-04-22 23:44:54 +03:00
781dee03db add client tracing
All checks were successful
pr / test (pull_request) Successful in 1m36s
lint / lint (pull_request) Successful in 10m37s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-22 23:39:21 +03:00
81 changed files with 757 additions and 1277 deletions

View File

@@ -1,5 +1,5 @@
// Package broker is an interface used for asynchronous messaging
package broker // import "go.unistack.org/micro/v3/broker"
package broker
import (
"context"
@@ -88,6 +88,8 @@ type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Context return context.Context for event
Context() context.Context
// Topic returns event topic
Topic() string
// Message returns broker message

View File

@@ -373,6 +373,10 @@ func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memoryEvent) Context() context.Context {
return m.opts.Context
}
func (m *memorySubscriber) Options() broker.SubscribeOptions {
return m.opts
}

View File

@@ -1,5 +1,5 @@
// Package client is an interface for an RPC client
package client // import "go.unistack.org/micro/v3/client"
package client
import (
"context"

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strconv"
"time"
"go.unistack.org/micro/v3/broker"
@@ -12,6 +13,8 @@ import (
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
)
// DefaultCodecs will be used to encode/decode data
@@ -104,10 +107,13 @@ func (n *noopResponse) Read() ([]byte, error) {
return nil, nil
}
type noopStream struct{}
type noopStream struct {
err error
ctx context.Context
}
func (n *noopStream) Context() context.Context {
return context.Background()
return n.ctx
}
func (n *noopStream) Request() Request {
@@ -135,15 +141,21 @@ func (n *noopStream) RecvMsg(interface{}) error {
}
func (n *noopStream) Error() error {
return nil
return n.err
}
func (n *noopStream) Close() error {
return nil
if sp, ok := tracer.SpanFromContext(n.ctx); ok && sp != nil {
if n.err != nil {
sp.SetStatus(tracer.SpanStatusError, n.err.Error())
}
sp.Finish()
}
return n.err
}
func (n *noopStream) CloseSend() error {
return nil
return n.err
}
func (n *noopMessage) Topic() string {
@@ -207,7 +219,28 @@ func (n *noopClient) String() string {
}
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
return n.funcCall(ctx, req, rsp, opts...)
ts := time.Now()
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
err := n.funcCall(ctx, req, rsp, opts...)
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return err
}
func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
@@ -349,7 +382,28 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp
}
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
return n.funcStream(ctx, req, opts...)
ts := time.Now()
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
stream, err := n.funcStream(ctx, req, opts...)
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return stream, err
}
func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
@@ -493,7 +547,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
}
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
return &noopStream{}, nil
return &noopStream{ctx: ctx}, nil
}
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {

View File

@@ -1,19 +1,8 @@
// Package codec is an interface for encoding messages
package codec // import "go.unistack.org/micro/v3/codec"
package codec
import (
"errors"
"io"
"go.unistack.org/micro/v3/metadata"
)
// Message types
const (
Error MessageType = iota
Request
Response
Event
)
var (
@@ -24,65 +13,23 @@ var (
)
var (
// DefaultMaxMsgSize specifies how much data codec can handle
DefaultMaxMsgSize = 1024 * 1024 * 4 // 4Mb
// DefaultCodec is the global default codec
DefaultCodec = NewCodec()
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
DefaultTagName = "codec"
)
// MessageType specifies message type for codec
type MessageType int
// Codec encodes/decodes various types of messages used within micro.
// ReadHeader and ReadBody are called in pairs to read requests/responses
// from the connection. Close is called when finished with the
// connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded.
// Codec encodes/decodes various types of messages.
type Codec interface {
ReadHeader(r io.Reader, m *Message, mt MessageType) error
ReadBody(r io.Reader, v interface{}) error
Write(w io.Writer, m *Message, v interface{}) error
Marshal(v interface{}, opts ...Option) ([]byte, error)
Unmarshal(b []byte, v interface{}, opts ...Option) error
String() string
}
// Message represents detailed information about
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Header metadata.Metadata
Target string
Method string
Endpoint string
Error string
ID string
Body []byte
Type MessageType
}
// NewMessage creates new codec message
func NewMessage(t MessageType) *Message {
return &Message{Type: t, Header: metadata.New(0)}
}
// MarshalAppend calls codec.Marshal(v) and returns the data appended to buf.
// If codec implements MarshalAppend, that is called instead.
func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte, error) {
if nc, ok := c.(interface {
MarshalAppend([]byte, interface{}, ...Option) ([]byte, error)
}); ok {
return nc.MarshalAppend(buf, v, opts...)
}
mbuf, err := c.Marshal(v, opts...)
if err != nil {
return nil, err
}
return append(buf, mbuf...), nil
type CodecV2 interface {
Marshal(buf []byte, v interface{}, opts ...Option) ([]byte, error)
Unmarshal(buf []byte, v interface{}, opts ...Option) error
String() string
}
// RawMessage is a raw encoded JSON value.
@@ -93,6 +40,8 @@ type RawMessage []byte
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
} else if len(*m) == 0 {
return []byte("null"), nil
}
return *m, nil
}

View File

@@ -2,70 +2,14 @@ package codec
import (
"encoding/json"
"io"
codecpb "go.unistack.org/micro-proto/v3/codec"
)
type noopCodec struct {
opts Options
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil
}
func (c *noopCodec) ReadBody(conn io.Reader, b interface{}) error {
// read bytes
buf, err := io.ReadAll(conn)
if err != nil {
return err
}
if b == nil {
return nil
}
switch v := b.(type) {
case *string:
*v = string(buf)
case *[]byte:
*v = buf
case *Frame:
v.Data = buf
default:
return json.Unmarshal(buf, v)
}
return nil
}
func (c *noopCodec) Write(conn io.Writer, m *Message, b interface{}) error {
if b == nil {
return nil
}
var v []byte
switch vb := b.(type) {
case *Frame:
v = vb.Data
case string:
v = []byte(vb)
case *string:
v = []byte(*vb)
case *[]byte:
v = *vb
case []byte:
v = vb
default:
var err error
v, err = json.Marshal(vb)
if err != nil {
return err
}
}
_, err := conn.Write(v)
return err
}
func (c *noopCodec) String() string {
return "noop"
}
@@ -91,8 +35,8 @@ func (c *noopCodec) Marshal(v interface{}, opts ...Option) ([]byte, error) {
return ve, nil
case *Frame:
return ve.Data, nil
case *Message:
return ve.Body, nil
case *codecpb.Frame:
return ve.Data, nil
}
return json.Marshal(v)
@@ -115,8 +59,8 @@ func (c *noopCodec) Unmarshal(d []byte, v interface{}, opts ...Option) error {
case *Frame:
ve.Data = d
return nil
case *Message:
ve.Body = d
case *codecpb.Frame:
ve.Data = d
return nil
}

View File

@@ -23,15 +23,8 @@ type Options struct {
Context context.Context
// TagName specifies tag name in struct to control codec
TagName string
// MaxMsgSize specifies max messages size that reads by codec
MaxMsgSize int
}
// MaxMsgSize sets the max message size
func MaxMsgSize(n int) Option {
return func(o *Options) {
o.MaxMsgSize = n
}
// Flatten specifies that struct must be analyzed for flatten tag
Flatten bool
}
// TagName sets the codec tag name in struct
@@ -41,6 +34,13 @@ func TagName(n string) Option {
}
}
// Flatten enables checking for flatten tag name
func Flatten(b bool) Option {
return func(o *Options) {
o.Flatten = b
}
}
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
@@ -65,12 +65,12 @@ func Meter(m meter.Meter) Option {
// NewOptions returns new options
func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
MaxMsgSize: DefaultMaxMsgSize,
TagName: DefaultTagName,
Context: context.Background(),
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
TagName: DefaultTagName,
Flatten: false,
}
for _, o := range opts {

View File

@@ -1,5 +1,5 @@
// Package config is an interface for dynamic configuration.
package config // import "go.unistack.org/micro/v3/config"
package config
import (
"context"

View File

@@ -1,6 +1,6 @@
// Package errors provides a way to return detailed information
// for an RPC request error. The error is normally JSON encoded.
package errors // import "go.unistack.org/micro/v3/errors"
package errors
import (
"bytes"
@@ -44,6 +44,20 @@ var (
ErrGatewayTimeout = &Error{Code: 504}
)
const ProblemContentType = "application/problem+json"
type Problem struct {
Type string `json:"type,omitempty"`
Title string `json:"title,omitempty"`
Detail string `json:"detail,omitempty"`
Instance string `json:"instance,omitempty"`
Errors []struct {
Title string `json:"title,omitempty"`
Detail string `json:"detail,omitempty"`
} `json:"errors,omitempty"`
Status int `json:"status,omitempty"`
}
// Error type
type Error struct {
// ID holds error id or service, usually someting like my_service or id
@@ -262,6 +276,10 @@ func CodeIn(err interface{}, codes ...int32) bool {
// FromError try to convert go error to *Error
func FromError(err error) *Error {
if err == nil {
return nil
}
if verr, ok := err.(*Error); ok && verr != nil {
return verr
}

View File

@@ -1,5 +1,5 @@
// Package flow is an interface used for saga pattern microservice workflow
package flow // import "go.unistack.org/micro/v3/flow"
package flow
import (
"context"

View File

@@ -1,4 +1,4 @@
package fsm // import "go.unistack.org/micro/v3/fsm"
package fsm
import (
"context"

View File

@@ -4,6 +4,17 @@ import "context"
type loggerKey struct{}
// MustContext returns logger from passed context or DefaultLogger if empty
func MustContext(ctx context.Context) Logger {
if ctx == nil {
return DefaultLogger
}
if l, ok := ctx.Value(loggerKey{}).(Logger); ok && l != nil {
return l
}
return DefaultLogger
}
// FromContext returns logger from passed context
func FromContext(ctx context.Context) (Logger, bool) {
if ctx == nil {

View File

@@ -1,5 +1,5 @@
// Package logger provides a log interface
package logger // import "go.unistack.org/micro/v3/logger"
package logger
import (
"context"

View File

@@ -6,6 +6,8 @@ import (
"log/slog"
"os"
"time"
"go.unistack.org/micro/v3/meter"
)
// Option func signature
@@ -45,6 +47,8 @@ type Options struct {
Level Level
// TimeFunc used to obtain current time
TimeFunc func() time.Time
// Meter used to count logs for specific level
Meter meter.Meter
}
// NewOptions creates new options struct
@@ -58,6 +62,7 @@ func NewOptions(opts ...Option) Options {
ContextAttrFuncs: DefaultContextAttrFuncs,
AddSource: true,
TimeFunc: time.Now,
Meter: meter.DefaultMeter,
}
WithMicroKeys()(&options)
@@ -69,7 +74,7 @@ func NewOptions(opts ...Option) Options {
return options
}
// WithContextAttrFuncs appends default funcs for the context arrts filler
// WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) {
o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...)
@@ -132,6 +137,13 @@ func WithName(n string) Option {
}
}
// WithMeter sets the meter
func WithMeter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// WithTimeFunc sets the func to obtain current time
func WithTimeFunc(fn func() time.Time) Option {
return func(o *Options) {

View File

@@ -11,6 +11,7 @@ import (
"sync"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
)
@@ -150,6 +151,7 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
}
func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
if !s.V(lvl) {
return
}
@@ -189,6 +191,7 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interfa
}
func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
if !s.V(lvl) {
return
}
@@ -228,6 +231,7 @@ func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, att
}
func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
if !s.V(logger.InfoLevel) {
return
}
@@ -249,6 +253,7 @@ func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
if !s.V(logger.InfoLevel) {
return
}
@@ -270,6 +275,7 @@ func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}
}
func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
if !s.V(logger.DebugLevel) {
return
}
@@ -291,6 +297,7 @@ func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
if !s.V(logger.DebugLevel) {
return
}
@@ -312,6 +319,7 @@ func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
if !s.V(logger.TraceLevel) {
return
}
@@ -333,6 +341,7 @@ func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
if !s.V(logger.TraceLevel) {
return
}
@@ -354,6 +363,7 @@ func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
if !s.V(logger.ErrorLevel) {
return
}
@@ -393,6 +403,7 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
if !s.V(logger.ErrorLevel) {
return
}
@@ -432,6 +443,7 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
if !s.V(logger.FatalLevel) {
return
}
@@ -454,6 +466,7 @@ func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
if !s.V(logger.FatalLevel) {
return
}
@@ -476,6 +489,7 @@ func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
if !s.V(logger.WarnLevel) {
return
}
@@ -497,6 +511,7 @@ func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
if !s.V(logger.WarnLevel) {
return
}

View File

@@ -1,5 +1,5 @@
// Package metadata is a way of defining message headers
package metadata // import "go.unistack.org/micro/v3/metadata"
package metadata
import (
"net/textproto"

View File

@@ -16,14 +16,19 @@ var (
DefaultAddress = ":9090"
// DefaultPath the meter endpoint where the Meter data will be made available
DefaultPath = "/metrics"
// DefaultMetricPrefix holds the string that prepends to all metrics
DefaultMetricPrefix = "micro_"
// DefaultLabelPrefix holds the string that prepends to all labels
DefaultLabelPrefix = "micro_"
// DefaultMeterStatsInterval specifies interval for meter updating
DefaultMeterStatsInterval = 5 * time.Second
// DefaultSummaryQuantiles is the default spread of stats for summary
DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1}
// DefaultSummaryWindow is the default window for summary
DefaultSummaryWindow = 5 * time.Minute
// DefaultSkipEndpoints is the slice of endpoint that must not be metered
DefaultSkipEndpoints = []string{
"MeterService.Metrics",
"HealthService.Live",
"HealthService.Ready",
"HealthService.Version",
}
)
// Meter is an interface for collecting and instrumenting metrics

View File

@@ -2,8 +2,6 @@ package meter
import (
"context"
"go.unistack.org/micro/v3/logger"
)
// Option powers the configuration for metrics implementations:
@@ -11,8 +9,6 @@ type Option func(*Options)
// Options for metrics implementations
type Options struct {
// Logger used for logging
Logger logger.Logger
// Context holds external options
Context context.Context
// Name holds the meter name
@@ -21,10 +17,6 @@ type Options struct {
Address string
// Path holds the path for metrics
Path string
// MetricPrefix holds the prefix for all metrics
MetricPrefix string
// LabelPrefix holds the prefix for all labels
LabelPrefix string
// Labels holds the default labels
Labels []string
// WriteProcessMetrics flag to write process metrics
@@ -36,12 +28,9 @@ type Options struct {
// NewOptions prepares a set of options:
func NewOptions(opt ...Option) Options {
opts := Options{
Address: DefaultAddress,
Path: DefaultPath,
Context: context.Background(),
Logger: logger.DefaultLogger,
MetricPrefix: DefaultMetricPrefix,
LabelPrefix: DefaultLabelPrefix,
Address: DefaultAddress,
Path: DefaultPath,
Context: context.Background(),
}
for _, o := range opt {
@@ -51,20 +40,6 @@ func NewOptions(opt ...Option) Options {
return opts
}
// LabelPrefix sets the labels prefix
func LabelPrefix(pref string) Option {
return func(o *Options) {
o.LabelPrefix = pref
}
}
// MetricPrefix sets the metric prefix
func MetricPrefix(pref string) Option {
return func(o *Options) {
o.MetricPrefix = pref
}
}
// Context sets the metrics context
func Context(ctx context.Context) Option {
return func(o *Options) {
@@ -95,14 +70,7 @@ func TimingObjectives(value map[float64]float64) Option {
}
*/
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// Labels sets the meter labels
// Labels add the meter labels
func Labels(ls ...string) Option {
return func(o *Options) {
o.Labels = append(o.Labels, ls...)

View File

@@ -1,347 +0,0 @@
package wrapper // import "go.unistack.org/micro/v3/meter/wrapper"
import (
"context"
"fmt"
"time"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/server"
)
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success"
labelFailure = "failure"
labelStatus = "status"
labelEndpoint = "endpoint"
// DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
// Options struct
type Options struct {
Meter meter.Meter
lopts []meter.Option
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions creates new Options struct
func NewOptions(opts ...Option) Options {
options := Options{
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// ServiceName passes service name to meter label
func ServiceName(name string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("name", name))
}
}
// ServiceVersion passes service version to meter label
func ServiceVersion(version string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("version", version))
}
}
// ServiceID passes service id to meter label
func ServiceID(id string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("id", id))
}
}
// Meter passes meter
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// SkipEndoints add endpoint to skip
func SkipEndoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
type wrapper struct {
client.Client
callFunc client.CallFunc
opts Options
}
// NewClientWrapper create new client wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
opts: NewOptions(opts...),
Client: c,
}
return handler
}
}
// NewCallWrapper create new call wrapper
func NewCallWrapper(opts ...Option) client.CallWrapper {
return func(fn client.CallFunc) client.CallFunc {
handler := &wrapper{
opts: NewOptions(opts...),
callFunc: fn,
}
return handler.CallFunc
}
}
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.callFunc(ctx, addr, req, rsp, opts)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Call(ctx, req, rsp, opts...)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Stream(ctx, req, opts...)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return stream, err
}
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err
}
// NewHandlerWrapper create new server handler wrapper
// deprecated
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
// NewServerHandlerWrapper create new server handler wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Service() + "." + req.Endpoint()
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return fn(ctx, req, rsp)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, req, rsp)
te := time.Since(ts)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc()
return err
}
}
// NewSubscriberWrapper create server subscribe wrapper
// deprecated
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}

View File

@@ -1,4 +1,4 @@
package mtls // import "go.unistack.org/micro/v3/mtls"
package mtls
import (
"bytes"

View File

@@ -1,5 +1,5 @@
// Package network is for creating internetworks
package network // import "go.unistack.org/micro/v3/network"
package network
import (
"go.unistack.org/micro/v3/client"

View File

@@ -1,5 +1,5 @@
// Package transport is an interface for synchronous connection based communication
package transport // import "go.unistack.org/micro/v3/network/transport"
package transport
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package broker is a tunnel broker
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"
package broker
import (
"context"
@@ -305,6 +305,10 @@ func (t *tunEvent) SetError(err error) {
t.err = err
}
func (t *tunEvent) Context() context.Context {
return context.TODO()
}
// NewBroker returns new tunnel broker
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
options := broker.NewOptions(opts...)

View File

@@ -1,5 +1,5 @@
// Package transport provides a tunnel transport
package transport // import "go.unistack.org/micro/v3/network/tunnel/transport"
package transport
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package tunnel provides gre network tunnelling
package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel"
package tunnel
import (
"context"

View File

@@ -269,15 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option {
}
}
}
for _, mtr := range o.Meters {
for _, or := range lopts.meters {
if mtr.Name() == or || all {
if err = mtr.Init(meter.Logger(l)); err != nil {
return err
}
}
}
}
for _, trc := range o.Tracers {
for _, ot := range lopts.tracers {
if trc.Name() == ot || all {

View File

@@ -1,5 +1,5 @@
// Package http enables the http profiler
package http // import "go.unistack.org/micro/v3/profiler/http"
package http
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
package pprof // import "go.unistack.org/micro/v3/profiler/pprof"
package pprof
import (
"os"

View File

@@ -1,5 +1,5 @@
// Package profiler is for profilers
package profiler // import "go.unistack.org/micro/v3/profiler"
package profiler
// Profiler interface
type Profiler interface {

View File

@@ -1,5 +1,5 @@
// Package proxy is a transparent proxy built on the micro/server
package proxy // import "go.unistack.org/micro/v3/proxy"
package proxy
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package register is an interface for service discovery
package register // import "go.unistack.org/micro/v3/register"
package register
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package dns resolves names to dns records
package dns // import "go.unistack.org/micro/v3/resolver/dns"
package dns
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package dnssrv resolves names to dns srv records
package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv"
package dnssrv
import (
"fmt"

View File

@@ -1,5 +1,5 @@
// Package http resolves names to network addresses using a http request
package http // import "go.unistack.org/micro/v3/resolver/http"
package http
import (
"encoding/json"

View File

@@ -1,5 +1,5 @@
// Package noop is a noop resolver
package noop // import "go.unistack.org/micro/v3/resolver/noop"
package noop
import (
"go.unistack.org/micro/v3/resolver"

View File

@@ -1,5 +1,5 @@
// Package register resolves names using the micro register
package register // import "go.unistack.org/micro/v3/resolver/registry"
package register
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package static is a static resolver
package static // import "go.unistack.org/micro/v3/resolver/static"
package static
import (
"go.unistack.org/micro/v3/resolver"

View File

@@ -1,5 +1,5 @@
// Package router provides a network routing control plane
package router // import "go.unistack.org/micro/v3/router"
package router
import (
"errors"

View File

@@ -1,4 +1,4 @@
package random // import "go.unistack.org/micro/v3/selector/random"
package random
import (
"go.unistack.org/micro/v3/selector"

View File

@@ -1,4 +1,4 @@
package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin"
package roundrobin
import (
"go.unistack.org/micro/v3/selector"

View File

@@ -1,5 +1,5 @@
// Package selector is for node selection and load balancing
package selector // import "go.unistack.org/micro/v3/selector"
package selector
import (
"errors"

View File

@@ -2,21 +2,21 @@ package semconv
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
PublishMessageDurationSeconds = "micro_publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
PublishMessageLatencyMicroseconds = "micro_publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
PublishMessageTotal = "micro_publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
PublishMessageInflight = "micro_publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
SubscribeMessageDurationSeconds = "micro_subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
SubscribeMessageLatencyMicroseconds = "micro_subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
SubscribeMessageTotal = "micro_subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
SubscribeMessageInflight = "micro_subscribe_message_inflight"
// BrokerGroupLag specifies broker lag
BrokerGroupLag = "broker_group_lag"
BrokerGroupLag = "micro_broker_group_lag"
)

View File

@@ -1,12 +0,0 @@
package semconv
var (
// CacheRequestDurationSeconds specifies meter metric name
CacheRequestDurationSeconds = "cache_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds"
// CacheRequestTotal specifies meter metric name
CacheRequestTotal = "cache_request_total"
// CacheRequestInflight specifies meter metric name
CacheRequestInflight = "cache_request_inflight"
)

View File

@@ -2,11 +2,11 @@ package semconv
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
ClientRequestDurationSeconds = "micro_client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
ClientRequestLatencyMicroseconds = "micro_client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
ClientRequestTotal = "micro_client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
ClientRequestInflight = "micro_client_request_inflight"
)

4
semconv/logger.go Normal file
View File

@@ -0,0 +1,4 @@
package semconv
// LoggerMessageTotal specifies meter metric name for logger messages
var LoggerMessageTotal = "micro_logger_message_total"

View File

@@ -2,11 +2,11 @@ package semconv
var (
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
ServerRequestDurationSeconds = "micro_server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
ServerRequestLatencyMicroseconds = "micro_server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
ServerRequestTotal = "micro_server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
ServerRequestInflight = "micro_server_request_inflight"
)

12
semconv/store.go Normal file
View File

@@ -0,0 +1,12 @@
package semconv
var (
// StoreRequestDurationSeconds specifies meter metric name
StoreRequestDurationSeconds = "micro_store_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds"
// StoreRequestTotal specifies meter metric name
StoreRequestTotal = "micro_store_request_total"
// StoreRequestInflight specifies meter metric name
StoreRequestInflight = "micro_store_request_inflight"
)

View File

@@ -1,7 +1,6 @@
package server
import (
"bytes"
"context"
"fmt"
"reflect"
@@ -274,7 +273,7 @@ func (n *noopServer) Register() error {
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
}
}
@@ -312,7 +311,7 @@ func (n *noopServer) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
}
if err := DefaultDeregisterFunc(service, config); err != nil {
@@ -343,11 +342,11 @@ func (n *noopServer) Deregister() error {
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(ncx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
}
}
}(subs[idx])
@@ -383,7 +382,7 @@ func (n *noopServer) Start() error {
config.Address = addr
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
}
n.Lock()
@@ -397,13 +396,13 @@ func (n *noopServer) Start() error {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
@@ -411,13 +410,13 @@ func (n *noopServer) Start() error {
// nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
}
} else {
// announce self to the world
if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server register error: %v", err)
config.Logger.Error(n.opts.Context, "server register error", err)
}
}
}
@@ -450,23 +449,23 @@ func (n *noopServer) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
}
// deregister self in case of error
if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
}
continue
}
if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
}
}
// wait for exit
@@ -478,7 +477,7 @@ func (n *noopServer) Start() error {
// deregister self
if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server deregister error: ", err)
config.Logger.Error(n.opts.Context, "server deregister error", err)
}
}
@@ -491,12 +490,12 @@ func (n *noopServer) Start() error {
ch <- nil
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
}
// disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
}
}
}()
@@ -512,36 +511,33 @@ func (n *noopServer) Start() error {
func (n *noopServer) subscribe() error {
config := n.Options()
cx := config.Context
var err error
var sub broker.Subscriber
subCtx := config.Context
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts := []broker.SubscribeOption{
broker.SubscribeContext(subCtx),
broker.SubscribeAutoAck(sb.Options().AutoAck),
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...)
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
@@ -637,127 +633,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
}
//nolint:gocyclo
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
})
}
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
default:
req = reflect.New(handler.reqType.Elem()).Elem()
}
reqType := handler.reqType
var cf codec.Codec
for _, msg := range msgs {
cf, err = n.newCodec(msg.ContentType())
if err != nil {
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
}
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
}
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(HookBatchSubHandler); ok {
fn = h(fn)
}
})
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
results <- fn(ctxs, msgs)
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
//nolint:gocyclo
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
@@ -815,7 +690,7 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
req = req.Elem()
}
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err
}

View File

@@ -9,7 +9,6 @@ import (
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
@@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
return nil
}
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
}
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
}
return nil
}
func TestNoopSub(t *testing.T) {
ctx := context.Background()
@@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
t.Fatal(err)
}
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
server.SubscriberQueue("queue"),
server.SubscriberBatch(true),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}

View File

@@ -341,8 +341,6 @@ type SubscriberOptions struct {
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
@@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
}
}
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {

View File

@@ -1,5 +1,5 @@
// Package server is an interface for a micro server
package server // import "go.unistack.org/micro/v3/server"
package server
import (
"context"
@@ -11,7 +11,9 @@ import (
)
// DefaultServer default server
var DefaultServer Server = NewServer()
var (
DefaultServer Server = NewServer()
)
var (
// DefaultAddress will be used if no address passed, use secure localhost
@@ -63,10 +65,10 @@ type Server interface {
}
type (
FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
FuncSubHandler func(ctx context.Context, ms Message) error
HookSubHandler func(next FuncSubHandler) FuncSubHandler
FuncSubHandler func(ctx context.Context, ms Message) error
HookSubHandler func(next FuncSubHandler) FuncSubHandler
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
HookHandler func(next FuncHandler) FuncHandler
)
/*

View File

@@ -3,14 +3,12 @@ package server
import (
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
)
const (
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
subSig = "func(context.Context, interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
@@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
switch typ.NumIn() {
case 2:
argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
@@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
@@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
"subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())

View File

@@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,

View File

@@ -1,5 +1,5 @@
// Package store is an interface for distributed data storage.
package store // import "go.unistack.org/micro/v3/store"
package store
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package sync is an interface for distributed synchronization
package sync // import "go.unistack.org/micro/v3/sync"
package sync
import (
"errors"

View File

@@ -83,8 +83,11 @@ func (sk SpanKind) String() string {
// SpanOptions contains span option
type SpanOptions struct {
Labels []interface{}
Kind SpanKind
StatusMsg string
Labels []interface{}
Status SpanStatus
Kind SpanKind
Record bool
}
// SpanOption func signature
@@ -110,12 +113,25 @@ func WithSpanLabels(kv ...interface{}) SpanOption {
}
}
func WithSpanStatus(st SpanStatus, msg string) SpanOption {
return func(o *SpanOptions) {
o.Status = st
o.StatusMsg = msg
}
}
func WithSpanKind(k SpanKind) SpanOption {
return func(o *SpanOptions) {
o.Kind = k
}
}
func WithSpanRecord(b bool) SpanOption {
return func(o *SpanOptions) {
o.Record = b
}
}
// Options struct
type Options struct {
// Context used to store custome tracer options
@@ -124,6 +140,8 @@ type Options struct {
Logger logger.Logger
// Name of the tracer
Name string
// ContextAttrFuncs contains funcs that provides tracing
ContextAttrFuncs []ContextAttrFunc
}
// Option func signature
@@ -148,7 +166,8 @@ func NewEventOptions(opts ...EventOption) EventOptions {
// NewSpanOptions returns default SpanOptions
func NewSpanOptions(opts ...SpanOption) SpanOptions {
options := SpanOptions{
Kind: SpanKindInternal,
Kind: SpanKindInternal,
Record: true,
}
for _, o := range opts {
o(&options)
@@ -159,8 +178,9 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
// NewOptions returns default options
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Context: context.Background(),
Logger: logger.DefaultLogger,
Context: context.Background(),
ContextAttrFuncs: DefaultContextAttrFuncs,
}
for _, o := range opts {
o(&options)

View File

@@ -1,5 +1,5 @@
// Package tracer provides an interface for distributed tracing
package tracer // import "go.unistack.org/micro/v3/tracer"
package tracer
import (
"context"
@@ -7,16 +7,25 @@ import (
"go.unistack.org/micro/v3/logger"
)
// DefaultTracer is the global default tracer
var DefaultTracer Tracer = NewTracer()
var (
// DefaultTracer is the global default tracer
DefaultTracer Tracer = NewTracer() //nolint:revive
// TraceIDKey is the key used for the trace id in the log call
TraceIDKey = "trace-id"
// SpanIDKey is the key used for the span id in the log call
SpanIDKey = "span-id"
// DefaultSkipEndpoints is the slice of endpoint that must not be traced
DefaultSkipEndpoints = []string{
"MeterService.Metrics",
"HealthService.Live",
"HealthService.Ready",
"HealthService.Version",
}
DefaultContextAttrFuncs []ContextAttrFunc
)
type ContextAttrFunc func(ctx context.Context) []interface{}
func init() {
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
func(ctx context.Context) []interface{} {
@@ -38,6 +47,8 @@ type Tracer interface {
Init(...Option) error
// Start a trace
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
// Extract get span metadata from context
// Extract(ctx context.Context)
// Flush flushes spans
Flush(ctx context.Context) error
}

View File

@@ -1,415 +0,0 @@
// Package wrapper provides wrapper for Tracer
package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
import (
"context"
"fmt"
"strings"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
)
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
for _, k := range DefaultHeadersExctract {
if v, ok := md.Get(k); ok {
labels = append(labels, strings.ToLower(k), v)
}
}
return labels
}
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
type tWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type (
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
)
// Options struct
type Options struct {
// Tracer that used for tracing
Tracer tracer.Tracer
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions create Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// WithTracer pass tracer
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.Client.Call(nctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(nctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "stream",
),
)
defer sp.Finish()
stream, err := ot.Client.Stream(nctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(nctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
defer sp.Finish()
sp.AddLabels("messaging.destination.name", msg.Topic())
sp.AddLabels("messaging.operation", "publish")
err := ot.Client.Publish(nctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(nctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}
callType := "unary"
if req.Stream() {
callType = "stream"
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", callType,
),
)
defer sp.Finish()
err := ot.serverHandler(nctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(nctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
defer sp.Finish()
sp.AddLabels("messaging.operation", "process")
sp.AddLabels("messaging.source.name", msg.Topic())
err := ot.serverSubscriber(nctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(nctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(nctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}

View File

@@ -1,4 +1,4 @@
package addr // import "go.unistack.org/micro/v3/util/addr"
package addr
import (
"fmt"
@@ -58,6 +58,7 @@ func IsLocal(addr string) bool {
}
// Extract returns a real ip
//
//nolint:gocyclo
func Extract(addr string) (string, error) {
// if addr specified then its returned

View File

@@ -1,5 +1,5 @@
// Package backoff provides backoff functionality
package backoff // import "go.unistack.org/micro/v3/util/backoff"
package backoff
import (
"math"

View File

@@ -1,4 +1,4 @@
package buf // import "go.unistack.org/micro/v3/util/buf"
package buf
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package http // import "go.unistack.org/micro/v3/util/http"
package http
import (
"context"

View File

@@ -1,4 +1,4 @@
package id // import "go.unistack.org/micro/v3/util/id"
package id
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package io is for io management
package io // import "go.unistack.org/micro/v3/util/io"
package io
import (
"io"

View File

@@ -1,5 +1,5 @@
// Package jitter provides a random jitter
package jitter // import "go.unistack.org/micro/v3/util/jitter"
package jitter
import (
"time"

View File

@@ -1,4 +1,4 @@
package jitter // import "go.unistack.org/micro/v3/util/jitter"
package jitter
import (
"context"

View File

@@ -1,4 +1,4 @@
package net // import "go.unistack.org/micro/v3/util/net"
package net
import (
"errors"

View File

@@ -1,6 +1,6 @@
// Package pki provides PKI all the PKI functions necessary to run micro over an untrusted network
// including a CA
package pki // import "go.unistack.org/micro/v3/util/pki"
package pki
import (
"bytes"

View File

@@ -1,5 +1,5 @@
// Package pool is a connection pool
package pool // import "go.unistack.org/micro/v3/util/pool"
package pool
import (
"context"

View File

@@ -1,4 +1,4 @@
package rand // import "go.unistack.org/micro/v3/util/rand"
package rand
import (
crand "crypto/rand"

View File

@@ -44,6 +44,37 @@ func SliceAppend(b bool) Option {
}
}
var maxDepth = 32
func mergeMap(dst, src map[string]interface{}, depth int) map[string]interface{} {
if depth > maxDepth {
return dst
}
for key, srcVal := range src {
if dstVal, ok := dst[key]; ok {
srcMap, srcMapOk := mapify(srcVal)
dstMap, dstMapOk := mapify(dstVal)
if srcMapOk && dstMapOk {
srcVal = mergeMap(dstMap, srcMap, depth+1)
}
}
dst[key] = srcVal
}
return dst
}
func mapify(i interface{}) (map[string]interface{}, bool) {
value := reflect.ValueOf(i)
if value.Kind() == reflect.Map {
m := map[string]interface{}{}
for _, k := range value.MapKeys() {
m[k.String()] = value.MapIndex(k).Interface()
}
return m, true
}
return map[string]interface{}{}, false
}
// Merge merges map[string]interface{} to destination struct
func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
options := Options{}
@@ -59,6 +90,11 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
return err
}
if mapper, ok := dst.(map[string]interface{}); ok {
dst = mergeMap(mapper, mp, 0)
return nil
}
var err error
var sval reflect.Value
var fname string

View File

@@ -4,6 +4,27 @@ import (
"testing"
)
func TestMergeMap(t *testing.T) {
src := map[string]interface{}{
"skey1": "sval1",
"skey2": map[string]interface{}{
"skey3": "sval3",
},
}
dst := map[string]interface{}{
"skey1": "dval1",
"skey2": map[string]interface{}{
"skey3": "dval3",
},
}
if err := Merge(src, dst); err != nil {
t.Fatal(err)
}
t.Logf("%#+v", src)
}
func TestFieldName(t *testing.T) {
src := "SomeVar"
chk := "some_var"

View File

@@ -25,6 +25,48 @@ type StructField struct {
Field reflect.StructField
}
// StructFieldNameByTag get struct field name by tag key and its value
func StructFieldNameByTag(src interface{}, tkey string, tval string) (string, interface{}, error) {
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return "", nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if len(fld.PkgPath) != 0 {
continue
}
if ts, ok := fld.Tag.Lookup(tkey); ok {
for _, p := range strings.Split(ts, ",") {
if p == tval {
return fld.Name, val.Interface(), nil
}
}
}
switch val.Kind() {
case reflect.Ptr:
if val = val.Elem(); val.Kind() == reflect.Struct {
if name, fld, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
return name, fld, nil
}
}
case reflect.Struct:
if name, fld, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
return name, fld, nil
}
}
}
return "", nil, ErrNotFound
}
// StructFieldByTag get struct field by tag key and its value
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
sv := reflect.ValueOf(src)
@@ -46,9 +88,6 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
if ts, ok := fld.Tag.Lookup(tkey); ok {
for _, p := range strings.Split(ts, ",") {
if p == tval {
if val.Kind() != reflect.Ptr && val.CanAddr() {
val = val.Addr()
}
return val.Interface(), nil
}
}
@@ -72,10 +111,21 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
// ZeroFieldByPath clean struct field by its path
func ZeroFieldByPath(src interface{}, path string) error {
if src == nil {
return nil
}
var err error
val := reflect.ValueOf(src)
if IsEmpty(val) {
return nil
}
for _, p := range strings.Split(path, ".") {
if IsEmpty(val) {
return nil
}
val, err = structValueByName(val, p)
if err != nil {
return err
@@ -493,13 +543,14 @@ func btSplitter(str string) []string {
}
// queryToMap turns something like a[b][c]=4 into
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
//
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
func queryToMap(param string) (map[string]interface{}, error) {
rawKey, rawValue, err := splitKeyAndValue(param)
if err != nil {

View File

@@ -190,9 +190,9 @@ func TestStructFieldByTag(t *testing.T) {
t.Fatal(err)
}
if v, ok := iface.(*[]string); !ok {
if v, ok := iface.([]string); !ok {
t.Fatalf("not *[]string %v", iface)
} else if len(*v) != 2 {
} else if len(v) != 2 {
t.Fatalf("invalid number %v", iface)
}
}

View File

@@ -1,4 +1,4 @@
package register // import "go.unistack.org/micro/v3/util/register"
package register
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package ring provides a simple ring buffer for storing local data
package ring // import "go.unistack.org/micro/v3/util/ring"
package ring
import (
"sync"

View File

@@ -1,5 +1,5 @@
// Package socket provides a pseudo socket
package socket // import "go.unistack.org/micro/v3/util/socket"
package socket
import (
"io"

60
util/sort/sort_test.go Normal file
View File

@@ -0,0 +1,60 @@
package sort
import (
"reflect"
"testing"
)
func TestUniq(t *testing.T) {
type args struct {
labels []interface{}
}
tests := []struct {
name string
args args
want []interface{}
}{
{
name: "test#1",
args: args{
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
},
{
name: "test#2",
args: args{
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-2", 2),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
},
{
name: "test#3",
args: args{
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-2", 3),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 3),
},
{
name: "test#4",
args: args{
labels: append(make([]interface{}, 0),
"test-1", 1, "test-1", 2,
"test-2", 3, "test-2", 2,
"test-3", 5, "test-3", 3,
"test-1", 4, "test-1", 1),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-3", 3),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var got []interface{}
if got = Uniq(tt.args.labels); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Uniq() = %v, want %v", got, tt.want)
}
t.Logf("got-%#v", got)
})
}
}

View File

@@ -1,5 +1,5 @@
// Package stream encapsulates streams within streams
package stream // import "go.unistack.org/micro/v3/util/stream"
package stream
import (
"context"

View File

@@ -1,11 +1,78 @@
package pool
import "sync"
import (
"bytes"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/semconv"
)
var (
pools = make([]Statser, 0)
poolsMu sync.Mutex
)
// Stats struct
type Stats struct {
Get uint64
Put uint64
Mis uint64
Ret uint64
}
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct {
p *sync.Pool
}
func (p Pool[T]) Put(t T) {
p.p.Put(t)
}
func (p Pool[T]) Get() T {
return p.p.Get().(T)
}
func NewPool[T any](fn func() T) Pool[T] {
return Pool[T]{
p: &sync.Pool{
@@ -16,10 +83,155 @@ func NewPool[T any](fn func() T) Pool[T] {
}
}
func (p Pool[T]) Get() T {
return p.p.Get().(T)
type BytePool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
c int
}
func (p Pool[T]) Put(t T) {
p.p.Put(t)
func NewBytePool(size int) *BytePool {
p := &BytePool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size)
return &b
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
func (p *BytePool) Cap() int {
return p.c
}
func (p *BytePool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *BytePool) Get() *[]byte {
atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte)
}
func (p *BytePool) Put(b *[]byte) {
atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c {
atomic.AddUint64(&p.ret, 1)
return
}
*b = (*b)[:0]
p.p.Put(b)
}
type BytesPool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewBytesPool(size int) *BytesPool {
p := &BytesPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size))
return b
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
func (p *BytesPool) Cap() int {
return p.c
}
func (p *BytesPool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *BytesPool) Get() *bytes.Buffer {
return p.p.Get().(*bytes.Buffer)
}
func (p *BytesPool) Put(b *bytes.Buffer) {
if (*b).Cap() > p.c {
atomic.AddUint64(&p.ret, 1)
return
}
b.Reset()
p.p.Put(b)
}
type StringsPool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewStringsPool(size int) *StringsPool {
p := &StringsPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
return &strings.Builder{}
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
func (p *StringsPool) Cap() int {
return p.c
}
func (p *StringsPool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *StringsPool) Get() *strings.Builder {
atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder)
}
func (p *StringsPool) Put(b *strings.Builder) {
atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c {
atomic.AddUint64(&p.ret, 1)
return
}
b.Reset()
p.p.Put(b)
}

View File

@@ -2,12 +2,30 @@ package pool
import (
"bytes"
"strings"
"testing"
)
func TestByte(t *testing.T) {
p := NewBytePool(1024)
b := p.Get()
copy(*b, []byte(`test`))
if bytes.Equal(*b, []byte("test")) {
t.Fatal("pool not works")
}
p.Put(b)
b = p.Get()
for i := 0; i < 1500; i++ {
*b = append(*b, []byte(`test`)...)
}
p.Put(b)
st := p.Stats()
if st.Get != 2 && st.Put != 2 && st.Mis != 1 && st.Ret != 1 {
t.Fatalf("pool stats error %#+v", st)
}
}
func TestBytes(t *testing.T) {
p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) })
p := NewBytesPool(1024)
b := p.Get()
b.Write([]byte(`test`))
if b.String() != "test" {
@@ -17,7 +35,7 @@ func TestBytes(t *testing.T) {
}
func TestStrings(t *testing.T) {
p := NewPool(func() *strings.Builder { return &strings.Builder{} })
p := NewStringsPool(20)
b := p.Get()
b.Write([]byte(`test`))
if b.String() != "test" {