diff --git a/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml b/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml index 3903718fe..6c0502e4d 100644 --- a/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml +++ b/bundle/manifests/collector_monitoring.coreos.com_v1_prometheusrule.yaml @@ -59,6 +59,7 @@ spec: annotations: description: Prometheus could not scrape {{ $labels.namespace }}/{{ $labels.pod }} collector component for more than 10m. + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorNodeDown.md summary: Collector cannot be scraped expr: | up{app_kubernetes_io_component = "collector", app_kubernetes_io_part_of = "cluster-logging"} == 0 @@ -79,6 +80,23 @@ spec: labels: service: collector severity: Warning + - alert: CollectorSourceDiscardedLogs + annotations: + description: |- + The collector source "{{ $labels.component_id }}" owned by ClusterLogForwarder "{{ $labels.namespace }}/{{ $labels.app_kubernetes_io_instance }}" + is discarding logs. This typically occurs when log lines exceed the configured maxMessageSize limit. + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorSourceDiscardedLogs.md + summary: Collector source "{{ $labels.component_id }}" in namespace "{{ $labels.namespace + }}" is discarding logs. + expr: | + sum by(namespace, app_kubernetes_io_instance, component_id, component_type)( + increase(vector_component_discarded_events_total{component_kind="source"}[10m]) + or + increase(vector_component_errors_total{component_kind="source", error_code=~"reading_line_from_file|reading_line_from_kubernetes_log"}[10m]) + ) > 0 + labels: + service: collector + severity: warning - alert: CollectorHigh403ForbiddenResponseRate annotations: description: High rate of "HTTP 403 Forbidden" responses detected for collector diff --git a/config/prometheus/collector_alerts.yaml b/config/prometheus/collector_alerts.yaml index 8f0576eb2..f906a4a04 100644 --- a/config/prometheus/collector_alerts.yaml +++ b/config/prometheus/collector_alerts.yaml @@ -56,6 +56,7 @@ spec: annotations: description: "Prometheus could not scrape {{ $labels.namespace }}/{{ $labels.pod }} collector component for more than 10m." summary: "Collector cannot be scraped" + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorNodeDown.md expr: | up{app_kubernetes_io_component = "collector", app_kubernetes_io_part_of = "cluster-logging"} == 0 for: 10m @@ -73,6 +74,23 @@ spec: labels: service: collector severity: Warning + - alert: CollectorSourceDiscardedLogs + annotations: + description: |- + The collector source "{{ $labels.component_id }}" owned by ClusterLogForwarder "{{ $labels.namespace }}/{{ $labels.app_kubernetes_io_instance }}" + is discarding logs. This typically occurs when log lines exceed the configured maxMessageSize limit. + summary: |- + Collector source "{{ $labels.component_id }}" in namespace "{{ $labels.namespace }}" is discarding logs. + runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorSourceDiscardedLogs.md + expr: | + sum by(namespace, app_kubernetes_io_instance, component_id, component_type)( + increase(vector_component_discarded_events_total{component_kind="source"}[10m]) + or + increase(vector_component_errors_total{component_kind="source", error_code=~"reading_line_from_file|reading_line_from_kubernetes_log"}[10m]) + ) > 0 + labels: + service: collector + severity: warning - alert: CollectorHigh403ForbiddenResponseRate annotations: description: |- diff --git a/internal/metrics/alerts_test.go b/internal/metrics/alerts_test.go new file mode 100644 index 000000000..80f6188a8 --- /dev/null +++ b/internal/metrics/alerts_test.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "bytes" + "os" + "path" + "regexp" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + k8sYAML "k8s.io/apimachinery/pkg/util/yaml" +) + +var _ = Describe("CollectorSourceDiscardedLogs alert", Ordered, func() { + var discardAlert monitoringv1.Rule + + BeforeAll(func() { + mdir, err := os.Getwd() + Expect(err).NotTo(HaveOccurred()) + mdir = path.Dir(path.Dir(mdir)) + data, err := os.ReadFile(path.Join(mdir, "config", "prometheus", "collector_alerts.yaml")) + Expect(err).NotTo(HaveOccurred()) + + rule := &monitoringv1.PrometheusRule{} + err = k8sYAML.NewYAMLOrJSONDecoder(bytes.NewReader(data), 1000).Decode(rule) + Expect(err).NotTo(HaveOccurred()) + + metricRegex := regexp.MustCompile(`(vector_\w+|logcollector_\w+)`) + for _, group := range rule.Spec.Groups { + for _, r := range group.Rules { + if r.Alert == "" { + continue + } + metrics := metricRegex.FindAllString(r.Expr.String(), -1) + for _, metric := range metrics { + Expect(collectorMinimalAllowlist.allowedMetrics).To(ContainElement(metric), + "metric %q used in alert %q is not in the collector minimal allowlist", metric, r.Alert) + } + if r.Alert == "CollectorSourceDiscardedLogs" { + discardAlert = r + } + } + } + Expect(discardAlert.Alert).NotTo(BeEmpty(), "CollectorSourceDiscardedLogs alert not found in collector_alerts.yaml") + }) + + It("should use discard and error metrics for source components", func() { + expr := discardAlert.Expr.String() + Expect(expr).To(ContainSubstring("vector_component_discarded_events_total")) + Expect(expr).To(ContainSubstring("vector_component_errors_total")) + Expect(expr).To(ContainSubstring(`component_kind="source"`)) + Expect(expr).To(ContainSubstring("reading_line_from_file")) + Expect(expr).To(ContainSubstring("reading_line_from_kubernetes_log")) + }) + + It("should group by labels that identify the affected log stream", func() { + expr := discardAlert.Expr.String() + Expect(expr).To(ContainSubstring("namespace")) + Expect(expr).To(ContainSubstring("app_kubernetes_io_instance")) + Expect(expr).To(ContainSubstring("component_id")) + Expect(expr).To(ContainSubstring("component_type")) + }) + + It("should have severity warning", func() { + Expect(discardAlert.Labels["severity"]).To(Equal("warning")) + }) +}) diff --git a/internal/metrics/relabel.go b/internal/metrics/relabel.go index 2234e6586..867785a19 100644 --- a/internal/metrics/relabel.go +++ b/internal/metrics/relabel.go @@ -26,6 +26,7 @@ var collectorMinimalAllowlist = &metricAllowlistConfig{ "vector_buffer_byte_size", "vector_component_errors_total", "vector_component_received_events_total", + "vector_component_discarded_events_total", // Metrics used in recording rules (collector_alerts.yaml, telemetry_rules.yaml) "vector_component_received_bytes_total", @@ -34,7 +35,6 @@ var collectorMinimalAllowlist = &metricAllowlistConfig{ "vector_component_sent_bytes_total", "vector_component_received_event_bytes_total", "vector_open_files", - "vector_component_discarded_events_total", // Additional buffer and event metrics "vector_buffer_discarded_events_total", diff --git a/test/functional/metrics/discarded_metrics_test.go b/test/functional/metrics/discarded_metrics_test.go new file mode 100644 index 000000000..1babb6ed3 --- /dev/null +++ b/test/functional/metrics/discarded_metrics_test.go @@ -0,0 +1,107 @@ +package metrics + +import ( + "fmt" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/constants" + "github.com/openshift/cluster-logging-operator/internal/runtime" + "github.com/openshift/cluster-logging-operator/test/framework/functional" + testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" + rbacv1 "k8s.io/api/rbac/v1" +) + +var _ = Describe("[Functional][Metrics] Discarded source logs metrics", func() { + + var ( + framework *functional.CollectorFunctionalFramework + metricsReaderRole *rbacv1.ClusterRole + metricsReaderBinding *rbacv1.ClusterRoleBinding + tokenReviewBinding *rbacv1.ClusterRoleBinding + ) + + AfterEach(func() { + if tokenReviewBinding != nil { + _ = framework.Test.Delete(tokenReviewBinding) + } + if metricsReaderBinding != nil { + _ = framework.Test.Delete(metricsReaderBinding) + } + if metricsReaderRole != nil { + _ = framework.Test.Delete(metricsReaderRole) + } + framework.Cleanup() + }) + + BeforeEach(func() { + framework = functional.NewCollectorFunctionalFramework() + testruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeAudit). + ToHttpOutput() + + framework.VisitConfig = func(conf string) string { + return strings.ReplaceAll(conf, "max_line_bytes = 3145728", "max_line_bytes = 256") + } + + roleName := fmt.Sprintf("%s-metrics-reader", framework.Name) + metricsReaderRole = runtime.NewClusterRole( + roleName, + runtime.NewNonResourceURLPolicyRule([]string{"/metrics"}, []string{"get"}), + ) + Expect(framework.Test.Create(metricsReaderRole)).To(Succeed()) + + metricsReaderBinding = runtime.NewClusterRoleBinding( + roleName, + runtime.NewClusterRoleRef(roleName), + runtime.NewServiceAccountSubject("default", framework.Namespace), + ) + Expect(framework.Test.Create(metricsReaderBinding)).To(Succeed()) + + tokenReviewBinding = runtime.NewClusterRoleBinding( + fmt.Sprintf("%s-token-reviewer", framework.Name), + runtime.NewClusterRoleRef("system:auth-delegator"), + runtime.NewServiceAccountSubject("default", framework.Namespace), + ) + Expect(framework.Test.Create(tokenReviewBinding)).To(Succeed()) + }) + + It("should generate vector_component_discarded_events_total when source logs exceed max_line_bytes", func() { + Expect(framework.Deploy()).To(BeNil()) + + auditLogFile := "/var/log/kube-apiserver/audit.log" + + // Write oversized lines (~1.5KB each, exceeding 256 byte limit) followed by a short line + // in a single write so Vector processes them together in one read pass. + longLine := functional.NewKubeAuditLog(time.Now()) + shortLine := `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata"}` + writeCmd := fmt.Sprintf( + "mkdir -p %s && for i in $(seq 1 5); do echo '%s' >> %s; done && echo '%s' >> %s", + "/var/log/kube-apiserver", + strings.ReplaceAll(longLine, "'", "'\\''"), + auditLogFile, + shortLine, + auditLogFile, + ) + _, err := framework.RunCommand(constants.CollectorName, "bash", "-c", writeCmd) + Expect(err).To(BeNil(), "failed to write audit log entries") + + metricsURL := fmt.Sprintf("https://%s.%s:24231/metrics", framework.Name, framework.Namespace) + curlCmd := fmt.Sprintf(`curl -ks -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" %s`, metricsURL) + grepDiscardCmd := fmt.Sprintf(`%s | grep -i discard`, curlCmd) + + Eventually(func() string { + metrics, _ := framework.RunCommand(constants.CollectorName, "sh", "-c", grepDiscardCmd) + return metrics + }, 60*time.Second, 10*time.Second).Should( + And( + ContainSubstring("vector_component_discarded_events_total"), + ContainSubstring(`component_kind="source"`), + ), + "expected vector_component_discarded_events_total metric with component_kind=source", + ) + }) +})