From 609c1b4eba077f8f3b0f25d26b158533824e15b6 Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Thu, 21 May 2026 15:59:11 +0300 Subject: [PATCH] feat(metrics): add CollectorSourceDiscardedLogs alert for discarded source logs Add a warning alert that fires when Vector source components discard logs (e.g. lines exceeding max_line_bytes). The alert groups by namespace, component_id, and component_type so users can identify the affected log stream. Use increase() window 10m for better balance between responsiveness and reducing transient noise. Alert now uses both discard and error metrics with proper error code filtering. Co-Authored-By: Claude Opus 4.6 --- ...nitoring.coreos.com_v1_prometheusrule.yaml | 18 +++ config/prometheus/collector_alerts.yaml | 18 +++ internal/metrics/alerts_test.go | 68 +++++++++++ internal/metrics/relabel.go | 2 +- .../metrics/discarded_metrics_test.go | 107 ++++++++++++++++++ 5 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 internal/metrics/alerts_test.go create mode 100644 test/functional/metrics/discarded_metrics_test.go diff --git a/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml b/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml index 3903718fe..6c0502e4d 100644 --- a/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml +++ b/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml @@ -59,6 +59,7 @@ spec: annotations: description: Prometheus could not scrape {{ $labels.namespace }}/{{ $labels.pod }} collector component for more than 10m. + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorNodeDown.md summary: Collector cannot be scraped expr: | up{app_kubernetes_io_component = "collector", app_kubernetes_io_part_of = "cluster-logging"} == 0 @@ -79,6 +80,23 @@ spec: labels: service: collector severity: Warning + - alert: CollectorSourceDiscardedLogs + annotations: + description: |- + The collector source "{{ $labels.component_id }}" owned by ClusterLogForwarder "{{ $labels.namespace }}/{{ $labels.app_kubernetes_io_instance }}" + is discarding logs. This typically occurs when log lines exceed the configured maxMessageSize limit. + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorSourceDiscardedLogs.md + summary: Collector source "{{ $labels.component_id }}" in namespace "{{ $labels.namespace + }}" is discarding logs. + expr: | + sum by(namespace, app_kubernetes_io_instance, component_id, component_type)( + increase(vector_component_discarded_events_total{component_kind="source"}[10m]) + or + increase(vector_component_errors_total{component_kind="source", error_code=~"reading_line_from_file|reading_line_from_kubernetes_log"}[10m]) + ) > 0 + labels: + service: collector + severity: warning - alert: CollectorHigh403ForbiddenResponseRate annotations: description: High rate of "HTTP 403 Forbidden" responses detected for collector diff --git a/config/prometheus/collector_alerts.yaml b/config/prometheus/collector_alerts.yaml index 8f0576eb2..f906a4a04 100644 --- a/config/prometheus/collector_alerts.yaml +++ b/config/prometheus/collector_alerts.yaml @@ -56,6 +56,7 @@ spec: annotations: description: "Prometheus could not scrape {{ $labels.namespace }}/{{ $labels.pod }} collector component for more than 10m." summary: "Collector cannot be scraped" + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorNodeDown.md expr: | up{app_kubernetes_io_component = "collector", app_kubernetes_io_part_of = "cluster-logging"} == 0 for: 10m @@ -73,6 +74,23 @@ spec: labels: service: collector severity: Warning + - alert: CollectorSourceDiscardedLogs + annotations: + description: |- + The collector source "{{ $labels.component_id }}" owned by ClusterLogForwarder "{{ $labels.namespace }}/{{ $labels.app_kubernetes_io_instance }}" + is discarding logs. This typically occurs when log lines exceed the configured maxMessageSize limit. + summary: |- + Collector source "{{ $labels.component_id }}" in namespace "{{ $labels.namespace }}" is discarding logs. + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorSourceDiscardedLogs.md + expr: | + sum by(namespace, app_kubernetes_io_instance, component_id, component_type)( + increase(vector_component_discarded_events_total{component_kind="source"}[10m]) + or + increase(vector_component_errors_total{component_kind="source", error_code=~"reading_line_from_file|reading_line_from_kubernetes_log"}[10m]) + ) > 0 + labels: + service: collector + severity: warning - alert: CollectorHigh403ForbiddenResponseRate annotations: description: |- diff --git a/internal/metrics/alerts_test.go b/internal/metrics/alerts_test.go new file mode 100644 index 000000000..80f6188a8 --- /dev/null +++ b/internal/metrics/alerts_test.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "bytes" + "os" + "path" + "regexp" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + k8sYAML "k8s.io/apimachinery/pkg/util/yaml" +) + +var _ = Describe("CollectorSourceDiscardedLogs alert", Ordered, func() { + var discardAlert monitoringv1.Rule + + BeforeAll(func() { + mdir, err := os.Getwd() + Expect(err).NotTo(HaveOccurred()) + mdir = path.Dir(path.Dir(mdir)) + data, err := os.ReadFile(path.Join(mdir, "config", "prometheus", "collector_alerts.yaml")) + Expect(err).NotTo(HaveOccurred()) + + rule := &monitoringv1.PrometheusRule{} + err = k8sYAML.NewYAMLOrJSONDecoder(bytes.NewReader(data), 1000).Decode(rule) + Expect(err).NotTo(HaveOccurred()) + + metricRegex := regexp.MustCompile(`(vector_\w+|logcollector_\w+)`) + for _, group := range rule.Spec.Groups { + for _, r := range group.Rules { + if r.Alert == "" { + continue + } + metrics := metricRegex.FindAllString(r.Expr.String(), -1) + for _, metric := range metrics { + Expect(collectorMinimalAllowlist.allowedMetrics).To(ContainElement(metric), + "metric %q used in alert %q is not in the collector minimal allowlist", metric, r.Alert) + } + if r.Alert == "CollectorSourceDiscardedLogs" { + discardAlert = r + } + } + } + Expect(discardAlert.Alert).NotTo(BeEmpty(), "CollectorSourceDiscardedLogs alert not found in collector_alerts.yaml") + }) + + It("should use discard and error metrics for source components", func() { + expr := discardAlert.Expr.String() + Expect(expr).To(ContainSubstring("vector_component_discarded_events_total")) + Expect(expr).To(ContainSubstring("vector_component_errors_total")) + Expect(expr).To(ContainSubstring(`component_kind="source"`)) + Expect(expr).To(ContainSubstring("reading_line_from_file")) + Expect(expr).To(ContainSubstring("reading_line_from_kubernetes_log")) + }) + + It("should group by labels that identify the affected log stream", func() { + expr := discardAlert.Expr.String() + Expect(expr).To(ContainSubstring("namespace")) + Expect(expr).To(ContainSubstring("app_kubernetes_io_instance")) + Expect(expr).To(ContainSubstring("component_id")) + Expect(expr).To(ContainSubstring("component_type")) + }) + + It("should have severity warning", func() { + Expect(discardAlert.Labels["severity"]).To(Equal("warning")) + }) +}) diff --git a/internal/metrics/relabel.go b/internal/metrics/relabel.go index 2234e6586..867785a19 100644 --- a/internal/metrics/relabel.go +++ b/internal/metrics/relabel.go @@ -26,6 +26,7 @@ var collectorMinimalAllowlist = &metricAllowlistConfig{ "vector_buffer_byte_size", "vector_component_errors_total", "vector_component_received_events_total", + "vector_component_discarded_events_total", // Metrics used in recording rules (collector_alerts.yaml, telemetry_rules.yaml) "vector_component_received_bytes_total", @@ -34,7 +35,6 @@ var collectorMinimalAllowlist = &metricAllowlistConfig{ "vector_component_sent_bytes_total", "vector_component_received_event_bytes_total", "vector_open_files", - "vector_component_discarded_events_total", // Additional buffer and event metrics "vector_buffer_discarded_events_total", diff --git a/test/functional/metrics/discarded_metrics_test.go b/test/functional/metrics/discarded_metrics_test.go new file mode 100644 index 000000000..1babb6ed3 --- /dev/null +++ b/test/functional/metrics/discarded_metrics_test.go @@ -0,0 +1,107 @@ +package metrics + +import ( + "fmt" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/constants" + "github.com/openshift/cluster-logging-operator/internal/runtime" + "github.com/openshift/cluster-logging-operator/test/framework/functional" + testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" + rbacv1 "k8s.io/api/rbac/v1" +) + +var _ = Describe("[Functional][Metrics] Discarded source logs metrics", func() { + + var ( + framework *functional.CollectorFunctionalFramework + metricsReaderRole *rbacv1.ClusterRole + metricsReaderBinding *rbacv1.ClusterRoleBinding + tokenReviewBinding *rbacv1.ClusterRoleBinding + ) + + AfterEach(func() { + if tokenReviewBinding != nil { + _ = framework.Test.Delete(tokenReviewBinding) + } + if metricsReaderBinding != nil { + _ = framework.Test.Delete(metricsReaderBinding) + } + if metricsReaderRole != nil { + _ = framework.Test.Delete(metricsReaderRole) + } + framework.Cleanup() + }) + + BeforeEach(func() { + framework = functional.NewCollectorFunctionalFramework() + testruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeAudit). + ToHttpOutput() + + framework.VisitConfig = func(conf string) string { + return strings.ReplaceAll(conf, "max_line_bytes = 3145728", "max_line_bytes = 256") + } + + roleName := fmt.Sprintf("%s-metrics-reader", framework.Name) + metricsReaderRole = runtime.NewClusterRole( + roleName, + runtime.NewNonResourceURLPolicyRule([]string{"/metrics"}, []string{"get"}), + ) + Expect(framework.Test.Create(metricsReaderRole)).To(Succeed()) + + metricsReaderBinding = runtime.NewClusterRoleBinding( + roleName, + runtime.NewClusterRoleRef(roleName), + runtime.NewServiceAccountSubject("default", framework.Namespace), + ) + Expect(framework.Test.Create(metricsReaderBinding)).To(Succeed()) + + tokenReviewBinding = runtime.NewClusterRoleBinding( + fmt.Sprintf("%s-token-reviewer", framework.Name), + runtime.NewClusterRoleRef("system:auth-delegator"), + runtime.NewServiceAccountSubject("default", framework.Namespace), + ) + Expect(framework.Test.Create(tokenReviewBinding)).To(Succeed()) + }) + + It("should generate vector_component_discarded_events_total when source logs exceed max_line_bytes", func() { + Expect(framework.Deploy()).To(BeNil()) + + auditLogFile := "/var/log/kube-apiserver/audit.log" + + // Write oversized lines (~1.5KB each, exceeding 256 byte limit) followed by a short line + // in a single write so Vector processes them together in one read pass. + longLine := functional.NewKubeAuditLog(time.Now()) + shortLine := `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata"}` + writeCmd := fmt.Sprintf( + "mkdir -p %s && for i in $(seq 1 5); do echo '%s' >> %s; done && echo '%s' >> %s", + "/var/log/kube-apiserver", + strings.ReplaceAll(longLine, "'", "'\\''"), + auditLogFile, + shortLine, + auditLogFile, + ) + _, err := framework.RunCommand(constants.CollectorName, "bash", "-c", writeCmd) + Expect(err).To(BeNil(), "failed to write audit log entries") + + metricsURL := fmt.Sprintf("https://%s.%s:24231/metrics", framework.Name, framework.Namespace) + curlCmd := fmt.Sprintf(`curl -ks -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" %s`, metricsURL) + grepDiscardCmd := fmt.Sprintf(`%s | grep -i discard`, curlCmd) + + Eventually(func() string { + metrics, _ := framework.RunCommand(constants.CollectorName, "sh", "-c", grepDiscardCmd) + return metrics + }, 60*time.Second, 10*time.Second).Should( + And( + ContainSubstring("vector_component_discarded_events_total"), + ContainSubstring(`component_kind="source"`), + ), + "expected vector_component_discarded_events_total metric with component_kind=source", + ) + }) +})