@@ -2,17 +2,19 @@ package reaper
22
33import (
44 "context"
5+ "errors"
56 "os"
67 "path/filepath"
78 "testing"
9+ "time"
810
911 "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts"
1012 "github.com/cybertec-postgresql/pgwatch/v5/internal/log"
1113 "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics"
1214 "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks"
1315 "github.com/cybertec-postgresql/pgwatch/v5/internal/sources"
1416 "github.com/cybertec-postgresql/pgwatch/v5/internal/testutil"
15- "github.com/pashagolub/pgxmock/v4"
17+ "github.com/pashagolub/pgxmock/v4"
1618 "github.com/stretchr/testify/assert"
1719 "github.com/stretchr/testify/require"
1820)
@@ -100,14 +102,14 @@ func TestReaper_LoadSources(t *testing.T) {
100102
101103 t .Run ("Test source config changes trigger restart" , func (t * testing.T ) {
102104 baseSource := sources.Source {
103- Name : "TestSource" ,
104- IsEnabled : true ,
105- Kind : sources .SourcePostgres ,
106- ConnStr : "postgres://localhost:5432/testdb" ,
107- Metrics : map [string ]float64 {"cpu" : 10 , "memory" : 20 },
108- MetricsStandby : map [string ]float64 {"cpu" : 30 },
109- CustomTags : map [string ]string {"env" : "test" },
110- Group : "default" ,
105+ Name : "TestSource" ,
106+ IsEnabled : true ,
107+ Kind : sources .SourcePostgres ,
108+ ConnStr : "postgres://localhost:5432/testdb" ,
109+ Metrics : map [string ]float64 {"cpu" : 10 , "memory" : 20 },
110+ MetricsStandby : map [string ]float64 {"cpu" : 30 },
111+ CustomTags : map [string ]string {"env" : "test" },
112+ Group : "default" ,
111113 }
112114
113115 testCases := []struct {
@@ -331,3 +333,265 @@ func TestReaper_LoadSources(t *testing.T) {
331333 assert .Nil (t , mockConn1 .ExpectationsWereMet (), "Expected all mock expectations to be met" )
332334 })
333335}
336+
337+ func newFetchMetricReaper () * Reaper {
338+ return & Reaper {
339+ Options : & cmdopts.Options {
340+ Metrics : metrics.CmdOpts {},
341+ Sinks : sinks.CmdOpts {},
342+ },
343+ measurementCache : NewInstanceMetricCache (),
344+ }
345+ }
346+
347+ func TestReaper_FetchMetric (t * testing.T ) {
348+ ctx := log .WithLogger (t .Context (), log .NewNoopLogger ())
349+
350+ t .Run ("metric not found in definitions" , func (t * testing.T ) {
351+ r := newFetchMetricReaper ()
352+ md , mock := createTestSourceConn (t )
353+ defer mock .Close ()
354+
355+ env , err := r .FetchMetric (ctx , md , "nonexistent_metric_xyz" )
356+ assert .ErrorIs (t , err , metrics .ErrMetricNotFound )
357+ assert .Nil (t , env )
358+ assert .NoError (t , mock .ExpectationsWereMet ())
359+ })
360+
361+ t .Run ("primary-only metric skipped on standby" , func (t * testing.T ) {
362+ r := newFetchMetricReaper ()
363+ metricDefs .MetricDefs ["primary_only_metric" ] = metrics.Metric {
364+ SQLs : metrics.SQLs {0 : "SELECT 1" },
365+ NodeStatus : "primary" ,
366+ }
367+ md , mock := createTestSourceConn (t )
368+ defer mock .Close ()
369+ md .IsInRecovery = true
370+
371+ env , err := r .FetchMetric (ctx , md , "primary_only_metric" )
372+ assert .NoError (t , err )
373+ assert .Nil (t , env )
374+ assert .NoError (t , mock .ExpectationsWereMet ())
375+ })
376+
377+ t .Run ("standby-only metric skipped on primary" , func (t * testing.T ) {
378+ r := newFetchMetricReaper ()
379+ metricDefs .MetricDefs ["standby_only_metric" ] = metrics.Metric {
380+ SQLs : metrics.SQLs {0 : "SELECT 1" },
381+ NodeStatus : "standby" ,
382+ }
383+ md , mock := createTestSourceConn (t )
384+ defer mock .Close ()
385+ md .IsInRecovery = false
386+
387+ env , err := r .FetchMetric (ctx , md , "standby_only_metric" )
388+ assert .NoError (t , err )
389+ assert .Nil (t , env )
390+ assert .NoError (t , mock .ExpectationsWereMet ())
391+ })
392+
393+ t .Run ("default metric with no SQL for version returns nil" , func (t * testing.T ) {
394+ r := newFetchMetricReaper ()
395+ metricDefs .MetricDefs ["no_sql_metric" ] = metrics.Metric {
396+ SQLs : metrics.SQLs {}, // no SQL defined
397+ }
398+ md , mock := createTestSourceConn (t )
399+ defer mock .Close ()
400+
401+ env , err := r .FetchMetric (ctx , md , "no_sql_metric" )
402+ assert .NoError (t , err )
403+ assert .Nil (t , env )
404+ assert .NoError (t , mock .ExpectationsWereMet ())
405+ })
406+
407+ t .Run ("default metric query success returns envelope" , func (t * testing.T ) {
408+ r := newFetchMetricReaper ()
409+ metricDefs .MetricDefs ["test_metric" ] = metrics.Metric {
410+ SQLs : metrics.SQLs {0 : "SELECT 1" },
411+ }
412+ md , mock := createTestSourceConn (t )
413+ defer mock .Close ()
414+ md .Name = "mydb"
415+ md .CustomTags = map [string ]string {"env" : "prod" }
416+
417+ rows := pgxmock .NewRows ([]string {"epoch_ns" , "value" }).
418+ AddRow (time .Now ().UnixNano (), int64 (42 ))
419+ mock .ExpectQuery ("SELECT 1" ).WillReturnRows (rows )
420+
421+ env , err := r .FetchMetric (ctx , md , "test_metric" )
422+ require .NoError (t , err )
423+ require .NotNil (t , env )
424+ assert .Equal (t , "mydb" , env .DBName )
425+ assert .Equal (t , "test_metric" , env .MetricName )
426+ assert .Len (t , env .Data , 1 )
427+ assert .Equal (t , map [string ]string {"env" : "prod" }, env .CustomTags )
428+ assert .NoError (t , mock .ExpectationsWereMet ())
429+ })
430+
431+ t .Run ("default metric query error returns error" , func (t * testing.T ) {
432+ r := newFetchMetricReaper ()
433+ metricDefs .MetricDefs ["error_metric" ] = metrics.Metric {
434+ SQLs : metrics.SQLs {0 : "SELECT fail" },
435+ }
436+ md , mock := createTestSourceConn (t )
437+ defer mock .Close ()
438+
439+ mock .ExpectQuery ("SELECT fail" ).WillReturnError (assert .AnError )
440+
441+ env , err := r .FetchMetric (ctx , md , "error_metric" )
442+ assert .Error (t , err )
443+ assert .Nil (t , env )
444+ assert .NoError (t , mock .ExpectationsWereMet ())
445+ })
446+
447+ t .Run ("default metric query returns empty rows" , func (t * testing.T ) {
448+ r := newFetchMetricReaper ()
449+ metricDefs .MetricDefs ["empty_metric" ] = metrics.Metric {
450+ SQLs : metrics.SQLs {0 : "SELECT empty" },
451+ }
452+ md , mock := createTestSourceConn (t )
453+ defer mock .Close ()
454+
455+ mock .ExpectQuery ("SELECT empty" ).WillReturnRows (pgxmock .NewRows ([]string {"epoch_ns" }))
456+
457+ env , err := r .FetchMetric (ctx , md , "empty_metric" )
458+ assert .NoError (t , err )
459+ assert .Nil (t , env )
460+ assert .NoError (t , mock .ExpectationsWereMet ())
461+ })
462+
463+ t .Run ("storage name used as metric name in envelope" , func (t * testing.T ) {
464+ r := newFetchMetricReaper ()
465+ metricDefs .MetricDefs ["logical_metric" ] = metrics.Metric {
466+ SQLs : metrics.SQLs {0 : "SELECT 1" },
467+ StorageName : "physical_metric" ,
468+ }
469+ md , mock := createTestSourceConn (t )
470+ defer mock .Close ()
471+
472+ rows := pgxmock .NewRows ([]string {"epoch_ns" , "v" }).
473+ AddRow (time .Now ().UnixNano (), int64 (1 ))
474+ mock .ExpectQuery ("SELECT 1" ).WillReturnRows (rows )
475+
476+ env , err := r .FetchMetric (ctx , md , "logical_metric" )
477+ require .NoError (t , err )
478+ require .NotNil (t , env )
479+ assert .Equal (t , "physical_metric" , env .MetricName )
480+ assert .NoError (t , mock .ExpectationsWereMet ())
481+ })
482+
483+ t .Run ("instance_up special metric returns envelope via GetInstanceUpMeasurement" , func (t * testing.T ) {
484+ r := newFetchMetricReaper ()
485+ metricDefs .MetricDefs [specialMetricInstanceUp ] = metrics.Metric {
486+ SQLs : metrics.SQLs {0 : "SELECT 1" },
487+ }
488+ md , mock := createTestSourceConn (t )
489+ defer mock .Close ()
490+ mock .ExpectPing ()
491+
492+ env , err := r .FetchMetric (ctx , md , specialMetricInstanceUp )
493+ require .NoError (t , err )
494+ require .NotNil (t , env )
495+ assert .Equal (t , specialMetricInstanceUp , env .MetricName )
496+ assert .Len (t , env .Data , 1 )
497+ assert .Equal (t , 1 , env .Data [0 ][specialMetricInstanceUp ])
498+ assert .NoError (t , mock .ExpectationsWereMet ())
499+ })
500+
501+ t .Run ("change_events special metric returns nil when no changes detected" , func (t * testing.T ) {
502+ r := newFetchMetricReaper ()
503+ metricDefs .MetricDefs [specialMetricChangeEvents ] = metrics.Metric {
504+ SQLs : metrics.SQLs {0 : "SELECT 1" },
505+ }
506+ // Remove all hash metric definitions so detection functions return early
507+ delete (metricDefs .MetricDefs , "sproc_hashes" )
508+ delete (metricDefs .MetricDefs , "table_hashes" )
509+ delete (metricDefs .MetricDefs , "index_hashes" )
510+ delete (metricDefs .MetricDefs , "configuration_hashes" )
511+ delete (metricDefs .MetricDefs , "privilege_hashes" )
512+
513+ md , mock := createTestSourceConn (t )
514+ defer mock .Close ()
515+
516+ env , err := r .FetchMetric (ctx , md , specialMetricChangeEvents )
517+ assert .NoError (t , err )
518+ assert .Nil (t , env , "expected nil envelope when no changes detected" )
519+ assert .NoError (t , mock .ExpectationsWereMet ())
520+ })
521+
522+ t .Run ("cache hit serves data without querying DB" , func (t * testing.T ) {
523+ r := newFetchMetricReaper ()
524+ r .Metrics .InstanceLevelCacheMaxSeconds = 30
525+
526+ metricDefs .MetricDefs ["cached_metric" ] = metrics.Metric {
527+ SQLs : metrics.SQLs {0 : "SELECT 1" },
528+ IsInstanceLevel : true ,
529+ }
530+ md , mock := createTestSourceConn (t )
531+ defer mock .Close ()
532+ md .Metrics = map [string ]float64 {"cached_metric" : 10 }
533+
534+ // Pre-populate the cache
535+ cachedData := metrics.Measurements {
536+ metrics.Measurement {
537+ metrics .EpochColumnName : time .Now ().UnixNano (),
538+ "value" : int64 (99 ),
539+ },
540+ }
541+ cacheKey := md .GetClusterIdentifier () + ":cached_metric"
542+ r .measurementCache .Put (cacheKey , cachedData )
543+
544+ // No DB query expected
545+ env , err := r .FetchMetric (ctx , md , "cached_metric" )
546+ require .NoError (t , err )
547+ require .NotNil (t , env )
548+ assert .Equal (t , "cached_metric" , env .MetricName )
549+ assert .Len (t , env .Data , 1 )
550+ assert .NoError (t , mock .ExpectationsWereMet ())
551+ })
552+
553+ t .Run ("sysinfo fields added to measurements" , func (t * testing.T ) {
554+ r := newFetchMetricReaper ()
555+ r .Sinks .RealDbnameField = "real_dbname"
556+ r .Sinks .SystemIdentifierField = "sys_id"
557+ metricDefs .MetricDefs ["sysinfo_metric" ] = metrics.Metric {
558+ SQLs : metrics.SQLs {0 : "SELECT sysinfo" },
559+ }
560+ md , mock := createTestSourceConn (t )
561+ defer mock .Close ()
562+ md .RealDbname = "realdb"
563+ md .SystemIdentifier = "42"
564+
565+ rows := pgxmock .NewRows ([]string {"epoch_ns" , "v" }).
566+ AddRow (time .Now ().UnixNano (), int64 (1 ))
567+ mock .ExpectQuery ("SELECT sysinfo" ).WillReturnRows (rows )
568+
569+ env , err := r .FetchMetric (ctx , md , "sysinfo_metric" )
570+ require .NoError (t , err )
571+ require .NotNil (t , env )
572+ assert .Equal (t , "realdb" , env .Data [0 ]["real_dbname" ])
573+ assert .Equal (t , "42" , env .Data [0 ]["sys_id" ])
574+ assert .NoError (t , mock .ExpectationsWereMet ())
575+ })
576+ }
577+
578+ type mockErr string
579+
580+ func (m mockErr ) SyncMetric (string , string , sinks.SyncOp ) error {
581+ return errors .New (string (m ))
582+ }
583+
584+ func (m mockErr ) Write (metrics.MeasurementEnvelope ) error {
585+ return errors .New (string (m ))
586+ }
587+
588+ func TestWriteMeasurements (t * testing.T ) {
589+ ctx , cancel := context .WithCancel (log .WithLogger (t .Context (), log .NewNoopLogger ()))
590+ defer cancel ()
591+ var err mockErr = "write error"
592+ r := NewReaper (ctx , & cmdopts.Options {
593+ SinksWriter : err ,
594+ })
595+ go r .WriteMeasurements (ctx )
596+ r .WriteInstanceDown (& sources.SourceConn {})
597+ }
0 commit comments