From 6feddff0b15cb29b4fcba8ef8d085aa442267cfb Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Thu, 11 Jun 2026 18:36:54 +0300 Subject: [PATCH 1/4] feat: add support for HashiCorp Vault integration in test topology --- FUTURE-CAPABILITIES.md | 53 +++++ internal/config/case.go | 106 ++++++++- internal/orchestrator/docker.go | 115 +++++++++- internal/orchestrator/vault.go | 71 ++++++ internal/orchestrator/vault_test.go | 326 ++++++++++++++++++++++++++++ internal/runner/runner.go | 9 + 6 files changed, 678 insertions(+), 2 deletions(-) create mode 100644 internal/orchestrator/vault.go create mode 100644 internal/orchestrator/vault_test.go diff --git a/FUTURE-CAPABILITIES.md b/FUTURE-CAPABILITIES.md index 6f8b497..faa4792 100644 --- a/FUTURE-CAPABILITIES.md +++ b/FUTURE-CAPABILITIES.md @@ -254,6 +254,59 @@ Result JSON gains: --- +## In-topology Vault secret store (`vault:`) + +A case can add a HashiCorp Vault dev server to the topology so a subject's +secret-store integration (e.g. VirtualMetric DataStream's `hashicorpvault` +credential provider) can be exercised end-to-end without real +infrastructure. Mirrors the `kafka:` supporting-service pattern: the +harness renders a `vault` service plus a one-shot `vault-init` that seeds +the declared secrets, and the subject gates on the seeding completing +(`service_completed_successfully`), so config-load-time secret resolution +never races it. + +```yaml +vault: + image: "hashicorp/vault:1.20" # optional, default shown (needs >= 1.12) + token: "pipebench-dev-root" # optional dev root token, default shown + mount: "secret" # optional KV mount, default shown (dev mode auto-enables it as KV v2) + secrets: # required: path -> field -> value + bench/http-auth: + username: "bench-user" + password: "bench-pass-12345" +``` + +How it runs: + +- The server runs in dev mode with TLS (`-dev-tls`): it generates its own + CA + leaf into `/vault/tls` (a per-run host dir), with SAN `vault` so + the chain validates for `https://vault:8200` inside the bench network. + Subjects whose secret providers are HTTPS-only work unmodified. +- The harness bind-mounts that dir read-only into the subject at + `/vault-tls` — subject configs reference the CA as + `/vault-tls/vault-ca.pem` (for vmetric: `ca_name: "vault-tls/vault-ca.pem"`). +- `vault-init` waits for the server's healthcheck, then seeds each + declared path via `vault kv put -mount= @`. Secret + values travel from `case.yaml` into per-run `0600` JSON files — they + never appear in the compose file, a command line, or `docker inspect` + output. Paths, field keys, mount, and token are charset-validated at + case load (`[A-Za-z0-9/_.-]`) because they do render into the compose + file. +- The token is a deterministic test-only value (`VAULT_DEV_ROOT_TOKEN_ID`); + the bench Vault holds nothing but the seeded test secrets and exists + for the lifetime of one run. + +Rules: + +- `secrets:` must declare at least one path, and every path at least one + field. +- `vault`, `vault-init` (and `redpanda`, `redpanda-init`) are reserved — + an `endpoints:` entry can't use those names. +- Composes with `kafka:`: a case may declare both blocks; the subject + then gates on both init containers. + +--- + ## Compatibility None of the schema described on this page exists in any case in `cases/` diff --git a/internal/config/case.go b/internal/config/case.go index 7d010d4..b60a6cf 100644 --- a/internal/config/case.go +++ b/internal/config/case.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "regexp" "strings" "time" @@ -51,6 +52,14 @@ type TestCase struct { // kafka_performance / kafka_correctness types. Kafka *KafkaConfig `yaml:"kafka"` + // Vault, when set, adds a HashiCorp Vault dev-mode server (TLS-enabled) + // to the test topology: the harness renders a `vault` service plus a + // one-shot `vault-init` that seeds the declared secrets, and the subject + // gates on the seeding completing. Lets a case exercise a subject's + // secret-store integration (e.g. vmetric's hashicorpvault credential + // provider) without real infrastructure. + Vault *VaultConfig `yaml:"vault"` + Subjects []string `yaml:"subjects"` Configurations map[string]Configuration `yaml:"configurations"` Correctness CorrectnessConfig `yaml:"correctness"` @@ -112,6 +121,51 @@ func (k *KafkaConfig) SMPOrDefault() int { return 1 } +// VaultConfig configures the in-topology HashiCorp Vault dev server (see +// TestCase.Vault). All fields except Secrets are optional; the orchestrator +// applies the defaults noted below. The dev server listens TLS-only at +// https://vault:8200 with a self-generated CA the harness bind-mounts into +// the subject at /vault-tls (vault-ca.pem). +type VaultConfig struct { + // Image is the Vault container image (default "hashicorp/vault:1.20"). + // Needs >= 1.12 for -dev-tls and `vault kv put -mount=...`. + Image string `yaml:"image"` + // Token is the deterministic dev-mode root token, fixed via + // VAULT_DEV_ROOT_TOKEN_ID (default "pipebench-dev-root"). Test-only — + // the bench Vault never holds real secrets. + Token string `yaml:"token"` + // Mount is the KV mount the secrets are seeded under (default "secret", + // the mount dev mode auto-enables as KV v2). + Mount string `yaml:"mount"` + // Secrets maps secret path -> field name -> value. vault-init seeds each + // path via `vault kv put -mount= @`; the values are + // written to per-run JSON files, never onto a command line. + Secrets map[string]map[string]string `yaml:"secrets"` +} + +// VaultImageOrDefault etc. centralize the dev-server defaults so the +// orchestrator and any caller render the same values. +func (v *VaultConfig) VaultImageOrDefault() string { + if v != nil && v.Image != "" { + return v.Image + } + return "hashicorp/vault:1.20" +} + +func (v *VaultConfig) TokenOrDefault() string { + if v != nil && v.Token != "" { + return v.Token + } + return "pipebench-dev-root" +} + +func (v *VaultConfig) MountOrDefault() string { + if v != nil && v.Mount != "" { + return v.Mount + } + return "secret" +} + // Endpoint is an auxiliary container in the test topology (see // TestCase.Endpoints). It's a host the subject reaches on the bench network — // not a generator or receiver. @@ -250,6 +304,9 @@ func (tc *TestCase) HasGenerator() bool { // UsesKafka reports whether the case adds a Redpanda broker to the topology. func (tc *TestCase) UsesKafka() bool { return tc.Kafka != nil } +// UsesVault reports whether the case adds a Vault dev server to the topology. +func (tc *TestCase) UsesVault() bool { return tc.Vault != nil } + // IsPerformanceType reports whether the case is scored as a throughput test — // the plain `performance` type or the Kafka variant `kafka_performance`. func (tc *TestCase) IsPerformanceType() bool { @@ -307,7 +364,10 @@ func (tc *TestCase) Validate() error { } // Endpoints: require name+image, unique, and not colliding with the fixed // service names the compose template always emits. - reserved := map[string]struct{}{"subject": {}, "generator": {}, "receiver": {}, "collector": {}} + reserved := map[string]struct{}{ + "subject": {}, "generator": {}, "receiver": {}, "collector": {}, + "redpanda": {}, "redpanda-init": {}, "vault": {}, "vault-init": {}, + } epNames := map[string]struct{}{} for i, e := range tc.Endpoints { if e.Name == "" { @@ -351,6 +411,50 @@ func (tc *TestCase) Validate() error { if tc.Correctness.MaxOverDeliveryPct < 0 { return fmt.Errorf("case %q: max_overdelivery_pct must be non-negative, got %.2f", tc.Name, tc.Correctness.MaxOverDeliveryPct) } + if err := tc.validateVault(); err != nil { + return err + } + return nil +} + +// vaultPathRe / vaultTokenRe constrain the vault-config strings that end up +// embedded in the generated docker-compose file (the vault-init command line +// and environment): no shell or YAML metacharacter can pass. Secret VALUES +// are exempt — they are written to JSON seed files, never rendered inline. +var ( + vaultPathRe = regexp.MustCompile(`^[A-Za-z0-9/_.-]+$`) + vaultTokenRe = regexp.MustCompile(`^[A-Za-z0-9_.-]+$`) +) + +// validateVault checks the optional `vault:` block: seeding at least one +// secret is mandatory (a vault server nothing reads from is a case-authoring +// mistake), and every compose-embedded string is charset-restricted. +func (tc *TestCase) validateVault() error { + if tc.Vault == nil { + return nil + } + if len(tc.Vault.Secrets) == 0 { + return fmt.Errorf("case %q: vault block requires at least one entry under `secrets`", tc.Name) + } + if !vaultTokenRe.MatchString(tc.Vault.TokenOrDefault()) { + return fmt.Errorf("case %q: vault token must match %s", tc.Name, vaultTokenRe) + } + if !vaultPathRe.MatchString(tc.Vault.MountOrDefault()) { + return fmt.Errorf("case %q: vault mount %q must match %s", tc.Name, tc.Vault.MountOrDefault(), vaultPathRe) + } + for path, fields := range tc.Vault.Secrets { + if !vaultPathRe.MatchString(path) { + return fmt.Errorf("case %q: vault secret path %q must match %s", tc.Name, path, vaultPathRe) + } + if len(fields) == 0 { + return fmt.Errorf("case %q: vault secret %q has no fields", tc.Name, path) + } + for key := range fields { + if !vaultTokenRe.MatchString(key) { + return fmt.Errorf("case %q: vault secret %q field key %q must match %s", tc.Name, path, key, vaultTokenRe) + } + } + } return nil } diff --git a/internal/orchestrator/docker.go b/internal/orchestrator/docker.go index 8b1817d..865e2be 100644 --- a/internal/orchestrator/docker.go +++ b/internal/orchestrator/docker.go @@ -50,10 +50,16 @@ services: image: "{{ .SubjectImage }}" container_name: "{{ .SubjectContainer }}" networks: [bench] -{{- if .KafkaEnabled }} +{{- if or .KafkaEnabled .VaultEnabled }} depends_on: +{{- if .KafkaEnabled }} redpanda-init: condition: service_completed_successfully +{{- end }} +{{- if .VaultEnabled }} + vault-init: + condition: service_completed_successfully +{{- end }} {{- end }} volumes: - "{{ .ConfigSrc }}:{{ .ConfigDst }}{{ .ConfigMountOpts }}" @@ -64,6 +70,9 @@ services: {{- if .TLSCertsHost }} - "{{ .TLSCertsHost }}:/certs:ro" {{- end }} +{{- if .VaultEnabled }} + - "{{ .VaultTLSHost }}:/vault-tls:ro" +{{- end }} {{- if .SubjectUser }} user: "{{ .SubjectUser }}" {{- end }} @@ -390,6 +399,59 @@ services: - "rpk topic create {{ .KafkaTopic }} -p {{ .KafkaPartitions }} --brokers redpanda:9092 || rpk topic describe {{ .KafkaTopic }} --brokers redpanda:9092" restart: "no" {{- end }} +{{- if .VaultEnabled }} + + vault: + image: "{{ .VaultImage }}" + container_name: "bench-vault" + hostname: "vault" + networks: [bench] + cap_add: + - IPC_LOCK + environment: + VAULT_DEV_ROOT_TOKEN_ID: "{{ .VaultToken }}" + command: + - server + - "-dev" + - "-dev-tls" + - "-dev-tls-cert-dir=/vault/tls" + - "-dev-tls-san=vault" + - "-dev-listen-address=0.0.0.0:8200" + volumes: + - "{{ .VaultTLSHost }}:/vault/tls" + # -tls-skip-verify is a liveness probe choice, not a trust decision: the + # check races the dev server writing its own cert dir at startup. Full + # chain verification happens in vault-init (VAULT_CACERT) and the subject. + healthcheck: + test: ["CMD", "vault", "status", "-address=https://127.0.0.1:8200", "-tls-skip-verify"] + interval: 2s + timeout: 5s + retries: 30 + start_period: 3s + restart: "no" + + # One-shot: seed each declared secret from its mounted JSON file, then + # exit 0. The subject gates on this completing so config-load-time + # $secret resolution never races the seeding. + vault-init: + image: "{{ .VaultImage }}" + container_name: "bench-vault-init" + networks: [bench] + depends_on: + vault: + condition: service_healthy + environment: + VAULT_ADDR: "https://vault:8200" + VAULT_CACERT: "/vault/tls/vault-ca.pem" + VAULT_TOKEN: "{{ .VaultToken }}" + volumes: + - "{{ .VaultTLSHost }}:/vault/tls:ro" + - "{{ .VaultSecretsHost }}:/vault-secrets:ro" + entrypoint: ["/bin/sh", "-c"] + command: + - "{{ .VaultInitCmd }}" + restart: "no" +{{- end }} ` // RunConfig holds parameters for a single test run. @@ -414,6 +476,13 @@ type RunConfig struct { // path is bind-mounted into the subject and every TLS-using generator // at /certs:ro. TLSCertsHost string + // VaultTLSHost / VaultSecretsHost / VaultSeeds are the host paths and + // seed list provisioned by PrepareVault for a `vault:`-enabled case. + // NewComposeRunner populates them when empty; tests may set them + // directly to render a compose file without touching the filesystem. + VaultTLSHost string + VaultSecretsHost string + VaultSeeds []VaultSeed } // ComposeRunner manages a docker compose lifecycle for one test run. @@ -444,6 +513,17 @@ func NewComposeRunner(cfg RunConfig) (*ComposeRunner, error) { cfg.DockerSocketGID = dockerSocketGID() } + // Vault topology prep lives here (not in the runner) so every RunConfig + // construction site — including the persistence/restart flows — gets it + // from the single place all of them already pass through. + if cfg.TestCase.UsesVault() && cfg.VaultTLSHost == "" { + vp, err := PrepareVault(cfg.TmpDir, cfg.TestCase.Vault) + if err != nil { + return nil, fmt.Errorf("preparing vault topology: %w", err) + } + cfg.VaultTLSHost, cfg.VaultSecretsHost, cfg.VaultSeeds = vp.TLSDir, vp.SecretsDir, vp.Seeds + } + composeFile := filepath.Join(cfg.TmpDir, "docker-compose.yaml") if err := writeCompose(composeFile, cfg); err != nil { return nil, err @@ -811,6 +891,18 @@ type composeVars struct { KafkaSMP int GenKafkaBatch int + // Vault topology. VaultEnabled gates the vault + vault-init services, + // the subject's vault-init depends_on, and the /vault-tls subject mount. + // VaultInitCmd is the pre-built one-shot seeding shell line: mount and + // paths are charset-validated at case load, and the secret values live + // in the mounted JSON files, never in this string. + VaultEnabled bool + VaultImage string + VaultToken string + VaultTLSHost string + VaultSecretsHost string + VaultInitCmd string + RecvMode string RecvListen string RecvValidateDedup string @@ -1074,6 +1166,27 @@ func writeCompose(path string, cfg RunConfig) error { vars.KafkaSMP = tc.Kafka.SMPOrDefault() } + // Vault dev server: render the vault + vault-init services and gate the + // subject on the seeding completing. Defaults centralized on VaultConfig. + if tc.UsesVault() { + if cfg.VaultTLSHost == "" || cfg.VaultSecretsHost == "" { + return fmt.Errorf("case %q uses vault but the vault host dirs were not prepared", tc.Name) + } + vars.VaultEnabled = true + vars.VaultImage = tc.Vault.VaultImageOrDefault() + vars.VaultToken = tc.Vault.TokenOrDefault() + vars.VaultTLSHost = filepath.ToSlash(cfg.VaultTLSHost) + vars.VaultSecretsHost = filepath.ToSlash(cfg.VaultSecretsHost) + var sb strings.Builder + sb.WriteString("set -e; ") + for _, s := range cfg.VaultSeeds { + fmt.Fprintf(&sb, "vault kv put -mount=%s %s @/vault-secrets/%s; ", + tc.Vault.MountOrDefault(), s.Path, s.File) + } + sb.WriteString("echo vault seeding complete") + vars.VaultInitCmd = sb.String() + } + if tc.MultiReceiver() { for i, rc := range tc.Receivers { vars.Receivers = append(vars.Receivers, receiverTpl{ diff --git a/internal/orchestrator/vault.go b/internal/orchestrator/vault.go new file mode 100644 index 0000000..31cd545 --- /dev/null +++ b/internal/orchestrator/vault.go @@ -0,0 +1,71 @@ +package orchestrator + +import ( + "encoding/json" + "fmt" + "maps" + "os" + "path/filepath" + "slices" + + "github.com/VirtualMetric/PipeBench/internal/config" +) + +// VaultSeed pairs a secret path with the JSON file vault-init loads it from. +// Path is charset-validated at case load (see config.TestCase.Validate), so +// it is safe to embed in the vault-init command line; the secret values live +// only in the JSON file. +type VaultSeed struct { + Path string // KV path the secret is seeded under + File string // file name inside the mounted /vault-secrets dir +} + +// VaultPaths is what PrepareVault provisions under the run temp dir for a +// `vault:`-enabled case. +type VaultPaths struct { + // TLSDir is bind-mounted read-write into the vault container (the dev + // server writes vault-ca.pem / vault-cert.pem / vault-key.pem there) and + // read-only into the subject at /vault-tls so it can verify the chain. + TLSDir string + // SecretsDir holds one JSON file per seeded secret path and is + // bind-mounted read-only into vault-init. + SecretsDir string + Seeds []VaultSeed +} + +// PrepareVault creates the host directories and secret seed files a +// `vault:`-enabled run needs. Seeds are ordered by sorted secret path so file +// numbering and the rendered vault-init command are deterministic. +func PrepareVault(tmpDir string, v *config.VaultConfig) (VaultPaths, error) { + tlsDir := filepath.Join(tmpDir, "vault-tls") + if err := os.MkdirAll(tlsDir, 0o777); err != nil { + return VaultPaths{}, fmt.Errorf("creating vault tls dir: %w", err) + } + // MkdirAll is umask-filtered; chmod explicitly. World-writable because the + // vault image drops to uid 100 (user "vault"), which must create the dev + // TLS material in this host-owned dir — per-run throwaway certs, removed + // with the temp dir. + if err := os.Chmod(tlsDir, 0o777); err != nil { + return VaultPaths{}, fmt.Errorf("chmod vault tls dir: %w", err) + } + + secretsDir := filepath.Join(tmpDir, "vault-secrets") + if err := os.MkdirAll(secretsDir, 0o700); err != nil { + return VaultPaths{}, fmt.Errorf("creating vault secrets dir: %w", err) + } + + seeds := make([]VaultSeed, 0, len(v.Secrets)) + for i, path := range slices.Sorted(maps.Keys(v.Secrets)) { + data, err := json.Marshal(v.Secrets[path]) + if err != nil { + return VaultPaths{}, fmt.Errorf("encoding vault secret %q: %w", path, err) + } + file := fmt.Sprintf("%d.json", i) + if err := os.WriteFile(filepath.Join(secretsDir, file), data, 0o600); err != nil { + return VaultPaths{}, fmt.Errorf("writing vault secret %q: %w", path, err) + } + seeds = append(seeds, VaultSeed{Path: path, File: file}) + } + + return VaultPaths{TLSDir: tlsDir, SecretsDir: secretsDir, Seeds: seeds}, nil +} diff --git a/internal/orchestrator/vault_test.go b/internal/orchestrator/vault_test.go new file mode 100644 index 0000000..bcb3cd5 --- /dev/null +++ b/internal/orchestrator/vault_test.go @@ -0,0 +1,326 @@ +package orchestrator + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/VirtualMetric/PipeBench/internal/config" + "gopkg.in/yaml.v3" +) + +// vaultSmokeCase returns a minimal vault-enabled correctness case with two +// secret paths declared out of sorted order (seed ordering is asserted below). +func vaultSmokeCase() *config.TestCase { + return &config.TestCase{ + Name: "vault-smoke", + Type: "correctness", + Duration: "10s", + Vault: &config.VaultConfig{ + Secrets: map[string]map[string]string{ + "bench/http-auth": {"username": "bench-user", "password": "bench-pass"}, + "bench/aux": {"value": "v1"}, + }, + }, + Generator: config.GeneratorConfig{ + Mode: "http", Target: "http://u:p@subject:9000/", Rate: 10, + LineSize: 64, Format: "raw", Connections: 1, + }, + Receiver: config.ReceiverConfig{Mode: "tcp", Listen: ":9001"}, + } +} + +// TestComposeRendersVault verifies a vault-enabled case renders the vault dev +// server + one-shot vault-init seeder, mounts the TLS dir into the subject, +// and gates the subject on seeding completing. Runs PrepareVault for real so +// the host dirs/seed files exercised here are the ones the runner would use. +func TestComposeRendersVault(t *testing.T) { + tc := vaultSmokeCase() + if err := tc.Validate(); err != nil { + t.Fatalf("validate: %v", err) + } + subj := config.Subject{Name: "vmetric", Image: "vmetric/director", Version: "2.0.2", ConfigPath: "/config.yml"} + tmp, err := os.MkdirTemp("", "compose-vault-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + vp, err := PrepareVault(tmp, tc.Vault) + if err != nil { + t.Fatalf("PrepareVault: %v", err) + } + composePath := filepath.Join(tmp, "compose.yaml") + cfg := RunConfig{ + TestCase: tc, Subject: subj, ConfigName: "default", + ConfigSrcPath: composePath, TmpDir: tmp, + GeneratorImage: "img-gen", ReceiverImage: "img-recv", CollectorImage: "img-coll", + ReceiverHostPort: 19001, + VaultTLSHost: vp.TLSDir, + VaultSecretsHost: vp.SecretsDir, + VaultSeeds: vp.Seeds, + } + if err := writeCompose(composePath, cfg); err != nil { + t.Fatalf("writeCompose: %v", err) + } + data, err := os.ReadFile(composePath) + if err != nil { + t.Fatal(err) + } + out := string(data) + + // The rendered compose (including the vault block) must be valid YAML — + // guards the template indentation. + var parsed map[string]any + if err := yaml.Unmarshal(data, &parsed); err != nil { + t.Fatalf("rendered compose is not valid YAML: %v\n%s", err, out) + } + + mustContain(t, out, " vault:\n") + mustContain(t, out, "container_name: \"bench-vault\"") + mustContain(t, out, "hostname: \"vault\"") + mustContain(t, out, "image: \"hashicorp/vault:1.20\"") + mustContain(t, out, "-dev-tls-san=vault") + mustContain(t, out, "-dev-listen-address=0.0.0.0:8200") + mustContain(t, out, "VAULT_DEV_ROOT_TOKEN_ID: \"pipebench-dev-root\"") + mustContain(t, out, "-tls-skip-verify") + + mustContain(t, out, " vault-init:\n") + mustContain(t, out, "container_name: \"bench-vault-init\"") + mustContain(t, out, "VAULT_ADDR: \"https://vault:8200\"") + mustContain(t, out, "VAULT_CACERT: \"/vault/tls/vault-ca.pem\"") + // Seeds render in sorted-path order with per-path JSON files; only paths + // and file names appear on the command line, never secret values. + mustContain(t, out, "vault kv put -mount=secret bench/aux @/vault-secrets/0.json; "+ + "vault kv put -mount=secret bench/http-auth @/vault-secrets/1.json") + mustNotContain(t, out, "bench-pass") + + // Subject gates on seeding and gets the TLS dir read-only at /vault-tls. + mustContain(t, out, ":/vault-tls:ro") + subjectSvc := parsed["services"].(map[string]any)["subject"].(map[string]any) + deps, ok := subjectSvc["depends_on"].(map[string]any) + if !ok { + t.Fatalf("subject depends_on missing or wrong shape: %#v", subjectSvc["depends_on"]) + } + if _, ok := deps["vault-init"]; !ok { + t.Errorf("subject depends_on lacks vault-init: %#v", deps) + } +} + +// TestComposeOmitsVaultByDefault guards existing cases: no vault service or +// wiring should appear when the case has no `vault:` block. +func TestComposeOmitsVaultByDefault(t *testing.T) { + tc := &config.TestCase{ + Name: "plain-tcp", + Type: "performance", + Duration: "30s", + Generator: config.GeneratorConfig{Mode: "tcp", Target: "subject:9000", Rate: 10, LineSize: 64, Format: "raw"}, + Receiver: config.ReceiverConfig{Mode: "tcp", Listen: ":9001"}, + } + subj := config.Subject{Name: "vmetric", Image: "vmetric/director", Version: "2.0.2", ConfigPath: "/config.yml"} + // Note: the tmp dir path is bind-mounted into the rendered compose, so its + // name must not itself contain the strings asserted absent below. + tmp, err := os.MkdirTemp("", "compose-plain-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + composePath := filepath.Join(tmp, "compose.yaml") + cfg := RunConfig{ + TestCase: tc, Subject: subj, ConfigName: "default", + ConfigSrcPath: composePath, TmpDir: tmp, + GeneratorImage: "img-gen", ReceiverImage: "img-recv", CollectorImage: "img-coll", + ReceiverHostPort: 19001, + } + if err := writeCompose(composePath, cfg); err != nil { + t.Fatalf("writeCompose: %v", err) + } + data, err := os.ReadFile(composePath) + if err != nil { + t.Fatal(err) + } + out := string(data) + mustNotContain(t, out, "vault") + mustNotContain(t, out, "VAULT_") + mustNotContain(t, out, "/vault-tls") +} + +// TestComposeRendersKafkaAndVaultTogether is the regression test for the +// subject depends_on restructure: with both blocks set, the subject must gate +// on BOTH redpanda-init and vault-init and the file must stay valid YAML. +func TestComposeRendersKafkaAndVaultTogether(t *testing.T) { + tc := vaultSmokeCase() + tc.Name = "kafka-vault" + tc.Type = "kafka_correctness" + tc.Kafka = &config.KafkaConfig{Topic: "bench"} + tc.Generator = config.GeneratorConfig{Mode: "kafka", Target: "redpanda:9092", Format: "json"} + if err := tc.Validate(); err != nil { + t.Fatalf("validate: %v", err) + } + subj := config.Subject{Name: "vmetric", Image: "vmetric/director", Version: "2.0.2", ConfigPath: "/config.yml"} + tmp, err := os.MkdirTemp("", "compose-kafka-vault-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + vp, err := PrepareVault(tmp, tc.Vault) + if err != nil { + t.Fatalf("PrepareVault: %v", err) + } + composePath := filepath.Join(tmp, "compose.yaml") + cfg := RunConfig{ + TestCase: tc, Subject: subj, ConfigName: "default", + ConfigSrcPath: composePath, TmpDir: tmp, + GeneratorImage: "img-gen", ReceiverImage: "img-recv", CollectorImage: "img-coll", + ReceiverHostPort: 19001, + VaultTLSHost: vp.TLSDir, + VaultSecretsHost: vp.SecretsDir, + VaultSeeds: vp.Seeds, + } + if err := writeCompose(composePath, cfg); err != nil { + t.Fatalf("writeCompose: %v", err) + } + data, err := os.ReadFile(composePath) + if err != nil { + t.Fatal(err) + } + var parsed map[string]any + if err := yaml.Unmarshal(data, &parsed); err != nil { + t.Fatalf("rendered compose is not valid YAML: %v\n%s", err, string(data)) + } + subjectSvc := parsed["services"].(map[string]any)["subject"].(map[string]any) + deps, ok := subjectSvc["depends_on"].(map[string]any) + if !ok { + t.Fatalf("subject depends_on missing or wrong shape: %#v", subjectSvc["depends_on"]) + } + for _, want := range []string{"redpanda-init", "vault-init"} { + if _, ok := deps[want]; !ok { + t.Errorf("subject depends_on lacks %s: %#v", want, deps) + } + } +} + +// TestWriteComposeRejectsUnpreparedVault verifies writeCompose fails fast when +// a vault case reaches it without PrepareVault having provisioned the host +// dirs (instead of rendering empty bind-mount sources). +func TestWriteComposeRejectsUnpreparedVault(t *testing.T) { + tc := vaultSmokeCase() + subj := config.Subject{Name: "vmetric", Image: "vmetric/director", Version: "2.0.2", ConfigPath: "/config.yml"} + tmp, err := os.MkdirTemp("", "compose-vault-unprep-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + composePath := filepath.Join(tmp, "compose.yaml") + cfg := RunConfig{ + TestCase: tc, Subject: subj, ConfigName: "default", + ConfigSrcPath: composePath, TmpDir: tmp, + GeneratorImage: "img-gen", ReceiverImage: "img-recv", CollectorImage: "img-coll", + ReceiverHostPort: 19001, + } + if err := writeCompose(composePath, cfg); err == nil { + t.Fatal("expected error for vault case without prepared host dirs") + } +} + +// TestValidateRejectsBadVault covers the vault validation rules: secrets are +// mandatory and every compose-embedded string is charset-restricted. +func TestValidateRejectsBadVault(t *testing.T) { + valid := map[string]map[string]string{"bench/ok": {"k": "v"}} + cases := map[string]*config.TestCase{ + "empty secrets": {Name: "x", Vault: &config.VaultConfig{}}, + "path with command substitution": {Name: "x", Vault: &config.VaultConfig{ + Secrets: map[string]map[string]string{"bench/$(reboot)": {"k": "v"}}}}, + "path with space": {Name: "x", Vault: &config.VaultConfig{ + Secrets: map[string]map[string]string{"bench/a b": {"k": "v"}}}}, + "path with quote": {Name: "x", Vault: &config.VaultConfig{ + Secrets: map[string]map[string]string{`bench/a"b`: {"k": "v"}}}}, + "empty field map": {Name: "x", Vault: &config.VaultConfig{ + Secrets: map[string]map[string]string{"bench/empty": {}}}}, + "bad field key": {Name: "x", Vault: &config.VaultConfig{ + Secrets: map[string]map[string]string{"bench/ok": {"user name": "v"}}}}, + "bad token": {Name: "x", Vault: &config.VaultConfig{Token: "root$tok", Secrets: valid}}, + "bad mount": {Name: "x", Vault: &config.VaultConfig{Mount: "se cret", Secrets: valid}}, + "endpoint named vault": {Name: "x", + Endpoints: []config.Endpoint{{Name: "vault", Image: "img"}}}, + "endpoint named vault-init": {Name: "x", + Endpoints: []config.Endpoint{{Name: "vault-init", Image: "img"}}}, + "endpoint named redpanda": {Name: "x", + Endpoints: []config.Endpoint{{Name: "redpanda", Image: "img"}}}, + } + for label, tc := range cases { + if err := tc.Validate(); err == nil { + t.Errorf("%s: expected validation error, got nil", label) + } + } + + // Sanity: a well-formed vault block passes. + good := vaultSmokeCase() + if err := good.Validate(); err != nil { + t.Errorf("valid vault case rejected: %v", err) + } +} + +// TestPrepareVaultWritesSeeds verifies the host-side provisioning: dir modes +// (the vault container's uid 100 must write the TLS dir; the secrets dir stays +// private), deterministic sorted seed ordering, and JSON round-tripping. +func TestPrepareVaultWritesSeeds(t *testing.T) { + tmp, err := os.MkdirTemp("", "prepare-vault-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp) + + v := &config.VaultConfig{ + Secrets: map[string]map[string]string{ + "z/last": {"k": "v2"}, + "a/first": {"user": "u", "pass": "p"}, + }, + } + vp, err := PrepareVault(tmp, v) + if err != nil { + t.Fatalf("PrepareVault: %v", err) + } + + tlsInfo, err := os.Stat(vp.TLSDir) + if err != nil { + t.Fatalf("tls dir: %v", err) + } + if got := tlsInfo.Mode().Perm(); got != 0o777 { + t.Errorf("tls dir mode: got %o, want 777", got) + } + secInfo, err := os.Stat(vp.SecretsDir) + if err != nil { + t.Fatalf("secrets dir: %v", err) + } + if got := secInfo.Mode().Perm(); got != 0o700 { + t.Errorf("secrets dir mode: got %o, want 700", got) + } + + want := []VaultSeed{{Path: "a/first", File: "0.json"}, {Path: "z/last", File: "1.json"}} + if len(vp.Seeds) != len(want) { + t.Fatalf("seeds: got %v, want %v", vp.Seeds, want) + } + for i, w := range want { + if vp.Seeds[i] != w { + t.Errorf("seed %d: got %+v, want %+v", i, vp.Seeds[i], w) + } + data, err := os.ReadFile(filepath.Join(vp.SecretsDir, w.File)) + if err != nil { + t.Fatalf("reading seed %s: %v", w.File, err) + } + var fields map[string]string + if err := json.Unmarshal(data, &fields); err != nil { + t.Fatalf("seed %s is not valid JSON: %v", w.File, err) + } + if len(fields) != len(v.Secrets[w.Path]) { + t.Errorf("seed %s fields: got %v, want %v", w.File, fields, v.Secrets[w.Path]) + } + for k, val := range v.Secrets[w.Path] { + if fields[k] != val { + t.Errorf("seed %s field %q: got %q, want %q", w.File, k, fields[k], val) + } + } + } +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index f675483..279c6b9 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -279,6 +279,15 @@ func (r *Runner) Run(tc *config.TestCase, subject config.Subject) (results.RunRe "bench-generator", "bench-receiver", "bench-collector", "bench-subject-" + subject.Name, } + // Supporting-service containers have fixed names too; a crashed prior run + // belongs to a different compose project, so Down() won't remove them and + // the new run would collide on the name. + if tc.UsesKafka() { + cleanupContainers = append(cleanupContainers, "bench-redpanda", "bench-redpanda-init") + } + if tc.UsesVault() { + cleanupContainers = append(cleanupContainers, "bench-vault", "bench-vault-init") + } // Plural-mode containers (bench-generator-, bench-receiver-) // need cleanup too, otherwise a re-run of the same case can collide. for _, c := range cr.GeneratorContainers() { From e0d2d5f6576b7a85be6cec1019a7e70ec1ecd2c0 Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Thu, 11 Jun 2026 18:37:12 +0300 Subject: [PATCH 2/4] feat: implement HTTP batcher tests and fix buffer aliasing issues --- containers/generator/http_test.go | 74 +++++++++++++++++++++++++++++++ containers/generator/main.go | 7 ++- 2 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 containers/generator/http_test.go diff --git a/containers/generator/http_test.go b/containers/generator/http_test.go new file mode 100644 index 0000000..2ab851e --- /dev/null +++ b/containers/generator/http_test.go @@ -0,0 +1,74 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" +) + +// TestRunHTTPSingleBatchesDistinctLines guards the HTTP batcher against two +// regressions: reused-buffer aliasing (sequenced mode rewrites the CONN=/SEQ= +// prefix in place, so un-copied batch entries would all alias the final line) +// and newline doubling (generated lines already end in '\n'; joining batch +// entries with another '\n' inserted a blank line between records). +func TestRunHTTPSingleBatchesDistinctLines(t *testing.T) { + var mu sync.Mutex + var lines [][]byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("reading body: %v", err) + return + } + mu.Lock() + lines = append(lines, bytes.Split(body, []byte("\n"))...) + mu.Unlock() + })) + defer srv.Close() + + // 250 lines = two full 100-line batches plus a partial final flush. + const total = 250 + cfg := config{ + Mode: "http", + Target: srv.URL, + TotalLines: total, + LineSize: 64, + Format: "raw", + Sequenced: true, + Connections: 1, + } + + sent, _, err := runHTTPSingle(cfg, &sendClock{}) + if err != nil { + t.Fatalf("runHTTPSingle: %v", err) + } + if sent != total { + t.Fatalf("lines sent: got %d, want %d", sent, total) + } + + mu.Lock() + defer mu.Unlock() + if len(lines) != total { + t.Fatalf("lines received: got %d, want %d (blank or missing lines)", len(lines), total) + } + seen := make(map[string]struct{}, total) + for i, l := range lines { + if len(l) == 0 { + t.Fatalf("line %d is blank — newline-doubling regression", i) + } + var conn int + var seq int64 + if _, err := fmt.Sscanf(string(l), "CONN=%d SEQ=%d", &conn, &seq); err != nil { + t.Fatalf("line %d %q lacks CONN=/SEQ= prefix: %v", i, l, err) + } + key := fmt.Sprintf("%d/%d", conn, seq) + if _, dup := seen[key]; dup { + t.Fatalf("duplicate sequence %s — buffer-aliasing regression", key) + } + seen[key] = struct{}{} + } +} diff --git a/containers/generator/main.go b/containers/generator/main.go index 0fbd3a3..62aa99a 100644 --- a/containers/generator/main.go +++ b/containers/generator/main.go @@ -823,7 +823,12 @@ func runHTTPSingle(cfg config, clock *sendClock) (int64, int64, error) { } _, _, err := sendLines(cfg, clock, func(line []byte) error { - batch = append(batch, line) + // Lines arrive with a trailing '\n' and may alias a reused buffer + // (sequenced mode rewrites the CONN=/SEQ= prefix in place), so copy + // and trim before batching: entries stay distinct until flush and + // the '\n' join there doesn't double newlines. + cp := append([]byte(nil), bytes.TrimSuffix(line, []byte("\n"))...) + batch = append(batch, cp) if len(batch) >= batchSize { return flush() } From 8e0f7499c9d72dedeadb44e517a858beb94385e5 Mon Sep 17 00:00:00 2001 From: Eren Aslan <16862833+erenaslandev@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:16:56 +0300 Subject: [PATCH 3/4] feat: clarify Vault TLS configuration and path referencing in documentation --- FUTURE-CAPABILITIES.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/FUTURE-CAPABILITIES.md b/FUTURE-CAPABILITIES.md index faa4792..376abe9 100644 --- a/FUTURE-CAPABILITIES.md +++ b/FUTURE-CAPABILITIES.md @@ -283,8 +283,11 @@ How it runs: the chain validates for `https://vault:8200` inside the bench network. Subjects whose secret providers are HTTPS-only work unmodified. - The harness bind-mounts that dir read-only into the subject at - `/vault-tls` — subject configs reference the CA as - `/vault-tls/vault-ca.pem` (for vmetric: `ca_name: "vault-tls/vault-ca.pem"`). + `/vault-tls`, so the CA's in-container path is `/vault-tls/vault-ca.pem`. + How a subject references it depends on its config convention — vmetric's + `ca_name` takes a path relative to the service root (the directory holding + the binary, `/` in the bench images), so it's written without the leading + slash: `ca_name: "vault-tls/vault-ca.pem"`. - `vault-init` waits for the server's healthcheck, then seeds each declared path via `vault kv put -mount= @`. Secret values travel from `case.yaml` into per-run `0600` JSON files — they @@ -300,8 +303,8 @@ Rules: - `secrets:` must declare at least one path, and every path at least one field. -- `vault`, `vault-init` (and `redpanda`, `redpanda-init`) are reserved — - an `endpoints:` entry can't use those names. +- `vault`, `vault-init` are reserved — an `endpoints:` entry can't use + those names. - Composes with `kafka:`: a case may declare both blocks; the subject then gates on both init containers. From c04f40cd05bd9a82a3af0ad1945262fc4acfa714 Mon Sep 17 00:00:00 2001 From: Yusuf Ozturk Date: Mon, 15 Jun 2026 01:39:47 +0200 Subject: [PATCH 4/4] Fix tests --- internal/orchestrator/docker.go | 2 ++ internal/orchestrator/vault_test.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/orchestrator/docker.go b/internal/orchestrator/docker.go index 37f898e..191921f 100644 --- a/internal/orchestrator/docker.go +++ b/internal/orchestrator/docker.go @@ -59,6 +59,7 @@ services: {{- if .VaultEnabled }} vault-init: condition: service_completed_successfully +{{- end }} {{- if .AWSEnabled }} localstack: condition: service_healthy @@ -505,6 +506,7 @@ services: entrypoint: ["/bin/sh", "-c"] command: - "{{ .VaultInitCmd }}" +{{- end }} {{- if .AWSEnabled }} localstack: diff --git a/internal/orchestrator/vault_test.go b/internal/orchestrator/vault_test.go index bcb3cd5..0e23c41 100644 --- a/internal/orchestrator/vault_test.go +++ b/internal/orchestrator/vault_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "os" "path/filepath" + "runtime" "testing" "github.com/VirtualMetric/PipeBench/internal/config" @@ -294,8 +295,14 @@ func TestPrepareVaultWritesSeeds(t *testing.T) { if err != nil { t.Fatalf("secrets dir: %v", err) } - if got := secInfo.Mode().Perm(); got != 0o700 { - t.Errorf("secrets dir mode: got %o, want 700", got) + // Windows can't represent Unix permission bits on directories — os.Stat + // always reports 0777 there regardless of the 0700 passed to MkdirAll. + // The restrictive mode is only meaningful (and asserted) on Unix, which + // is also where the CI runs. + if runtime.GOOS != "windows" { + if got := secInfo.Mode().Perm(); got != 0o700 { + t.Errorf("secrets dir mode: got %o, want 700", got) + } } want := []VaultSeed{{Path: "a/first", File: "0.json"}, {Path: "z/last", File: "1.json"}}