summary: fix Unregister behaviour for summary metric type (#16)
The change covers two things: 1. Cleanup of Set.a metrics list from per-quantile metrics for summary. 2. Register summary metric and per-quantile metrics in one take. This prevents registry corruption when Unregister called in the middle of metric register process.
This commit is contained in:
parent
dd0c59c0d6
commit
be819551e3
64
set.go
64
set.go
@ -324,13 +324,20 @@ func (s *Set) NewSummary(name string) *Summary {
|
|||||||
//
|
//
|
||||||
// The returned summary is safe to use from concurrent goroutines.
|
// The returned summary is safe to use from concurrent goroutines.
|
||||||
func (s *Set) NewSummaryExt(name string, window time.Duration, quantiles []float64) *Summary {
|
func (s *Set) NewSummaryExt(name string, window time.Duration, quantiles []float64) *Summary {
|
||||||
|
if err := validateMetric(name); err != nil {
|
||||||
|
panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err))
|
||||||
|
}
|
||||||
sm := newSummary(window, quantiles)
|
sm := newSummary(window, quantiles)
|
||||||
s.registerMetric(name, sm)
|
|
||||||
registerSummary(sm)
|
|
||||||
s.registerSummaryQuantiles(name, sm)
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
// defer will unlock in case of panic
|
||||||
|
// checks in tests
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.mustRegisterLocked(name, sm)
|
||||||
|
registerSummaryLocked(sm)
|
||||||
|
s.registerSummaryQuantilesLocked(name, sm)
|
||||||
s.summaries = append(s.summaries, sm)
|
s.summaries = append(s.summaries, sm)
|
||||||
s.mu.Unlock()
|
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,21 +386,17 @@ func (s *Set) GetOrCreateSummaryExt(name string, window time.Duration, quantiles
|
|||||||
name: name,
|
name: name,
|
||||||
metric: sm,
|
metric: sm,
|
||||||
}
|
}
|
||||||
mustRegisterQuantiles := false
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
nm = s.m[name]
|
nm = s.m[name]
|
||||||
if nm == nil {
|
if nm == nil {
|
||||||
nm = nmNew
|
nm = nmNew
|
||||||
s.m[name] = nm
|
s.m[name] = nm
|
||||||
s.a = append(s.a, nm)
|
s.a = append(s.a, nm)
|
||||||
registerSummary(sm)
|
registerSummaryLocked(sm)
|
||||||
mustRegisterQuantiles = true
|
s.registerSummaryQuantilesLocked(name, sm)
|
||||||
}
|
}
|
||||||
s.summaries = append(s.summaries, sm)
|
s.summaries = append(s.summaries, sm)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
if mustRegisterQuantiles {
|
|
||||||
s.registerSummaryQuantiles(name, sm)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
sm, ok := nm.metric.(*Summary)
|
sm, ok := nm.metric.(*Summary)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -408,14 +411,14 @@ func (s *Set) GetOrCreateSummaryExt(name string, window time.Duration, quantiles
|
|||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Set) registerSummaryQuantiles(name string, sm *Summary) {
|
func (s *Set) registerSummaryQuantilesLocked(name string, sm *Summary) {
|
||||||
for i, q := range sm.quantiles {
|
for i, q := range sm.quantiles {
|
||||||
quantileValueName := addTag(name, fmt.Sprintf(`quantile="%g"`, q))
|
quantileValueName := addTag(name, fmt.Sprintf(`quantile="%g"`, q))
|
||||||
qv := &quantileValue{
|
qv := &quantileValue{
|
||||||
sm: sm,
|
sm: sm,
|
||||||
idx: i,
|
idx: i,
|
||||||
}
|
}
|
||||||
s.registerMetric(quantileValueName, qv)
|
s.mustRegisterLocked(quantileValueName, qv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -424,6 +427,16 @@ func (s *Set) registerMetric(name string, m metric) {
|
|||||||
panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err))
|
panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err))
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
// defer will unlock in case of panic
|
||||||
|
// checks in test
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.mustRegisterLocked(name, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mustRegisterLocked registers given metric with
|
||||||
|
// the given name. Panics if the given name was
|
||||||
|
// already registered before.
|
||||||
|
func (s *Set) mustRegisterLocked(name string, m metric) {
|
||||||
nm, ok := s.m[name]
|
nm, ok := s.m[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
nm = &namedMetric{
|
nm = &namedMetric{
|
||||||
@ -433,7 +446,6 @@ func (s *Set) registerMetric(name string, m metric) {
|
|||||||
s.m[name] = nm
|
s.m[name] = nm
|
||||||
s.a = append(s.a, nm)
|
s.a = append(s.a, nm)
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
|
||||||
if ok {
|
if ok {
|
||||||
panic(fmt.Errorf("BUG: metric %q is already registered", name))
|
panic(fmt.Errorf("BUG: metric %q is already registered", name))
|
||||||
}
|
}
|
||||||
@ -455,32 +467,34 @@ func (s *Set) UnregisterMetric(name string) bool {
|
|||||||
|
|
||||||
delete(s.m, name)
|
delete(s.m, name)
|
||||||
|
|
||||||
// remove metric from s.a
|
deleteFromList := func(metricName string) {
|
||||||
found := false
|
for i, nm := range s.a {
|
||||||
for i, nm := range s.a {
|
if nm.name == metricName {
|
||||||
if nm.name == name {
|
s.a = append(s.a[:i], s.a[i+1:]...)
|
||||||
s.a = append(s.a[:i], s.a[i+1:]...)
|
return
|
||||||
found = true
|
}
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
panic(fmt.Errorf("BUG: cannot find metric %q in the list of registered metrics", name))
|
panic(fmt.Errorf("BUG: cannot find metric %q in the list of registered metrics", name))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// remove metric from s.a
|
||||||
|
deleteFromList(name)
|
||||||
|
|
||||||
sm, ok := m.(*Summary)
|
sm, ok := m.(*Summary)
|
||||||
if !ok {
|
if !ok {
|
||||||
// There is no need in cleaning up s.summaries.
|
// There is no need in cleaning up summary.
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove summary metric name including quantile labels from set
|
// cleanup registry from per-quantile metrics
|
||||||
for _, q := range sm.quantiles {
|
for _, q := range sm.quantiles {
|
||||||
quantileValueName := addTag(name, fmt.Sprintf(`quantile="%g"`, q))
|
quantileValueName := addTag(name, fmt.Sprintf(`quantile="%g"`, q))
|
||||||
delete(s.m, quantileValueName)
|
delete(s.m, quantileValueName)
|
||||||
|
deleteFromList(quantileValueName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove sm from s.summaries
|
// Remove sm from s.summaries
|
||||||
found = false
|
found := false
|
||||||
for i, xsm := range s.summaries {
|
for i, xsm := range s.summaries {
|
||||||
if xsm == sm {
|
if xsm == sm {
|
||||||
s.summaries = append(s.summaries[:i], s.summaries[i+1:]...)
|
s.summaries = append(s.summaries[:i], s.summaries[i+1:]...)
|
||||||
|
76
set_test.go
76
set_test.go
@ -2,7 +2,9 @@ package metrics
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewSet(t *testing.T) {
|
func TestNewSet(t *testing.T) {
|
||||||
@ -64,31 +66,38 @@ func TestSetListMetricNames(t *testing.T) {
|
|||||||
|
|
||||||
func TestSetUnregisterMetric(t *testing.T) {
|
func TestSetUnregisterMetric(t *testing.T) {
|
||||||
s := NewSet()
|
s := NewSet()
|
||||||
|
const cName, smName = "counter_1", "summary_1"
|
||||||
// Initialize a few metrics
|
// Initialize a few metrics
|
||||||
for i := 0; i < 5; i++ {
|
c := s.NewCounter(cName)
|
||||||
c := s.NewCounter(fmt.Sprintf("counter_%d", i))
|
c.Inc()
|
||||||
c.Inc()
|
sm := s.NewSummary(smName)
|
||||||
sm := s.NewSummary(fmt.Sprintf("summary_%d", i))
|
sm.Update(1)
|
||||||
sm.Update(float64(i))
|
|
||||||
}
|
|
||||||
// Unregister existing metrics
|
// Unregister existing metrics
|
||||||
if !s.UnregisterMetric("counter_1") {
|
if !s.UnregisterMetric(cName) {
|
||||||
t.Fatalf("UnregisterMetric(counter_1) must return true")
|
t.Fatalf("UnregisterMetric(%s) must return true", cName)
|
||||||
}
|
}
|
||||||
if !s.UnregisterMetric("summary_1") {
|
if !s.UnregisterMetric(smName) {
|
||||||
t.Fatalf("UnregisterMetric(summary_1) must return true")
|
t.Fatalf("UnregisterMetric(%s) must return true", smName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unregister twice must return false
|
// Unregister twice must return false
|
||||||
if s.UnregisterMetric("counter_1") {
|
if s.UnregisterMetric(cName) {
|
||||||
t.Fatalf("UnregisterMetric(counter_1) must return false on unregistered metric")
|
t.Fatalf("UnregisterMetric(%s) must return false on unregistered metric", cName)
|
||||||
}
|
}
|
||||||
if s.UnregisterMetric("summary_1") {
|
if s.UnregisterMetric(smName) {
|
||||||
t.Fatalf("UnregisterMetric(summary_1) must return false on unregistered metric")
|
t.Fatalf("UnregisterMetric(%s) must return false on unregistered metric", smName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify that registry is empty
|
||||||
|
if len(s.m) != 0 {
|
||||||
|
t.Fatalf("expected metrics map to be empty; got %d elements", len(s.m))
|
||||||
|
}
|
||||||
|
if len(s.a) != 0 {
|
||||||
|
t.Fatalf("expected metrics list to be empty; got %d elements", len(s.a))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate metrics are removed
|
// Validate metrics are removed
|
||||||
const cName, smName = "counter_1", "summary_1"
|
|
||||||
ok := false
|
ok := false
|
||||||
for _, n := range s.ListMetricNames() {
|
for _, n := range s.ListMetricNames() {
|
||||||
if n == cName || n == smName {
|
if n == cName || n == smName {
|
||||||
@ -104,3 +113,40 @@ func TestSetUnregisterMetric(t *testing.T) {
|
|||||||
s.NewCounter(cName).Inc()
|
s.NewCounter(cName).Inc()
|
||||||
s.NewSummary(smName).Update(float64(1))
|
s.NewSummary(smName).Update(float64(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRegisterUnregister tests concurrent access to
|
||||||
|
// metrics during registering and unregistering.
|
||||||
|
// Should be tested specifically with `-race` enabled.
|
||||||
|
func TestRegisterUnregister(t *testing.T) {
|
||||||
|
const (
|
||||||
|
workers = 16
|
||||||
|
iterations = 1e3
|
||||||
|
)
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(workers)
|
||||||
|
for n := 0; n < workers; n++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
iteration := i % 5
|
||||||
|
counter := fmt.Sprintf(`counter{iteration="%d"}`, iteration)
|
||||||
|
GetOrCreateCounter(counter).Add(i)
|
||||||
|
UnregisterMetric(counter)
|
||||||
|
|
||||||
|
histogram := fmt.Sprintf(`histogram{iteration="%d"}`, iteration)
|
||||||
|
GetOrCreateHistogram(histogram).UpdateDuration(now)
|
||||||
|
UnregisterMetric(histogram)
|
||||||
|
|
||||||
|
gauge := fmt.Sprintf(`gauge{iteration="%d"}`, iteration)
|
||||||
|
GetOrCreateGauge(gauge, func() float64 { return 1 })
|
||||||
|
UnregisterMetric(gauge)
|
||||||
|
|
||||||
|
summary := fmt.Sprintf(`summary{iteration="%d"}`, iteration)
|
||||||
|
GetOrCreateSummary(summary).Update(float64(i))
|
||||||
|
UnregisterMetric(summary)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
@ -203,7 +203,7 @@ func addTag(name, tag string) string {
|
|||||||
return fmt.Sprintf("%s,%s}", name[:len(name)-1], tag)
|
return fmt.Sprintf("%s,%s}", name[:len(name)-1], tag)
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerSummary(sm *Summary) {
|
func registerSummaryLocked(sm *Summary) {
|
||||||
window := sm.window
|
window := sm.window
|
||||||
summariesLock.Lock()
|
summariesLock.Lock()
|
||||||
summaries[window] = append(summaries[window], sm)
|
summaries[window] = append(summaries[window], sm)
|
||||||
|
Loading…
Reference in New Issue
Block a user