From 12b665cf2e7ae6381fcff61778cd39925fcc0d8a Mon Sep 17 00:00:00 2001 From: Selman Uluc Date: Wed, 10 Jun 2026 19:46:46 +0300 Subject: [PATCH 1/2] Add Kafka offset-commit restart case to verify delivery and over-delivery limits --- internal/config/case.go | 9 ++ internal/runner/runner.go | 272 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 281 insertions(+) diff --git a/internal/config/case.go b/internal/config/case.go index b3fae50..9725756 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -523,6 +523,15 @@ type CorrectnessConfig struct { DrainSeconds int `yaml:"drain_seconds"` DrainQuietWindow string `yaml:"drain_quiet_window"` + // MaxOverDeliveryPct caps duplicate re-delivery as a percentage of + // lines sent. Enforced by case types that verify source + // acknowledgments actually persisted (kafka_offset_commit_restart: + // a clean restart after full delivery must resume from committed + // offsets, not re-consume the topic). Zero = strict, no + // over-delivery allowed. Ignored by case types that don't document + // it — at-least-once types deliberately tolerate duplicates. + MaxOverDeliveryPct float64 `yaml:"max_overdelivery_pct"` + // RateCeiling validates a per-window EPS ceiling on the receive side. // Empty MaxEPS = check disabled. RateCeiling RateCeilingConfig `yaml:"rate_ceiling"` diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 0ddf105..f675483 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -169,6 +169,16 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe if tc.Type == "kafka_inflight_crash_correctness" { return r.runKafkaInflightCrash(tc, subject) } + // Kafka offset-commit restart: receiver stays UP, ALL records are + // delivered cleanly, then the subject is restarted gracefully. A + // consumer whose offset commits actually persist resumes from the + // committed offsets and re-delivers (close to) nothing. Verifies no + // loss AND bounded over-delivery — the inverse of the in-flight crash + // case, which tolerates unlimited duplicates and is therefore blind to + // a subject whose acknowledgments never reach the broker at all. + if tc.Type == "kafka_offset_commit_restart" { + return r.runKafkaOffsetCommitRestart(tc, subject) + } configName := r.opts.ConfigName @@ -1664,6 +1674,268 @@ func (r *Runner) runKafkaInflightCrash(tc *config.TestCase, subject config.Subje return result, nil } +// runKafkaOffsetCommitRestart verifies that delivery-bound source +// acknowledgments actually persist: every produced record is delivered to a +// LIVE receiver, the subject is then restarted GRACEFULLY, and the restarted +// consumer must resume from the committed offsets instead of re-consuming +// the topic. +// +// This is the inverse of runKafkaInflightCrash: that case kills mid-delivery +// and tolerates unlimited duplicates (at-least-once permits them), which +// makes it structurally blind to "offsets are never committed at all" — a +// bug that produces zero loss and 100% over-delivery. This case closes that +// gap with a hard over-delivery ceiling (correctness.max_overdelivery_pct, +// zero = strict). +// +// Verdict: no loss (loss <= expected_loss_pct) AND over-delivery <= +// max_overdelivery_pct. +func (r *Runner) runKafkaOffsetCommitRestart(tc *config.TestCase, subject config.Subject) (results.RunResult, error) { + configName := r.opts.ConfigName + subject = r.applySubjectOverrides(subject) + + fmt.Printf("→ test=%s subject=%s version=%s config=%s\n", + tc.Name, subject.Name, subject.Version, configName) + + configSrc, err := tc.ConfigFilePath(r.opts.CasesDir, configName, subject) + if err != nil { + return results.RunResult{}, err + } + configSrc, err = filepath.Abs(configSrc) + if err != nil { + return results.RunResult{}, fmt.Errorf("resolving config path: %w", err) + } + + tmpDir, err := os.MkdirTemp("", "bench-"+tc.Name+"-") + if err != nil { + return results.RunResult{}, err + } + if err := os.Chmod(tmpDir, 0o777); err != nil { + return results.RunResult{}, fmt.Errorf("chmod tmpdir: %w", err) + } + defer func() { + if !r.opts.NoCleanup { + os.RemoveAll(tmpDir) + } + }() + + caseDir, err := filepath.Abs(filepath.Join(r.opts.CasesDir, tc.Name)) + if err != nil { + return results.RunResult{}, fmt.Errorf("resolving case directory: %w", err) + } + + extraEnv := map[string]string{} + if cfg, ok := tc.Configurations[configName]; ok { + for k, v := range cfg.Env { + extraEnv[k] = v + } + } + + runCfg := orchestrator.RunConfig{ + TestCase: tc, + Subject: subject, + ConfigName: configName, + ConfigSrcPath: configSrc, + CaseDir: caseDir, + TmpDir: tmpDir, + GeneratorImage: r.opts.GeneratorImage, + ReceiverImage: r.opts.ReceiverImage, + CollectorImage: r.opts.CollectorImage, + ReceiverHostPort: r.opts.ReceiverHostPort, + ExtraSubjectEnv: extraEnv, + CPULimit: r.opts.CPULimit, + MemLimit: r.opts.MemLimit, + } + + orch, err := orchestrator.NewComposeRunner(runCfg) + if err != nil { + return results.RunResult{}, fmt.Errorf("compose setup: %w", err) + } + + for _, c := range []string{"bench-generator", "bench-receiver", "bench-collector", "bench-subject-" + subject.Name} { + _ = exec.Command("docker", "rm", "-f", c).Run() + } + _ = orch.Down() + + startTime := time.Now() + defer func() { + if !r.opts.NoCleanup { + fmt.Println(" tearing down…") + _ = orch.Down() + } + }() + + n := tc.Generator.TotalLines + if n <= 0 { + return results.RunResult{}, fmt.Errorf("kafka_offset_commit_restart requires generator.total_lines > 0") + } + + // Everything up, receiver INCLUDED — the whole stream must deliver + // cleanly before the restart. + fmt.Println(" starting all services (receiver UP throughout)…") + if err := orch.Up(); err != nil { + return results.RunResult{}, fmt.Errorf("starting services: %w", err) + } + + metricsPort, stopPortFwd, err := orch.ReceiverMetricsPort() + if err != nil { + return results.RunResult{}, fmt.Errorf("setting up receiver access: %w", err) + } + defer stopPortFwd() + + // Let the generator finish producing, then collect its final count. + duration := tc.DurationOrDefault(60 * time.Second) + warmup := tc.WarmupOrDefault(30 * time.Second) + genTimeout := duration + warmup + 2*time.Minute + if genTimeout > r.opts.Timeout { + genTimeout = r.opts.Timeout + } + if err := orch.WaitForGeneratorExit(genTimeout); err != nil { + fmt.Printf(" (generator wait: %v)\n", err) + } + genStats := r.parseGeneratorStats(orch.GeneratorStdout()) + fmt.Printf(" generator sent %s lines\n", formatCount(genStats.LinesSent)) + sent := genStats.LinesSent + if sent <= 0 { + sent = int64(n) + } + + // Wait for FULL delivery — the restart must land after every record + // reached the target, so any post-restart arrival is re-consumption. + fmt.Printf(" waiting for full delivery (receiver >= %s)…\n", formatCount(sent)) + delivered := false + deliverDeadline := time.Now().Add(r.opts.Timeout) + for time.Now().Before(deliverDeadline) { + rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr == nil { + fmt.Printf(" received: %s\n", formatCount(rm.LinesReceived)) + if rm.LinesReceived >= sent { + delivered = true + break + } + } + time.Sleep(2 * time.Second) + } + if !delivered { + return results.RunResult{}, fmt.Errorf("receiver never reached full delivery (%s) before timeout", formatCount(sent)) + } + + // Settle so the delivery-bound offset commits land at the broker; the + // graceful stop below additionally drains pending commits on shutdown. + fmt.Println(" full delivery reached — settling 5s, then graceful restart…") + time.Sleep(5 * time.Second) + + if err := orch.StopServices(30*time.Second, "subject"); err != nil { + return results.RunResult{}, fmt.Errorf("stopping subject: %w", err) + } + time.Sleep(3 * time.Second) + fmt.Println(" restarting subject (must resume from committed offsets)…") + if err := orch.UpServices("subject"); err != nil { + return results.RunResult{}, fmt.Errorf("restarting subject: %w", err) + } + + // Observation window: watch for re-consumption. The receiver count is + // already at `sent`; anything beyond it is over-delivery. Stable rounds + // mirror the in-flight case's drain. + fmt.Println(" observing for re-consumption…") + var lastCount int64 + stableRounds := 0 + observeDeadline := time.Now().Add(90 * time.Second) + for time.Now().Before(observeDeadline) { + time.Sleep(5 * time.Second) + rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second) + if qerr != nil { + continue + } + fmt.Printf(" received: %s / %s sent\n", formatCount(rm.LinesReceived), formatCount(sent)) + if rm.LinesReceived == lastCount && rm.LinesReceived > 0 { + stableRounds++ + if stableRounds >= 4 { + fmt.Println(" receiver stable") + break + } + } else { + stableRounds = 0 + } + lastCount = rm.LinesReceived + } + + recvMetrics, err := r.queryReceiverMetrics(metricsPort, 30*time.Second) + if err != nil { + return results.RunResult{}, fmt.Errorf("querying receiver metrics: %w", err) + } + + elapsed := time.Since(startTime).Seconds() + lossPct := 0.0 + if sent > 0 { + lossPct = 100.0 * (1.0 - float64(recvMetrics.LinesReceived)/float64(sent)) + if lossPct < 0 { + lossPct = 0 + } + } + overPct := 0.0 + var extra int64 + if recvMetrics.LinesReceived > sent && sent > 0 { + extra = recvMetrics.LinesReceived - sent + overPct = 100.0 * float64(extra) / float64(sent) + } + + var errors []string + if lossPct > tc.Correctness.ExpectedLossPct { + errors = append(errors, fmt.Sprintf("expected loss <= %.2f%%, got %.2f%% (%s of %s lines lost)", + tc.Correctness.ExpectedLossPct, lossPct, + formatCount(sent-recvMetrics.LinesReceived), formatCount(sent))) + } + if overPct > tc.Correctness.MaxOverDeliveryPct { + errors = append(errors, fmt.Sprintf( + "expected over-delivery <= %.2f%%, got %.2f%% (%s duplicate lines) — restart re-consumed records whose offsets should have been committed", + tc.Correctness.MaxOverDeliveryPct, overPct, formatCount(extra))) + } + passed := len(errors) == 0 + + fmt.Printf(" lines sent: %s lines received: %s loss: %.2f%% over-delivery: %.2f%%", + formatCount(sent), formatCount(recvMetrics.LinesReceived), lossPct, overPct) + if recvMetrics.Duplicates > 0 { + fmt.Printf(" (receiver dedup counted %s duplicates)", formatCount(recvMetrics.Duplicates)) + } + fmt.Println() + if passed { + fmt.Println(" kafka offset-commit restart correctness: PASSED ✓") + } else { + fmt.Println(" kafka offset-commit restart correctness: FAILED ✗") + } + + result := results.RunResult{ + TestName: tc.Name, + Config: configName, + Subject: subject.Name, + Version: subject.Version, + Hardware: hardwareID(), + Timestamp: startTime, + DurationSec: elapsed, + FirstSentNs: genStats.FirstSentNs, + LastSentNs: genStats.LastSentNs, + FirstReceivedNs: recvMetrics.FirstReceivedNs, + LastReceivedNs: recvMetrics.LastReceivedNs, + LinesIn: sent, + LinesOut: recvMetrics.LinesReceived, + BytesIn: genStats.BytesSent, + BytesOut: recvMetrics.BytesReceived, + LossPercent: lossPct, + Passed: &passed, + } + if !passed { + result.FailReason = strings.Join(errors, "; ") + } + + dir, err := r.store.Save(result, "") + if err != nil { + return result, fmt.Errorf("saving results: %w", err) + } + fmt.Printf(" done. results → %s\n", dir) + + return result, nil +} + // runPersistenceFileRestartCorrectness verifies that a file-tail subject // recovers correctly when it's offline across a file rotation. // From 30f0a79cdc965449ebd664c0f0ea8f740f9c63b0 Mon Sep 17 00:00:00 2001 From: Yusuf Ozturk Date: Thu, 11 Jun 2026 00:31:47 +0200 Subject: [PATCH 2/2] Review fix --- internal/config/case.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/config/case.go b/internal/config/case.go index 9725756..7d010d4 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -348,6 +348,9 @@ func (tc *TestCase) Validate() error { } } } + if tc.Correctness.MaxOverDeliveryPct < 0 { + return fmt.Errorf("case %q: max_overdelivery_pct must be non-negative, got %.2f", tc.Name, tc.Correctness.MaxOverDeliveryPct) + } return nil }