Compare commits

...

2 Commits
v3.8.20 ... v3

Author SHA1 Message Date
591c427815 protect meter by mutex
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-07 16:53:47 +03:00
304bb99b78 fixup invalid metric names
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-04 19:43:40 +03:00
2 changed files with 60 additions and 23 deletions

@ -5,6 +5,7 @@ import (
"hash/fnv"
"io"
"regexp"
"strings"
"sync"
"time"
@ -25,7 +26,7 @@ type prometheusMeter struct {
gauge map[string]*gauges
histogram map[string]*histograms
summary map[string]*summaries
sync.Mutex
mu *sync.Mutex
}
type counters struct {
@ -60,6 +61,15 @@ func newString(v string) *string {
return &nv
}
func newName(name string) *string {
idx := strings.Index(name, "{")
if idx <= 0 {
return newString(name)
}
return newString(name[:idx])
}
func NewMeter(opts ...meter.Option) *prometheusMeter {
return &prometheusMeter{
set: prometheus.NewRegistry(), // prometheus.DefaultRegisterer,
@ -69,6 +79,7 @@ func NewMeter(opts ...meter.Option) *prometheusMeter {
gauge: make(map[string]*gauges),
histogram: make(map[string]*histograms),
summary: make(map[string]*summaries),
mu: &sync.Mutex{},
}
}
@ -90,8 +101,8 @@ func (m *prometheusMeter) Name() string {
}
func (m *prometheusMeter) Counter(name string, labels ...string) meter.Counter {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
nm := m.buildMetric(name)
labels = append(m.opts.Labels, labels...)
cd, ok := m.counter[nm]
@ -113,8 +124,8 @@ func (m *prometheusMeter) Counter(name string, labels ...string) meter.Counter {
}
func (m *prometheusMeter) FloatCounter(name string, labels ...string) meter.FloatCounter {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
nm := m.buildMetric(name)
labels = append(m.opts.Labels, labels...)
cd, ok := m.floatCounter[nm]
@ -136,8 +147,8 @@ func (m *prometheusMeter) FloatCounter(name string, labels ...string) meter.Floa
}
func (m *prometheusMeter) Gauge(name string, fn func() float64, labels ...string) meter.Gauge {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
nm := m.buildMetric(name)
labels = append(m.opts.Labels, labels...)
cd, ok := m.gauge[nm]
@ -159,8 +170,8 @@ func (m *prometheusMeter) Gauge(name string, fn func() float64, labels ...string
}
func (m *prometheusMeter) Histogram(name string, labels ...string) meter.Histogram {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
nm := m.buildMetric(name)
labels = append(m.opts.Labels, labels...)
cd, ok := m.histogram[nm]
@ -182,8 +193,8 @@ func (m *prometheusMeter) Histogram(name string, labels ...string) meter.Histogr
}
func (m *prometheusMeter) Summary(name string, labels ...string) meter.Summary {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
nm := m.buildMetric(name)
labels = append(m.opts.Labels, labels...)
cd, ok := m.summary[nm]
@ -205,8 +216,8 @@ func (m *prometheusMeter) Summary(name string, labels ...string) meter.Summary {
}
func (m *prometheusMeter) SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) meter.Summary {
m.Lock()
defer m.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
nm := m.buildMetric(name)
labels = append(m.opts.Labels, labels...)
cd, ok := m.summary[nm]
@ -267,9 +278,11 @@ func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
enc := expfmt.NewEncoder(w, expfmt.NewFormat(expfmt.TypeTextPlain))
m.mu.Lock()
for name, metrics := range m.counter {
mf := &dto.MetricFamily{
Name: newString(name),
Name: newName(name),
Type: dto.MetricType_GAUGE.Enum(),
Metric: make([]*dto.Metric, 0, len(metrics.cs)),
}
@ -284,7 +297,7 @@ func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
for name, metrics := range m.gauge {
mf := &dto.MetricFamily{
Name: newString(name),
Name: newName(name),
Type: dto.MetricType_GAUGE.Enum(),
Metric: make([]*dto.Metric, 0, len(metrics.cs)),
}
@ -299,7 +312,7 @@ func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
for name, metrics := range m.floatCounter {
mf := &dto.MetricFamily{
Name: newString(name),
Name: newName(name),
Type: dto.MetricType_GAUGE.Enum(),
Metric: make([]*dto.Metric, 0, len(metrics.cs)),
}
@ -314,7 +327,7 @@ func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
for name, metrics := range m.histogram {
mf := &dto.MetricFamily{
Name: newString(name),
Name: newName(name),
Type: dto.MetricType_HISTOGRAM.Enum(),
Metric: make([]*dto.Metric, 0, len(metrics.cs)),
}
@ -329,7 +342,7 @@ func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
for name, metrics := range m.summary {
mf := &dto.MetricFamily{
Name: newString(name),
Name: newName(name),
Type: dto.MetricType_SUMMARY.Enum(),
Metric: make([]*dto.Metric, 0, len(metrics.cs)),
}
@ -342,6 +355,8 @@ func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
mfs = append(mfs, mf)
}
m.mu.Unlock()
for _, mf := range mfs {
_ = enc.Encode(mf)
}
@ -360,6 +375,7 @@ func (m *prometheusMeter) Clone(opts ...meter.Option) meter.Meter {
}
return &prometheusMeter{
mu: m.mu,
set: m.set,
opts: options,
floatCounter: m.floatCounter,

@ -8,6 +8,32 @@ import (
"go.unistack.org/micro/v3/meter"
)
func TestBuildMetric(t *testing.T) {
m := NewMeter(meter.Labels("service_name", "test", "service_version", "0.0.0.1"))
if err := m.Init(); err != nil {
t.Fatal(err)
}
name := m.buildMetric("micro_server")
if name != `micro_server{service_name="test",service_version="0.0.0.1"}` {
t.Fatal("invalid name")
}
}
func TestWithDefaultLabels(t *testing.T) {
m := NewMeter(meter.Labels("service_name", "test", "service_version", "0.0.0.1"))
if err := m.Init(); err != nil {
t.Fatal(err)
}
m.Counter("micro_server", "endpoint", "ep3", "path", "/path3", "status", "success").Inc()
buf := bytes.NewBuffer(nil)
_ = m.Write(buf, meter.WriteProcessMetrics(false), meter.WriteFDMetrics(false))
if !bytes.Contains(buf.Bytes(), []byte(`micro_server{service_name="test",service_version="0.0.0.1",endpoint="ep3",path="/path3",status="success"} 1`)) {
t.Fatalf("invalid metrics output: %s", buf.Bytes())
}
}
func TestStd(t *testing.T) {
m := NewMeter(meter.WriteProcessMetrics(true), meter.WriteFDMetrics(true))
if err := m.Init(); err != nil {
@ -46,7 +72,6 @@ func TestMultiple(t *testing.T) {
buf := bytes.NewBuffer(nil)
_ = m.Write(buf, meter.WriteProcessMetrics(false), meter.WriteFDMetrics(false))
if !bytes.Contains(buf.Bytes(), []byte(`micro_server{endpoint="ep1",path="/path1"} 2`)) {
// t.Fatal("XXXX")
t.Fatalf("invalid metrics output: %s", buf.Bytes())
}
}
@ -58,16 +83,12 @@ func TestCounterSet(t *testing.T) {
m.Counter("forte_accounts_total", "channel_code", "crm").Set(value)
fmt.Println(uint64(float64(value)))
buf := bytes.NewBuffer(nil)
_ = m.Write(buf)
output := buf.String()
fmt.Println(output)
expectedOutput := fmt.Sprintf(`%s{channel_code="crm"} %d`, "forte_accounts_total", value)
if !bytes.Contains(buf.Bytes(), []byte(expectedOutput)) {