Skip to content

Commit 1677cdb

Browse files
authored
[+] implement parallel source discovery (#1378)
* [+] implement parallel source discovery Improves dead-source handling with parallel resolution and instance_up=0 on discovery failure. `Sources.ResolveDatabases()` previously resolved each source sequentially. A single slow or unresponsive source (e.g. a continuous-discovery endpoint behind a firewall) would block discovery of all subsequent sources for the full connection timeout duration. Sources are now resolved concurrently using `sync.WaitGroup.Go()`. Results are collected into a pre-allocated indexed slice to preserve deterministic ordering. Per-source error logging with source name is included in the resolver itself. When a `SourcePostgresContinuous` or `SourcePatroni` source fails to resolve any databases, `LoadSources()` now emits `instance_up=0` to the configured sinks. This makes the failure visible in dashboards and alerting, consistent with how unreachable directly-monitored sources are handled. * `reaper.WriteInstanceDown()` accepts name as an argument * use `on_error` callback in `Sources.ResolveDatabases()` * use const instead of hard-coded value
1 parent 066cc14 commit 1677cdb

5 files changed

Lines changed: 35 additions & 15 deletions

File tree

internal/cmdopts/cmdsource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (cmd *SourceResolveCommand) Execute(args []string) error {
9999
}
100100
}
101101
}
102-
conns, err := foundSources.ResolveDatabases()
102+
conns, err := foundSources.ResolveDatabases(nil)
103103
if err != nil {
104104
return err
105105
}

internal/reaper/database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.Sourc
438438
return metrics.Measurements{
439439
metrics.Measurement{
440440
metrics.EpochColumnName: time.Now().UnixNano(),
441-
"instance_up": func() int {
441+
specialMetricInstanceUp: func() int {
442442
if md.Conn.Ping(ctx) == nil {
443443
return 1
444444
}

internal/reaper/reaper.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (r *Reaper) Reap(ctx context.Context) {
101101
ctx = log.WithLogger(ctx, srcL)
102102

103103
if monitoredSource.Connect(ctx, r.Sources) != nil {
104-
r.WriteInstanceDown(monitoredSource)
104+
r.WriteInstanceDown(monitoredSource.Name)
105105
srcL.Warning("could not init connection, retrying on next iteration")
106106
continue
107107
}
@@ -405,9 +405,12 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) {
405405
return err
406406
}
407407
srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
408+
// filter out disabled sources and sources with group not in the list of groups to monitor
408409
return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
409410
})
410-
if newSrcs, err = srcs.ResolveDatabases(); err != nil {
411+
412+
if newSrcs, err = srcs.ResolveDatabases(r.WriteInstanceDown); err != nil {
413+
// discover dtabases for continuous monitoring sources
411414
r.logger.WithError(err).Error("could not resolve databases from sources")
412415
}
413416

@@ -432,9 +435,9 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) {
432435
}
433436

434437
// WriteInstanceDown writes instance_up = 0 metric to sinks for the given source
435-
func (r *Reaper) WriteInstanceDown(md *sources.SourceConn) {
438+
func (r *Reaper) WriteInstanceDown(name string) {
436439
r.measurementCh <- metrics.MeasurementEnvelope{
437-
DBName: md.Name,
440+
DBName: name,
438441
MetricName: specialMetricInstanceUp,
439442
Data: metrics.Measurements{metrics.Measurement{
440443
metrics.EpochColumnName: time.Now().UnixNano(),

internal/reaper/reaper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,5 +593,5 @@ func TestWriteMeasurements(t *testing.T) {
593593
SinksWriter: err,
594594
})
595595
go r.WriteMeasurements(ctx)
596-
r.WriteInstanceDown(&sources.SourceConn{})
596+
r.WriteInstanceDown("foo")
597597
}

internal/sources/resolver.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/url"
1515
"os"
1616
"strings"
17+
"sync"
1718
"time"
1819

1920
jsoniter "github.com/json-iterator/go"
@@ -25,16 +26,32 @@ import (
2526
"go.uber.org/zap"
2627
)
2728

28-
// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni
29-
func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) {
29+
// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni.
30+
// Each source is resolved concurrently so that a slow or unreachable source does not block the others.
31+
func (srcs Sources) ResolveDatabases(onError func(string)) (_ SourceConns, err error) {
32+
type result struct {
33+
dbs SourceConns
34+
err error
35+
}
36+
results := make([]result, len(srcs))
37+
var wg sync.WaitGroup
38+
for i, s := range srcs {
39+
wg.Go(func() {
40+
dbs, e := s.ResolveDatabases()
41+
results[i] = result{dbs, e}
42+
})
43+
}
44+
wg.Wait()
3045
resolvedDbs := make(SourceConns, 0, len(srcs))
31-
for _, s := range srcs {
32-
if !s.IsEnabled {
33-
continue
46+
for i, res := range results {
47+
if res.err != nil {
48+
if onError != nil {
49+
onError(srcs[i].Name)
50+
}
51+
logger.WithField("source", srcs[i].Name).WithError(res.err).Error("could not resolve databases from source")
52+
err = errors.Join(err, res.err)
3453
}
35-
dbs, e := s.ResolveDatabases()
36-
err = errors.Join(err, e)
37-
resolvedDbs = append(resolvedDbs, dbs...)
54+
resolvedDbs = append(resolvedDbs, res.dbs...)
3855
}
3956
return resolvedDbs, err
4057
}

0 commit comments

Comments
 (0)