KTestify is an open-source, modular testing framework for Apache Kafka and Kafka Streams. It lets you write expressive, reliable stream tests, from low-level message assertions using a fluent Java API, all the way up to full BDD scenarios described in plain Gherkin.
|
The foundation library. A fluent Java API for testing Kafka producers, consumers, and Kafka Streams topologies, with no Cucumber dependency. What it gives you:
ConsumerContext<String, String> ctx = ConsumerContext.<String, String>builder()
.topic(outputTopic)
.matchMethod(ConfigConstants.methodMatchFile)
.matchFilePaths(List.of("expected/order.json"))
.readTimeout(Duration.ofSeconds(30))
.build();
Boolean passed = new RawKafkaConsumer(ctx).call();
assertTrue(passed); |
A standalone BDD test runner. Uses Cucumber 7 + Gherkin to express Kafka stream scenarios in plain language, backed by the ktestify-core API. What it gives you:
Background:
Given namespace
| namespace |
| ktestify |
Given input topic
| topicName | topicAlias |
| raw-roundtrip | orders-in |
Given output topic
| topicName | topicAlias |
| raw-roundtrip | orders-out |
Scenario: Order enrichment, JSON file match
When record from file is sent
| topicName | file | recordKey |
| raw-roundtrip | data/order.json | order-123 |
Then expected record from file
| topicAlias | file | expectedRecordKey |
| orders-out | expected/order.json | order-123 | |
KTestify provides a Plugin system based on SPI (Service Provider Interface) to allow third-party libraries to extend its capabilities without modifying the core codebase. Plugins can introduce new matchers, transport implementations, or orchestration strategies.
You can learn more on how to create your own plugin in the Plugin Development Guide. Below are first-party plugin that are integrated and ready to use with ktestify-cucumber out of the box.
|
Cross-platform test suite notifications for ktestify-cucumber. Sends a rich summary card at the end of each test run, with per-tag group breakdowns, CI/Git context, and configurable success-rate thresholds. What it gives you:
ktestify.plugins.notifications {
enabled = true
channels = [
{
type = "teams"
enabled = true
webhook-url = ${?KTESTIFY_TEAMS_WEBHOOK_URL}
on-failure-only = false
}
]
} |
Azure Blob Storage transport support for ktestify-cucumber. Ready-to-use Gherkin steps for uploading and asserting blobs are discovered automatically. What it gives you:
ktestify.plugins.azure-blob {
connection-string = ${?AZURE_STORAGE_CONNECTION_STRING}
request-timeout = 30s
auto-create-containers = true
} |
KTestify enforces a strict three-layer separation so that transports, orchestration logic, and assertion strategies are
completely decoupled. ConsumedRecord<V> is the only data type that crosses layer boundaries.
flowchart TB
subgraph cucumber["🥒 ktestify-cucumber"]
GS["Gherkin Scenarios"]
SD["Step Definitions"]
CVS["ConsumerValidationService\nProducerValidationService"]
end
subgraph core["🟢 ktestify-core"]
subgraph api["Fluent API"]
CC["ConsumerContext"]
PC["ProducerContext"]
end
subgraph transport["Transport Layer"]
KRF["KafkaRecordFetcher"]
end
subgraph orchestration["Orchestration Layer"]
AKC["AbstractKafkaConsumer"]
RKC["RawKafkaConsumer"]
AVC["AvroKafkaConsumer"]
end
subgraph assertion["Assertion Layer"]
RM["RecordMatcher 〈interface〉"]
FRM["FileRecordMatcher"]
XML["XmlRecordMatcher"]
XP["XPathRecordMatcher"]
AVM["AvroFileRecordMatcher"]
NOOP["NoOpRecordMatcher"]
end
CR(["ConsumedRecord<V>\n— the only cross-layer type"])
end
GS --> SD --> CVS
CVS --> CC & PC
CC --> AKC
AKC --> RKC & AVC
RKC & AVC --> KRF
KRF -->|" List<ConsumedRecord<V>> "| CR
CR --> RM
RM --> FRM & XML & XP & AVM & NOOP
style cucumber fill: #0C1018, stroke: #2DD4BF, color: #e2e8f0
style core fill: #0C1018, stroke: #6EE7B7, color: #e2e8f0
style transport fill: #111827, stroke: #6EE7B7, color: #e2e8f0
style orchestration fill: #111827, stroke: #6EE7B7, color: #e2e8f0
style assertion fill: #111827, stroke: #6EE7B7, color: #e2e8f0
style api fill: #111827, stroke: #2DD4BF, color: #e2e8f0
style CR fill: #1e3a2f, stroke: #6EE7B7, color: #6EE7B7
| Match strategy | Raw (String) | Avro (GenericRecord) |
|---|---|---|
| JSON file comparison | ✅ | ✅ |
| Key + value file comparison | ✅ | ✅ |
| Positional fields (line/from/to) | ✅ | ✅ |
| XML structural comparison | ✅ | X |
| XPath expression matching | ✅ | X |
| Record key only | ✅ | ✅ |
| Batch (N records vs N files) | ✅ | ✅ |
| No-op (presence check) | ✅ | ✅ |
ktestify-core, embed the assertion API in your own test project:
<dependency>
<groupId>io.github.ktestify</groupId>
<artifactId>ktestify-core</artifactId>
<version>0.1.0</version>
<scope>test</scope>
</dependency>ktestify-cucumber, run BDD scenarios as a standalone test application:
<dependency>
<groupId>io.github.ktestify</groupId>
<artifactId>ktestify-cucumber</artifactId>
<version>0.1.3</version>
<scope>test</scope>
</dependency>Drop an application.conf next to your features to override any default:
ktestify {
kafka.bootstrap-servers = "localhost:9092"
kafka.topic-namespace = "my-app"
schema-registry.url = "http://localhost:8081"
framework.timeouts {
default-read-timeout = 30s
consumer-delta-time = 60s
}
framework.directories.assets = "src/test/resources/data"
}All values can also be set via environment variables (KTESTIFY_KAFKA_BOOTSTRAP_SERVERS,
KTESTIFY_SCHEMA_REGISTRY_URL, KTESTIFY_TOPIC_NAMESPACE, KTESTIFY_ASSETS_DIR, …).
Via Maven (integration-tests profile):
mvn verify -Pintegration-tests -Dcucumber.it.tags="@integration"Or directly as a fat-JAR:
java -Dconfig.file=docker/local.conf \
-jar target/ktestify-cucumber.jar \
--tags "@integration" \
--plugin "json:target/cucumber-reports/cucumber.json" \
src/test/resources/featuresGiven namespace | namespace | → declare topic namespace
Given input topic | topicName | topicAlias | namespace | → INPUT topic
Given output topic | topicName | topicAlias | | → OUTPUT topic
Given schema | schemaName | schemaAlias | schemaVersion | → Avro schema
Given assets directory | absolutePath | → base path for payload filesWhen record from file is sent
| topicName | file | recordKey | headerFile |
When record from file based on schema is sent
| topicName | file | schemaName | recordKey |
And wait for {int} seconds
When script is executed
| scriptPath | scriptArgs |Then expected record from file
| topicAlias | file | expectedRecordKey | consumerReadTimeout | consumerDeltaTime |
Then expected record from file based on XML
| topicAlias | file | excludedElements |
Then expected record based on XML should have fields matching from file
| topicAlias | file | xpathExpressions |
Then expected records from files ← batch
| topicAlias | expectedRecordsCount | files | consumerReadTimeout |
Then expected record from file based on schema ← Avro
| topicAlias | file | excludedKeys |
And record should not appear in topic ← negative watcher
| topicAlias | topicType | consumerReadTimeout | consumerDeltaTime || KTestify | Plain Kafka Clients | EmbeddedKafka | |
|---|---|---|---|
| Fluent assertion DSL | ✅ | ❌ | ❌ |
| BDD / Gherkin support | ✅ | ❌ | ❌ |
| Avro + Schema Registry | ✅ | ❌ | |
| XML / XPath matchers | ✅ | ❌ | ❌ |
| Batch record assertions | ✅ | ❌ | ❌ |
| Dynamic variables in payloads | ✅ | ❌ | ❌ |
| Embedded broker lifecycle | ✅ | ✅ | |
| Transport-agnostic design | ✅ | ❌ | ❌ |
| CI-friendly | ✅ | ✅ | ✅ |
| Repository | Status | Version |
|---|---|---|
| ktestify-core | 🟢 Active | 0.1.0 |
| ktestify-cucumber | 🟢 Active | 0.1.3 |
| ktestify-plugin-azureblob | 🟢 Active | 0.0.1-SNAPSHOT |
| ktestify-plugin-notifications | 🟢 Active | 0.1.0 |
Contributions are welcome! Please read the contributing guide in each repository before opening a pull request.
- Fork the repository
- Create a feature branch :
git checkout -b feat/my-feature - Commit using Conventional Commits :
git commit -m 'feat: add my feature' - Push and open a Pull Request against
main
KTestify is licensed under the Apache License 2.0.
