Compare commits

..

27 Commits

Author SHA1 Message Date
71fe0df73f use automaxproc and automemlimit
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-10-06 13:50:59 +03:00
f1b8ecbdb3 store: add new ErrNotConnected error
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-10-05 14:46:22 +03:00
fd2b2762e9 fixup missing xpool dep
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-30 09:57:07 +03:00
82d269cfb4 xpool: add metrics
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-29 22:58:53 +03:00
6641463eed util/reflect: add ability to merge maps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 19:22:20 +03:00
faf2454f0a cleanup
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 17:54:17 +03:00
de9e4d73f5 change semconv metric names to include micro_ prefix
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 08:38:36 +03:00
4ae7277140 meter: remove prefix options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 08:27:25 +03:00
a98618ed5b add codec.Flatten option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-16 23:10:43 +03:00
3aaf1182cb add codec option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-16 23:02:45 +03:00
eb1482d789 codec: simplify codec interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-16 22:41:47 +03:00
a305f7553f Merge pull request '#347 add test' (#349) from kgorbunov/micro:#347-v3 into v3
Reviewed-on: #349
2024-09-16 14:59:58 +03:00
Gorbunov Kirill Andreevich
d9b2f2a45d #347 add test
Some checks failed
pr / test (pull_request) Failing after 0s
lint / lint (pull_request) Failing after 1s
2024-09-16 14:48:47 +03:00
3ace7657dc codec: RawMessage Marshal fix
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 10:43:45 +03:00
53b40617e2 fixup util/xpool
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-04 23:06:40 +03:00
1a9236caad update meter options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-04 22:41:10 +03:00
6c68d39081 errors: add RFC9457 problem type
closes #297

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-08-01 01:06:02 +03:00
35e62fbeb0 tracer: add default context attr funcs option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-06 00:09:27 +03:00
00b3ceb468 smeconv: fix naming
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-07-04 14:56:48 +03:00
7dc8f088c9 Merge pull request 'fix impl interface' (#346) from devstigneev/micro:fix_impl_mevent into v3
Reviewed-on: #346
2024-07-01 12:26:53 +03:00
c65afcea1b fix impl interface
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Has been cancelled
2024-07-01 09:47:51 +03:00
3eebfb5b11 Обновить options.go 2024-05-10 08:12:10 +03:00
fa1427014c close #343
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-09 19:16:12 +03:00
62074965ee close #329
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-09 16:41:22 +03:00
9c8fbb2202 broker: add Event Context() method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-05 16:22:06 +03:00
7c0a5f5e2a add abilit to skip span recording
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 19:31:35 +03:00
b08f5321b0 tracer: allow to skip span recording
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 19:18:12 +03:00
78 changed files with 660 additions and 1046 deletions

View File

@@ -1,5 +1,5 @@
// Package broker is an interface used for asynchronous messaging
package broker // import "go.unistack.org/micro/v3/broker"
package broker
import (
"context"
@@ -88,6 +88,8 @@ type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Context return context.Context for event
Context() context.Context
// Topic returns event topic
Topic() string
// Message returns broker message

View File

@@ -373,6 +373,10 @@ func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memoryEvent) Context() context.Context {
return m.opts.Context
}
func (m *memorySubscriber) Options() broker.SubscribeOptions {
return m.opts
}

View File

@@ -1,5 +1,5 @@
// Package client is an interface for an RPC client
package client // import "go.unistack.org/micro/v3/client"
package client
import (
"context"

View File

@@ -1,19 +1,8 @@
// Package codec is an interface for encoding messages
package codec // import "go.unistack.org/micro/v3/codec"
package codec
import (
"errors"
"io"
"go.unistack.org/micro/v3/metadata"
)
// Message types
const (
Error MessageType = iota
Request
Response
Event
)
var (
@@ -24,65 +13,23 @@ var (
)
var (
// DefaultMaxMsgSize specifies how much data codec can handle
DefaultMaxMsgSize = 1024 * 1024 * 4 // 4Mb
// DefaultCodec is the global default codec
DefaultCodec = NewCodec()
// DefaultTagName specifies struct tag name to control codec Marshal/Unmarshal
DefaultTagName = "codec"
)
// MessageType specifies message type for codec
type MessageType int
// Codec encodes/decodes various types of messages used within micro.
// ReadHeader and ReadBody are called in pairs to read requests/responses
// from the connection. Close is called when finished with the
// connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded.
// Codec encodes/decodes various types of messages.
type Codec interface {
ReadHeader(r io.Reader, m *Message, mt MessageType) error
ReadBody(r io.Reader, v interface{}) error
Write(w io.Writer, m *Message, v interface{}) error
Marshal(v interface{}, opts ...Option) ([]byte, error)
Unmarshal(b []byte, v interface{}, opts ...Option) error
String() string
}
// Message represents detailed information about
// the communication, likely followed by the body.
// In the case of an error, body may be nil.
type Message struct {
Header metadata.Metadata
Target string
Method string
Endpoint string
Error string
ID string
Body []byte
Type MessageType
}
// NewMessage creates new codec message
func NewMessage(t MessageType) *Message {
return &Message{Type: t, Header: metadata.New(0)}
}
// MarshalAppend calls codec.Marshal(v) and returns the data appended to buf.
// If codec implements MarshalAppend, that is called instead.
func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte, error) {
if nc, ok := c.(interface {
MarshalAppend([]byte, interface{}, ...Option) ([]byte, error)
}); ok {
return nc.MarshalAppend(buf, v, opts...)
}
mbuf, err := c.Marshal(v, opts...)
if err != nil {
return nil, err
}
return append(buf, mbuf...), nil
type CodecV2 interface {
Marshal(buf []byte, v interface{}, opts ...Option) ([]byte, error)
Unmarshal(buf []byte, v interface{}, opts ...Option) error
String() string
}
// RawMessage is a raw encoded JSON value.
@@ -93,6 +40,8 @@ type RawMessage []byte
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
} else if len(*m) == 0 {
return []byte("null"), nil
}
return *m, nil
}

View File

@@ -2,70 +2,14 @@ package codec
import (
"encoding/json"
"io"
codecpb "go.unistack.org/micro-proto/v3/codec"
)
type noopCodec struct {
opts Options
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil
}
func (c *noopCodec) ReadBody(conn io.Reader, b interface{}) error {
// read bytes
buf, err := io.ReadAll(conn)
if err != nil {
return err
}
if b == nil {
return nil
}
switch v := b.(type) {
case *string:
*v = string(buf)
case *[]byte:
*v = buf
case *Frame:
v.Data = buf
default:
return json.Unmarshal(buf, v)
}
return nil
}
func (c *noopCodec) Write(conn io.Writer, m *Message, b interface{}) error {
if b == nil {
return nil
}
var v []byte
switch vb := b.(type) {
case *Frame:
v = vb.Data
case string:
v = []byte(vb)
case *string:
v = []byte(*vb)
case *[]byte:
v = *vb
case []byte:
v = vb
default:
var err error
v, err = json.Marshal(vb)
if err != nil {
return err
}
}
_, err := conn.Write(v)
return err
}
func (c *noopCodec) String() string {
return "noop"
}
@@ -91,8 +35,8 @@ func (c *noopCodec) Marshal(v interface{}, opts ...Option) ([]byte, error) {
return ve, nil
case *Frame:
return ve.Data, nil
case *Message:
return ve.Body, nil
case *codecpb.Frame:
return ve.Data, nil
}
return json.Marshal(v)
@@ -115,8 +59,8 @@ func (c *noopCodec) Unmarshal(d []byte, v interface{}, opts ...Option) error {
case *Frame:
ve.Data = d
return nil
case *Message:
ve.Body = d
case *codecpb.Frame:
ve.Data = d
return nil
}

View File

@@ -23,15 +23,8 @@ type Options struct {
Context context.Context
// TagName specifies tag name in struct to control codec
TagName string
// MaxMsgSize specifies max messages size that reads by codec
MaxMsgSize int
}
// MaxMsgSize sets the max message size
func MaxMsgSize(n int) Option {
return func(o *Options) {
o.MaxMsgSize = n
}
// Flatten specifies that struct must be analyzed for flatten tag
Flatten bool
}
// TagName sets the codec tag name in struct
@@ -41,6 +34,13 @@ func TagName(n string) Option {
}
}
// Flatten enables checking for flatten tag name
func Flatten(b bool) Option {
return func(o *Options) {
o.Flatten = b
}
}
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
@@ -65,12 +65,12 @@ func Meter(m meter.Meter) Option {
// NewOptions returns new options
func NewOptions(opts ...Option) Options {
options := Options{
Context: context.Background(),
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
MaxMsgSize: DefaultMaxMsgSize,
TagName: DefaultTagName,
Context: context.Background(),
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
TagName: DefaultTagName,
Flatten: false,
}
for _, o := range opts {

View File

@@ -1,5 +1,5 @@
// Package config is an interface for dynamic configuration.
package config // import "go.unistack.org/micro/v3/config"
package config
import (
"context"

View File

@@ -1,6 +1,6 @@
// Package errors provides a way to return detailed information
// for an RPC request error. The error is normally JSON encoded.
package errors // import "go.unistack.org/micro/v3/errors"
package errors
import (
"bytes"
@@ -44,6 +44,20 @@ var (
ErrGatewayTimeout = &Error{Code: 504}
)
const ProblemContentType = "application/problem+json"
type Problem struct {
Type string `json:"type,omitempty"`
Title string `json:"title,omitempty"`
Detail string `json:"detail,omitempty"`
Instance string `json:"instance,omitempty"`
Errors []struct {
Title string `json:"title,omitempty"`
Detail string `json:"detail,omitempty"`
} `json:"errors,omitempty"`
Status int `json:"status,omitempty"`
}
// Error type
type Error struct {
// ID holds error id or service, usually someting like my_service or id

View File

@@ -1,5 +1,5 @@
// Package flow is an interface used for saga pattern microservice workflow
package flow // import "go.unistack.org/micro/v3/flow"
package flow
import (
"context"

View File

@@ -1,4 +1,4 @@
package fsm // import "go.unistack.org/micro/v3/fsm"
package fsm
import (
"context"

16
go.mod
View File

@@ -1,20 +1,32 @@
module go.unistack.org/micro/v3
go 1.20
go 1.22
require (
dario.cat/mergo v1.0.0
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/KimMachineGun/automemlimit v0.6.1
github.com/google/uuid v1.3.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v3 v3.4.1
golang.org/x/sync v0.3.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.33.0
)
require (
github.com/cilium/ebpf v0.9.1 // indirect
github.com/containerd/cgroups/v3 v3.0.1 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect
)

49
go.sum
View File

@@ -2,23 +2,68 @@ dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8=
github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY=
github.com/cilium/ebpf v0.9.1 h1:64sn2K3UKw8NbP/blsixRpF3nXuyhz/VjRlRzvlBRu4=
github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY=
github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5ToenJRaE=
github.com/containerd/cgroups/v3 v3.0.1/go.mod h1:/vtwk1VXrtoa5AaZLkypuOJgA/6DyPMZHJPGQNtlHnw=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0=
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
@@ -26,8 +71,8 @@ google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -4,6 +4,17 @@ import "context"
type loggerKey struct{}
// MustContext returns logger from passed context or DefaultLogger if empty
func MustContext(ctx context.Context) Logger {
if ctx == nil {
return DefaultLogger
}
if l, ok := ctx.Value(loggerKey{}).(Logger); ok && l != nil {
return l
}
return DefaultLogger
}
// FromContext returns logger from passed context
func FromContext(ctx context.Context) (Logger, bool) {
if ctx == nil {

View File

@@ -1,5 +1,5 @@
// Package logger provides a log interface
package logger // import "go.unistack.org/micro/v3/logger"
package logger
import (
"context"

View File

@@ -6,6 +6,8 @@ import (
"log/slog"
"os"
"time"
"go.unistack.org/micro/v3/meter"
)
// Option func signature
@@ -45,6 +47,8 @@ type Options struct {
Level Level
// TimeFunc used to obtain current time
TimeFunc func() time.Time
// Meter used to count logs for specific level
Meter meter.Meter
}
// NewOptions creates new options struct
@@ -58,6 +62,7 @@ func NewOptions(opts ...Option) Options {
ContextAttrFuncs: DefaultContextAttrFuncs,
AddSource: true,
TimeFunc: time.Now,
Meter: meter.DefaultMeter,
}
WithMicroKeys()(&options)
@@ -69,7 +74,7 @@ func NewOptions(opts ...Option) Options {
return options
}
// WithContextAttrFuncs appends default funcs for the context arrts filler
// WithContextAttrFuncs appends default funcs for the context attrs filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) {
o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...)
@@ -132,6 +137,13 @@ func WithName(n string) Option {
}
}
// WithMeter sets the meter
func WithMeter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// WithTimeFunc sets the func to obtain current time
func WithTimeFunc(fn func() time.Time) Option {
return func(o *Options) {

View File

@@ -11,6 +11,7 @@ import (
"sync"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/tracer"
)
@@ -150,6 +151,7 @@ func (s *slogLogger) Init(opts ...logger.Option) error {
}
func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
if !s.V(lvl) {
return
}
@@ -189,6 +191,7 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interfa
}
func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", lvl.String()).Inc()
if !s.V(lvl) {
return
}
@@ -228,6 +231,7 @@ func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, att
}
func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
if !s.V(logger.InfoLevel) {
return
}
@@ -249,6 +253,7 @@ func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.InfoLevel.String()).Inc()
if !s.V(logger.InfoLevel) {
return
}
@@ -270,6 +275,7 @@ func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}
}
func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
if !s.V(logger.DebugLevel) {
return
}
@@ -291,6 +297,7 @@ func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.DebugLevel.String()).Inc()
if !s.V(logger.DebugLevel) {
return
}
@@ -312,6 +319,7 @@ func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
if !s.V(logger.TraceLevel) {
return
}
@@ -333,6 +341,7 @@ func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.TraceLevel.String()).Inc()
if !s.V(logger.TraceLevel) {
return
}
@@ -354,6 +363,7 @@ func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
if !s.V(logger.ErrorLevel) {
return
}
@@ -393,6 +403,7 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.ErrorLevel.String()).Inc()
if !s.V(logger.ErrorLevel) {
return
}
@@ -432,6 +443,7 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
if !s.V(logger.FatalLevel) {
return
}
@@ -454,6 +466,7 @@ func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.FatalLevel.String()).Inc()
if !s.V(logger.FatalLevel) {
return
}
@@ -476,6 +489,7 @@ func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{
}
func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
if !s.V(logger.WarnLevel) {
return
}
@@ -497,6 +511,7 @@ func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
}
func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) {
s.opts.Meter.Counter(semconv.LoggerMessageTotal, "level", logger.WarnLevel.String()).Inc()
if !s.V(logger.WarnLevel) {
return
}

View File

@@ -1,5 +1,5 @@
// Package metadata is a way of defining message headers
package metadata // import "go.unistack.org/micro/v3/metadata"
package metadata
import (
"net/textproto"

View File

@@ -16,10 +16,8 @@ var (
DefaultAddress = ":9090"
// DefaultPath the meter endpoint where the Meter data will be made available
DefaultPath = "/metrics"
// DefaultMetricPrefix holds the string that prepends to all metrics
DefaultMetricPrefix = "micro_"
// DefaultLabelPrefix holds the string that prepends to all labels
DefaultLabelPrefix = "micro_"
// DefaultMeterStatsInterval specifies interval for meter updating
DefaultMeterStatsInterval = 5 * time.Second
// DefaultSummaryQuantiles is the default spread of stats for summary
DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1}
// DefaultSummaryWindow is the default window for summary

View File

@@ -2,8 +2,6 @@ package meter
import (
"context"
"go.unistack.org/micro/v3/logger"
)
// Option powers the configuration for metrics implementations:
@@ -11,8 +9,6 @@ type Option func(*Options)
// Options for metrics implementations
type Options struct {
// Logger used for logging
Logger logger.Logger
// Context holds external options
Context context.Context
// Name holds the meter name
@@ -21,10 +17,6 @@ type Options struct {
Address string
// Path holds the path for metrics
Path string
// MetricPrefix holds the prefix for all metrics
MetricPrefix string
// LabelPrefix holds the prefix for all labels
LabelPrefix string
// Labels holds the default labels
Labels []string
// WriteProcessMetrics flag to write process metrics
@@ -36,12 +28,9 @@ type Options struct {
// NewOptions prepares a set of options:
func NewOptions(opt ...Option) Options {
opts := Options{
Address: DefaultAddress,
Path: DefaultPath,
Context: context.Background(),
Logger: logger.DefaultLogger,
MetricPrefix: DefaultMetricPrefix,
LabelPrefix: DefaultLabelPrefix,
Address: DefaultAddress,
Path: DefaultPath,
Context: context.Background(),
}
for _, o := range opt {
@@ -51,20 +40,6 @@ func NewOptions(opt ...Option) Options {
return opts
}
// LabelPrefix sets the labels prefix
func LabelPrefix(pref string) Option {
return func(o *Options) {
o.LabelPrefix = pref
}
}
// MetricPrefix sets the metric prefix
func MetricPrefix(pref string) Option {
return func(o *Options) {
o.MetricPrefix = pref
}
}
// Context sets the metrics context
func Context(ctx context.Context) Option {
return func(o *Options) {
@@ -95,14 +70,7 @@ func TimingObjectives(value map[float64]float64) Option {
}
*/
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// Labels sets the meter labels
// Labels add the meter labels
func Labels(ls ...string) Option {
return func(o *Options) {
o.Labels = append(o.Labels, ls...)

View File

@@ -1,347 +0,0 @@
package wrapper // import "go.unistack.org/micro/v3/meter/wrapper"
import (
"context"
"fmt"
"time"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/server"
)
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success"
labelFailure = "failure"
labelStatus = "status"
labelEndpoint = "endpoint"
// DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
// Options struct
type Options struct {
Meter meter.Meter
lopts []meter.Option
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions creates new Options struct
func NewOptions(opts ...Option) Options {
options := Options{
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// ServiceName passes service name to meter label
func ServiceName(name string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("name", name))
}
}
// ServiceVersion passes service version to meter label
func ServiceVersion(version string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("version", version))
}
}
// ServiceID passes service id to meter label
func ServiceID(id string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("id", id))
}
}
// Meter passes meter
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// SkipEndoints add endpoint to skip
func SkipEndoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
type wrapper struct {
client.Client
callFunc client.CallFunc
opts Options
}
// NewClientWrapper create new client wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
opts: NewOptions(opts...),
Client: c,
}
return handler
}
}
// NewCallWrapper create new call wrapper
func NewCallWrapper(opts ...Option) client.CallWrapper {
return func(fn client.CallFunc) client.CallFunc {
handler := &wrapper{
opts: NewOptions(opts...),
callFunc: fn,
}
return handler.CallFunc
}
}
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.callFunc(ctx, addr, req, rsp, opts)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Call(ctx, req, rsp, opts...)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Stream(ctx, req, opts...)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return stream, err
}
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err
}
// NewHandlerWrapper create new server handler wrapper
// deprecated
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
// NewServerHandlerWrapper create new server handler wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Service() + "." + req.Endpoint()
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return fn(ctx, req, rsp)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, req, rsp)
te := time.Since(ts)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc()
return err
}
}
// NewSubscriberWrapper create server subscribe wrapper
// deprecated
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}

View File

@@ -1,4 +1,4 @@
package mtls // import "go.unistack.org/micro/v3/mtls"
package mtls
import (
"bytes"

View File

@@ -1,5 +1,5 @@
// Package network is for creating internetworks
package network // import "go.unistack.org/micro/v3/network"
package network
import (
"go.unistack.org/micro/v3/client"

View File

@@ -1,5 +1,5 @@
// Package transport is an interface for synchronous connection based communication
package transport // import "go.unistack.org/micro/v3/network/transport"
package transport
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package broker is a tunnel broker
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"
package broker
import (
"context"
@@ -305,6 +305,10 @@ func (t *tunEvent) SetError(err error) {
t.err = err
}
func (t *tunEvent) Context() context.Context {
return context.TODO()
}
// NewBroker returns new tunnel broker
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
options := broker.NewOptions(opts...)

View File

@@ -1,5 +1,5 @@
// Package transport provides a tunnel transport
package transport // import "go.unistack.org/micro/v3/network/tunnel/transport"
package transport
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package tunnel provides gre network tunnelling
package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel"
package tunnel
import (
"context"

View File

@@ -269,15 +269,7 @@ func Logger(l logger.Logger, opts ...LoggerOption) Option {
}
}
}
for _, mtr := range o.Meters {
for _, or := range lopts.meters {
if mtr.Name() == or || all {
if err = mtr.Init(meter.Logger(l)); err != nil {
return err
}
}
}
}
for _, trc := range o.Tracers {
for _, ot := range lopts.tracers {
if trc.Name() == ot || all {

View File

@@ -1,5 +1,5 @@
// Package http enables the http profiler
package http // import "go.unistack.org/micro/v3/profiler/http"
package http
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
package pprof // import "go.unistack.org/micro/v3/profiler/pprof"
package pprof
import (
"os"

View File

@@ -1,5 +1,5 @@
// Package profiler is for profilers
package profiler // import "go.unistack.org/micro/v3/profiler"
package profiler
// Profiler interface
type Profiler interface {

View File

@@ -1,5 +1,5 @@
// Package proxy is a transparent proxy built on the micro/server
package proxy // import "go.unistack.org/micro/v3/proxy"
package proxy
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package register is an interface for service discovery
package register // import "go.unistack.org/micro/v3/register"
package register
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package dns resolves names to dns records
package dns // import "go.unistack.org/micro/v3/resolver/dns"
package dns
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package dnssrv resolves names to dns srv records
package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv"
package dnssrv
import (
"fmt"

View File

@@ -1,5 +1,5 @@
// Package http resolves names to network addresses using a http request
package http // import "go.unistack.org/micro/v3/resolver/http"
package http
import (
"encoding/json"

View File

@@ -1,5 +1,5 @@
// Package noop is a noop resolver
package noop // import "go.unistack.org/micro/v3/resolver/noop"
package noop
import (
"go.unistack.org/micro/v3/resolver"

View File

@@ -1,5 +1,5 @@
// Package register resolves names using the micro register
package register // import "go.unistack.org/micro/v3/resolver/registry"
package register
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package static is a static resolver
package static // import "go.unistack.org/micro/v3/resolver/static"
package static
import (
"go.unistack.org/micro/v3/resolver"

View File

@@ -1,5 +1,5 @@
// Package router provides a network routing control plane
package router // import "go.unistack.org/micro/v3/router"
package router
import (
"errors"

View File

@@ -1,4 +1,4 @@
package random // import "go.unistack.org/micro/v3/selector/random"
package random
import (
"go.unistack.org/micro/v3/selector"

View File

@@ -1,4 +1,4 @@
package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin"
package roundrobin
import (
"go.unistack.org/micro/v3/selector"

View File

@@ -1,5 +1,5 @@
// Package selector is for node selection and load balancing
package selector // import "go.unistack.org/micro/v3/selector"
package selector
import (
"errors"

View File

@@ -2,21 +2,21 @@ package semconv
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
PublishMessageDurationSeconds = "micro_publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
PublishMessageLatencyMicroseconds = "micro_publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
PublishMessageTotal = "micro_publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
PublishMessageInflight = "micro_publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
SubscribeMessageDurationSeconds = "micro_subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
SubscribeMessageLatencyMicroseconds = "micro_subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
SubscribeMessageTotal = "micro_subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
SubscribeMessageInflight = "micro_subscribe_message_inflight"
// BrokerGroupLag specifies broker lag
BrokerGroupLag = "broker_group_lag"
BrokerGroupLag = "micro_broker_group_lag"
)

View File

@@ -1,12 +0,0 @@
package semconv
var (
// CacheRequestDurationSeconds specifies meter metric name
CacheRequestDurationSeconds = "cache_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds"
// CacheRequestTotal specifies meter metric name
CacheRequestTotal = "cache_request_total"
// CacheRequestInflight specifies meter metric name
CacheRequestInflight = "cache_request_inflight"
)

View File

@@ -2,11 +2,11 @@ package semconv
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
ClientRequestDurationSeconds = "micro_client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
ClientRequestLatencyMicroseconds = "micro_client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
ClientRequestTotal = "micro_client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
ClientRequestInflight = "micro_client_request_inflight"
)

4
semconv/logger.go Normal file
View File

@@ -0,0 +1,4 @@
package semconv
// LoggerMessageTotal specifies meter metric name for logger messages
var LoggerMessageTotal = "micro_logger_message_total"

12
semconv/pool.go Normal file
View File

@@ -0,0 +1,12 @@
package semconv
var (
// PoolGetTotal specifies meter metric name for total number of pool get ops
PoolGetTotal = "micro_pool_get_total"
// PoolPutTotal specifies meter metric name for total number of pool put ops
PoolPutTotal = "micro_pool_put_total"
// PoolMisTotal specifies meter metric name for total number of pool misses
PoolMisTotal = "micro_pool_mis_total"
// PoolRetTotal specifies meter metric name for total number of pool returned to gc
PoolRetTotal = "micro_pool_ret_total"
)

View File

@@ -2,11 +2,11 @@ package semconv
var (
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
ServerRequestDurationSeconds = "micro_server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
ServerRequestLatencyMicroseconds = "micro_server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
ServerRequestTotal = "micro_server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
ServerRequestInflight = "micro_server_request_inflight"
)

12
semconv/store.go Normal file
View File

@@ -0,0 +1,12 @@
package semconv
var (
// StoreRequestDurationSeconds specifies meter metric name
StoreRequestDurationSeconds = "micro_store_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds"
// StoreRequestTotal specifies meter metric name
StoreRequestTotal = "micro_store_request_total"
// StoreRequestInflight specifies meter metric name
StoreRequestInflight = "micro_store_request_inflight"
)

View File

@@ -1,7 +1,6 @@
package server
import (
"bytes"
"context"
"fmt"
"reflect"
@@ -691,7 +690,7 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
req = req.Elem()
}
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err
}

View File

@@ -1,5 +1,5 @@
// Package server is an interface for a micro server
package server // import "go.unistack.org/micro/v3/server"
package server
import (
"context"
@@ -11,7 +11,9 @@ import (
)
// DefaultServer default server
var DefaultServer Server = NewServer()
var (
DefaultServer Server = NewServer()
)
var (
// DefaultAddress will be used if no address passed, use secure localhost

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"sync"
"github.com/KimMachineGun/automemlimit/memlimit"
_ "go.uber.org/automaxprocs"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/config"
@@ -17,6 +19,18 @@ import (
"go.unistack.org/micro/v3/tracer"
)
func init() {
memlimit.SetGoMemLimitWithOpts(
memlimit.WithRatio(0.9),
memlimit.WithProvider(
memlimit.ApplyFallback(
memlimit.FromCgroup,
memlimit.FromSystem,
),
),
)
}
// Service is an interface that wraps the lower level components.
// Its works as container with building blocks for service.
type Service interface {

View File

@@ -1,5 +1,5 @@
// Package store is an interface for distributed data storage.
package store // import "go.unistack.org/micro/v3/store"
package store
import (
"context"
@@ -7,6 +7,8 @@ import (
)
var (
// ErrNotConnected is returned when a store is not connected
ErrNotConnected = errors.New("not conected")
// ErrNotFound is returned when a key doesn't exist
ErrNotFound = errors.New("not found")
// ErrInvalidKey is returned when a key has empty or have invalid format

View File

@@ -1,5 +1,5 @@
// Package sync is an interface for distributed synchronization
package sync // import "go.unistack.org/micro/v3/sync"
package sync
import (
"errors"

View File

@@ -83,8 +83,11 @@ func (sk SpanKind) String() string {
// SpanOptions contains span option
type SpanOptions struct {
Labels []interface{}
Kind SpanKind
StatusMsg string
Labels []interface{}
Status SpanStatus
Kind SpanKind
Record bool
}
// SpanOption func signature
@@ -110,12 +113,25 @@ func WithSpanLabels(kv ...interface{}) SpanOption {
}
}
func WithSpanStatus(st SpanStatus, msg string) SpanOption {
return func(o *SpanOptions) {
o.Status = st
o.StatusMsg = msg
}
}
func WithSpanKind(k SpanKind) SpanOption {
return func(o *SpanOptions) {
o.Kind = k
}
}
func WithSpanRecord(b bool) SpanOption {
return func(o *SpanOptions) {
o.Record = b
}
}
// Options struct
type Options struct {
// Context used to store custome tracer options
@@ -124,6 +140,8 @@ type Options struct {
Logger logger.Logger
// Name of the tracer
Name string
// ContextAttrFuncs contains funcs that provides tracing
ContextAttrFuncs []ContextAttrFunc
}
// Option func signature
@@ -148,7 +166,8 @@ func NewEventOptions(opts ...EventOption) EventOptions {
// NewSpanOptions returns default SpanOptions
func NewSpanOptions(opts ...SpanOption) SpanOptions {
options := SpanOptions{
Kind: SpanKindInternal,
Kind: SpanKindInternal,
Record: true,
}
for _, o := range opts {
o(&options)
@@ -159,8 +178,9 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
// NewOptions returns default options
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Context: context.Background(),
Logger: logger.DefaultLogger,
Context: context.Background(),
ContextAttrFuncs: DefaultContextAttrFuncs,
}
for _, o := range opts {
o(&options)

View File

@@ -1,5 +1,5 @@
// Package tracer provides an interface for distributed tracing
package tracer // import "go.unistack.org/micro/v3/tracer"
package tracer
import (
"context"
@@ -21,8 +21,11 @@ var (
"HealthService.Ready",
"HealthService.Version",
}
DefaultContextAttrFuncs []ContextAttrFunc
)
type ContextAttrFunc func(ctx context.Context) []interface{}
func init() {
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
func(ctx context.Context) []interface{} {
@@ -44,6 +47,8 @@ type Tracer interface {
Init(...Option) error
// Start a trace
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
// Extract get span metadata from context
// Extract(ctx context.Context)
// Flush flushes spans
Flush(ctx context.Context) error
}

View File

@@ -1,415 +0,0 @@
// Package wrapper provides wrapper for Tracer
package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
import (
"context"
"fmt"
"strings"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
)
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
for _, k := range DefaultHeadersExctract {
if v, ok := md.Get(k); ok {
labels = append(labels, strings.ToLower(k), v)
}
}
return labels
}
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
type tWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type (
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
)
// Options struct
type Options struct {
// Tracer that used for tracing
Tracer tracer.Tracer
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions create Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// WithTracer pass tracer
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.Client.Call(nctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(nctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "stream",
),
)
defer sp.Finish()
stream, err := ot.Client.Stream(nctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(nctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
defer sp.Finish()
sp.AddLabels("messaging.destination.name", msg.Topic())
sp.AddLabels("messaging.operation", "publish")
err := ot.Client.Publish(nctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(nctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}
callType := "unary"
if req.Stream() {
callType = "stream"
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", callType,
),
)
defer sp.Finish()
err := ot.serverHandler(nctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(nctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
defer sp.Finish()
sp.AddLabels("messaging.operation", "process")
sp.AddLabels("messaging.source.name", msg.Topic())
err := ot.serverSubscriber(nctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(nctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(nctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}

View File

@@ -1,4 +1,4 @@
package addr // import "go.unistack.org/micro/v3/util/addr"
package addr
import (
"fmt"
@@ -58,6 +58,7 @@ func IsLocal(addr string) bool {
}
// Extract returns a real ip
//
//nolint:gocyclo
func Extract(addr string) (string, error) {
// if addr specified then its returned

View File

@@ -1,5 +1,5 @@
// Package backoff provides backoff functionality
package backoff // import "go.unistack.org/micro/v3/util/backoff"
package backoff
import (
"math"

View File

@@ -1,4 +1,4 @@
package buf // import "go.unistack.org/micro/v3/util/buf"
package buf
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package http // import "go.unistack.org/micro/v3/util/http"
package http
import (
"context"

View File

@@ -1,4 +1,4 @@
package id // import "go.unistack.org/micro/v3/util/id"
package id
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package io is for io management
package io // import "go.unistack.org/micro/v3/util/io"
package io
import (
"io"

View File

@@ -1,5 +1,5 @@
// Package jitter provides a random jitter
package jitter // import "go.unistack.org/micro/v3/util/jitter"
package jitter
import (
"time"

View File

@@ -1,4 +1,4 @@
package jitter // import "go.unistack.org/micro/v3/util/jitter"
package jitter
import (
"context"

View File

@@ -1,4 +1,4 @@
package net // import "go.unistack.org/micro/v3/util/net"
package net
import (
"errors"

View File

@@ -1,6 +1,6 @@
// Package pki provides PKI all the PKI functions necessary to run micro over an untrusted network
// including a CA
package pki // import "go.unistack.org/micro/v3/util/pki"
package pki
import (
"bytes"

View File

@@ -1,5 +1,5 @@
// Package pool is a connection pool
package pool // import "go.unistack.org/micro/v3/util/pool"
package pool
import (
"context"

View File

@@ -1,4 +1,4 @@
package rand // import "go.unistack.org/micro/v3/util/rand"
package rand
import (
crand "crypto/rand"

View File

@@ -44,6 +44,37 @@ func SliceAppend(b bool) Option {
}
}
var maxDepth = 32
func mergeMap(dst, src map[string]interface{}, depth int) map[string]interface{} {
if depth > maxDepth {
return dst
}
for key, srcVal := range src {
if dstVal, ok := dst[key]; ok {
srcMap, srcMapOk := mapify(srcVal)
dstMap, dstMapOk := mapify(dstVal)
if srcMapOk && dstMapOk {
srcVal = mergeMap(dstMap, srcMap, depth+1)
}
}
dst[key] = srcVal
}
return dst
}
func mapify(i interface{}) (map[string]interface{}, bool) {
value := reflect.ValueOf(i)
if value.Kind() == reflect.Map {
m := map[string]interface{}{}
for _, k := range value.MapKeys() {
m[k.String()] = value.MapIndex(k).Interface()
}
return m, true
}
return map[string]interface{}{}, false
}
// Merge merges map[string]interface{} to destination struct
func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
options := Options{}
@@ -59,6 +90,11 @@ func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
return err
}
if mapper, ok := dst.(map[string]interface{}); ok {
dst = mergeMap(mapper, mp, 0)
return nil
}
var err error
var sval reflect.Value
var fname string

View File

@@ -4,6 +4,27 @@ import (
"testing"
)
func TestMergeMap(t *testing.T) {
src := map[string]interface{}{
"skey1": "sval1",
"skey2": map[string]interface{}{
"skey3": "sval3",
},
}
dst := map[string]interface{}{
"skey1": "dval1",
"skey2": map[string]interface{}{
"skey3": "dval3",
},
}
if err := Merge(src, dst); err != nil {
t.Fatal(err)
}
t.Logf("%#+v", src)
}
func TestFieldName(t *testing.T) {
src := "SomeVar"
chk := "some_var"

View File

@@ -1,4 +1,4 @@
package register // import "go.unistack.org/micro/v3/util/register"
package register
import (
"context"

View File

@@ -1,5 +1,5 @@
// Package ring provides a simple ring buffer for storing local data
package ring // import "go.unistack.org/micro/v3/util/ring"
package ring
import (
"sync"

View File

@@ -1,5 +1,5 @@
// Package socket provides a pseudo socket
package socket // import "go.unistack.org/micro/v3/util/socket"
package socket
import (
"io"

60
util/sort/sort_test.go Normal file
View File

@@ -0,0 +1,60 @@
package sort
import (
"reflect"
"testing"
)
func TestUniq(t *testing.T) {
type args struct {
labels []interface{}
}
tests := []struct {
name string
args args
want []interface{}
}{
{
name: "test#1",
args: args{
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
},
{
name: "test#2",
args: args{
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-2", 2),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2),
},
{
name: "test#3",
args: args{
labels: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-2", 3),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 3),
},
{
name: "test#4",
args: args{
labels: append(make([]interface{}, 0),
"test-1", 1, "test-1", 2,
"test-2", 3, "test-2", 2,
"test-3", 5, "test-3", 3,
"test-1", 4, "test-1", 1),
},
want: append(make([]interface{}, 0), "test-1", 1, "test-2", 2, "test-3", 3),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var got []interface{}
if got = Uniq(tt.args.labels); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Uniq() = %v, want %v", got, tt.want)
}
t.Logf("got-%#v", got)
})
}
}

View File

@@ -1,5 +1,5 @@
// Package stream encapsulates streams within streams
package stream // import "go.unistack.org/micro/v3/util/stream"
package stream
import (
"context"

View File

@@ -1,11 +1,78 @@
package pool
import "sync"
import (
"bytes"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/semconv"
)
var (
pools = make([]Statser, 0)
poolsMu sync.Mutex
)
// Stats struct
type Stats struct {
Get uint64
Put uint64
Mis uint64
Ret uint64
}
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct {
p *sync.Pool
}
func (p Pool[T]) Put(t T) {
p.p.Put(t)
}
func (p Pool[T]) Get() T {
return p.p.Get().(T)
}
func NewPool[T any](fn func() T) Pool[T] {
return Pool[T]{
p: &sync.Pool{
@@ -16,10 +83,155 @@ func NewPool[T any](fn func() T) Pool[T] {
}
}
func (p Pool[T]) Get() T {
return p.p.Get().(T)
type BytePool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
c int
}
func (p Pool[T]) Put(t T) {
p.p.Put(t)
func NewBytePool(size int) *BytePool {
p := &BytePool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size)
return &b
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
func (p *BytePool) Cap() int {
return p.c
}
func (p *BytePool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *BytePool) Get() *[]byte {
atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte)
}
func (p *BytePool) Put(b *[]byte) {
atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c {
atomic.AddUint64(&p.ret, 1)
return
}
*b = (*b)[:0]
p.p.Put(b)
}
type BytesPool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewBytesPool(size int) *BytesPool {
p := &BytesPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size))
return b
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
func (p *BytesPool) Cap() int {
return p.c
}
func (p *BytesPool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *BytesPool) Get() *bytes.Buffer {
return p.p.Get().(*bytes.Buffer)
}
func (p *BytesPool) Put(b *bytes.Buffer) {
if (*b).Cap() > p.c {
atomic.AddUint64(&p.ret, 1)
return
}
b.Reset()
p.p.Put(b)
}
type StringsPool struct {
p *sync.Pool
get uint64
put uint64
mis uint64
ret uint64
c int
}
func NewStringsPool(size int) *StringsPool {
p := &StringsPool{c: size}
p.p = &sync.Pool{
New: func() interface{} {
atomic.AddUint64(&p.mis, 1)
return &strings.Builder{}
},
}
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p
}
func (p *StringsPool) Cap() int {
return p.c
}
func (p *StringsPool) Stats() Stats {
return Stats{
Put: atomic.LoadUint64(&p.put),
Get: atomic.LoadUint64(&p.get),
Mis: atomic.LoadUint64(&p.mis),
Ret: atomic.LoadUint64(&p.ret),
}
}
func (p *StringsPool) Get() *strings.Builder {
atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder)
}
func (p *StringsPool) Put(b *strings.Builder) {
atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c {
atomic.AddUint64(&p.ret, 1)
return
}
b.Reset()
p.p.Put(b)
}

View File

@@ -2,12 +2,30 @@ package pool
import (
"bytes"
"strings"
"testing"
)
func TestByte(t *testing.T) {
p := NewBytePool(1024)
b := p.Get()
copy(*b, []byte(`test`))
if bytes.Equal(*b, []byte("test")) {
t.Fatal("pool not works")
}
p.Put(b)
b = p.Get()
for i := 0; i < 1500; i++ {
*b = append(*b, []byte(`test`)...)
}
p.Put(b)
st := p.Stats()
if st.Get != 2 && st.Put != 2 && st.Mis != 1 && st.Ret != 1 {
t.Fatalf("pool stats error %#+v", st)
}
}
func TestBytes(t *testing.T) {
p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) })
p := NewBytesPool(1024)
b := p.Get()
b.Write([]byte(`test`))
if b.String() != "test" {
@@ -17,7 +35,7 @@ func TestBytes(t *testing.T) {
}
func TestStrings(t *testing.T) {
p := NewPool(func() *strings.Builder { return &strings.Builder{} })
p := NewStringsPool(20)
b := p.Get()
b.Write([]byte(`test`))
if b.String() != "test" {