Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ spec:
annotations:
description: Prometheus could not scrape {{ $labels.namespace }}/{{ $labels.pod
}} collector component for more than 10m.
runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorNodeDown.md
summary: Collector cannot be scraped
expr: |
up{app_kubernetes_io_component = "collector", app_kubernetes_io_part_of = "cluster-logging"} == 0
Expand All @@ -79,6 +80,23 @@ spec:
labels:
service: collector
severity: Warning
- alert: CollectorSourceDiscardedLogs
annotations:
description: |-
The collector source "{{ $labels.component_id }}" owned by ClusterLogForwarder "{{ $labels.namespace }}/{{ $labels.app_kubernetes_io_instance }}"
is discarding logs. This typically occurs when log lines exceed the configured maxMessageSize limit.
runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorSourceDiscardedLogs.md
summary: Collector source "{{ $labels.component_id }}" in namespace "{{ $labels.namespace
}}" is discarding logs.
expr: |
sum by(namespace, app_kubernetes_io_instance, component_id, component_type)(
increase(vector_component_discarded_events_total{component_kind="source"}[10m])
or
increase(vector_component_errors_total{component_kind="source", error_code=~"reading_line_from_file|reading_line_from_kubernetes_log"}[10m])
) > 0
labels:
service: collector
severity: warning
- alert: CollectorHigh403ForbiddenResponseRate
annotations:
description: High rate of "HTTP 403 Forbidden" responses detected for collector
Expand Down
18 changes: 18 additions & 0 deletions config/prometheus/collector_alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
annotations:
description: "Prometheus could not scrape {{ $labels.namespace }}/{{ $labels.pod }} collector component for more than 10m."
summary: "Collector cannot be scraped"
runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorNodeDown.md
expr: |
up{app_kubernetes_io_component = "collector", app_kubernetes_io_part_of = "cluster-logging"} == 0
for: 10m
Expand All @@ -73,6 +74,23 @@ spec:
labels:
service: collector
severity: Warning
- alert: CollectorSourceDiscardedLogs
annotations:
description: |-
The collector source "{{ $labels.component_id }}" owned by ClusterLogForwarder "{{ $labels.namespace }}/{{ $labels.app_kubernetes_io_instance }}"
is discarding logs. This typically occurs when log lines exceed the configured maxMessageSize limit.
summary: |-
Collector source "{{ $labels.component_id }}" in namespace "{{ $labels.namespace }}" is discarding logs.
runbook_url: https://github.com/openshift/runbooks/blob/master/alerts/cluster-logging-operator/CollectorSourceDiscardedLogs.md
expr: |
sum by(namespace, app_kubernetes_io_instance, component_id, component_type)(
increase(vector_component_discarded_events_total{component_kind="source"}[10m])
or
increase(vector_component_errors_total{component_kind="source", error_code=~"reading_line_from_file|reading_line_from_kubernetes_log"}[10m])
) > 0
labels:
service: collector
severity: warning
- alert: CollectorHigh403ForbiddenResponseRate
annotations:
description: |-
Expand Down
68 changes: 68 additions & 0 deletions internal/metrics/alerts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package metrics

import (
"bytes"
"os"
"path"
"regexp"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
k8sYAML "k8s.io/apimachinery/pkg/util/yaml"
)

var _ = Describe("CollectorSourceDiscardedLogs alert", Ordered, func() {
var discardAlert monitoringv1.Rule

BeforeAll(func() {
mdir, err := os.Getwd()
Expect(err).NotTo(HaveOccurred())
mdir = path.Dir(path.Dir(mdir))
data, err := os.ReadFile(path.Join(mdir, "config", "prometheus", "collector_alerts.yaml"))
Expect(err).NotTo(HaveOccurred())

rule := &monitoringv1.PrometheusRule{}
err = k8sYAML.NewYAMLOrJSONDecoder(bytes.NewReader(data), 1000).Decode(rule)
Expect(err).NotTo(HaveOccurred())

metricRegex := regexp.MustCompile(`(vector_\w+|logcollector_\w+)`)
for _, group := range rule.Spec.Groups {
for _, r := range group.Rules {
if r.Alert == "" {
continue
}
metrics := metricRegex.FindAllString(r.Expr.String(), -1)
for _, metric := range metrics {
Expect(collectorMinimalAllowlist.allowedMetrics).To(ContainElement(metric),
"metric %q used in alert %q is not in the collector minimal allowlist", metric, r.Alert)
}
if r.Alert == "CollectorSourceDiscardedLogs" {
discardAlert = r
}
}
}
Expect(discardAlert.Alert).NotTo(BeEmpty(), "CollectorSourceDiscardedLogs alert not found in collector_alerts.yaml")
})

It("should use discard and error metrics for source components", func() {
expr := discardAlert.Expr.String()
Expect(expr).To(ContainSubstring("vector_component_discarded_events_total"))
Expect(expr).To(ContainSubstring("vector_component_errors_total"))
Expect(expr).To(ContainSubstring(`component_kind="source"`))
Expect(expr).To(ContainSubstring("reading_line_from_file"))
Expect(expr).To(ContainSubstring("reading_line_from_kubernetes_log"))
})

It("should group by labels that identify the affected log stream", func() {
expr := discardAlert.Expr.String()
Expect(expr).To(ContainSubstring("namespace"))
Expect(expr).To(ContainSubstring("app_kubernetes_io_instance"))
Expect(expr).To(ContainSubstring("component_id"))
Expect(expr).To(ContainSubstring("component_type"))
})

It("should have severity warning", func() {
Expect(discardAlert.Labels["severity"]).To(Equal("warning"))
})
})
2 changes: 1 addition & 1 deletion internal/metrics/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var collectorMinimalAllowlist = &metricAllowlistConfig{
"vector_buffer_byte_size",
"vector_component_errors_total",
"vector_component_received_events_total",
"vector_component_discarded_events_total",

// Metrics used in recording rules (collector_alerts.yaml, telemetry_rules.yaml)
"vector_component_received_bytes_total",
Expand All @@ -34,7 +35,6 @@ var collectorMinimalAllowlist = &metricAllowlistConfig{
"vector_component_sent_bytes_total",
"vector_component_received_event_bytes_total",
"vector_open_files",
"vector_component_discarded_events_total",

// Additional buffer and event metrics
"vector_buffer_discarded_events_total",
Expand Down
107 changes: 107 additions & 0 deletions test/functional/metrics/discarded_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package metrics

import (
"fmt"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
"github.com/openshift/cluster-logging-operator/internal/constants"
"github.com/openshift/cluster-logging-operator/internal/runtime"
"github.com/openshift/cluster-logging-operator/test/framework/functional"
testruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
rbacv1 "k8s.io/api/rbac/v1"
)

var _ = Describe("[Functional][Metrics] Discarded source logs metrics", func() {

var (
framework *functional.CollectorFunctionalFramework
metricsReaderRole *rbacv1.ClusterRole
metricsReaderBinding *rbacv1.ClusterRoleBinding
tokenReviewBinding *rbacv1.ClusterRoleBinding
)

AfterEach(func() {
if tokenReviewBinding != nil {
_ = framework.Test.Delete(tokenReviewBinding)
}
if metricsReaderBinding != nil {
_ = framework.Test.Delete(metricsReaderBinding)
}
if metricsReaderRole != nil {
_ = framework.Test.Delete(metricsReaderRole)
}
framework.Cleanup()
})

BeforeEach(func() {
framework = functional.NewCollectorFunctionalFramework()
testruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeAudit).
ToHttpOutput()

framework.VisitConfig = func(conf string) string {
return strings.ReplaceAll(conf, "max_line_bytes = 3145728", "max_line_bytes = 256")
}

roleName := fmt.Sprintf("%s-metrics-reader", framework.Name)
metricsReaderRole = runtime.NewClusterRole(
roleName,
runtime.NewNonResourceURLPolicyRule([]string{"/metrics"}, []string{"get"}),
)
Expect(framework.Test.Create(metricsReaderRole)).To(Succeed())

metricsReaderBinding = runtime.NewClusterRoleBinding(
roleName,
runtime.NewClusterRoleRef(roleName),
runtime.NewServiceAccountSubject("default", framework.Namespace),
)
Expect(framework.Test.Create(metricsReaderBinding)).To(Succeed())

tokenReviewBinding = runtime.NewClusterRoleBinding(
fmt.Sprintf("%s-token-reviewer", framework.Name),
runtime.NewClusterRoleRef("system:auth-delegator"),
runtime.NewServiceAccountSubject("default", framework.Namespace),
)
Expect(framework.Test.Create(tokenReviewBinding)).To(Succeed())
})

It("should generate vector_component_discarded_events_total when source logs exceed max_line_bytes", func() {
Expect(framework.Deploy()).To(BeNil())

auditLogFile := "/var/log/kube-apiserver/audit.log"

// Write oversized lines (~1.5KB each, exceeding 256 byte limit) followed by a short line
// in a single write so Vector processes them together in one read pass.
longLine := functional.NewKubeAuditLog(time.Now())
shortLine := `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata"}`
writeCmd := fmt.Sprintf(
"mkdir -p %s && for i in $(seq 1 5); do echo '%s' >> %s; done && echo '%s' >> %s",
"/var/log/kube-apiserver",
strings.ReplaceAll(longLine, "'", "'\\''"),
auditLogFile,
shortLine,
auditLogFile,
)
_, err := framework.RunCommand(constants.CollectorName, "bash", "-c", writeCmd)
Expect(err).To(BeNil(), "failed to write audit log entries")

metricsURL := fmt.Sprintf("https://%s.%s:24231/metrics", framework.Name, framework.Namespace)
curlCmd := fmt.Sprintf(`curl -ks -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" %s`, metricsURL)
grepDiscardCmd := fmt.Sprintf(`%s | grep -i discard`, curlCmd)

Eventually(func() string {
metrics, _ := framework.RunCommand(constants.CollectorName, "sh", "-c", grepDiscardCmd)
return metrics
}, 60*time.Second, 10*time.Second).Should(
And(
ContainSubstring("vector_component_discarded_events_total"),
ContainSubstring(`component_kind="source"`),
),
Comment thread
vparfonov marked this conversation as resolved.
"expected vector_component_discarded_events_total metric with component_kind=source",
)
})
})