Skip to content

Commit 4e84d21

Browse files
authored
[+] refactor psutil OS metric fetchers (#1269)
* [-] stop using deprecated `psutil.TimesStat.Total()` * [+] make `GetGoPsutilDiskPG()` work for Windows and Darwin * [+] improve `GetGoPsutilDiskPG()` and add tests * [+] improve `GetGoPsutilCPU` * [*] check for Windows abs path in `GetGoPsutilDiskPG()`
1 parent 2797305 commit 4e84d21

5 files changed

Lines changed: 165 additions & 153 deletions

File tree

internal/reaper/file.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,17 @@ const (
3030
metricPsutilMem = "psutil_mem"
3131
)
3232

33-
const (
34-
sqlPgDirs = `select current_setting('data_directory') as dd, current_setting('log_directory') as ld`
35-
sqlTsDirs = `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])`
36-
)
33+
const sqlPgDirs = `select name, path from
34+
(values
35+
('data_directory', current_setting('data_directory')),
36+
('pg_wal', current_setting('data_directory')||'/pg_wal'),
37+
('log_directory', case
38+
when current_setting('log_directory') ~ '^(\w:)?\/.+' then current_setting('log_directory')
39+
else current_setting('data_directory') || '/' || current_setting('log_directory')
40+
end)) as d(name, path)
41+
union all
42+
select spcname::text, pg_catalog.pg_tablespace_location(oid)
43+
from pg_catalog.pg_tablespace where spcname !~ 'pg_.+'`
3744

3845
var directlyFetchableOSMetrics = []string{metricPsutilCPU, metricPsutilDisk, metricPsutilDiskIoTotal, metricPsutilMem, metricCPULoad}
3946

@@ -42,7 +49,7 @@ func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool {
4249
}
4350

4451
func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error) {
45-
var data, dataDirs, dataTblspDirs metrics.Measurements
52+
var data, pgDirs metrics.Measurements
4653
var err error
4754

4855
switch metricName {
@@ -51,13 +58,10 @@ func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.Sourc
5158
case metricPsutilCPU:
5259
data, err = GetGoPsutilCPU(md.GetMetricInterval(metricName))
5360
case metricPsutilDisk:
54-
if dataDirs, err = QueryMeasurements(ctx, md, sqlPgDirs); err != nil {
55-
return nil, err
56-
}
57-
if dataTblspDirs, err = QueryMeasurements(ctx, md, sqlTsDirs); err != nil {
61+
if pgDirs, err = QueryMeasurements(ctx, md, sqlPgDirs); err != nil {
5862
return nil, err
5963
}
60-
data, err = GetGoPsutilDiskPG(dataDirs, dataTblspDirs)
64+
data, err = GetGoPsutilDiskPG(pgDirs)
6165
case metricPsutilDiskIoTotal:
6266
data, err = GetGoPsutilDiskTotals()
6367
case metricPsutilMem:

internal/reaper/psutil.go

Lines changed: 60 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package reaper
33
import (
44
"math"
55
"os"
6-
"path"
7-
"strings"
86
"sync"
97
"time"
108

@@ -20,31 +18,39 @@ var prevCPULoadTimeStatsLock sync.RWMutex
2018
var prevCPULoadTimeStats cpu.TimesStat
2119
var prevCPULoadTimestamp time.Time
2220

21+
func init() {
22+
// initialize the cache with current stats so the first call returns a meaningful delta
23+
if probe, err := cpu.Times(false); err == nil {
24+
prevCPULoadTimeStats = probe[0]
25+
prevCPULoadTimestamp = time.Now()
26+
}
27+
}
28+
29+
// cpuTotal returns the total number of seconds across all CPU states.
30+
// Guest and GuestNice are intentionally excluded because on Linux they are
31+
// already counted within User and Nice respectively (/proc/stat semantics),
32+
// so including them would double-count and skew percentage calculations.
33+
func cpuTotal(c cpu.TimesStat) float64 {
34+
return c.User + c.System + c.Idle + c.Nice + c.Iowait + c.Irq +
35+
c.Softirq + c.Steal
36+
}
37+
2338
func goPsutilCalcCPUUtilization(probe0, probe1 cpu.TimesStat) float64 {
24-
return 100 - (100.0 * (probe1.Idle - probe0.Idle + probe1.Iowait - probe0.Iowait + probe1.Steal - probe0.Steal) / (probe1.Total() - probe0.Total()))
39+
return 100 - (100.0 * (probe1.Idle - probe0.Idle + probe1.Iowait - probe0.Iowait + probe1.Steal - probe0.Steal) / (cpuTotal(probe1) - cpuTotal(probe0)))
2540
}
2641

27-
// Simulates "psutil" metric output. Assumes the result from last call as input, otherwise uses a 1s measurement
28-
func GetGoPsutilCPU(interval float64) ([]map[string]any, error) {
42+
// GetGoPsutilCPU simulates "psutil" metric output. Assumes the result from last call as input
43+
func GetGoPsutilCPU(interval float64) (metrics.Measurements, error) {
2944
prevCPULoadTimeStatsLock.RLock()
3045
prevTime := prevCPULoadTimestamp
3146
prevTimeStat := prevCPULoadTimeStats
3247
prevCPULoadTimeStatsLock.RUnlock()
3348

34-
if prevTime.IsZero() || (time.Now().UnixNano()-prevTime.UnixNano()) < 1e9 { // give "short" stats on first run, based on a 1s probe
35-
probe0, err := cpu.Times(false)
36-
if err != nil {
37-
return nil, err
38-
}
39-
prevTimeStat = probe0[0]
40-
time.Sleep(1e9)
41-
}
42-
4349
curCallStats, err := cpu.Times(false)
4450
if err != nil {
4551
return nil, err
4652
}
47-
if prevTime.IsZero() || time.Now().UnixNano()-prevTime.UnixNano() < 1e9 || time.Now().Unix()-prevTime.Unix() >= int64(interval) {
53+
if time.Since(prevTime) >= time.Duration(float64(time.Second)*interval) {
4854
prevCPULoadTimeStatsLock.Lock() // update the cache
4955
prevCPULoadTimeStats = curCallStats[0]
5056
prevCPULoadTimestamp = time.Now()
@@ -67,17 +73,18 @@ func GetGoPsutilCPU(interval float64) ([]map[string]any, error) {
6773
retMap["load_1m"] = math.Round(100*la.Load1) / 100
6874
retMap["load_5m_norm"] = math.Round(100*la.Load5/float64(cpus)) / 100
6975
retMap["load_5m"] = math.Round(100*la.Load5) / 100
70-
retMap["user"] = math.Round(10000.0*(curCallStats[0].User-prevTimeStat.User)/(curCallStats[0].Total()-prevTimeStat.Total())) / 100
71-
retMap["system"] = math.Round(10000.0*(curCallStats[0].System-prevTimeStat.System)/(curCallStats[0].Total()-prevTimeStat.Total())) / 100
72-
retMap["idle"] = math.Round(10000.0*(curCallStats[0].Idle-prevTimeStat.Idle)/(curCallStats[0].Total()-prevTimeStat.Total())) / 100
73-
retMap["iowait"] = math.Round(10000.0*(curCallStats[0].Iowait-prevTimeStat.Iowait)/(curCallStats[0].Total()-prevTimeStat.Total())) / 100
74-
retMap["irqs"] = math.Round(10000.0*(curCallStats[0].Irq-prevTimeStat.Irq+curCallStats[0].Softirq-prevTimeStat.Softirq)/(curCallStats[0].Total()-prevTimeStat.Total())) / 100
75-
retMap["other"] = math.Round(10000.0*(curCallStats[0].Steal-prevTimeStat.Steal+curCallStats[0].Guest-prevTimeStat.Guest+curCallStats[0].GuestNice-prevTimeStat.GuestNice)/(curCallStats[0].Total()-prevTimeStat.Total())) / 100
76-
77-
return []map[string]any{retMap}, nil
76+
totalDiff := cpuTotal(curCallStats[0]) - cpuTotal(prevTimeStat)
77+
retMap["user"] = math.Round(10000.0*(curCallStats[0].User-prevTimeStat.User)/totalDiff) / 100
78+
retMap["system"] = math.Round(10000.0*(curCallStats[0].System-prevTimeStat.System)/totalDiff) / 100
79+
retMap["idle"] = math.Round(10000.0*(curCallStats[0].Idle-prevTimeStat.Idle)/totalDiff) / 100
80+
retMap["iowait"] = math.Round(10000.0*(curCallStats[0].Iowait-prevTimeStat.Iowait)/totalDiff) / 100
81+
retMap["irqs"] = math.Round(10000.0*(curCallStats[0].Irq-prevTimeStat.Irq+curCallStats[0].Softirq-prevTimeStat.Softirq)/totalDiff) / 100
82+
retMap["other"] = math.Round(10000.0*(curCallStats[0].Steal-prevTimeStat.Steal+curCallStats[0].Guest-prevTimeStat.Guest+curCallStats[0].GuestNice-prevTimeStat.GuestNice)/totalDiff) / 100
83+
84+
return metrics.Measurements{retMap}, nil
7885
}
7986

80-
func GetGoPsutilMem() ([]map[string]any, error) {
87+
func GetGoPsutilMem() (metrics.Measurements, error) {
8188
vm, err := mem.VirtualMemory()
8289
if err != nil {
8390
return nil, err
@@ -95,10 +102,10 @@ func GetGoPsutilMem() ([]map[string]any, error) {
95102
retMap["swap_free"] = int64(vm.SwapFree)
96103
retMap["swap_percent"] = math.Round(100*float64(vm.SwapCached)/float64(vm.SwapTotal)) / 100
97104

98-
return []map[string]any{retMap}, nil
105+
return metrics.Measurements{retMap}, nil
99106
}
100107

101-
func GetGoPsutilDiskTotals() ([]map[string]any, error) {
108+
func GetGoPsutilDiskTotals() (metrics.Measurements, error) {
102109
d, err := disk.IOCounters()
103110
if err != nil {
104111
return nil, err
@@ -119,10 +126,10 @@ func GetGoPsutilDiskTotals() ([]map[string]any, error) {
119126
retMap["read_count"] = reads
120127
retMap["write_count"] = writes
121128

122-
return []map[string]any{retMap}, nil
129+
return metrics.Measurements{retMap}, nil
123130
}
124131

125-
func GetLoadAvgLocal() ([]map[string]any, error) {
132+
func GetLoadAvgLocal() (metrics.Measurements, error) {
126133
la, err := load.Avg()
127134
if err != nil {
128135
return nil, err
@@ -133,122 +140,52 @@ func GetLoadAvgLocal() ([]map[string]any, error) {
133140
row["load_5min"] = la.Load5
134141
row["load_15min"] = la.Load15
135142

136-
return []map[string]any{row}, nil
143+
return metrics.Measurements{row}, nil
137144
}
138145

139146
func CheckFolderExistsAndReadable(path string) bool {
147+
if path == "" {
148+
return false
149+
}
140150
_, err := os.ReadDir(path)
141151
return err == nil
142152
}
143153

144-
func GetGoPsutilDiskPG(DataDirs, TblspaceDirs []map[string]any) ([]map[string]any, error) {
145-
var ddDevice, ldDevice, walDevice uint64
146-
147-
data := DataDirs
148-
149-
dataDirPath := data[0]["dd"].(string)
150-
ddUsage, err := disk.Usage(dataDirPath)
151-
if err != nil {
152-
return nil, err
153-
}
154-
155-
retRows := make([]map[string]any, 0)
154+
func GetGoPsutilDiskPG(pgDirs metrics.Measurements) (metrics.Measurements, error) {
155+
usageCache := make(map[uint64]*disk.UsageStat)
156+
retRows := make(metrics.Measurements, 0)
156157
epochNs := time.Now().UnixNano()
157-
dd := metrics.NewMeasurement(epochNs)
158-
dd["tag_dir_or_tablespace"] = "data_directory"
159-
dd["tag_path"] = dataDirPath
160-
dd["total"] = float64(ddUsage.Total)
161-
dd["used"] = float64(ddUsage.Used)
162-
dd["free"] = float64(ddUsage.Free)
163-
dd["percent"] = math.Round(100*ddUsage.UsedPercent) / 100
164-
retRows = append(retRows, dd)
165-
166-
ddDevice, err = GetPathUnderlyingDeviceID(dataDirPath)
167-
if err != nil {
168-
return nil, err
169-
}
170158

171-
logDirPath := data[0]["ld"].(string)
172-
if !strings.HasPrefix(logDirPath, "/") {
173-
logDirPath = path.Join(dataDirPath, logDirPath)
174-
}
175-
if len(logDirPath) > 0 && CheckFolderExistsAndReadable(logDirPath) { // syslog etc considered out of scope
176-
ldDevice, err = GetPathUnderlyingDeviceID(logDirPath)
177-
if err != nil {
178-
return nil, err
179-
}
180-
if ldDevice != ddDevice { // no point to report same data in case of single folder configuration
181-
ld := metrics.NewMeasurement(epochNs)
182-
ldUsage, err := disk.Usage(logDirPath)
183-
if err != nil {
184-
return nil, err
185-
}
159+
for _, row := range pgDirs {
160+
path := row["path"].(string)
161+
name := row["name"].(string)
186162

187-
ld["tag_dir_or_tablespace"] = "log_directory"
188-
ld["tag_path"] = logDirPath
189-
ld["total"] = float64(ldUsage.Total)
190-
ld["used"] = float64(ldUsage.Used)
191-
ld["free"] = float64(ldUsage.Free)
192-
ld["percent"] = math.Round(100*ldUsage.UsedPercent) / 100
193-
retRows = append(retRows, ld)
163+
if !CheckFolderExistsAndReadable(path) { // syslog etc considered out of scope
164+
continue
194165
}
195-
}
196-
197-
var walDirPath string
198-
if CheckFolderExistsAndReadable(path.Join(dataDirPath, "pg_wal")) {
199-
walDirPath = path.Join(dataDirPath, "pg_wal")
200-
}
201166

202-
if len(walDirPath) > 0 {
203-
walDevice, err = GetPathUnderlyingDeviceID(walDirPath)
167+
devID, err := GetPathUnderlyingDeviceID(path)
204168
if err != nil {
205169
return nil, err
206170
}
207171

208-
if walDevice != ddDevice || walDevice != ldDevice { // no point to report same data in case of single folder configuration
209-
walUsage, err := disk.Usage(walDirPath)
172+
usage, ok := usageCache[devID]
173+
if !ok {
174+
usage, err = disk.Usage(path)
210175
if err != nil {
211176
return nil, err
212177
}
213-
214-
wd := metrics.NewMeasurement(epochNs)
215-
wd["tag_dir_or_tablespace"] = "pg_wal"
216-
wd["tag_path"] = walDirPath
217-
wd["total"] = float64(walUsage.Total)
218-
wd["used"] = float64(walUsage.Used)
219-
wd["free"] = float64(walUsage.Free)
220-
wd["percent"] = math.Round(100*walUsage.UsedPercent) / 100
221-
retRows = append(retRows, wd)
178+
usageCache[devID] = usage
222179
}
223-
}
224180

225-
data = TblspaceDirs
226-
if len(data) > 0 {
227-
for _, row := range data {
228-
tsPath := row["location"].(string)
229-
tsName := row["name"].(string)
230-
231-
tsDevice, err := GetPathUnderlyingDeviceID(tsPath)
232-
if err != nil {
233-
return nil, err
234-
}
235-
236-
if tsDevice == ddDevice || tsDevice == ldDevice || tsDevice == walDevice {
237-
continue
238-
}
239-
tsUsage, err := disk.Usage(tsPath)
240-
if err != nil {
241-
return nil, err
242-
}
243-
ts := metrics.NewMeasurement(epochNs)
244-
ts["tag_dir_or_tablespace"] = tsName
245-
ts["tag_path"] = tsPath
246-
ts["total"] = float64(tsUsage.Total)
247-
ts["used"] = float64(tsUsage.Used)
248-
ts["free"] = float64(tsUsage.Free)
249-
ts["percent"] = math.Round(100*tsUsage.UsedPercent) / 100
250-
retRows = append(retRows, ts)
251-
}
181+
m := metrics.NewMeasurement(epochNs)
182+
m["tag_dir_or_tablespace"] = name
183+
m["tag_path"] = path
184+
m["total"] = float64(usage.Total)
185+
m["used"] = float64(usage.Used)
186+
m["free"] = float64(usage.Free)
187+
m["percent"] = math.Round(100*usage.UsedPercent) / 100
188+
retRows = append(retRows, m)
252189
}
253190

254191
return retRows, nil

internal/reaper/psutil_darwin.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package reaper
22

3-
import "errors"
3+
import "hash/fnv"
44

5-
var ErrNotImplemented = errors.New("not implemented")
6-
7-
func GetPathUnderlyingDeviceID(path string) (uint64, error) {
8-
return 0, ErrNotImplemented
5+
// GetPathUnderlyingDeviceID on Darwin falls back to an FNV hash of the path.
6+
// Identical paths still get the same ID, so deduplication works correctly.
7+
func GetPathUnderlyingDeviceID(p string) (uint64, error) {
8+
h := fnv.New64a()
9+
_, _ = h.Write([]byte(p))
10+
return h.Sum64(), nil
911
}

0 commit comments

Comments
 (0)