Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions FUTURE-CAPABILITIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,62 @@ Result JSON gains:

---

## In-topology Vault secret store (`vault:`)

A case can add a HashiCorp Vault dev server to the topology so a subject's
secret-store integration (e.g. VirtualMetric DataStream's `hashicorpvault`
credential provider) can be exercised end-to-end without real
infrastructure. Mirrors the `kafka:` supporting-service pattern: the
harness renders a `vault` service plus a one-shot `vault-init` that seeds
the declared secrets, and the subject gates on the seeding completing
(`service_completed_successfully`), so config-load-time secret resolution
never races it.

```yaml
vault:
image: "hashicorp/vault:1.20" # optional, default shown (needs >= 1.12)
token: "pipebench-dev-root" # optional dev root token, default shown
mount: "secret" # optional KV mount, default shown (dev mode auto-enables it as KV v2)
secrets: # required: path -> field -> value
bench/http-auth:
username: "bench-user"
password: "bench-pass-12345"
```

How it runs:

- The server runs in dev mode with TLS (`-dev-tls`): it generates its own
CA + leaf into `/vault/tls` (a per-run host dir), with SAN `vault` so
the chain validates for `https://vault:8200` inside the bench network.
Subjects whose secret providers are HTTPS-only work unmodified.
- The harness bind-mounts that dir read-only into the subject at
`/vault-tls`, so the CA's in-container path is `/vault-tls/vault-ca.pem`.
How a subject references it depends on its config convention — vmetric's
`ca_name` takes a path relative to the service root (the directory holding
the binary, `/` in the bench images), so it's written without the leading
slash: `ca_name: "vault-tls/vault-ca.pem"`.
- `vault-init` waits for the server's healthcheck, then seeds each
declared path via `vault kv put -mount=<mount> <path> @<file>`. Secret
values travel from `case.yaml` into per-run `0600` JSON files — they
never appear in the compose file, a command line, or `docker inspect`
output. Paths, field keys, mount, and token are charset-validated at
case load (`[A-Za-z0-9/_.-]`) because they do render into the compose
file.
- The token is a deterministic test-only value (`VAULT_DEV_ROOT_TOKEN_ID`);
the bench Vault holds nothing but the seeded test secrets and exists
for the lifetime of one run.

Rules:

- `secrets:` must declare at least one path, and every path at least one
field.
- `vault`, `vault-init` are reserved — an `endpoints:` entry can't use
those names.
- Composes with `kafka:`: a case may declare both blocks; the subject
then gates on both init containers.

---

## Compatibility

None of the schema described on this page exists in any case in `cases/`
Expand Down
74 changes: 74 additions & 0 deletions containers/generator/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
)

// TestRunHTTPSingleBatchesDistinctLines guards the HTTP batcher against two
// regressions: reused-buffer aliasing (sequenced mode rewrites the CONN=/SEQ=
// prefix in place, so un-copied batch entries would all alias the final line)
// and newline doubling (generated lines already end in '\n'; joining batch
// entries with another '\n' inserted a blank line between records).
func TestRunHTTPSingleBatchesDistinctLines(t *testing.T) {
var mu sync.Mutex
var lines [][]byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Errorf("reading body: %v", err)
return
}
mu.Lock()
lines = append(lines, bytes.Split(body, []byte("\n"))...)
mu.Unlock()
}))
defer srv.Close()

// 250 lines = two full 100-line batches plus a partial final flush.
const total = 250
cfg := config{
Mode: "http",
Target: srv.URL,
TotalLines: total,
LineSize: 64,
Format: "raw",
Sequenced: true,
Connections: 1,
}

sent, _, err := runHTTPSingle(cfg, &sendClock{})
if err != nil {
t.Fatalf("runHTTPSingle: %v", err)
}
if sent != total {
t.Fatalf("lines sent: got %d, want %d", sent, total)
}

mu.Lock()
defer mu.Unlock()
if len(lines) != total {
t.Fatalf("lines received: got %d, want %d (blank or missing lines)", len(lines), total)
}
seen := make(map[string]struct{}, total)
for i, l := range lines {
if len(l) == 0 {
t.Fatalf("line %d is blank — newline-doubling regression", i)
}
var conn int
var seq int64
if _, err := fmt.Sscanf(string(l), "CONN=%d SEQ=%d", &conn, &seq); err != nil {
t.Fatalf("line %d %q lacks CONN=/SEQ= prefix: %v", i, l, err)
}
key := fmt.Sprintf("%d/%d", conn, seq)
if _, dup := seen[key]; dup {
t.Fatalf("duplicate sequence %s — buffer-aliasing regression", key)
}
seen[key] = struct{}{}
}
}
7 changes: 6 additions & 1 deletion containers/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,12 @@ func runHTTPSingle(cfg config, clock *sendClock) (int64, int64, error) {
}

_, _, err := sendLines(cfg, clock, func(line []byte) error {
batch = append(batch, line)
// Lines arrive with a trailing '\n' and may alias a reused buffer
// (sequenced mode rewrites the CONN=/SEQ= prefix in place), so copy
// and trim before batching: entries stay distinct until flush and
// the '\n' join there doesn't double newlines.
cp := append([]byte(nil), bytes.TrimSuffix(line, []byte("\n"))...)
batch = append(batch, cp)
if len(batch) >= batchSize {
return flush()
}
Expand Down
108 changes: 107 additions & 1 deletion internal/config/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -51,6 +52,14 @@ type TestCase struct {
// kafka_performance / kafka_correctness types.
Kafka *KafkaConfig `yaml:"kafka"`

// Vault, when set, adds a HashiCorp Vault dev-mode server (TLS-enabled)
// to the test topology: the harness renders a `vault` service plus a
// one-shot `vault-init` that seeds the declared secrets, and the subject
// gates on the seeding completing. Lets a case exercise a subject's
// secret-store integration (e.g. vmetric's hashicorpvault credential
// provider) without real infrastructure.
Vault *VaultConfig `yaml:"vault"`

// AWS, when set, adds a LocalStack emulator to the test topology and
// creates the declared S3/SQS/SNS/Kinesis/CloudWatch resources before
// the subject starts. See AWSConfig in cloud.go.
Expand Down Expand Up @@ -133,6 +142,51 @@ func (k *KafkaConfig) SMPOrDefault() int {
return 1
}

// VaultConfig configures the in-topology HashiCorp Vault dev server (see
// TestCase.Vault). All fields except Secrets are optional; the orchestrator
// applies the defaults noted below. The dev server listens TLS-only at
// https://vault:8200 with a self-generated CA the harness bind-mounts into
// the subject at /vault-tls (vault-ca.pem).
type VaultConfig struct {
// Image is the Vault container image (default "hashicorp/vault:1.20").
// Needs >= 1.12 for -dev-tls and `vault kv put -mount=...`.
Image string `yaml:"image"`
// Token is the deterministic dev-mode root token, fixed via
// VAULT_DEV_ROOT_TOKEN_ID (default "pipebench-dev-root"). Test-only —
// the bench Vault never holds real secrets.
Token string `yaml:"token"`
// Mount is the KV mount the secrets are seeded under (default "secret",
// the mount dev mode auto-enables as KV v2).
Mount string `yaml:"mount"`
// Secrets maps secret path -> field name -> value. vault-init seeds each
// path via `vault kv put -mount=<mount> <path> @<file>`; the values are
// written to per-run JSON files, never onto a command line.
Secrets map[string]map[string]string `yaml:"secrets"`
}

// VaultImageOrDefault etc. centralize the dev-server defaults so the
// orchestrator and any caller render the same values.
func (v *VaultConfig) VaultImageOrDefault() string {
if v != nil && v.Image != "" {
return v.Image
}
return "hashicorp/vault:1.20"
}

func (v *VaultConfig) TokenOrDefault() string {
if v != nil && v.Token != "" {
return v.Token
}
return "pipebench-dev-root"
}

func (v *VaultConfig) MountOrDefault() string {
if v != nil && v.Mount != "" {
return v.Mount
}
return "secret"
}

// Endpoint is an auxiliary container in the test topology (see
// TestCase.Endpoints). It's a host the subject reaches on the bench network —
// not a generator or receiver.
Expand Down Expand Up @@ -271,6 +325,9 @@ func (tc *TestCase) HasGenerator() bool {
// UsesKafka reports whether the case adds a Redpanda broker to the topology.
func (tc *TestCase) UsesKafka() bool { return tc.Kafka != nil }

// UsesVault reports whether the case adds a Vault dev server to the topology.
func (tc *TestCase) UsesVault() bool { return tc.Vault != nil }

// IsPerformanceType reports whether the case is scored as a throughput test —
// the plain `performance` type or the Kafka variant `kafka_performance`.
func (tc *TestCase) IsPerformanceType() bool {
Expand Down Expand Up @@ -335,6 +392,8 @@ func (tc *TestCase) Validate() error {
"minio": {}, "minio-init": {},
// Kafka broker services rendered from the kafka: block.
"redpanda": {}, "redpanda-init": {},
// Secret store services rendered from the vault: block.
"vault": {}, "vault-init": {},
}
epNames := map[string]struct{}{}
for i, e := range tc.Endpoints {
Expand Down Expand Up @@ -379,7 +438,54 @@ func (tc *TestCase) Validate() error {
if tc.Correctness.MaxOverDeliveryPct < 0 {
return fmt.Errorf("case %q: max_overdelivery_pct must be non-negative, got %.2f", tc.Name, tc.Correctness.MaxOverDeliveryPct)
}
return tc.validateCloud()
if err := tc.validateVault(); err != nil {
return err
}
if err := tc.validateCloud(); err != nil {
return err
}
return nil
}

// vaultPathRe / vaultTokenRe constrain the vault-config strings that end up
// embedded in the generated docker-compose file (the vault-init command line
// and environment): no shell or YAML metacharacter can pass. Secret VALUES
// are exempt — they are written to JSON seed files, never rendered inline.
var (
vaultPathRe = regexp.MustCompile(`^[A-Za-z0-9/_.-]+$`)
vaultTokenRe = regexp.MustCompile(`^[A-Za-z0-9_.-]+$`)
)

// validateVault checks the optional `vault:` block: seeding at least one
// secret is mandatory (a vault server nothing reads from is a case-authoring
// mistake), and every compose-embedded string is charset-restricted.
func (tc *TestCase) validateVault() error {
if tc.Vault == nil {
return nil
}
if len(tc.Vault.Secrets) == 0 {
return fmt.Errorf("case %q: vault block requires at least one entry under `secrets`", tc.Name)
}
if !vaultTokenRe.MatchString(tc.Vault.TokenOrDefault()) {
return fmt.Errorf("case %q: vault token must match %s", tc.Name, vaultTokenRe)
}
if !vaultPathRe.MatchString(tc.Vault.MountOrDefault()) {
return fmt.Errorf("case %q: vault mount %q must match %s", tc.Name, tc.Vault.MountOrDefault(), vaultPathRe)
}
for path, fields := range tc.Vault.Secrets {
if !vaultPathRe.MatchString(path) {
return fmt.Errorf("case %q: vault secret path %q must match %s", tc.Name, path, vaultPathRe)
}
if len(fields) == 0 {
return fmt.Errorf("case %q: vault secret %q has no fields", tc.Name, path)
}
for key := range fields {
if !vaultTokenRe.MatchString(key) {
return fmt.Errorf("case %q: vault secret %q field key %q must match %s", tc.Name, path, key, vaultTokenRe)
}
}
}
return nil
}

// validateSampleFile rejects sample_file paths that are absolute or escape the
Expand Down
Loading
Loading