Skip to content

Commit 89cb5eb

Browse files
committed
[+] implement parallel source discovery
Improves dead-source handling with parallel resolution and instance_up=0 on discovery failure. `Sources.ResolveDatabases()` previously resolved each source sequentially. A single slow or unresponsive source (e.g. a continuous-discovery endpoint behind a firewall) would block discovery of all subsequent sources for the full connection timeout duration. Sources are now resolved concurrently using `sync.WaitGroup.Go()`. Results are collected into a pre-allocated indexed slice to preserve deterministic ordering. Per-source error logging with source name is included in the resolver itself. When a `SourcePostgresContinuous` or `SourcePatroni` source fails to resolve any databases, `LoadSources()` now emits `instance_up=0` to the configured sinks. This makes the failure visible in dashboards and alerting, consistent with how unreachable directly-monitored sources are handled.
1 parent 066cc14 commit 89cb5eb

2 files changed

Lines changed: 33 additions & 8 deletions

File tree

internal/reaper/reaper.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,19 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) {
407407
srcs = slices.DeleteFunc(srcs, func(s sources.Source) bool {
408408
return !s.IsEnabled || len(r.Sources.Groups) > 0 && !slices.Contains(r.Sources.Groups, s.Group)
409409
})
410-
if newSrcs, err = srcs.ResolveDatabases(); err != nil {
410+
newSrcs, err = srcs.ResolveDatabases()
411+
if err != nil {
411412
r.logger.WithError(err).Error("could not resolve databases from sources")
413+
for _, s := range srcs {
414+
if s.Kind != sources.SourcePostgresContinuous && s.Kind != sources.SourcePatroni {
415+
continue
416+
}
417+
if !slices.ContainsFunc(newSrcs, func(sc *sources.SourceConn) bool {
418+
return sc.Name == s.Name || strings.HasPrefix(sc.Name, s.Name+"_")
419+
}) {
420+
r.WriteInstanceDown(sources.NewSourceConn(s))
421+
}
422+
}
412423
}
413424

414425
for i, newMD := range newSrcs {

internal/sources/resolver.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/url"
1515
"os"
1616
"strings"
17+
"sync"
1718
"time"
1819

1920
jsoniter "github.com/json-iterator/go"
@@ -25,16 +26,29 @@ import (
2526
"go.uber.org/zap"
2627
)
2728

28-
// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni
29+
// ResolveDatabases() updates list of monitored objects from continuous monitoring sources, e.g. patroni.
30+
// Each source is resolved concurrently so that a slow or unreachable source does not block the others.
2931
func (srcs Sources) ResolveDatabases() (_ SourceConns, err error) {
32+
type result struct {
33+
dbs SourceConns
34+
err error
35+
}
36+
results := make([]result, len(srcs))
37+
var wg sync.WaitGroup
38+
for i, s := range srcs {
39+
wg.Go(func() {
40+
dbs, e := s.ResolveDatabases()
41+
results[i] = result{dbs, e}
42+
})
43+
}
44+
wg.Wait()
3045
resolvedDbs := make(SourceConns, 0, len(srcs))
31-
for _, s := range srcs {
32-
if !s.IsEnabled {
33-
continue
46+
for i, res := range results {
47+
if res.err != nil {
48+
logger.WithField("source", srcs[i].Name).WithError(res.err).Error("could not resolve databases from source")
49+
err = errors.Join(err, res.err)
3450
}
35-
dbs, e := s.ResolveDatabases()
36-
err = errors.Join(err, e)
37-
resolvedDbs = append(resolvedDbs, dbs...)
51+
resolvedDbs = append(resolvedDbs, res.dbs...)
3852
}
3953
return resolvedDbs, err
4054
}

0 commit comments

Comments
 (0)