Skip to content

Commit 09dbb33

Browse files
[*] optimize PrometheusWriter.Collect locking, fixes #1189 (#1190)
* Added lazy initilization for metric write * Added tests * Rename test file
1 parent 4e0a36b commit 09dbb33

2 files changed

Lines changed: 94 additions & 6 deletions

File tree

internal/sinks/prometheus.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,10 @@ func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
119119
func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) { // cache structure: [dbUnique][metric]lastly_fetched_data
120120
promw.Lock()
121121
defer promw.Unlock()
122-
if _, ok := promw.Cache[dbUnique]; ok {
123-
promw.Cache[dbUnique][metric] = msgArr
122+
if _, ok := promw.Cache[dbUnique]; !ok {
123+
promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
124124
}
125+
promw.Cache[dbUnique][metric] = msgArr
125126
}
126127

127128
func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) { // cache structure: [dbUnique][metric]lastly_fetched_data
@@ -172,10 +173,7 @@ func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
172173
return
173174
}
174175
snapshot := promw.Cache
175-
promw.Cache = make(PromMetricCache, len(snapshot))
176-
for dbUnique := range snapshot {
177-
promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
178-
}
176+
promw.Cache = make(PromMetricCache)
179177
promw.Unlock()
180178

181179
t1 := time.Now()

internal/sinks/prometheus_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package sinks
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/cybertec-postgresql/pgwatch/v5/internal/log"
8+
"github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
9+
"github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func newTestPrometheusWriter(namespace string) *PrometheusWriter {
16+
return &PrometheusWriter{
17+
ctx: testutil.TestContext,
18+
logger: log.GetLogger(testutil.TestContext),
19+
Namespace: namespace,
20+
Cache: make(PromMetricCache),
21+
lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
22+
Namespace: namespace,
23+
Name: "test_last_scrape_errors",
24+
}),
25+
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
26+
Namespace: namespace,
27+
Name: "test_total_scrapes",
28+
}),
29+
totalScrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
30+
Namespace: namespace,
31+
Name: "test_total_scrape_failures",
32+
}),
33+
}
34+
}
35+
36+
// TestLazyInitialization_WriteAfterCollect verifies that Write() works after
37+
// Collect() clears the cache. Collect() no longer pre-creates maps, so Write()
38+
// must create them lazily.
39+
func TestLazyInitialization_WriteAfterCollect(t *testing.T) {
40+
promw := newTestPrometheusWriter("test")
41+
42+
// Write initial data
43+
msg := metrics.MeasurementEnvelope{
44+
DBName: "db1",
45+
MetricName: "metric1",
46+
Data: metrics.Measurements{
47+
{metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(100)},
48+
},
49+
}
50+
require.NoError(t, promw.Write(msg))
51+
52+
// Collect clears the cache
53+
ch := make(chan prometheus.Metric, 100)
54+
promw.Collect(ch)
55+
assert.Empty(t, promw.Cache, "cache should be empty after Collect")
56+
57+
// Write after Collect - must work via lazy initialization
58+
msg.Data[0]["value"] = int64(200)
59+
require.NoError(t, promw.Write(msg))
60+
61+
assert.Contains(t, promw.Cache, "db1")
62+
assert.Equal(t, int64(200), promw.Cache["db1"]["metric1"].Data[0]["value"])
63+
}
64+
65+
// TestCollect_NoPreallocation verifies Collect() creates an empty cache
66+
// without pre-allocating maps for each database (O(1) instead of O(N)).
67+
func TestCollect_NoPreallocation(t *testing.T) {
68+
promw := newTestPrometheusWriter("test")
69+
70+
// Populate cache with multiple databases
71+
for _, db := range []string{"db1", "db2", "db3", "db4", "db5"} {
72+
promw.Cache[db] = map[string]metrics.MeasurementEnvelope{
73+
"metric": {
74+
DBName: db,
75+
MetricName: "metric",
76+
Data: metrics.Measurements{
77+
{metrics.EpochColumnName: time.Now().UnixNano(), "value": int64(1)},
78+
},
79+
},
80+
}
81+
}
82+
assert.Len(t, promw.Cache, 5)
83+
84+
// Collect
85+
ch := make(chan prometheus.Metric, 100)
86+
promw.Collect(ch)
87+
88+
// New cache should be empty - no pre-allocated maps
89+
assert.Empty(t, promw.Cache)
90+
}

0 commit comments

Comments
 (0)