Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/config/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ func (tc *TestCase) Validate() error {
}
}
}
if tc.Correctness.MaxOverDeliveryPct < 0 {
return fmt.Errorf("case %q: max_overdelivery_pct must be non-negative, got %.2f", tc.Name, tc.Correctness.MaxOverDeliveryPct)
}
return nil
}

Expand Down Expand Up @@ -523,6 +526,15 @@ type CorrectnessConfig struct {
DrainSeconds int `yaml:"drain_seconds"`
DrainQuietWindow string `yaml:"drain_quiet_window"`

// MaxOverDeliveryPct caps duplicate re-delivery as a percentage of
// lines sent. Enforced by case types that verify source
// acknowledgments actually persisted (kafka_offset_commit_restart:
// a clean restart after full delivery must resume from committed
// offsets, not re-consume the topic). Zero = strict, no
// over-delivery allowed. Ignored by case types that don't document
// it — at-least-once types deliberately tolerate duplicates.
MaxOverDeliveryPct float64 `yaml:"max_overdelivery_pct"`
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// RateCeiling validates a per-window EPS ceiling on the receive side.
// Empty MaxEPS = check disabled.
RateCeiling RateCeilingConfig `yaml:"rate_ceiling"`
Expand Down
272 changes: 272 additions & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe
if tc.Type == "kafka_inflight_crash_correctness" {
return r.runKafkaInflightCrash(tc, subject)
}
// Kafka offset-commit restart: receiver stays UP, ALL records are
// delivered cleanly, then the subject is restarted gracefully. A
// consumer whose offset commits actually persist resumes from the
// committed offsets and re-delivers (close to) nothing. Verifies no
// loss AND bounded over-delivery — the inverse of the in-flight crash
// case, which tolerates unlimited duplicates and is therefore blind to
// a subject whose acknowledgments never reach the broker at all.
if tc.Type == "kafka_offset_commit_restart" {
return r.runKafkaOffsetCommitRestart(tc, subject)
}

configName := r.opts.ConfigName

Expand Down Expand Up @@ -1664,6 +1674,268 @@ func (r *Runner) runKafkaInflightCrash(tc *config.TestCase, subject config.Subje
return result, nil
}

// runKafkaOffsetCommitRestart verifies that delivery-bound source
// acknowledgments actually persist: every produced record is delivered to a
// LIVE receiver, the subject is then restarted GRACEFULLY, and the restarted
// consumer must resume from the committed offsets instead of re-consuming
// the topic.
//
// This is the inverse of runKafkaInflightCrash: that case kills mid-delivery
// and tolerates unlimited duplicates (at-least-once permits them), which
// makes it structurally blind to "offsets are never committed at all" — a
// bug that produces zero loss and 100% over-delivery. This case closes that
// gap with a hard over-delivery ceiling (correctness.max_overdelivery_pct,
// zero = strict).
//
// Verdict: no loss (loss <= expected_loss_pct) AND over-delivery <=
// max_overdelivery_pct.
func (r *Runner) runKafkaOffsetCommitRestart(tc *config.TestCase, subject config.Subject) (results.RunResult, error) {
configName := r.opts.ConfigName
subject = r.applySubjectOverrides(subject)

fmt.Printf("→ test=%s subject=%s version=%s config=%s\n",
tc.Name, subject.Name, subject.Version, configName)

configSrc, err := tc.ConfigFilePath(r.opts.CasesDir, configName, subject)
if err != nil {
return results.RunResult{}, err
}
configSrc, err = filepath.Abs(configSrc)
if err != nil {
return results.RunResult{}, fmt.Errorf("resolving config path: %w", err)
}

tmpDir, err := os.MkdirTemp("", "bench-"+tc.Name+"-")
if err != nil {
return results.RunResult{}, err
}
if err := os.Chmod(tmpDir, 0o777); err != nil {
return results.RunResult{}, fmt.Errorf("chmod tmpdir: %w", err)
}
defer func() {
if !r.opts.NoCleanup {
os.RemoveAll(tmpDir)
}
}()

caseDir, err := filepath.Abs(filepath.Join(r.opts.CasesDir, tc.Name))
if err != nil {
return results.RunResult{}, fmt.Errorf("resolving case directory: %w", err)
}

extraEnv := map[string]string{}
if cfg, ok := tc.Configurations[configName]; ok {
for k, v := range cfg.Env {
extraEnv[k] = v
}
}

runCfg := orchestrator.RunConfig{
TestCase: tc,
Subject: subject,
ConfigName: configName,
ConfigSrcPath: configSrc,
CaseDir: caseDir,
TmpDir: tmpDir,
GeneratorImage: r.opts.GeneratorImage,
ReceiverImage: r.opts.ReceiverImage,
CollectorImage: r.opts.CollectorImage,
ReceiverHostPort: r.opts.ReceiverHostPort,
ExtraSubjectEnv: extraEnv,
CPULimit: r.opts.CPULimit,
MemLimit: r.opts.MemLimit,
}

orch, err := orchestrator.NewComposeRunner(runCfg)
if err != nil {
return results.RunResult{}, fmt.Errorf("compose setup: %w", err)
}

for _, c := range []string{"bench-generator", "bench-receiver", "bench-collector", "bench-subject-" + subject.Name} {
_ = exec.Command("docker", "rm", "-f", c).Run()
}
_ = orch.Down()

startTime := time.Now()
defer func() {
if !r.opts.NoCleanup {
fmt.Println(" tearing down…")
_ = orch.Down()
}
}()

n := tc.Generator.TotalLines
if n <= 0 {
return results.RunResult{}, fmt.Errorf("kafka_offset_commit_restart requires generator.total_lines > 0")
}
Comment thread
yusufozturk marked this conversation as resolved.

// Everything up, receiver INCLUDED — the whole stream must deliver
// cleanly before the restart.
fmt.Println(" starting all services (receiver UP throughout)…")
if err := orch.Up(); err != nil {
return results.RunResult{}, fmt.Errorf("starting services: %w", err)
}

metricsPort, stopPortFwd, err := orch.ReceiverMetricsPort()
if err != nil {
return results.RunResult{}, fmt.Errorf("setting up receiver access: %w", err)
}
defer stopPortFwd()

// Let the generator finish producing, then collect its final count.
duration := tc.DurationOrDefault(60 * time.Second)
warmup := tc.WarmupOrDefault(30 * time.Second)
genTimeout := duration + warmup + 2*time.Minute
if genTimeout > r.opts.Timeout {
genTimeout = r.opts.Timeout
}
if err := orch.WaitForGeneratorExit(genTimeout); err != nil {
fmt.Printf(" (generator wait: %v)\n", err)
}
genStats := r.parseGeneratorStats(orch.GeneratorStdout())
fmt.Printf(" generator sent %s lines\n", formatCount(genStats.LinesSent))
sent := genStats.LinesSent
if sent <= 0 {
sent = int64(n)
}

// Wait for FULL delivery — the restart must land after every record
// reached the target, so any post-restart arrival is re-consumption.
fmt.Printf(" waiting for full delivery (receiver >= %s)…\n", formatCount(sent))
delivered := false
deliverDeadline := time.Now().Add(r.opts.Timeout)
for time.Now().Before(deliverDeadline) {
rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second)
if qerr == nil {
fmt.Printf(" received: %s\n", formatCount(rm.LinesReceived))
if rm.LinesReceived >= sent {
delivered = true
break
}
}
time.Sleep(2 * time.Second)
}
if !delivered {
return results.RunResult{}, fmt.Errorf("receiver never reached full delivery (%s) before timeout", formatCount(sent))
}

// Settle so the delivery-bound offset commits land at the broker; the
// graceful stop below additionally drains pending commits on shutdown.
fmt.Println(" full delivery reached — settling 5s, then graceful restart…")
time.Sleep(5 * time.Second)

if err := orch.StopServices(30*time.Second, "subject"); err != nil {
return results.RunResult{}, fmt.Errorf("stopping subject: %w", err)
}
time.Sleep(3 * time.Second)
fmt.Println(" restarting subject (must resume from committed offsets)…")
if err := orch.UpServices("subject"); err != nil {
return results.RunResult{}, fmt.Errorf("restarting subject: %w", err)
}

// Observation window: watch for re-consumption. The receiver count is
// already at `sent`; anything beyond it is over-delivery. Stable rounds
// mirror the in-flight case's drain.
fmt.Println(" observing for re-consumption…")
var lastCount int64
stableRounds := 0
observeDeadline := time.Now().Add(90 * time.Second)
for time.Now().Before(observeDeadline) {
time.Sleep(5 * time.Second)
rm, qerr := r.queryReceiverMetrics(metricsPort, 10*time.Second)
if qerr != nil {
continue
}
fmt.Printf(" received: %s / %s sent\n", formatCount(rm.LinesReceived), formatCount(sent))
if rm.LinesReceived == lastCount && rm.LinesReceived > 0 {
stableRounds++
if stableRounds >= 4 {
fmt.Println(" receiver stable")
break
}
} else {
stableRounds = 0
}
lastCount = rm.LinesReceived
}

recvMetrics, err := r.queryReceiverMetrics(metricsPort, 30*time.Second)
if err != nil {
return results.RunResult{}, fmt.Errorf("querying receiver metrics: %w", err)
}

elapsed := time.Since(startTime).Seconds()
lossPct := 0.0
if sent > 0 {
lossPct = 100.0 * (1.0 - float64(recvMetrics.LinesReceived)/float64(sent))
if lossPct < 0 {
lossPct = 0
}
}
overPct := 0.0
var extra int64
if recvMetrics.LinesReceived > sent && sent > 0 {
extra = recvMetrics.LinesReceived - sent
overPct = 100.0 * float64(extra) / float64(sent)
}

var errors []string
if lossPct > tc.Correctness.ExpectedLossPct {
errors = append(errors, fmt.Sprintf("expected loss <= %.2f%%, got %.2f%% (%s of %s lines lost)",
tc.Correctness.ExpectedLossPct, lossPct,
formatCount(sent-recvMetrics.LinesReceived), formatCount(sent)))
}
if overPct > tc.Correctness.MaxOverDeliveryPct {
errors = append(errors, fmt.Sprintf(
"expected over-delivery <= %.2f%%, got %.2f%% (%s duplicate lines) — restart re-consumed records whose offsets should have been committed",
tc.Correctness.MaxOverDeliveryPct, overPct, formatCount(extra)))
}
passed := len(errors) == 0
Comment thread
yusufozturk marked this conversation as resolved.

fmt.Printf(" lines sent: %s lines received: %s loss: %.2f%% over-delivery: %.2f%%",
formatCount(sent), formatCount(recvMetrics.LinesReceived), lossPct, overPct)
if recvMetrics.Duplicates > 0 {
fmt.Printf(" (receiver dedup counted %s duplicates)", formatCount(recvMetrics.Duplicates))
}
fmt.Println()
if passed {
fmt.Println(" kafka offset-commit restart correctness: PASSED ✓")
} else {
fmt.Println(" kafka offset-commit restart correctness: FAILED ✗")
}

result := results.RunResult{
TestName: tc.Name,
Config: configName,
Subject: subject.Name,
Version: subject.Version,
Hardware: hardwareID(),
Timestamp: startTime,
DurationSec: elapsed,
FirstSentNs: genStats.FirstSentNs,
LastSentNs: genStats.LastSentNs,
FirstReceivedNs: recvMetrics.FirstReceivedNs,
LastReceivedNs: recvMetrics.LastReceivedNs,
LinesIn: sent,
LinesOut: recvMetrics.LinesReceived,
BytesIn: genStats.BytesSent,
BytesOut: recvMetrics.BytesReceived,
LossPercent: lossPct,
Passed: &passed,
}
if !passed {
result.FailReason = strings.Join(errors, "; ")
}

dir, err := r.store.Save(result, "")
if err != nil {
return result, fmt.Errorf("saving results: %w", err)
}
fmt.Printf(" done. results → %s\n", dir)

return result, nil
}

// runPersistenceFileRestartCorrectness verifies that a file-tail subject
// recovers correctly when it's offline across a file rotation.
//
Expand Down
Loading