From c3cabc1fe5205c2fc8c40b20220d5f5ae9c0bb9f Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Sun, 7 Apr 2024 20:48:47 +0300 Subject: [PATCH 1/6] add options in broker --- .gitignore | 3 +++ broker/broker.go | 3 +++ broker/options.go | 24 +++++++++++++++--------- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 500d68ca..c2fff381 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Develop tools /.vscode/ /.idea/ +.idea +.vscode # Binaries for programs and plugins *.exe @@ -13,6 +15,7 @@ _obj _test _build +.DS_Store # Architecture specific extensions/prefixes *.[568vq] diff --git a/broker/broker.go b/broker/broker.go index 53099b7a..7e8dd9ae 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -4,6 +4,7 @@ package broker // import "go.unistack.org/micro/v4/broker" import ( "context" "errors" + "time" "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/options" @@ -19,6 +20,8 @@ var ( ErrDisconnected = errors.New("broker disconnected") // ErrInvalidMessage returns when message has nvalid format ErrInvalidMessage = errors.New("broker message has invalid format") + // DefaultGracefulTimeout + DefaultGracefulTimeout = 5 * time.Second ) // Broker is an interface used for asynchronous messaging. diff --git a/broker/options.go b/broker/options.go index ddc12e32..b1c2fea4 100644 --- a/broker/options.go +++ b/broker/options.go @@ -3,6 +3,7 @@ package broker import ( "context" "crypto/tls" + "sync" "time" "go.unistack.org/micro/v4/codec" @@ -36,22 +37,27 @@ type Options struct { Name string // Address holds the broker address Address []string + + Wait *sync.WaitGroup + + GracefulTimeout time.Duration } // NewOptions create new Options func NewOptions(opts ...options.Option) Options { - options := Options{ - Register: register.DefaultRegister, - Logger: logger.DefaultLogger, - Context: context.Background(), - Meter: meter.DefaultMeter, - Codecs: make(map[string]codec.Codec), - Tracer: tracer.DefaultTracer, + newOpts := Options{ + Register: register.DefaultRegister, + Logger: logger.DefaultLogger, + Context: context.Background(), + Meter: meter.DefaultMeter, + Codecs: make(map[string]codec.Codec), + Tracer: tracer.DefaultTracer, + GracefulTimeout: DefaultGracefulTimeout, } for _, o := range opts { - o(&options) + o(&newOpts) } - return options + return newOpts } // PublishOptions struct From 56d33ae823991af8f7d23da7c2dcdb2eb2d37a99 Mon Sep 17 00:00:00 2001 From: devstigneev Date: Sun, 7 Apr 2024 21:17:42 +0300 Subject: [PATCH 2/6] rename path to sync --- broker/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/options.go b/broker/options.go index b1c2fea4..aa3a7272 100644 --- a/broker/options.go +++ b/broker/options.go @@ -3,7 +3,6 @@ package broker import ( "context" "crypto/tls" - "sync" "time" "go.unistack.org/micro/v4/codec" @@ -12,6 +11,7 @@ import ( "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/register" + "go.unistack.org/micro/v4/sync" "go.unistack.org/micro/v4/tracer" ) From f8c68a81f708f1f4654658d2327820974e2b2246 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 13 Apr 2024 02:06:51 +0300 Subject: [PATCH 3/6] semconv: add broker group lag Signed-off-by: Vasiliy Tolstov --- semconv/broker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/semconv/broker.go b/semconv/broker.go index 9b61b523..8c51a8f2 100644 --- a/semconv/broker.go +++ b/semconv/broker.go @@ -17,4 +17,6 @@ var ( SubscribeMessageTotal = "subscribe_message_total" // SubscribeMessageInflight specifies meter metric name SubscribeMessageInflight = "subscribe_message_inflight" + // BrokerGroupLag specifies broker lag + BrokerGroupLag = "broker_lag" ) From 76090f7569e9db5562c5e0d835ade4f70c77b6b2 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 14 Apr 2024 00:16:55 +0300 Subject: [PATCH 4/6] util/xpool: package pool Signed-off-by: Vasiliy Tolstov --- util/xpool/pool.go | 25 +++++++++++++++++++++++++ util/xpool/pool_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 util/xpool/pool.go create mode 100644 util/xpool/pool_test.go diff --git a/util/xpool/pool.go b/util/xpool/pool.go new file mode 100644 index 00000000..1ffe4293 --- /dev/null +++ b/util/xpool/pool.go @@ -0,0 +1,25 @@ +package pool + +import "sync" + +type Pool[T any] struct { + p *sync.Pool +} + +func NewPool[T any](fn func() T) Pool[T] { + return Pool[T]{ + p: &sync.Pool{ + New: func() interface{} { + return fn() + }, + }, + } +} + +func (p Pool[T]) Get() T { + return p.p.Get().(T) +} + +func (p Pool[T]) Put(t T) { + p.p.Put(t) +} diff --git a/util/xpool/pool_test.go b/util/xpool/pool_test.go new file mode 100644 index 00000000..8e7a9b81 --- /dev/null +++ b/util/xpool/pool_test.go @@ -0,0 +1,27 @@ +package pool + +import ( + "bytes" + "strings" + "testing" +) + +func TestBytes(t *testing.T) { + p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) }) + b := p.Get() + b.Write([]byte(`test`)) + if b.String() != "test" { + t.Fatal("pool not works") + } + p.Put(b) +} + +func TestStrings(t *testing.T) { + p := NewPool(func() *strings.Builder { return &strings.Builder{} }) + b := p.Get() + b.Write([]byte(`test`)) + if b.String() != "test" { + t.Fatal("pool not works") + } + p.Put(b) +} From b8928d3da978b1bce1d82c451d0be15629344a68 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 14 Apr 2024 16:42:36 +0300 Subject: [PATCH 5/6] semconv: add cache metric names Signed-off-by: Vasiliy Tolstov --- semconv/cache.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 semconv/cache.go diff --git a/semconv/cache.go b/semconv/cache.go new file mode 100644 index 00000000..f12739c1 --- /dev/null +++ b/semconv/cache.go @@ -0,0 +1,12 @@ +package semconv + +var ( + // CacheRequestDurationSeconds specifies meter metric name + CacheRequestDurationSeconds = "cache_request_duration_seconds" + // ClientRequestLatencyMicroseconds specifies meter metric name + CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds" + // CacheRequestTotal specifies meter metric name + CacheRequestTotal = "cache_request_total" + // CacheRequestInflight specifies meter metric name + CacheRequestInflight = "cache_request_inflight" +) From 725ed992cca568e89d2632e50de5867863420e42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=BE=D1=80?= =?UTF-8?q?=D0=B1=D1=83=D0=BD=D0=BE=D0=B2?= Date: Mon, 15 Apr 2024 13:31:14 +0300 Subject: [PATCH 6/6] #335 caller skip count. (#338) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Gorbunov Kirill Andreevich Reviewed-on: https://git.unistack.org/unistack-org/micro/pulls/338 Co-authored-by: Кирилл Горбунов Co-committed-by: Кирилл Горбунов --- logger/options.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/logger/options.go b/logger/options.go index 8755c9b7..d3748749 100644 --- a/logger/options.go +++ b/logger/options.go @@ -214,6 +214,20 @@ func WithMicroKeys() options.Option { } } +// WithAddCallerSkipCount add skip count for copy logger +func WithAddCallerSkipCount(n int) options.Option { + return func(src interface{}) error { + c, err := options.Get(src, ".CallerSkipCount") + if err != nil { + return err + } + if err = options.Set(src, c.(int)+n, ".CallerSkipCount"); err != nil { + return err + } + return nil + } +} + // WithAddStacktrace controls writing stacktrace on error func WithAddStacktrace(v bool) options.Option { return func(src interface{}) error {