Compare commits
34 Commits
Author | SHA1 | Date | |
---|---|---|---|
82d269cfb4 | |||
6641463eed | |||
faf2454f0a | |||
de9e4d73f5 | |||
4ae7277140 | |||
a98618ed5b | |||
3aaf1182cb | |||
eb1482d789 | |||
a305f7553f | |||
|
d9b2f2a45d | ||
3ace7657dc | |||
53b40617e2 | |||
1a9236caad | |||
6c68d39081 | |||
35e62fbeb0 | |||
00b3ceb468 | |||
7dc8f088c9 | |||
c65afcea1b | |||
3eebfb5b11 | |||
fa1427014c | |||
62074965ee | |||
9c8fbb2202 | |||
7c0a5f5e2a | |||
b08f5321b0 | |||
cc0f24e012 | |||
307a08f50c | |||
edc93e8c37 | |||
391813c260 | |||
1a1459dd0e | |||
4e99680c30 | |||
92a3a547b8 | |||
849c462037 | |||
54a55c83e2 | |||
781dee03db |
@@ -1,5 +1,5 @@
|
||||
// Package broker is an interface used for asynchronous messaging
|
||||
package broker // import "go.unistack.org/micro/v3/broker"
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -88,6 +88,8 @@ type BatchHandler func(Events) error
|
||||
|
||||
// Event is given to a subscription handler for processing
|
||||
type Event interface {
|
||||
// Context return context.Context for event
|
||||
Context() context.Context
|
||||
// Topic returns event topic
|
||||
Topic() string
|
||||
// Message returns broker message
|
||||
|
@@ -373,6 +373,10 @@ func (m *memoryEvent) SetError(err error) {
|
||||
m.err = err
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Context() context.Context {
|
||||
return m.opts.Context
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Options() broker.SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package client is an interface for an RPC client
|
||||
package client // import "go.unistack.org/micro/v3/client"
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
@@ -12,6 +13,8 @@ import (
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/options"
|
||||
"go.unistack.org/micro/v3/selector"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
// DefaultCodecs will be used to encode/decode data
|
||||
@@ -104,10 +107,13 @@ func (n *noopResponse) Read() ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type noopStream struct{}
|
||||
type noopStream struct {
|
||||
err error
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (n *noopStream) Context() context.Context {
|
||||
return context.Background()
|
||||
return n.ctx
|
||||
}
|
||||
|
||||
func (n *noopStream) Request() Request {
|
||||
@@ -135,15 +141,21 @@ func (n *noopStream) RecvMsg(interface{}) error {
|
||||
}
|
||||
|
||||
func (n *noopStream) Error() error {
|
||||
return nil
|
||||
return n.err
|
||||
}
|
||||
|
||||
func (n *noopStream) Close() error {
|
||||
return nil
|
||||
if sp, ok := tracer.SpanFromContext(n.ctx); ok && sp != nil {
|
||||
if n.err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, n.err.Error())
|
||||
}
|
||||
sp.Finish()
|
||||
}
|
||||
return n.err
|
||||
}
|
||||
|
||||
func (n *noopStream) CloseSend() error {
|
||||
return nil
|
||||
return n.err
|
||||
}
|
||||
|
||||
func (n *noopMessage) Topic() string {
|
||||
@@ -207,7 +219,28 @@ func (n *noopClient) String() string {
|
||||
}
|
||||
|
||||
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||
return n.funcCall(ctx, req, rsp, opts...)
|
||||
ts := time.Now()
|
||||
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||
var sp tracer.Span
|
||||
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||
)
|
||||
err := n.funcCall(ctx, req, rsp, opts...)
|
||||
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
||||
te := time.Since(ts)
|
||||
n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||
n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||
|
||||
if me := errors.FromError(err); me == nil {
|
||||
sp.Finish()
|
||||
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
||||
} else {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||
@@ -349,7 +382,28 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp
|
||||
}
|
||||
|
||||
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||
return n.funcStream(ctx, req, opts...)
|
||||
ts := time.Now()
|
||||
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||
var sp tracer.Span
|
||||
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||
)
|
||||
stream, err := n.funcStream(ctx, req, opts...)
|
||||
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
||||
te := time.Since(ts)
|
||||
n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||
n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||
|
||||
if me := errors.FromError(err); me == nil {
|
||||
sp.Finish()
|
||||
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
||||
} else {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
||||
}
|
||||
|
||||
return stream, err
|
||||
}
|
||||
|
||||
func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||
@@ -493,7 +547,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti
|
||||
}
|
||||
|
||||
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
|
||||
return &noopStream{}, nil
|
||||
return &noopStream{ctx: ctx}, nil
|
||||
}
|
||||
|
||||
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||
|
@@ -1,19 +1,8 @@
|
||||
// Package codec is an interface for encoding messages
|
||||
package codec // import "go.unistack.org/micro/v3/codec"
|
||||
package codec
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
// Message types
|
||||
const (
|
||||
Error MessageType = iota
|
||||
Request
|
||||
Response
|
||||
Event
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -24,65 +13,23 @@ var (
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultMaxMsgSize specifies how much data codec can handle
|
||||
DefaultMaxMsgSize = 1024 * 1024 * 4 // 4Mb
|
||||
// DefaultCodec is the global default codec
|
||||
DefaultCodec = NewCodec()
|
||||
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
|
||||
DefaultTagName = "codec"
|
||||
)
|
||||
|
||||
// MessageType specifies message type for codec
|
||||
type MessageType int
|
||||
|
||||
// Codec encodes/decodes various types of messages used within micro.
|
||||
// ReadHeader and ReadBody are called in pairs to read requests/responses
|
||||
// from the connection. Close is called when finished with the
|
||||
// connection. ReadBody may be called with a nil argument to force the
|
||||
// body to be read and discarded.
|
||||
// Codec encodes/decodes various types of messages.
|
||||
type Codec interface {
|
||||
ReadHeader(r io.Reader, m *Message, mt MessageType) error
|
||||
ReadBody(r io.Reader, v interface{}) error
|
||||
Write(w io.Writer, m *Message, v interface{}) error
|
||||
Marshal(v interface{}, opts ...Option) ([]byte, error)
|
||||
Unmarshal(b []byte, v interface{}, opts ...Option) error
|
||||
String() string
|
||||
}
|
||||
|
||||
// Message represents detailed information about
|
||||
// the communication, likely followed by the body.
|
||||
// In the case of an error, body may be nil.
|
||||
type Message struct {
|
||||
Header metadata.Metadata
|
||||
Target string
|
||||
Method string
|
||||
Endpoint string
|
||||
Error string
|
||||
ID string
|
||||
Body []byte
|
||||
Type MessageType
|
||||
}
|
||||
|
||||
// NewMessage creates new codec message
|
||||
func NewMessage(t MessageType) *Message {
|
||||
return &Message{Type: t, Header: metadata.New(0)}
|
||||
}
|
||||
|
||||
// MarshalAppend calls codec.Marshal(v) and returns the data appended to buf.
|
||||
// If codec implements MarshalAppend, that is called instead.
|
||||
func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte, error) {
|
||||
if nc, ok := c.(interface {
|
||||
MarshalAppend([]byte, interface{}, ...Option) ([]byte, error)
|
||||
}); ok {
|
||||
return nc.MarshalAppend(buf, v, opts...)
|
||||
}
|
||||
|
||||
mbuf, err := c.Marshal(v, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append(buf, mbuf...), nil
|
||||
type CodecV2 interface {
|
||||
Marshal(buf []byte, v interface{}, opts ...Option) ([]byte, error)
|
||||
Unmarshal(buf []byte, v interface{}, opts ...Option) error
|
||||
String() string
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
@@ -93,6 +40,8 @@ type RawMessage []byte
|
||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if m == nil {
|
||||
return []byte("null"), nil
|
||||
} else if len(*m) == 0 {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return *m, nil
|
||||
}
|
||||
|
@@ -2,70 +2,14 @@ package codec
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
codecpb "go.unistack.org/micro-proto/v3/codec"
|
||||
)
|
||||
|
||||
type noopCodec struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *noopCodec) ReadBody(conn io.Reader, b interface{}) error {
|
||||
// read bytes
|
||||
buf, err := io.ReadAll(conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch v := b.(type) {
|
||||
case *string:
|
||||
*v = string(buf)
|
||||
case *[]byte:
|
||||
*v = buf
|
||||
case *Frame:
|
||||
v.Data = buf
|
||||
default:
|
||||
return json.Unmarshal(buf, v)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *noopCodec) Write(conn io.Writer, m *Message, b interface{}) error {
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var v []byte
|
||||
switch vb := b.(type) {
|
||||
case *Frame:
|
||||
v = vb.Data
|
||||
case string:
|
||||
v = []byte(vb)
|
||||
case *string:
|
||||
v = []byte(*vb)
|
||||
case *[]byte:
|
||||
v = *vb
|
||||
case []byte:
|
||||
v = vb
|
||||
default:
|
||||
var err error
|
||||
v, err = json.Marshal(vb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err := conn.Write(v)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *noopCodec) String() string {
|
||||
return "noop"
|
||||
}
|
||||
@@ -91,8 +35,8 @@ func (c *noopCodec) Marshal(v interface{}, opts ...Option) ([]byte, error) {
|
||||
return ve, nil
|
||||
case *Frame:
|
||||
return ve.Data, nil
|
||||
case *Message:
|
||||
return ve.Body, nil
|
||||
case *codecpb.Frame:
|
||||
return ve.Data, nil
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
@@ -115,8 +59,8 @@ func (c *noopCodec) Unmarshal(d []byte, v interface{}, opts ...Option) error {
|
||||
case *Frame:
|
||||
ve.Data = d
|
||||
return nil
|
||||
case *Message:
|
||||
ve.Body = d
|
||||
case *codecpb.Frame:
|
||||
ve.Data = d
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -23,15 +23,8 @@ type Options struct {
|
||||
Context context.Context
|
||||
// TagName specifies tag name in struct to control codec
|
||||
TagName string
|
||||
// MaxMsgSize specifies max messages size that reads by codec
|
||||
MaxMsgSize int
|
||||
}
|
||||
|
||||
// MaxMsgSize sets the max message size
|
||||
func MaxMsgSize(n int) Option {
|
||||
return func(o *Options) {
|
||||
o.MaxMsgSize = n
|
||||
}
|
||||
// Flatten specifies that struct must be analyzed for flatten tag
|
||||
Flatten bool
|
||||
}
|
||||
|
||||
// TagName sets the codec tag name in struct
|
||||
@@ -41,6 +34,13 @@ func TagName(n string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Flatten enables checking for flatten tag name
|
||||
func Flatten(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.Flatten = b
|
||||
}
|
||||
}
|
||||
|
||||
// Logger sets the logger
|
||||
func Logger(l logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
@@ -65,12 +65,12 @@ func Meter(m meter.Meter) Option {
|
||||
// NewOptions returns new options
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Context: context.Background(),
|
||||
Logger: logger.DefaultLogger,
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
MaxMsgSize: DefaultMaxMsgSize,
|
||||
TagName: DefaultTagName,
|
||||
Context: context.Background(),
|
||||
Logger: logger.DefaultLogger,
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
TagName: DefaultTagName,
|
||||
Flatten: false,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package config is an interface for dynamic configuration.
|
||||
package config // import "go.unistack.org/micro/v3/config"
|
||||
package config
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// Package errors provides a way to return detailed information
|
||||
// for an RPC request error. The error is normally JSON encoded.
|
||||
package errors // import "go.unistack.org/micro/v3/errors"
|
||||
package errors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -44,6 +44,20 @@ var (
|
||||
ErrGatewayTimeout = &Error{Code: 504}
|
||||
)
|
||||
|
||||
const ProblemContentType = "application/problem+json"
|
||||
|
||||
type Problem struct {
|
||||
Type string `json:"type,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Detail string `json:"detail,omitempty"`
|
||||
Instance string `json:"instance,omitempty"`
|
||||
Errors []struct {
|
||||
Title string `json:"title,omitempty"`
|
||||
Detail string `json:"detail,omitempty"`
|
||||
} `json:"errors,omitempty"`
|
||||
Status int `json:"status,omitempty"`
|
||||
}
|
||||
|
||||
// Error type
|
||||
type Error struct {
|
||||
// ID holds error id or service, usually someting like my_service or id
|
||||
@@ -262,6 +276,10 @@ func CodeIn(err interface{}, codes ...int32) bool {
|
||||
|
||||
// FromError try to convert go error to *Error
|
||||
func FromError(err error) *Error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if verr, ok := err.(*Error); ok && verr != nil {
|
||||
return verr
|
||||
}
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package flow is an interface used for saga pattern microservice workflow
|
||||
package flow // import "go.unistack.org/micro/v3/flow"
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package fsm // import "go.unistack.org/micro/v3/fsm"
|
||||
package fsm
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -4,6 +4,17 @@ import "context"
|
||||
|
||||
type loggerKey struct{}
|
||||
|
||||
// MustContext returns logger from passed context or DefaultLogger if empty
|
||||
func MustContext(ctx context.Context) Logger {
|
||||
if ctx == nil {
|
||||
return DefaultLogger
|
||||
}
|
||||
if l, ok := ctx.Value(loggerKey{}).(Logger); ok && l != nil {
|
||||
return l
|
||||
}
|
||||
return DefaultLogger
|
||||
}
|
||||
|
||||
// FromContext returns logger from passed context
|
||||
func FromContext(ctx context.Context) (Logger, bool) {
|
||||
if ctx == nil {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package logger provides a log interface
|
||||
package logger // import "go.unistack.org/micro/v3/logger"
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -6,6 +6,8 @@ import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
)
|
||||
|
||||
// Option func signature
|
||||
@@ -45,6 +47,8 @@ type Options struct {
|
||||
Level Level
|
||||
// TimeFunc used to obtain current time
|
||||
TimeFunc func() time.Time
|
||||
// Meter used to count logs for specific level
|
||||
Meter meter.Meter
|
||||
}
|
||||
|
||||
// NewOptions creates new options struct
|
||||
@@ -58,6 +62,7 @@ func NewOptions(opts ...Option) Options {
|
||||
ContextAttrFuncs: DefaultContextAttrFuncs,
|
||||
AddSource: true,
|
||||
TimeFunc: time.Now,
|
||||
Meter: meter.DefaultMeter,
|
||||
}
|
||||
|
||||
WithMicroKeys()(&options)
|
||||
@@ -69,7 +74,7 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// WithContextAttrFuncs appends default funcs for the context arrts filler
|
||||
// WithContextAttrFuncs appends default funcs for the context attrs filler
|
||||
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
|
||||
return func(o *Options) {
|
||||
o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...)
|
||||
@@ -132,6 +137,13 @@ func WithName(n string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithMeter sets the meter
|
||||
func WithMeter(m meter.Meter) Option {
|
||||
return func(o *Options) {
|
||||
o.Meter = m
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeFunc sets the func to obtain current time
|
||||
func WithTimeFunc(fn func() time.Time) Option {
|
||||
return func(o *Options) {
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
@@ -150,6 +151,7 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
|
||||
if !s.V(lvl) {
|
||||
return
|
||||
}
|
||||
@@ -189,6 +191,7 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interfa
|
||||
}
|
||||
|
||||
func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
|
||||
if !s.V(lvl) {
|
||||
return
|
||||
}
|
||||
@@ -228,6 +231,7 @@ func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, att
|
||||
}
|
||||
|
||||
func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
|
||||
if !s.V(logger.InfoLevel) {
|
||||
return
|
||||
}
|
||||
@@ -249,6 +253,7 @@ func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
|
||||
if !s.V(logger.InfoLevel) {
|
||||
return
|
||||
}
|
||||
@@ -270,6 +275,7 @@ func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}
|
||||
}
|
||||
|
||||
func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
|
||||
if !s.V(logger.DebugLevel) {
|
||||
return
|
||||
}
|
||||
@@ -291,6 +297,7 @@ func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
|
||||
if !s.V(logger.DebugLevel) {
|
||||
return
|
||||
}
|
||||
@@ -312,6 +319,7 @@ func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{
|
||||
}
|
||||
|
||||
func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
|
||||
if !s.V(logger.TraceLevel) {
|
||||
return
|
||||
}
|
||||
@@ -333,6 +341,7 @@ func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
|
||||
if !s.V(logger.TraceLevel) {
|
||||
return
|
||||
}
|
||||
@@ -354,6 +363,7 @@ func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{
|
||||
}
|
||||
|
||||
func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
|
||||
if !s.V(logger.ErrorLevel) {
|
||||
return
|
||||
}
|
||||
@@ -393,6 +403,7 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
|
||||
if !s.V(logger.ErrorLevel) {
|
||||
return
|
||||
}
|
||||
@@ -432,6 +443,7 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{
|
||||
}
|
||||
|
||||
func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
|
||||
if !s.V(logger.FatalLevel) {
|
||||
return
|
||||
}
|
||||
@@ -454,6 +466,7 @@ func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
|
||||
if !s.V(logger.FatalLevel) {
|
||||
return
|
||||
}
|
||||
@@ -476,6 +489,7 @@ func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{
|
||||
}
|
||||
|
||||
func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
|
||||
if !s.V(logger.WarnLevel) {
|
||||
return
|
||||
}
|
||||
@@ -497,6 +511,7 @@ func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
|
||||
}
|
||||
|
||||
func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) {
|
||||
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
|
||||
if !s.V(logger.WarnLevel) {
|
||||
return
|
||||
}
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package metadata is a way of defining message headers
|
||||
package metadata // import "go.unistack.org/micro/v3/metadata"
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"net/textproto"
|
||||
|
@@ -16,14 +16,19 @@ var (
|
||||
DefaultAddress = ":9090"
|
||||
// DefaultPath the meter endpoint where the Meter data will be made available
|
||||
DefaultPath = "/metrics"
|
||||
// DefaultMetricPrefix holds the string that prepends to all metrics
|
||||
DefaultMetricPrefix = "micro_"
|
||||
// DefaultLabelPrefix holds the string that prepends to all labels
|
||||
DefaultLabelPrefix = "micro_"
|
||||
// DefaultMeterStatsInterval specifies interval for meter updating
|
||||
DefaultMeterStatsInterval = 5 * time.Second
|
||||
// DefaultSummaryQuantiles is the default spread of stats for summary
|
||||
DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1}
|
||||
// DefaultSummaryWindow is the default window for summary
|
||||
DefaultSummaryWindow = 5 * time.Minute
|
||||
// DefaultSkipEndpoints is the slice of endpoint that must not be metered
|
||||
DefaultSkipEndpoints = []string{
|
||||
"MeterService.Metrics",
|
||||
"HealthService.Live",
|
||||
"HealthService.Ready",
|
||||
"HealthService.Version",
|
||||
}
|
||||
)
|
||||
|
||||
// Meter is an interface for collecting and instrumenting metrics
|
||||
|
@@ -2,8 +2,6 @@ package meter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
)
|
||||
|
||||
// Option powers the configuration for metrics implementations:
|
||||
@@ -11,8 +9,6 @@ type Option func(*Options)
|
||||
|
||||
// Options for metrics implementations
|
||||
type Options struct {
|
||||
// Logger used for logging
|
||||
Logger logger.Logger
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Name holds the meter name
|
||||
@@ -21,10 +17,6 @@ type Options struct {
|
||||
Address string
|
||||
// Path holds the path for metrics
|
||||
Path string
|
||||
// MetricPrefix holds the prefix for all metrics
|
||||
MetricPrefix string
|
||||
// LabelPrefix holds the prefix for all labels
|
||||
LabelPrefix string
|
||||
// Labels holds the default labels
|
||||
Labels []string
|
||||
// WriteProcessMetrics flag to write process metrics
|
||||
@@ -36,12 +28,9 @@ type Options struct {
|
||||
// NewOptions prepares a set of options:
|
||||
func NewOptions(opt ...Option) Options {
|
||||
opts := Options{
|
||||
Address: DefaultAddress,
|
||||
Path: DefaultPath,
|
||||
Context: context.Background(),
|
||||
Logger: logger.DefaultLogger,
|
||||
MetricPrefix: DefaultMetricPrefix,
|
||||
LabelPrefix: DefaultLabelPrefix,
|
||||
Address: DefaultAddress,
|
||||
Path: DefaultPath,
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opt {
|
||||
@@ -51,20 +40,6 @@ func NewOptions(opt ...Option) Options {
|
||||
return opts
|
||||
}
|
||||
|
||||
// LabelPrefix sets the labels prefix
|
||||
func LabelPrefix(pref string) Option {
|
||||
return func(o *Options) {
|
||||
o.LabelPrefix = pref
|
||||
}
|
||||
}
|
||||
|
||||
// MetricPrefix sets the metric prefix
|
||||
func MetricPrefix(pref string) Option {
|
||||
return func(o *Options) {
|
||||
o.MetricPrefix = pref
|
||||
}
|
||||
}
|
||||
|
||||
// Context sets the metrics context
|
||||
func Context(ctx context.Context) Option {
|
||||
return func(o *Options) {
|
||||
@@ -95,14 +70,7 @@ func TimingObjectives(value map[float64]float64) Option {
|
||||
}
|
||||
*/
|
||||
|
||||
// Logger sets the logger
|
||||
func Logger(l logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
// Labels sets the meter labels
|
||||
// Labels add the meter labels
|
||||
func Labels(ls ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.Labels = append(o.Labels, ls...)
|
||||
|
@@ -1,347 +0,0 @@
|
||||
package wrapper // import "go.unistack.org/micro/v3/meter/wrapper"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
)
|
||||
|
||||
var (
|
||||
// ClientRequestDurationSeconds specifies meter metric name
|
||||
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
||||
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
||||
// ClientRequestTotal specifies meter metric name
|
||||
ClientRequestTotal = "client_request_total"
|
||||
// ClientRequestInflight specifies meter metric name
|
||||
ClientRequestInflight = "client_request_inflight"
|
||||
// ServerRequestDurationSeconds specifies meter metric name
|
||||
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
||||
// ServerRequestLatencyMicroseconds specifies meter metric name
|
||||
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
||||
// ServerRequestTotal specifies meter metric name
|
||||
ServerRequestTotal = "server_request_total"
|
||||
// ServerRequestInflight specifies meter metric name
|
||||
ServerRequestInflight = "server_request_inflight"
|
||||
// PublishMessageDurationSeconds specifies meter metric name
|
||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||
// PublishMessageLatencyMicroseconds specifies meter metric name
|
||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||
// PublishMessageTotal specifies meter metric name
|
||||
PublishMessageTotal = "publish_message_total"
|
||||
// PublishMessageInflight specifies meter metric name
|
||||
PublishMessageInflight = "publish_message_inflight"
|
||||
// SubscribeMessageDurationSeconds specifies meter metric name
|
||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||
// SubscribeMessageTotal specifies meter metric name
|
||||
SubscribeMessageTotal = "subscribe_message_total"
|
||||
// SubscribeMessageInflight specifies meter metric name
|
||||
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||
|
||||
labelSuccess = "success"
|
||||
labelFailure = "failure"
|
||||
labelStatus = "status"
|
||||
labelEndpoint = "endpoint"
|
||||
|
||||
// DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper
|
||||
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
|
||||
)
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
Meter meter.Meter
|
||||
lopts []meter.Option
|
||||
SkipEndpoints []string
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// NewOptions creates new Options struct
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Meter: meter.DefaultMeter,
|
||||
lopts: make([]meter.Option, 0, 5),
|
||||
SkipEndpoints: DefaultSkipEndpoints,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
// ServiceName passes service name to meter label
|
||||
func ServiceName(name string) Option {
|
||||
return func(o *Options) {
|
||||
o.lopts = append(o.lopts, meter.Labels("name", name))
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceVersion passes service version to meter label
|
||||
func ServiceVersion(version string) Option {
|
||||
return func(o *Options) {
|
||||
o.lopts = append(o.lopts, meter.Labels("version", version))
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceID passes service id to meter label
|
||||
func ServiceID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.lopts = append(o.lopts, meter.Labels("id", id))
|
||||
}
|
||||
}
|
||||
|
||||
// Meter passes meter
|
||||
func Meter(m meter.Meter) Option {
|
||||
return func(o *Options) {
|
||||
o.Meter = m
|
||||
}
|
||||
}
|
||||
|
||||
// SkipEndoints add endpoint to skip
|
||||
func SkipEndoints(eps ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
|
||||
}
|
||||
}
|
||||
|
||||
type wrapper struct {
|
||||
client.Client
|
||||
callFunc client.CallFunc
|
||||
opts Options
|
||||
}
|
||||
|
||||
// NewClientWrapper create new client wrapper
|
||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
return func(c client.Client) client.Client {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
Client: c,
|
||||
}
|
||||
return handler
|
||||
}
|
||||
}
|
||||
|
||||
// NewCallWrapper create new call wrapper
|
||||
func NewCallWrapper(opts ...Option) client.CallWrapper {
|
||||
return func(fn client.CallFunc) client.CallFunc {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
callFunc: fn,
|
||||
}
|
||||
return handler.CallFunc
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return w.callFunc(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return w.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return w.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
|
||||
|
||||
return stream, err
|
||||
}
|
||||
|
||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||
endpoint := p.Topic()
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.Client.Publish(ctx, p, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// NewHandlerWrapper create new server handler wrapper
|
||||
// deprecated
|
||||
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
return handler.HandlerFunc
|
||||
}
|
||||
|
||||
// NewServerHandlerWrapper create new server handler wrapper
|
||||
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
return handler.HandlerFunc
|
||||
}
|
||||
|
||||
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
endpoint := req.Service() + "." + req.Endpoint()
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := fn(ctx, req, rsp)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NewSubscriberWrapper create server subscribe wrapper
|
||||
// deprecated
|
||||
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
return handler.SubscriberFunc
|
||||
}
|
||||
|
||||
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
handler := &wrapper{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
return handler.SubscriberFunc
|
||||
}
|
||||
|
||||
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
|
||||
return func(ctx context.Context, msg server.Message) error {
|
||||
endpoint := msg.Topic()
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := fn(ctx, msg)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
package mtls // import "go.unistack.org/micro/v3/mtls"
|
||||
package mtls
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package network is for creating internetworks
|
||||
package network // import "go.unistack.org/micro/v3/network"
|
||||
package network
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/client"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package transport is an interface for synchronous connection based communication
|
||||
package transport // import "go.unistack.org/micro/v3/network/transport"
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package broker is a tunnel broker
|
||||
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -305,6 +305,10 @@ func (t *tunEvent) SetError(err error) {
|
||||
t.err = err
|
||||
}
|
||||
|
||||
func (t *tunEvent) Context() context.Context {
|
||||
return context.TODO()
|
||||
}
|
||||
|
||||
// NewBroker returns new tunnel broker
|
||||
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
|
||||
options := broker.NewOptions(opts...)
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package transport provides a tunnel transport
|
||||
package transport // import "go.unistack.org/micro/v3/network/tunnel/transport"
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package tunnel provides gre network tunnelling
|
||||
package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel"
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
10
options.go
10
options.go
@@ -269,15 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, mtr := range o.Meters {
|
||||
for _, or := range lopts.meters {
|
||||
if mtr.Name() == or || all {
|
||||
if err = mtr.Init(meter.Logger(l)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, trc := range o.Tracers {
|
||||
for _, ot := range lopts.tracers {
|
||||
if trc.Name() == ot || all {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package http enables the http profiler
|
||||
package http // import "go.unistack.org/micro/v3/profiler/http"
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
|
||||
package pprof // import "go.unistack.org/micro/v3/profiler/pprof"
|
||||
package pprof
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package profiler is for profilers
|
||||
package profiler // import "go.unistack.org/micro/v3/profiler"
|
||||
package profiler
|
||||
|
||||
// Profiler interface
|
||||
type Profiler interface {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package proxy is a transparent proxy built on the micro/server
|
||||
package proxy // import "go.unistack.org/micro/v3/proxy"
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package register is an interface for service discovery
|
||||
package register // import "go.unistack.org/micro/v3/register"
|
||||
package register
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package dns resolves names to dns records
|
||||
package dns // import "go.unistack.org/micro/v3/resolver/dns"
|
||||
package dns
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package dnssrv resolves names to dns srv records
|
||||
package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv"
|
||||
package dnssrv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package http resolves names to network addresses using a http request
|
||||
package http // import "go.unistack.org/micro/v3/resolver/http"
|
||||
package http
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package noop is a noop resolver
|
||||
package noop // import "go.unistack.org/micro/v3/resolver/noop"
|
||||
package noop
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/resolver"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package register resolves names using the micro register
|
||||
package register // import "go.unistack.org/micro/v3/resolver/registry"
|
||||
package register
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package static is a static resolver
|
||||
package static // import "go.unistack.org/micro/v3/resolver/static"
|
||||
package static
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/resolver"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package router provides a network routing control plane
|
||||
package router // import "go.unistack.org/micro/v3/router"
|
||||
package router
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package random // import "go.unistack.org/micro/v3/selector/random"
|
||||
package random
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/selector"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin"
|
||||
package roundrobin
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/selector"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package selector is for node selection and load balancing
|
||||
package selector // import "go.unistack.org/micro/v3/selector"
|
||||
package selector
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
@@ -2,21 +2,21 @@ package semconv
|
||||
|
||||
var (
|
||||
// PublishMessageDurationSeconds specifies meter metric name
|
||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||
PublishMessageDurationSeconds = "micro_publish_message_duration_seconds"
|
||||
// PublishMessageLatencyMicroseconds specifies meter metric name
|
||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||
PublishMessageLatencyMicroseconds = "micro_publish_message_latency_microseconds"
|
||||
// PublishMessageTotal specifies meter metric name
|
||||
PublishMessageTotal = "publish_message_total"
|
||||
PublishMessageTotal = "micro_publish_message_total"
|
||||
// PublishMessageInflight specifies meter metric name
|
||||
PublishMessageInflight = "publish_message_inflight"
|
||||
PublishMessageInflight = "micro_publish_message_inflight"
|
||||
// SubscribeMessageDurationSeconds specifies meter metric name
|
||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||
SubscribeMessageDurationSeconds = "micro_subscribe_message_duration_seconds"
|
||||
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||
SubscribeMessageLatencyMicroseconds = "micro_subscribe_message_latency_microseconds"
|
||||
// SubscribeMessageTotal specifies meter metric name
|
||||
SubscribeMessageTotal = "subscribe_message_total"
|
||||
SubscribeMessageTotal = "micro_subscribe_message_total"
|
||||
// SubscribeMessageInflight specifies meter metric name
|
||||
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||
SubscribeMessageInflight = "micro_subscribe_message_inflight"
|
||||
// BrokerGroupLag specifies broker lag
|
||||
BrokerGroupLag = "broker_group_lag"
|
||||
BrokerGroupLag = "micro_broker_group_lag"
|
||||
)
|
||||
|
@@ -1,12 +0,0 @@
|
||||
package semconv
|
||||
|
||||
var (
|
||||
// CacheRequestDurationSeconds specifies meter metric name
|
||||
CacheRequestDurationSeconds = "cache_request_duration_seconds"
|
||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
||||
CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds"
|
||||
// CacheRequestTotal specifies meter metric name
|
||||
CacheRequestTotal = "cache_request_total"
|
||||
// CacheRequestInflight specifies meter metric name
|
||||
CacheRequestInflight = "cache_request_inflight"
|
||||
)
|
@@ -2,11 +2,11 @@ package semconv
|
||||
|
||||
var (
|
||||
// ClientRequestDurationSeconds specifies meter metric name
|
||||
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
||||
ClientRequestDurationSeconds = "micro_client_request_duration_seconds"
|
||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
||||
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
||||
ClientRequestLatencyMicroseconds = "micro_client_request_latency_microseconds"
|
||||
// ClientRequestTotal specifies meter metric name
|
||||
ClientRequestTotal = "client_request_total"
|
||||
ClientRequestTotal = "micro_client_request_total"
|
||||
// ClientRequestInflight specifies meter metric name
|
||||
ClientRequestInflight = "client_request_inflight"
|
||||
ClientRequestInflight = "micro_client_request_inflight"
|
||||
)
|
||||
|
4
semconv/logger.go
Normal file
4
semconv/logger.go
Normal file
@@ -0,0 +1,4 @@
|
||||
package semconv
|
||||
|
||||
// LoggerMessageTotal specifies meter metric name for logger messages
|
||||
var LoggerMessageTotal = "micro_logger_message_total"
|
@@ -2,11 +2,11 @@ package semconv
|
||||
|
||||
var (
|
||||
// ServerRequestDurationSeconds specifies meter metric name
|
||||
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
||||
ServerRequestDurationSeconds = "micro_server_request_duration_seconds"
|
||||
// ServerRequestLatencyMicroseconds specifies meter metric name
|
||||
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
||||
ServerRequestLatencyMicroseconds = "micro_server_request_latency_microseconds"
|
||||
// ServerRequestTotal specifies meter metric name
|
||||
ServerRequestTotal = "server_request_total"
|
||||
ServerRequestTotal = "micro_server_request_total"
|
||||
// ServerRequestInflight specifies meter metric name
|
||||
ServerRequestInflight = "server_request_inflight"
|
||||
ServerRequestInflight = "micro_server_request_inflight"
|
||||
)
|
||||
|
12
semconv/store.go
Normal file
12
semconv/store.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package semconv
|
||||
|
||||
var (
|
||||
// StoreRequestDurationSeconds specifies meter metric name
|
||||
StoreRequestDurationSeconds = "micro_store_request_duration_seconds"
|
||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
||||
StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds"
|
||||
// StoreRequestTotal specifies meter metric name
|
||||
StoreRequestTotal = "micro_store_request_total"
|
||||
// StoreRequestInflight specifies meter metric name
|
||||
StoreRequestInflight = "micro_store_request_inflight"
|
||||
)
|
185
server/noop.go
185
server/noop.go
@@ -1,7 +1,6 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
@@ -274,7 +273,7 @@ func (n *noopServer) Register() error {
|
||||
|
||||
if !registered {
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
|
||||
config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,7 +311,7 @@ func (n *noopServer) Deregister() error {
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
|
||||
config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
|
||||
}
|
||||
|
||||
if err := DefaultDeregisterFunc(service, config); err != nil {
|
||||
@@ -343,11 +342,11 @@ func (n *noopServer) Deregister() error {
|
||||
go func(s broker.Subscriber) {
|
||||
defer wg.Done()
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
|
||||
config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
|
||||
}
|
||||
if err := s.Unsubscribe(ncx); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
|
||||
config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
|
||||
}
|
||||
}
|
||||
}(subs[idx])
|
||||
@@ -383,7 +382,7 @@ func (n *noopServer) Start() error {
|
||||
config.Address = addr
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
|
||||
config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
@@ -397,13 +396,13 @@ func (n *noopServer) Start() error {
|
||||
// connect to the broker
|
||||
if err := config.Broker.Connect(config.Context); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
|
||||
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
|
||||
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -411,13 +410,13 @@ func (n *noopServer) Start() error {
|
||||
// nolint: nestif
|
||||
if err := config.RegisterCheck(config.Context); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
|
||||
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
|
||||
}
|
||||
} else {
|
||||
// announce self to the world
|
||||
if err := n.Register(); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server register error: %v", err)
|
||||
config.Logger.Error(n.opts.Context, "server register error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -450,23 +449,23 @@ func (n *noopServer) Start() error {
|
||||
// nolint: nestif
|
||||
if rerr != nil && registered {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
|
||||
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
|
||||
}
|
||||
// deregister self in case of error
|
||||
if err := n.Deregister(); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
|
||||
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
|
||||
}
|
||||
}
|
||||
} else if rerr != nil && !registered {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
|
||||
config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := n.Register(); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
|
||||
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
|
||||
}
|
||||
}
|
||||
// wait for exit
|
||||
@@ -478,7 +477,7 @@ func (n *noopServer) Start() error {
|
||||
// deregister self
|
||||
if err := n.Deregister(); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "server deregister error: ", err)
|
||||
config.Logger.Error(n.opts.Context, "server deregister error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -491,12 +490,12 @@ func (n *noopServer) Start() error {
|
||||
ch <- nil
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
|
||||
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
|
||||
}
|
||||
// disconnect broker
|
||||
if err := config.Broker.Disconnect(config.Context); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
|
||||
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -512,36 +511,33 @@ func (n *noopServer) Start() error {
|
||||
func (n *noopServer) subscribe() error {
|
||||
config := n.Options()
|
||||
|
||||
cx := config.Context
|
||||
var err error
|
||||
var sub broker.Subscriber
|
||||
subCtx := config.Context
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{
|
||||
broker.SubscribeContext(subCtx),
|
||||
broker.SubscribeAutoAck(sb.Options().AutoAck),
|
||||
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
|
||||
}
|
||||
|
||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
@@ -637,127 +633,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
return func(ps broker.Events) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
n.RLock()
|
||||
config := n.opts
|
||||
n.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
|
||||
config.Logger.Error(n.opts.Context, string(debug.Stack()))
|
||||
}
|
||||
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
msgs := make([]Message, 0, len(ps))
|
||||
ctxs := make([]context.Context, 0, len(ps))
|
||||
for _, p := range ps {
|
||||
msg := p.Message()
|
||||
// if we don't have headers, create empty map
|
||||
if msg.Header == nil {
|
||||
msg.Header = metadata.New(2)
|
||||
}
|
||||
|
||||
ct, _ := msg.Header.Get(metadata.HeaderContentType)
|
||||
if len(ct) == 0 {
|
||||
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
|
||||
ct = defaultContentType
|
||||
}
|
||||
hdr := metadata.Copy(msg.Header)
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
|
||||
msgs = append(msgs, &rpcMessage{
|
||||
topic: topic,
|
||||
contentType: ct,
|
||||
header: msg.Header,
|
||||
body: msg.Body,
|
||||
})
|
||||
}
|
||||
results := make(chan error, len(sb.handlers))
|
||||
|
||||
for i := 0; i < len(sb.handlers); i++ {
|
||||
handler := sb.handlers[i]
|
||||
|
||||
var req reflect.Value
|
||||
|
||||
switch handler.reqType.Kind() {
|
||||
case reflect.Ptr:
|
||||
req = reflect.New(handler.reqType.Elem())
|
||||
default:
|
||||
req = reflect.New(handler.reqType.Elem()).Elem()
|
||||
}
|
||||
|
||||
reqType := handler.reqType
|
||||
var cf codec.Codec
|
||||
for _, msg := range msgs {
|
||||
cf, err = n.newCodec(msg.ContentType())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rb := reflect.New(req.Type().Elem())
|
||||
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
msg.(*rpcMessage).codec = cf
|
||||
msg.(*rpcMessage).payload = rb.Interface()
|
||||
}
|
||||
|
||||
fn := func(ctxs []context.Context, ms []Message) error {
|
||||
var vals []reflect.Value
|
||||
if sb.typ.Kind() != reflect.Func {
|
||||
vals = append(vals, sb.rcvr)
|
||||
}
|
||||
if handler.ctxType != nil {
|
||||
vals = append(vals, reflect.ValueOf(ctxs))
|
||||
}
|
||||
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
||||
for _, m := range ms {
|
||||
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
|
||||
}
|
||||
vals = append(vals, payloads)
|
||||
|
||||
returnValues := handler.method.Call(vals)
|
||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||
return rerr.(error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
opts.Hooks.EachNext(func(hook options.Hook) {
|
||||
if h, ok := hook.(HookBatchSubHandler); ok {
|
||||
fn = h(fn)
|
||||
}
|
||||
})
|
||||
|
||||
if n.wg != nil {
|
||||
n.wg.Add(1)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if n.wg != nil {
|
||||
defer n.wg.Done()
|
||||
}
|
||||
results <- fn(ctxs, msgs)
|
||||
}()
|
||||
}
|
||||
|
||||
var errors []string
|
||||
for i := 0; i < len(sb.handlers); i++ {
|
||||
if rerr := <-results; rerr != nil {
|
||||
errors = append(errors, rerr.Error())
|
||||
}
|
||||
}
|
||||
if len(errors) > 0 {
|
||||
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
@@ -815,7 +690,7 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
|
||||
req = req.Elem()
|
||||
}
|
||||
|
||||
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
|
||||
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -9,7 +9,6 @@ import (
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
)
|
||||
|
||||
@@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
|
||||
if len(msgs) != 8 {
|
||||
h.t.Fatal("invalid number of messages received")
|
||||
}
|
||||
for idx := 0; idx < len(msgs); idx++ {
|
||||
md, _ := metadata.FromIncomingContext(ctxs[idx])
|
||||
_ = md
|
||||
// fmt.Printf("msg md %v\n", md)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestNoopSub(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
|
||||
server.SubscriberQueue("queue"),
|
||||
server.SubscriberBatch(true),
|
||||
)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@@ -341,8 +341,6 @@ type SubscriberOptions struct {
|
||||
AutoAck bool
|
||||
// BodyOnly flag specifies that message without headers
|
||||
BodyOnly bool
|
||||
// Batch flag specifies that message processed in batches
|
||||
Batch bool
|
||||
// BatchSize flag specifies max size of batch
|
||||
BatchSize int
|
||||
// BatchWait flag specifies max wait time for batch filling
|
||||
@@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberBatch control batch processing for handler
|
||||
func SubscriberBatch(b bool) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
o.Batch = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberBatchSize control batch filling size for handler
|
||||
// Batch filling max waiting time controlled by SubscriberBatchWait
|
||||
func SubscriberBatchSize(n int) SubscriberOption {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package server is an interface for a micro server
|
||||
package server // import "go.unistack.org/micro/v3/server"
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -11,7 +11,9 @@ import (
|
||||
)
|
||||
|
||||
// DefaultServer default server
|
||||
var DefaultServer Server = NewServer()
|
||||
var (
|
||||
DefaultServer Server = NewServer()
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultAddress will be used if no address passed, use secure localhost
|
||||
@@ -63,10 +65,10 @@ type Server interface {
|
||||
}
|
||||
|
||||
type (
|
||||
FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
|
||||
HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
|
||||
FuncSubHandler func(ctx context.Context, ms Message) error
|
||||
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
||||
FuncSubHandler func(ctx context.Context, ms Message) error
|
||||
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
||||
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
|
||||
HookHandler func(next FuncHandler) FuncHandler
|
||||
)
|
||||
|
||||
/*
|
||||
|
@@ -3,14 +3,12 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
const (
|
||||
subSig = "func(context.Context, interface{}) error"
|
||||
batchSubSig = "func([]context.Context, []interface{}) error"
|
||||
subSig = "func(context.Context, interface{}) error"
|
||||
)
|
||||
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
@@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
|
||||
switch typ.NumIn() {
|
||||
case 2:
|
||||
argType = typ.In(1)
|
||||
if sub.Options().Batch {
|
||||
if argType.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
|
||||
}
|
||||
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
|
||||
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
|
||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
|
||||
}
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if typ.NumOut() != 1 {
|
||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
|
||||
name, typ.NumOut(), subSig, batchSubSig)
|
||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
|
||||
name, typ.NumOut(), subSig)
|
||||
}
|
||||
if returnType := typ.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||
@@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
|
||||
case 3:
|
||||
argType = method.Type.In(2)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
|
||||
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
|
||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
||||
name, method.Name, method.Type.NumIn(), subSig)
|
||||
}
|
||||
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
@@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
|
||||
}
|
||||
if method.Type.NumOut() != 1 {
|
||||
return fmt.Errorf(
|
||||
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
|
||||
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
|
||||
"subscriber %v.%v has wrong number of return values: %v require signature %s",
|
||||
name, method.Name, method.Type.NumOut(), subSig)
|
||||
}
|
||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
||||
|
@@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
|
||||
// publication message.
|
||||
type SubscriberFunc func(ctx context.Context, msg Message) error
|
||||
|
||||
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
|
||||
// for the wrappers. What's handed to the actual method is the concrete
|
||||
// publication message. This func used by batch subscribers
|
||||
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
|
||||
|
||||
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
|
||||
type HandlerWrapper func(HandlerFunc) HandlerFunc
|
||||
|
||||
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
|
||||
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
|
||||
|
||||
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
|
||||
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
|
||||
|
||||
// StreamWrapper wraps a Stream interface and returns the equivalent.
|
||||
// Because streams exist for the lifetime of a method invocation this
|
||||
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package store is an interface for distributed data storage.
|
||||
package store // import "go.unistack.org/micro/v3/store"
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package sync is an interface for distributed synchronization
|
||||
package sync // import "go.unistack.org/micro/v3/sync"
|
||||
package sync
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
@@ -83,8 +83,11 @@ func (sk SpanKind) String() string {
|
||||
|
||||
// SpanOptions contains span option
|
||||
type SpanOptions struct {
|
||||
Labels []interface{}
|
||||
Kind SpanKind
|
||||
StatusMsg string
|
||||
Labels []interface{}
|
||||
Status SpanStatus
|
||||
Kind SpanKind
|
||||
Record bool
|
||||
}
|
||||
|
||||
// SpanOption func signature
|
||||
@@ -110,12 +113,25 @@ func WithSpanLabels(kv ...interface{}) SpanOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpanStatus(st SpanStatus, msg string) SpanOption {
|
||||
return func(o *SpanOptions) {
|
||||
o.Status = st
|
||||
o.StatusMsg = msg
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpanKind(k SpanKind) SpanOption {
|
||||
return func(o *SpanOptions) {
|
||||
o.Kind = k
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpanRecord(b bool) SpanOption {
|
||||
return func(o *SpanOptions) {
|
||||
o.Record = b
|
||||
}
|
||||
}
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
// Context used to store custome tracer options
|
||||
@@ -124,6 +140,8 @@ type Options struct {
|
||||
Logger logger.Logger
|
||||
// Name of the tracer
|
||||
Name string
|
||||
// ContextAttrFuncs contains funcs that provides tracing
|
||||
ContextAttrFuncs []ContextAttrFunc
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
@@ -148,7 +166,8 @@ func NewEventOptions(opts ...EventOption) EventOptions {
|
||||
// NewSpanOptions returns default SpanOptions
|
||||
func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
||||
options := SpanOptions{
|
||||
Kind: SpanKindInternal,
|
||||
Kind: SpanKindInternal,
|
||||
Record: true,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@@ -159,8 +178,9 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
||||
// NewOptions returns default options
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Logger: logger.DefaultLogger,
|
||||
Context: context.Background(),
|
||||
Logger: logger.DefaultLogger,
|
||||
Context: context.Background(),
|
||||
ContextAttrFuncs: DefaultContextAttrFuncs,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package tracer provides an interface for distributed tracing
|
||||
package tracer // import "go.unistack.org/micro/v3/tracer"
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -7,16 +7,25 @@ import (
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
)
|
||||
|
||||
// DefaultTracer is the global default tracer
|
||||
var DefaultTracer Tracer = NewTracer()
|
||||
|
||||
var (
|
||||
// DefaultTracer is the global default tracer
|
||||
DefaultTracer Tracer = NewTracer() //nolint:revive
|
||||
// TraceIDKey is the key used for the trace id in the log call
|
||||
TraceIDKey = "trace-id"
|
||||
// SpanIDKey is the key used for the span id in the log call
|
||||
SpanIDKey = "span-id"
|
||||
// DefaultSkipEndpoints is the slice of endpoint that must not be traced
|
||||
DefaultSkipEndpoints = []string{
|
||||
"MeterService.Metrics",
|
||||
"HealthService.Live",
|
||||
"HealthService.Ready",
|
||||
"HealthService.Version",
|
||||
}
|
||||
DefaultContextAttrFuncs []ContextAttrFunc
|
||||
)
|
||||
|
||||
type ContextAttrFunc func(ctx context.Context) []interface{}
|
||||
|
||||
func init() {
|
||||
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
|
||||
func(ctx context.Context) []interface{} {
|
||||
@@ -38,6 +47,8 @@ type Tracer interface {
|
||||
Init(...Option) error
|
||||
// Start a trace
|
||||
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
||||
// Extract get span metadata from context
|
||||
// Extract(ctx context.Context)
|
||||
// Flush flushes spans
|
||||
Flush(ctx context.Context) error
|
||||
}
|
||||
|
@@ -1,415 +0,0 @@
|
||||
// Package wrapper provides wrapper for Tracer
|
||||
package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
|
||||
|
||||
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
|
||||
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
|
||||
for _, k := range DefaultHeadersExctract {
|
||||
if v, ok := md.Get(k); ok {
|
||||
labels = append(labels, strings.ToLower(k), v)
|
||||
}
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
||||
var labels []interface{}
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||
}
|
||||
if err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
sp.AddLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
||||
var labels []interface{}
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||
}
|
||||
if err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
sp.AddLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
||||
var labels []interface{}
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||
}
|
||||
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
|
||||
if err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
sp.AddLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
||||
var labels []interface{}
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||
}
|
||||
if err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
sp.AddLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
||||
var labels []interface{}
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||
}
|
||||
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
|
||||
|
||||
if err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
sp.AddLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
|
||||
var labels []interface{}
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||
}
|
||||
if err != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
sp.AddLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
|
||||
)
|
||||
|
||||
type tWrapper struct {
|
||||
client.Client
|
||||
serverHandler server.HandlerFunc
|
||||
serverSubscriber server.SubscriberFunc
|
||||
clientCallFunc client.CallFunc
|
||||
opts Options
|
||||
}
|
||||
|
||||
type (
|
||||
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
|
||||
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
|
||||
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
|
||||
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
|
||||
ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
||||
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
||||
)
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
// Tracer that used for tracing
|
||||
Tracer tracer.Tracer
|
||||
// ClientCallObservers funcs
|
||||
ClientCallObservers []ClientCallObserver
|
||||
// ClientStreamObservers funcs
|
||||
ClientStreamObservers []ClientStreamObserver
|
||||
// ClientPublishObservers funcs
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
// ClientCallFuncObservers funcs
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
// ServerHandlerObservers funcs
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// ServerSubscriberObservers funcs
|
||||
ServerSubscriberObservers []ServerSubscriberObserver
|
||||
// SkipEndpoints
|
||||
SkipEndpoints []string
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// NewOptions create Options from Option slice
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Tracer: tracer.DefaultTracer,
|
||||
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
|
||||
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
|
||||
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
|
||||
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
|
||||
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
|
||||
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
|
||||
SkipEndpoints: DefaultSkipEndpoints,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// WithTracer pass tracer
|
||||
func WithTracer(t tracer.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
// SkipEndponts
|
||||
func SkipEndpoins(eps ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCallObservers funcs
|
||||
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientStreamObservers funcs
|
||||
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientStreamObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientPublishObservers funcs
|
||||
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientPublishObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCallFuncObservers funcs
|
||||
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallFuncObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithServerHandlerObservers funcs
|
||||
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerHandlerObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithServerSubscriberObservers funcs
|
||||
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerSubscriberObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
|
||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||
tracer.WithSpanLabels(
|
||||
"rpc.service", req.Service(),
|
||||
"rpc.method", req.Method(),
|
||||
"rpc.flavor", "rpc",
|
||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||
"rpc.call_type", "unary",
|
||||
),
|
||||
)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.Client.Call(nctx, req, rsp, opts...)
|
||||
|
||||
for _, o := range ot.opts.ClientCallObservers {
|
||||
o(nctx, req, rsp, opts, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
|
||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||
tracer.WithSpanLabels(
|
||||
"rpc.service", req.Service(),
|
||||
"rpc.method", req.Method(),
|
||||
"rpc.flavor", "rpc",
|
||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||
"rpc.call_type", "stream",
|
||||
),
|
||||
)
|
||||
defer sp.Finish()
|
||||
|
||||
stream, err := ot.Client.Stream(nctx, req, opts...)
|
||||
|
||||
for _, o := range ot.opts.ClientStreamObservers {
|
||||
o(nctx, req, opts, stream, sp, err)
|
||||
}
|
||||
|
||||
return stream, err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
|
||||
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
|
||||
defer sp.Finish()
|
||||
sp.AddLabels("messaging.destination.name", msg.Topic())
|
||||
sp.AddLabels("messaging.operation", "publish")
|
||||
err := ot.Client.Publish(nctx, msg, opts...)
|
||||
|
||||
for _, o := range ot.opts.ClientPublishObservers {
|
||||
o(nctx, msg, opts, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.serverHandler(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
|
||||
callType := "unary"
|
||||
if req.Stream() {
|
||||
callType = "stream"
|
||||
}
|
||||
|
||||
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
|
||||
tracer.WithSpanKind(tracer.SpanKindServer),
|
||||
tracer.WithSpanLabels(
|
||||
"rpc.service", req.Service(),
|
||||
"rpc.method", req.Method(),
|
||||
"rpc.flavor", "rpc",
|
||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||
"rpc.call_type", callType,
|
||||
),
|
||||
)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.serverHandler(nctx, req, rsp)
|
||||
|
||||
for _, o := range ot.opts.ServerHandlerObservers {
|
||||
o(nctx, req, rsp, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
|
||||
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
|
||||
defer sp.Finish()
|
||||
sp.AddLabels("messaging.operation", "process")
|
||||
sp.AddLabels("messaging.source.name", msg.Topic())
|
||||
err := ot.serverSubscriber(nctx, msg)
|
||||
|
||||
for _, o := range ot.opts.ServerSubscriberObservers {
|
||||
o(nctx, msg, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
|
||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
return func(c client.Client) client.Client {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &tWrapper{opts: options, Client: c}
|
||||
}
|
||||
}
|
||||
|
||||
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
|
||||
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
||||
return func(h client.CallFunc) client.CallFunc {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ot := &tWrapper{opts: options, clientCallFunc: h}
|
||||
return ot.ClientCallFunc
|
||||
}
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
|
||||
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
|
||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||
tracer.WithSpanLabels(
|
||||
"rpc.service", req.Service(),
|
||||
"rpc.method", req.Method(),
|
||||
"rpc.flavor", "rpc",
|
||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||
"rpc.call_type", "unary",
|
||||
),
|
||||
)
|
||||
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
|
||||
|
||||
for _, o := range ot.opts.ClientCallFuncObservers {
|
||||
o(nctx, addr, req, rsp, opts, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
|
||||
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
return func(h server.HandlerFunc) server.HandlerFunc {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ot := &tWrapper{opts: options, serverHandler: h}
|
||||
return ot.ServerHandler
|
||||
}
|
||||
}
|
||||
|
||||
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
|
||||
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
return func(h server.SubscriberFunc) server.SubscriberFunc {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ot := &tWrapper{opts: options, serverSubscriber: h}
|
||||
return ot.ServerSubscriber
|
||||
}
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
package addr // import "go.unistack.org/micro/v3/util/addr"
|
||||
package addr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -58,6 +58,7 @@ func IsLocal(addr string) bool {
|
||||
}
|
||||
|
||||
// Extract returns a real ip
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func Extract(addr string) (string, error) {
|
||||
// if addr specified then its returned
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package backoff provides backoff functionality
|
||||
package backoff // import "go.unistack.org/micro/v3/util/backoff"
|
||||
package backoff
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package buf // import "go.unistack.org/micro/v3/util/buf"
|
||||
package buf
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package http // import "go.unistack.org/micro/v3/util/http"
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package id // import "go.unistack.org/micro/v3/util/id"
|
||||
package id
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package io is for io management
|
||||
package io // import "go.unistack.org/micro/v3/util/io"
|
||||
package io
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package jitter provides a random jitter
|
||||
package jitter // import "go.unistack.org/micro/v3/util/jitter"
|
||||
package jitter
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package jitter // import "go.unistack.org/micro/v3/util/jitter"
|
||||
package jitter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package net // import "go.unistack.org/micro/v3/util/net"
|
||||
package net
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// Package pki provides PKI all the PKI functions necessary to run micro over an untrusted network
|
||||
// including a CA
|
||||
package pki // import "go.unistack.org/micro/v3/util/pki"
|
||||
package pki
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package pool is a connection pool
|
||||
package pool // import "go.unistack.org/micro/v3/util/pool"
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package rand // import "go.unistack.org/micro/v3/util/rand"
|
||||
package rand
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
|
@@ -44,6 +44,37 @@ func SliceAppend(b bool) Option {
|
||||
}
|
||||
}
|
||||
|
||||
var maxDepth = 32
|
||||
|
||||
func mergeMap(dst, src map[string]interface{}, depth int) map[string]interface{} {
|
||||
if depth > maxDepth {
|
||||
return dst
|
||||
}
|
||||
for key, srcVal := range src {
|
||||
if dstVal, ok := dst[key]; ok {
|
||||
srcMap, srcMapOk := mapify(srcVal)
|
||||
dstMap, dstMapOk := mapify(dstVal)
|
||||
if srcMapOk && dstMapOk {
|
||||
srcVal = mergeMap(dstMap, srcMap, depth+1)
|
||||
}
|
||||
}
|
||||
dst[key] = srcVal
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func mapify(i interface{}) (map[string]interface{}, bool) {
|
||||
value := reflect.ValueOf(i)
|
||||
if value.Kind() == reflect.Map {
|
||||
m := map[string]interface{}{}
|
||||
for _, k := range value.MapKeys() {
|
||||
m[k.String()] = value.MapIndex(k).Interface()
|
||||
}
|
||||
return m, true
|
||||
}
|
||||
return map[string]interface{}{}, false
|
||||
}
|
||||
|
||||
// Merge merges map[string]interface{} to destination struct
|
||||
func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
|
||||
options := Options{}
|
||||
@@ -59,6 +90,11 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if mapper, ok := dst.(map[string]interface{}); ok {
|
||||
dst = mergeMap(mapper, mp, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
var sval reflect.Value
|
||||
var fname string
|
||||
|
@@ -4,6 +4,27 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMergeMap(t *testing.T) {
|
||||
src := map[string]interface{}{
|
||||
"skey1": "sval1",
|
||||
"skey2": map[string]interface{}{
|
||||
"skey3": "sval3",
|
||||
},
|
||||
}
|
||||
dst := map[string]interface{}{
|
||||
"skey1": "dval1",
|
||||
"skey2": map[string]interface{}{
|
||||
"skey3": "dval3",
|
||||
},
|
||||
}
|
||||
|
||||
if err := Merge(src, dst); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Logf("%#+v", src)
|
||||
}
|
||||
|
||||
func TestFieldName(t *testing.T) {
|
||||
src := "SomeVar"
|
||||
chk := "some_var"
|
||||
|
@@ -25,6 +25,48 @@ type StructField struct {
|
||||
Field reflect.StructField
|
||||
}
|
||||
|
||||
// StructFieldNameByTag get struct field name by tag key and its value
|
||||
func StructFieldNameByTag(src interface{}, tkey string, tval string) (string, interface{}, error) {
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
sv = sv.Elem()
|
||||
}
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return "", nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if ts, ok := fld.Tag.Lookup(tkey); ok {
|
||||
for _, p := range strings.Split(ts, ",") {
|
||||
if p == tval {
|
||||
return fld.Name, val.Interface(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case reflect.Ptr:
|
||||
if val = val.Elem(); val.Kind() == reflect.Struct {
|
||||
if name, fld, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
|
||||
return name, fld, nil
|
||||
}
|
||||
}
|
||||
case reflect.Struct:
|
||||
if name, fld, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
|
||||
return name, fld, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", nil, ErrNotFound
|
||||
}
|
||||
|
||||
// StructFieldByTag get struct field by tag key and its value
|
||||
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
|
||||
sv := reflect.ValueOf(src)
|
||||
@@ -46,9 +88,6 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
||||
if ts, ok := fld.Tag.Lookup(tkey); ok {
|
||||
for _, p := range strings.Split(ts, ",") {
|
||||
if p == tval {
|
||||
if val.Kind() != reflect.Ptr && val.CanAddr() {
|
||||
val = val.Addr()
|
||||
}
|
||||
return val.Interface(), nil
|
||||
}
|
||||
}
|
||||
@@ -72,10 +111,21 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
||||
|
||||
// ZeroFieldByPath clean struct field by its path
|
||||
func ZeroFieldByPath(src interface{}, path string) error {
|
||||
if src == nil {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
val := reflect.ValueOf(src)
|
||||
|
||||
if IsEmpty(val) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, p := range strings.Split(path, ".") {
|
||||
if IsEmpty(val) {
|
||||
return nil
|
||||
}
|
||||
|
||||
val, err = structValueByName(val, p)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -493,13 +543,14 @@ func btSplitter(str string) []string {
|
||||
}
|
||||
|
||||
// queryToMap turns something like a[b][c]=4 into
|
||||
// map[string]interface{}{
|
||||
// "a": map[string]interface{}{
|
||||
// "b": map[string]interface{}{
|
||||
// "c": 4,
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// map[string]interface{}{
|
||||
// "a": map[string]interface{}{
|
||||
// "b": map[string]interface{}{
|
||||
// "c": 4,
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
func queryToMap(param string) (map[string]interface{}, error) {
|
||||
rawKey, rawValue, err := splitKeyAndValue(param)
|
||||
if err != nil {
|
||||
|
@@ -190,9 +190,9 @@ func TestStructFieldByTag(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v, ok := iface.(*[]string); !ok {
|
||||
if v, ok := iface.([]string); !ok {
|
||||
t.Fatalf("not *[]string %v", iface)
|
||||
} else if len(*v) != 2 {
|
||||
} else if len(v) != 2 {
|
||||
t.Fatalf("invalid number %v", iface)
|
||||
}
|
||||
}
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package register // import "go.unistack.org/micro/v3/util/register"
|
||||
package register
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package ring provides a simple ring buffer for storing local data
|
||||
package ring // import "go.unistack.org/micro/v3/util/ring"
|
||||
package ring
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
@@ -1,5 +1,5 @@
|
||||
// Package socket provides a pseudo socket
|
||||
package socket // import "go.unistack.org/micro/v3/util/socket"
|
||||
package socket
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
60
util/sort/sort_test.go
Normal file
60
util/sort/sort_test.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package sort
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUniq(t *testing.T) {
|
||||
type args struct {
|
||||
labels []interface{}
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []interface{}
|
||||
}{
|
||||
{
|
||||
name: "test#1",
|
||||
args: args{
|
||||
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
|
||||
},
|
||||
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
|
||||
},
|
||||
{
|
||||
name: "test#2",
|
||||
args: args{
|
||||
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-2", 2),
|
||||
},
|
||||
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
|
||||
},
|
||||
{
|
||||
name: "test#3",
|
||||
args: args{
|
||||
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-2", 3),
|
||||
},
|
||||
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 3),
|
||||
},
|
||||
{
|
||||
name: "test#4",
|
||||
args: args{
|
||||
labels: append(make([]interface{}, 0),
|
||||
"test-1", 1, "test-1", 2,
|
||||
"test-2", 3, "test-2", 2,
|
||||
"test-3", 5, "test-3", 3,
|
||||
"test-1", 4, "test-1", 1),
|
||||
},
|
||||
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-3", 3),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var got []interface{}
|
||||
if got = Uniq(tt.args.labels); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("Uniq() = %v, want %v", got, tt.want)
|
||||
}
|
||||
t.Logf("got-%#v", got)
|
||||
})
|
||||
}
|
||||
}
|
@@ -1,5 +1,5 @@
|
||||
// Package stream encapsulates streams within streams
|
||||
package stream // import "go.unistack.org/micro/v3/util/stream"
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@@ -1,11 +1,78 @@
|
||||
package pool
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"bytes"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
)
|
||||
|
||||
var (
|
||||
pools = make([]Statser, 0)
|
||||
poolsMu sync.Mutex
|
||||
)
|
||||
|
||||
// Stats struct
|
||||
type Stats struct {
|
||||
Get uint64
|
||||
Put uint64
|
||||
Mis uint64
|
||||
Ret uint64
|
||||
}
|
||||
|
||||
// Statser provides buffer pool stats
|
||||
type Statser interface {
|
||||
Stats() Stats
|
||||
Cap() int
|
||||
}
|
||||
|
||||
func init() {
|
||||
go newStatsMeter()
|
||||
}
|
||||
|
||||
func newStatsMeter() {
|
||||
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
poolsMu.Lock()
|
||||
for _, st := range pools {
|
||||
stats := st.Stats()
|
||||
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
||||
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
||||
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
||||
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
||||
}
|
||||
poolsMu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
_ Statser = (*BytePool)(nil)
|
||||
_ Statser = (*BytesPool)(nil)
|
||||
_ Statser = (*StringsPool)(nil)
|
||||
)
|
||||
|
||||
type Pool[T any] struct {
|
||||
p *sync.Pool
|
||||
}
|
||||
|
||||
func (p Pool[T]) Put(t T) {
|
||||
p.p.Put(t)
|
||||
}
|
||||
|
||||
func (p Pool[T]) Get() T {
|
||||
return p.p.Get().(T)
|
||||
}
|
||||
|
||||
func NewPool[T any](fn func() T) Pool[T] {
|
||||
return Pool[T]{
|
||||
p: &sync.Pool{
|
||||
@@ -16,10 +83,155 @@ func NewPool[T any](fn func() T) Pool[T] {
|
||||
}
|
||||
}
|
||||
|
||||
func (p Pool[T]) Get() T {
|
||||
return p.p.Get().(T)
|
||||
type BytePool struct {
|
||||
p *sync.Pool
|
||||
get uint64
|
||||
put uint64
|
||||
mis uint64
|
||||
ret uint64
|
||||
c int
|
||||
}
|
||||
|
||||
func (p Pool[T]) Put(t T) {
|
||||
p.p.Put(t)
|
||||
func NewBytePool(size int) *BytePool {
|
||||
p := &BytePool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
atomic.AddUint64(&p.mis, 1)
|
||||
b := make([]byte, 0, size)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
poolsMu.Lock()
|
||||
pools = append(pools, p)
|
||||
poolsMu.Unlock()
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *BytePool) Cap() int {
|
||||
return p.c
|
||||
}
|
||||
|
||||
func (p *BytePool) Stats() Stats {
|
||||
return Stats{
|
||||
Put: atomic.LoadUint64(&p.put),
|
||||
Get: atomic.LoadUint64(&p.get),
|
||||
Mis: atomic.LoadUint64(&p.mis),
|
||||
Ret: atomic.LoadUint64(&p.ret),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *BytePool) Get() *[]byte {
|
||||
atomic.AddUint64(&p.get, 1)
|
||||
return p.p.Get().(*[]byte)
|
||||
}
|
||||
|
||||
func (p *BytePool) Put(b *[]byte) {
|
||||
atomic.AddUint64(&p.put, 1)
|
||||
if cap(*b) > p.c {
|
||||
atomic.AddUint64(&p.ret, 1)
|
||||
return
|
||||
}
|
||||
*b = (*b)[:0]
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
||||
type BytesPool struct {
|
||||
p *sync.Pool
|
||||
get uint64
|
||||
put uint64
|
||||
mis uint64
|
||||
ret uint64
|
||||
c int
|
||||
}
|
||||
|
||||
func NewBytesPool(size int) *BytesPool {
|
||||
p := &BytesPool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
atomic.AddUint64(&p.mis, 1)
|
||||
b := bytes.NewBuffer(make([]byte, 0, size))
|
||||
return b
|
||||
},
|
||||
}
|
||||
poolsMu.Lock()
|
||||
pools = append(pools, p)
|
||||
poolsMu.Unlock()
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *BytesPool) Cap() int {
|
||||
return p.c
|
||||
}
|
||||
|
||||
func (p *BytesPool) Stats() Stats {
|
||||
return Stats{
|
||||
Put: atomic.LoadUint64(&p.put),
|
||||
Get: atomic.LoadUint64(&p.get),
|
||||
Mis: atomic.LoadUint64(&p.mis),
|
||||
Ret: atomic.LoadUint64(&p.ret),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *BytesPool) Get() *bytes.Buffer {
|
||||
return p.p.Get().(*bytes.Buffer)
|
||||
}
|
||||
|
||||
func (p *BytesPool) Put(b *bytes.Buffer) {
|
||||
if (*b).Cap() > p.c {
|
||||
atomic.AddUint64(&p.ret, 1)
|
||||
return
|
||||
}
|
||||
b.Reset()
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
||||
type StringsPool struct {
|
||||
p *sync.Pool
|
||||
get uint64
|
||||
put uint64
|
||||
mis uint64
|
||||
ret uint64
|
||||
c int
|
||||
}
|
||||
|
||||
func NewStringsPool(size int) *StringsPool {
|
||||
p := &StringsPool{c: size}
|
||||
p.p = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
atomic.AddUint64(&p.mis, 1)
|
||||
return &strings.Builder{}
|
||||
},
|
||||
}
|
||||
poolsMu.Lock()
|
||||
pools = append(pools, p)
|
||||
poolsMu.Unlock()
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *StringsPool) Cap() int {
|
||||
return p.c
|
||||
}
|
||||
|
||||
func (p *StringsPool) Stats() Stats {
|
||||
return Stats{
|
||||
Put: atomic.LoadUint64(&p.put),
|
||||
Get: atomic.LoadUint64(&p.get),
|
||||
Mis: atomic.LoadUint64(&p.mis),
|
||||
Ret: atomic.LoadUint64(&p.ret),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *StringsPool) Get() *strings.Builder {
|
||||
atomic.AddUint64(&p.get, 1)
|
||||
return p.p.Get().(*strings.Builder)
|
||||
}
|
||||
|
||||
func (p *StringsPool) Put(b *strings.Builder) {
|
||||
atomic.AddUint64(&p.put, 1)
|
||||
if b.Cap() > p.c {
|
||||
atomic.AddUint64(&p.ret, 1)
|
||||
return
|
||||
}
|
||||
b.Reset()
|
||||
p.p.Put(b)
|
||||
}
|
||||
|
@@ -2,12 +2,30 @@ package pool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestByte(t *testing.T) {
|
||||
p := NewBytePool(1024)
|
||||
b := p.Get()
|
||||
copy(*b, []byte(`test`))
|
||||
if bytes.Equal(*b, []byte("test")) {
|
||||
t.Fatal("pool not works")
|
||||
}
|
||||
p.Put(b)
|
||||
b = p.Get()
|
||||
for i := 0; i < 1500; i++ {
|
||||
*b = append(*b, []byte(`test`)...)
|
||||
}
|
||||
p.Put(b)
|
||||
st := p.Stats()
|
||||
if st.Get != 2 && st.Put != 2 && st.Mis != 1 && st.Ret != 1 {
|
||||
t.Fatalf("pool stats error %#+v", st)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBytes(t *testing.T) {
|
||||
p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) })
|
||||
p := NewBytesPool(1024)
|
||||
b := p.Get()
|
||||
b.Write([]byte(`test`))
|
||||
if b.String() != "test" {
|
||||
@@ -17,7 +35,7 @@ func TestBytes(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStrings(t *testing.T) {
|
||||
p := NewPool(func() *strings.Builder { return &strings.Builder{} })
|
||||
p := NewStringsPool(20)
|
||||
b := p.Get()
|
||||
b.Write([]byte(`test`))
|
||||
if b.String() != "test" {
|
||||
|
Reference in New Issue
Block a user