From d4d2a3f1482b2d06ea0614c258b78d31d5aa3ad0 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Thu, 30 Apr 2026 12:43:04 +0200 Subject: [PATCH 1/2] [+] improve `Reaper` test coverage --- internal/reaper/reaper.go | 4 +- internal/reaper/reaper_test.go | 388 +++++++++++++++++++++++++++------ 2 files changed, 320 insertions(+), 72 deletions(-) diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index 3dadb602e1..fdaf7fa3d2 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -250,9 +250,7 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDown map[str var md *sources.SourceConn var dbRemovedFromConfig bool var metricRemovedFromPreset bool - splits := strings.Split(dbMetric, dbMetricJoinStr) - db := splits[0] - metric := splits[1] + db, metric, _ := strings.Cut(dbMetric, dbMetricJoinStr) _, wholeDbShutDown := hostsToShutDown[db] if !wholeDbShutDown { diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index 0ff3b4acdb..46c7d34d27 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -23,25 +23,28 @@ func TestReaper_LoadSources(t *testing.T) { ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) t.Run("Test pause trigger file", func(t *testing.T) { + a := assert.New(t) pausefile := filepath.Join(t.TempDir(), "pausefile") require.NoError(t, os.WriteFile(pausefile, []byte("foo"), 0644)) r := NewReaper(ctx, &cmdopts.Options{Metrics: metrics.CmdOpts{EmergencyPauseTriggerfile: pausefile}}) - assert.NoError(t, r.LoadSources(ctx)) - assert.True(t, len(r.monitoredSources) == 0, "Expected no monitored sources when pause trigger file exists") + a.NoError(r.LoadSources(ctx)) + a.True(len(r.monitoredSources) == 0, "Expected no monitored sources when pause trigger file exists") }) t.Run("Test SyncFromReader errror", func(t *testing.T) { + a := assert.New(t) reader := &testutil.MockSourcesReaderWriter{ GetSourcesFunc: func() (sources.Sources, error) { return nil, assert.AnError }, } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader}) - assert.Error(t, r.LoadSources(ctx)) - assert.Equal(t, 0, len(r.monitoredSources), "Expected no monitored sources after error") + a.Error(r.LoadSources(ctx)) + a.Equal(0, len(r.monitoredSources), "Expected no monitored sources after error") }) t.Run("Test SyncFromReader success", func(t *testing.T) { + a := assert.New(t) source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres} source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres} reader := &testutil.MockSourcesReaderWriter{ @@ -51,13 +54,14 @@ func TestReaper_LoadSources(t *testing.T) { } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader}) - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after successful load") - assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source1.Name)) - assert.NotNil(t, r.monitoredSources.GetMonitoredDatabase(source2.Name)) + a.NoError(r.LoadSources(ctx)) + a.Equal(2, len(r.monitoredSources), "Expected two monitored sources after successful load") + a.NotNil(r.monitoredSources.GetMonitoredDatabase(source1.Name)) + a.NotNil(r.monitoredSources.GetMonitoredDatabase(source2.Name)) }) t.Run("Test repeated load", func(t *testing.T) { + a := assert.New(t) source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres} source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres} reader := &testutil.MockSourcesReaderWriter{ @@ -67,15 +71,16 @@ func TestReaper_LoadSources(t *testing.T) { } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: reader}) - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored sources after first load") + a.NoError(r.LoadSources(ctx)) + a.Equal(2, len(r.monitoredSources), "Expected two monitored sources after first load") // Load again with the same sources - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 2, len(r.monitoredSources), "Expected still two monitored sources after second load") + a.NoError(r.LoadSources(ctx)) + a.Equal(2, len(r.monitoredSources), "Expected still two monitored sources after second load") }) t.Run("Test group limited sources", func(t *testing.T) { + a := assert.New(t) source1 := sources.Source{Name: "Source 1", IsEnabled: true, Kind: sources.SourcePostgres, Group: ""} source2 := sources.Source{Name: "Source 2", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group1"} source3 := sources.Source{Name: "Source 3", IsEnabled: true, Kind: sources.SourcePostgres, Group: "group1"} @@ -88,16 +93,16 @@ func TestReaper_LoadSources(t *testing.T) { } r := NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1", "group2"}}}) - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 3, len(r.monitoredSources), "Expected three monitored sources after load") + a.NoError(r.LoadSources(ctx)) + a.Equal(3, len(r.monitoredSources), "Expected three monitored sources after load") r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader, Sources: sources.CmdOpts{Groups: []string{"group1"}}}) - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 2, len(r.monitoredSources), "Expected two monitored source after group filtering") + a.NoError(r.LoadSources(ctx)) + a.Equal(2, len(r.monitoredSources), "Expected two monitored source after group filtering") r = NewReaper(ctx, &cmdopts.Options{SourcesReaderWriter: newReader}) - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 5, len(r.monitoredSources), "Expected five monitored sources after resetting groups") + a.NoError(r.LoadSources(ctx)) + a.Equal(5, len(r.monitoredSources), "Expected five monitored sources after resetting groups") }) t.Run("Test source config changes trigger restart", func(t *testing.T) { @@ -219,6 +224,7 @@ func TestReaper_LoadSources(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + a := assert.New(t) initialSource := *baseSource.Clone() initialReader := &testutil.MockSourcesReaderWriter{ GetSourcesFunc: func() (sources.Sources, error) { @@ -230,8 +236,8 @@ func TestReaper_LoadSources(t *testing.T) { SourcesReaderWriter: initialReader, SinksWriter: &sinks.MultiWriter{}, }) - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after initial load") + a.NoError(r.LoadSources(ctx)) + a.Equal(1, len(r.monitoredSources), "Expected one monitored source after initial load") mockConn, err := pgxmock.NewPool() require.NoError(t, err) @@ -259,17 +265,17 @@ func TestReaper_LoadSources(t *testing.T) { r.SourcesReaderWriter = modifiedReader // Reload sources - assert.NoError(t, r.LoadSources(ctx)) - assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload") - assert.Equal(t, modifiedSource, r.monitoredSources[0].Source) + a.NoError(r.LoadSources(ctx)) + a.Equal(1, len(r.monitoredSources), "Expected one monitored source after reload") + a.Equal(modifiedSource, r.monitoredSources[0].Source) for metric := range initialSource.Metrics { dbMetric := initialSource.Name + "¤¤¤" + metric - assert.Equal(t, tc.expectCancel, cancelCalled[dbMetric]) + a.Equal(tc.expectCancel, cancelCalled[dbMetric]) if tc.expectCancel { - assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met") + a.Nil(mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met") _, exists := r.cancelFuncs[dbMetric] - assert.False(t, exists, "Expected cancel func to be removed from map after cancellation") + a.False(exists, "Expected cancel func to be removed from map after cancellation") } } }) @@ -277,6 +283,7 @@ func TestReaper_LoadSources(t *testing.T) { }) t.Run("Test only changed source cancelled in multi-source setup", func(t *testing.T) { + a := assert.New(t) source1 := sources.Source{ Name: "Source1", IsEnabled: true, @@ -302,7 +309,7 @@ func TestReaper_LoadSources(t *testing.T) { SourcesReaderWriter: initialReader, SinksWriter: &sinks.MultiWriter{}, }) - assert.NoError(t, r.LoadSources(ctx)) + a.NoError(r.LoadSources(ctx)) // Set mock connections for both sources to avoid nil pointer on Close() mockConn1, err := pgxmock.NewPool() @@ -326,11 +333,11 @@ func TestReaper_LoadSources(t *testing.T) { } r.SourcesReaderWriter = modifiedReader - assert.NoError(t, r.LoadSources(ctx)) + a.NoError(r.LoadSources(ctx)) - assert.True(t, source1Cancelled, "Source1 should be cancelled due to config change") - assert.False(t, source2Cancelled, "Source2 should NOT be cancelled as it was not modified") - assert.Nil(t, mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met") + a.True(source1Cancelled, "Source1 should be cancelled due to config change") + a.False(source2Cancelled, "Source2 should NOT be cancelled as it was not modified") + a.Nil(mockConn1.ExpectationsWereMet(), "Expected all mock expectations to be met") }) } @@ -348,17 +355,19 @@ func TestReaper_FetchMetric(t *testing.T) { ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) t.Run("metric not found in definitions", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() md, mock := createTestSourceConn(t) defer mock.Close() env, err := r.FetchMetric(ctx, md, "nonexistent_metric_xyz") - assert.ErrorIs(t, err, metrics.ErrMetricNotFound) - assert.Nil(t, env) - assert.NoError(t, mock.ExpectationsWereMet()) + a.ErrorIs(err, metrics.ErrMetricNotFound) + a.Nil(env) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("primary-only metric skipped on standby", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["primary_only_metric"] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT 1"}, @@ -369,12 +378,13 @@ func TestReaper_FetchMetric(t *testing.T) { md.IsInRecovery = true env, err := r.FetchMetric(ctx, md, "primary_only_metric") - assert.NoError(t, err) - assert.Nil(t, env) - assert.NoError(t, mock.ExpectationsWereMet()) + a.NoError(err) + a.Nil(env) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("standby-only metric skipped on primary", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["standby_only_metric"] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT 1"}, @@ -385,12 +395,13 @@ func TestReaper_FetchMetric(t *testing.T) { md.IsInRecovery = false env, err := r.FetchMetric(ctx, md, "standby_only_metric") - assert.NoError(t, err) - assert.Nil(t, env) - assert.NoError(t, mock.ExpectationsWereMet()) + a.NoError(err) + a.Nil(env) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("default metric with no SQL for version returns nil", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["no_sql_metric"] = metrics.Metric{ SQLs: metrics.SQLs{}, // no SQL defined @@ -399,12 +410,13 @@ func TestReaper_FetchMetric(t *testing.T) { defer mock.Close() env, err := r.FetchMetric(ctx, md, "no_sql_metric") - assert.NoError(t, err) - assert.Nil(t, env) - assert.NoError(t, mock.ExpectationsWereMet()) + a.NoError(err) + a.Nil(env) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("default metric query success returns envelope", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["test_metric"] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT 1"}, @@ -421,14 +433,15 @@ func TestReaper_FetchMetric(t *testing.T) { env, err := r.FetchMetric(ctx, md, "test_metric") require.NoError(t, err) require.NotNil(t, env) - assert.Equal(t, "mydb", env.DBName) - assert.Equal(t, "test_metric", env.MetricName) - assert.Len(t, env.Data, 1) - assert.Equal(t, map[string]string{"env": "prod"}, env.CustomTags) - assert.NoError(t, mock.ExpectationsWereMet()) + a.Equal("mydb", env.DBName) + a.Equal("test_metric", env.MetricName) + a.Len(env.Data, 1) + a.Equal(map[string]string{"env": "prod"}, env.CustomTags) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("default metric query error returns error", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["error_metric"] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT fail"}, @@ -439,12 +452,13 @@ func TestReaper_FetchMetric(t *testing.T) { mock.ExpectQuery("SELECT fail").WillReturnError(assert.AnError) env, err := r.FetchMetric(ctx, md, "error_metric") - assert.Error(t, err) - assert.Nil(t, env) - assert.NoError(t, mock.ExpectationsWereMet()) + a.Error(err) + a.Nil(env) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("default metric query returns empty rows", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["empty_metric"] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT empty"}, @@ -455,12 +469,13 @@ func TestReaper_FetchMetric(t *testing.T) { mock.ExpectQuery("SELECT empty").WillReturnRows(pgxmock.NewRows([]string{"epoch_ns"})) env, err := r.FetchMetric(ctx, md, "empty_metric") - assert.NoError(t, err) - assert.Nil(t, env) - assert.NoError(t, mock.ExpectationsWereMet()) + a.NoError(err) + a.Nil(env) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("storage name used as metric name in envelope", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs["logical_metric"] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT 1"}, @@ -476,11 +491,12 @@ func TestReaper_FetchMetric(t *testing.T) { env, err := r.FetchMetric(ctx, md, "logical_metric") require.NoError(t, err) require.NotNil(t, env) - assert.Equal(t, "physical_metric", env.MetricName) - assert.NoError(t, mock.ExpectationsWereMet()) + a.Equal("physical_metric", env.MetricName) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("instance_up special metric returns envelope via GetInstanceUpMeasurement", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT 1"}, @@ -492,13 +508,14 @@ func TestReaper_FetchMetric(t *testing.T) { env, err := r.FetchMetric(ctx, md, specialMetricInstanceUp) require.NoError(t, err) require.NotNil(t, env) - assert.Equal(t, specialMetricInstanceUp, env.MetricName) - assert.Len(t, env.Data, 1) - assert.Equal(t, 1, env.Data[0][specialMetricInstanceUp]) - assert.NoError(t, mock.ExpectationsWereMet()) + a.Equal(specialMetricInstanceUp, env.MetricName) + a.Len(env.Data, 1) + a.Equal(1, env.Data[0][specialMetricInstanceUp]) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("change_events special metric returns nil when no changes detected", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{ SQLs: metrics.SQLs{0: "SELECT 1"}, @@ -514,12 +531,13 @@ func TestReaper_FetchMetric(t *testing.T) { defer mock.Close() env, err := r.FetchMetric(ctx, md, specialMetricChangeEvents) - assert.NoError(t, err) - assert.Nil(t, env, "expected nil envelope when no changes detected") - assert.NoError(t, mock.ExpectationsWereMet()) + a.NoError(err) + a.Nil(env, "expected nil envelope when no changes detected") + a.NoError(mock.ExpectationsWereMet()) }) t.Run("cache hit serves data without querying DB", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() r.Metrics.InstanceLevelCacheMaxSeconds = 30 @@ -545,12 +563,13 @@ func TestReaper_FetchMetric(t *testing.T) { env, err := r.FetchMetric(ctx, md, "cached_metric") require.NoError(t, err) require.NotNil(t, env) - assert.Equal(t, "cached_metric", env.MetricName) - assert.Len(t, env.Data, 1) - assert.NoError(t, mock.ExpectationsWereMet()) + a.Equal("cached_metric", env.MetricName) + a.Len(env.Data, 1) + a.NoError(mock.ExpectationsWereMet()) }) t.Run("sysinfo fields added to measurements", func(t *testing.T) { + a := assert.New(t) r := newFetchMetricReaper() r.Sinks.RealDbnameField = "real_dbname" r.Sinks.SystemIdentifierField = "sys_id" @@ -569,9 +588,9 @@ func TestReaper_FetchMetric(t *testing.T) { env, err := r.FetchMetric(ctx, md, "sysinfo_metric") require.NoError(t, err) require.NotNil(t, env) - assert.Equal(t, "realdb", env.Data[0]["real_dbname"]) - assert.Equal(t, "42", env.Data[0]["sys_id"]) - assert.NoError(t, mock.ExpectationsWereMet()) + a.Equal("realdb", env.Data[0]["real_dbname"]) + a.Equal("42", env.Data[0]["sys_id"]) + a.NoError(mock.ExpectationsWereMet()) }) } @@ -595,3 +614,234 @@ func TestWriteMeasurements(t *testing.T) { go r.WriteMeasurements(ctx) r.WriteInstanceDown("foo") } + +func TestReaper_Ready(t *testing.T) { + a := assert.New(t) + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + r := NewReaper(ctx, &cmdopts.Options{}) + a.False(r.Ready()) + r.ready.Store(true) + a.True(r.Ready()) +} + +func TestReaper_WriteInstanceDown(t *testing.T) { + a := assert.New(t) + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + r := NewReaper(ctx, &cmdopts.Options{}) + r.WriteInstanceDown("testdb") + select { + case msg := <-r.measurementCh: + a.Equal("testdb", msg.DBName) + a.Equal(specialMetricInstanceUp, msg.MetricName) + require.Len(t, msg.Data, 1) + a.Equal(0, msg.Data[0][specialMetricInstanceUp]) + default: + t.Error("expected message in measurementCh") + } +} + +func TestReaper_AddSysinfoToMeasurements(t *testing.T) { + t.Run("adds real dbname and system identifier fields", func(t *testing.T) { + a := assert.New(t) + r := &Reaper{ + Options: &cmdopts.Options{ + Sinks: sinks.CmdOpts{ + RealDbnameField: "real_dbname", + SystemIdentifierField: "sys_id", + }, + }, + } + md := &sources.SourceConn{ + RuntimeInfo: sources.RuntimeInfo{ + RealDbname: "realdb", + SystemIdentifier: "12345", + }, + } + data := metrics.Measurements{metrics.Measurement{}} + r.AddSysinfoToMeasurements(data, md) + a.Equal("realdb", data[0]["real_dbname"]) + a.Equal("12345", data[0]["sys_id"]) + }) + + t.Run("skips fields when config field names are empty", func(t *testing.T) { + a := assert.New(t) + r := &Reaper{Options: &cmdopts.Options{}} + md := &sources.SourceConn{ + RuntimeInfo: sources.RuntimeInfo{ + RealDbname: "realdb", + SystemIdentifier: "12345", + }, + } + data := metrics.Measurements{metrics.Measurement{}} + r.AddSysinfoToMeasurements(data, md) + a.NotContains(data[0], "real_dbname") + a.NotContains(data[0], "sys_id") + }) + + t.Run("skips fields when md values are empty", func(t *testing.T) { + a := assert.New(t) + r := &Reaper{ + Options: &cmdopts.Options{ + Sinks: sinks.CmdOpts{ + RealDbnameField: "real_dbname", + SystemIdentifierField: "sys_id", + }, + }, + } + md := &sources.SourceConn{} + data := metrics.Measurements{metrics.Measurement{}} + r.AddSysinfoToMeasurements(data, md) + a.NotContains(data[0], "real_dbname") + a.NotContains(data[0], "sys_id") + }) +} + +func TestReaper_ShutdownOldWorkers(t *testing.T) { + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + + t.Run("cancels worker for DB removed from config", func(t *testing.T) { + a := assert.New(t) + r := NewReaper(ctx, &cmdopts.Options{SinksWriter: &sinks.MultiWriter{}}) + cancelCalled := false + dbMetric := "testdb" + dbMetricJoinStr + "cpu" + r.cancelFuncs[dbMetric] = func() { cancelCalled = true } + + r.ShutdownOldWorkers(ctx, map[string]bool{}) + + a.True(cancelCalled) + a.NotContains(r.cancelFuncs, dbMetric) + }) + + t.Run("cancels worker for whole DB shutdown", func(t *testing.T) { + a := assert.New(t) + r := NewReaper(ctx, &cmdopts.Options{SinksWriter: &sinks.MultiWriter{}}) + cancelCalled := false + dbMetric := "testdb" + dbMetricJoinStr + "cpu" + r.cancelFuncs[dbMetric] = func() { cancelCalled = true } + + r.ShutdownOldWorkers(ctx, map[string]bool{"testdb": true}) + + a.True(cancelCalled) + a.NotContains(r.cancelFuncs, dbMetric) + }) + + t.Run("cancels worker for metric removed from preset", func(t *testing.T) { + a := assert.New(t) + r := NewReaper(ctx, &cmdopts.Options{SinksWriter: &sinks.MultiWriter{}}) + cancelCalled := false + dbMetric := "testdb" + dbMetricJoinStr + "cpu" + r.cancelFuncs[dbMetric] = func() { cancelCalled = true } + r.monitoredSources = sources.SourceConns{ + {Source: sources.Source{Name: "testdb", Metrics: metrics.MetricIntervals{"memory": 10}}}, + } + + r.ShutdownOldWorkers(ctx, map[string]bool{}) + + a.True(cancelCalled) + a.NotContains(r.cancelFuncs, dbMetric) + }) + + t.Run("keeps worker when metric is still active", func(t *testing.T) { + a := assert.New(t) + r := NewReaper(ctx, &cmdopts.Options{SinksWriter: &sinks.MultiWriter{}}) + cancelCalled := false + dbMetric := "testdb" + dbMetricJoinStr + "cpu" + r.cancelFuncs[dbMetric] = func() { cancelCalled = true } + r.monitoredSources = sources.SourceConns{ + {Source: sources.Source{Name: "testdb", Metrics: metrics.MetricIntervals{"cpu": 10}}}, + } + + r.ShutdownOldWorkers(ctx, map[string]bool{}) + + a.False(cancelCalled) + a.Contains(r.cancelFuncs, dbMetric) + }) + + t.Run("cancels all workers when context is cancelled", func(t *testing.T) { + a := assert.New(t) + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + r := NewReaper(ctx, &cmdopts.Options{SinksWriter: &sinks.MultiWriter{}}) + cancelCalled := false + dbMetric := "testdb" + dbMetricJoinStr + "cpu" + r.cancelFuncs[dbMetric] = func() { cancelCalled = true } + r.monitoredSources = sources.SourceConns{ + {Source: sources.Source{Name: "testdb", Metrics: metrics.MetricIntervals{"cpu": 10}}}, + } + + r.ShutdownOldWorkers(cancelledCtx, map[string]bool{}) + + a.True(cancelCalled) + }) +} + +func TestReaper_CreateSourceHelpers(t *testing.T) { + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + + t.Run("skips already initialized source", func(t *testing.T) { + r := NewReaper(ctx, &cmdopts.Options{}) + md := &sources.SourceConn{Source: sources.Source{Name: "existing"}} + r.prevLoopMonitoredDBs = sources.SourceConns{md} + // Conn is nil — would panic if used, proving early return + r.CreateSourceHelpers(ctx, r.logger, md) + }) + + t.Run("skips non-postgres source", func(t *testing.T) { + r := NewReaper(ctx, &cmdopts.Options{}) + md := &sources.SourceConn{Source: sources.Source{Name: "pgbouncer", Kind: sources.SourcePgBouncer}} + r.CreateSourceHelpers(ctx, r.logger, md) + }) + + t.Run("skips source in recovery", func(t *testing.T) { + r := NewReaper(ctx, &cmdopts.Options{}) + md := &sources.SourceConn{ + Source: sources.Source{Name: "standby"}, + RuntimeInfo: sources.RuntimeInfo{IsInRecovery: true}, + } + r.CreateSourceHelpers(ctx, r.logger, md) + }) + + t.Run("creates extensions when configured", func(t *testing.T) { + a := assert.New(t) + r := NewReaper(ctx, &cmdopts.Options{ + Sources: sources.CmdOpts{TryCreateListedExtsIfMissing: "pg_stat_statements"}, + }) + md, mock := createTestSourceConn(t) + defer mock.Close() + mock.ExpectQuery("pg_available_extensions"). + WillReturnRows(pgxmock.NewRows([]string{"name"}).AddRow("pg_stat_statements")) + mock.ExpectExec(`create extension if not exists`). + WillReturnResult(pgxmock.NewResult("CREATE", 1)) + + r.CreateSourceHelpers(ctx, r.logger, md) + a.NoError(mock.ExpectationsWereMet()) + }) + + t.Run("creates metric helpers when configured", func(t *testing.T) { + a := assert.New(t) + r := NewReaper(ctx, &cmdopts.Options{ + Sources: sources.CmdOpts{CreateHelpers: true}, + }) + md, mock := createTestSourceConn(t) + defer mock.Close() + + const helperMetric = "test_helper_metric" + metricDefs.MetricDefs[helperMetric] = metrics.Metric{ + InitSQL: "CREATE OR REPLACE FUNCTION test_helper() RETURNS void LANGUAGE sql AS ''", + } + t.Cleanup(func() { delete(metricDefs.MetricDefs, helperMetric) }) + md.Metrics = metrics.MetricIntervals{helperMetric: 10} + + mock.ExpectExec("CREATE OR REPLACE FUNCTION"). + WillReturnResult(pgxmock.NewResult("CREATE", 1)) + + r.CreateSourceHelpers(ctx, r.logger, md) + a.NoError(mock.ExpectationsWereMet()) + }) +} + +func TestReaper_PrintMemStats(t *testing.T) { + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + r := NewReaper(ctx, &cmdopts.Options{}) + assert.NotPanics(t, r.PrintMemStats) +} From 9c23703cab7b66de4d756ae38b7d1fbfec89c344 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Thu, 30 Apr 2026 13:01:38 +0200 Subject: [PATCH 2/2] make linter happy --- internal/reaper/reaper_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index 46c7d34d27..2b47e27e0e 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -778,7 +778,7 @@ func TestReaper_ShutdownOldWorkers(t *testing.T) { func TestReaper_CreateSourceHelpers(t *testing.T) { ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) - t.Run("skips already initialized source", func(t *testing.T) { + t.Run("skips already initialized source", func(*testing.T) { r := NewReaper(ctx, &cmdopts.Options{}) md := &sources.SourceConn{Source: sources.Source{Name: "existing"}} r.prevLoopMonitoredDBs = sources.SourceConns{md} @@ -786,13 +786,13 @@ func TestReaper_CreateSourceHelpers(t *testing.T) { r.CreateSourceHelpers(ctx, r.logger, md) }) - t.Run("skips non-postgres source", func(t *testing.T) { + t.Run("skips non-postgres source", func(*testing.T) { r := NewReaper(ctx, &cmdopts.Options{}) md := &sources.SourceConn{Source: sources.Source{Name: "pgbouncer", Kind: sources.SourcePgBouncer}} r.CreateSourceHelpers(ctx, r.logger, md) }) - t.Run("skips source in recovery", func(t *testing.T) { + t.Run("skips source in recovery", func(*testing.T) { r := NewReaper(ctx, &cmdopts.Options{}) md := &sources.SourceConn{ Source: sources.Source{Name: "standby"},