[FEAT]: pytest-xdist support for distributed test execution#73
[FEAT]: pytest-xdist support for distributed test execution#73nina-msft wants to merge 4 commits into
pytest-xdist support for distributed test execution#73Conversation
The controller's _aggregate_trial_results iterated session.items looking for a _rampart_trial_base attribute set on cloned items at collection time. Under pytest-xdist that attribute is not reliably reachable on the controller at pytest_sessionfinish, so trial-group verdicts silently disappeared from the terminal summary and the JSON report -- per-clone results were present but no aggregate FAIL/PASS line was emitted. Decouple aggregation from pytest.Item state: - Store trial metadata as data on RampartSession._trial_specs (a dict[clone_nodeid, TrialSpec]) at collection time on every process. - Ship rial_specs through the existing ampart.xdist.v1 worker payload (back-compatible: missing/empty list is treated as no trials). - Controller merges specs from each worker and aggregates from the merged data instead of session.items. Also fixes a related JSON-sink gap: JsonFileReportSink now projects eport.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) actually lands in the emitted file. Tests: - New TestTrialSpecs unit class (round-trip, malformed entries, non-finite thresholds, idempotent merge, first-writer-wins). - handle_testnodedown test covering trial-spec merge. - Strengthened ests/integration/test_xdist_aggregation.py to assert the trial-group line is present and correct under --dist=loadgroup and --dist=load (the prior tests only checked per-clone counts and would have missed this regression). - JSON-sink metadata projection unit tests. 565 unit tests + 13 xdist integration tests pass; ruff clean. Co-authored-by: Copilot <[email protected]>
|
Great job @nina-msft, really a solid PR. The overall architecture makes sense to me, which is workers collect, the controller merges, and only the controller emits. I also like that the worker output path treats the boundary seriously rather than just passing objects around and hoping for the best. I feel like we should definitely keep that part even if some of the surrounding design changes. The first thing I would fix before merge is ordering. I am a little bit worried about the sink discovery piece. This would be cleaner as an explicit hook instead of fixture reflection. The key difference is that the existing A hook would be different: def pytest_rampart_sinks(config):
return [JsonFileReportSink(output_dir=Path(".report"))]That would be a hook implementation, not another fixture. RAMPART would register the hook spec from its pytest plugin, and users would implement the hook from a root The one thing to be careful about is having two sink configuration surfaces. If both I think we need to put more throughts into whether to use The size cap has a similar issue. If one worker payload goes over One maintainability, there are now two serializers for roughly the same data. Two small notes
I like the direction. The two things I would most like to see tightened are the full-nodeid ordering and replacing the controller-side sink fixture scanning with an explicit hook or some similar explicit controller-safe sink registration path. |
spencrr
left a comment
There was a problem hiding this comment.
Hey Nina! I like:
- one (de)serialization spot in
_xdist.pywith sanitization and schme versioning. - using
trial_specsover worker channel so that--dist-loadworks correctly - workers disable
_emit_sinksand controller is the single source of truth
Other small AI Finds
- Worker-side
register_trial_specis dead work. Workers runpytest_collection_modifyitems, register specs, ship them across, and never aggregate or evaluate. That's fine for correctness, but it means the spec is computed twice (once on the worker, once on the controller's own collection pass, which the merge then drops viasetdefault). Worth a one-line comment inpytest_collection_modifyitemsclarifying why we register on every process even though only the controller acts on it - otherwise future readers will be tempted to "optimize" the worker path and break the fallback. xdist_groupmarker is added unconditionally to every trial clone. Correct behavior (xdist sees it; pytest without xdist treats it as an unknown marker). If the repo ever turns on--strict-markersyou'll need a conditionaladdinivalue_line("markers", "xdist_group: ...")somewhere; not a problem today._session.pyand_xdist.py"documented deviations" notes. The architecture doc was actually updated in this PR to describe_session.py/_xdist.pyas first-class members of the plugin package - so theNote: ... documented deviation from the architectureblocks at the top of those modules are now stale and can come out. Minor.- Trial-spec aggregation runs on every controller
pytest_sessionfinishcall, including when only one worker reported. That's fine for correctness; just observing that worker-result completeness isn't a precondition for trial aggregation, which is the right call - missing-worker should mark incomplete, not block the report. _emit_sinksbackground-task path. pre-existing - when an event loop is already running at session-finish (e.g., some pytest-asyncio configurations) the emission becomes fire-and-forget with no awaiter. Not changed by this PR, but the new xdist controller path is the first place I'd actually worry about this in production, because the controller's sink emission carries cross-worker data that didn't exist before. Worth at least filing as a follow-up.
| session: pytest.Session, | ||
| session: pytest.Session, # noqa: ARG001 — kept for hook signature symmetry | ||
| rampart_session: RampartSession, | ||
| ) -> None: |
There was a problem hiding this comment.
Should we just drop pytest session here. Is it true that we just stuff the results under the rampart key/rampart_session
| if is_xdist_controller(config=session.config): | ||
| _aggregate_trial_results(session=session, rampart_session=rampart_session) | ||
| _evaluate_gates(rampart_session=rampart_session) | ||
| _record_xdist_metadata(session=session, rampart_session=rampart_session) | ||
| controller_sinks = discover_sinks_from_conftest(config=session.config) | ||
| if controller_sinks: | ||
| rampart_session.add_sinks(sinks=controller_sinks) | ||
| _emit_sinks(rampart_session=rampart_session) | ||
| return | ||
|
|
||
| _aggregate_trial_results(session=session, rampart_session=rampart_session) | ||
| _evaluate_gates(rampart_session=rampart_session) | ||
| _emit_sinks(rampart_session=rampart_session) |
There was a problem hiding this comment.
nit: possible pull out just the xdist parts
| if is_xdist_controller(config=session.config): | |
| _aggregate_trial_results(session=session, rampart_session=rampart_session) | |
| _evaluate_gates(rampart_session=rampart_session) | |
| _record_xdist_metadata(session=session, rampart_session=rampart_session) | |
| controller_sinks = discover_sinks_from_conftest(config=session.config) | |
| if controller_sinks: | |
| rampart_session.add_sinks(sinks=controller_sinks) | |
| _emit_sinks(rampart_session=rampart_session) | |
| return | |
| _aggregate_trial_results(session=session, rampart_session=rampart_session) | |
| _evaluate_gates(rampart_session=rampart_session) | |
| _emit_sinks(rampart_session=rampart_session) | |
| _aggregate_trial_results(session=session, rampart_session=rampart_session) | |
| _evaluate_gates(rampart_session=rampart_session) | |
| if is_xdist_controller(config=session.config): | |
| _record_xdist_metadata(session=session, rampart_session=rampart_session) | |
| controller_sinks = discover_sinks_from_conftest(config=session.config) | |
| if controller_sinks: | |
| rampart_session.add_sinks(sinks=controller_sinks) | |
| _emit_sinks(rampart_session=rampart_session) |
| def is_xdist_controller(*, config: pytest.Config) -> bool: | ||
| """Return True when this process is the pytest-xdist controller. | ||
|
|
||
| The controller is defined as a process where xdist is active | ||
| (``--numprocesses`` is set) and which is NOT itself a worker. | ||
|
|
||
| Args: | ||
| config (pytest.Config): The pytest configuration object. | ||
|
|
||
| Returns: | ||
| bool: True if running in the xdist controller process. | ||
| """ | ||
| if is_xdist_worker(config=config): | ||
| return False | ||
| numprocesses = getattr(config.option, "numprocesses", None) | ||
| return numprocesses is not None and numprocesses != 0 |
There was a problem hiding this comment.
Possibly reconsider from (xdist/plugin.py)
def is_xdist_controller(request_or_session) -> bool:
return (
not is_xdist_worker(request_or_session)
and request_or_session.config.option.dist != "no"
)This PR uses numprocesses is not None and numprocesses != 0 instead. For -n N flows these agree (xdist's pytest_cmdline_main sets dist = "load" whenever numprocesses is truthy). They diverge for:
pytest -d --tx=... (the -d shortcut for --dist=load)
pytest --dist=loadgroup --tx=2*popen (any --tx-based setup without -n)
Any custom scheduler that sets dist without -n
In those cases, this PR's controller branch returns False, so pytest_sessionfinish falls into the non-xdist branch on the controller. The controller would then try to _aggregate_trial_results and _emit_sinks against its own (empty) _results_by_nodeid while workers still serialize into workeroutput. Therefore, the report would end up empty rather than incomplete.
| - **Arbitrary code execution** — strict JSON-safe primitives only; no `pickle`, `marshal`, or custom `__reduce__`. | ||
| - **Schema drift** — payloads with missing or unknown schema versions are rejected fail-closed. | ||
| - **Memory exhaustion** — worker payloads are capped at 64 MB by default. | ||
| - **Terminal/log injection** — ANSI escape sequences are stripped from free-form text at the deserialization boundary. |
There was a problem hiding this comment.
ai note:
ANSI stripping doesn't cover OSC, and the security notes claim it does.
_ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") matches CSI only. It does not match:
- OSC (
\x1b]…BELor\x1b]…ST) — used for terminal hyperlinks (\x1b]8;;https://attacker/\x1b\\text\x1b]8;;\x1b\\) and window-title sets. Windows Terminal, iTerm2, modern GNOME Terminal, and Konsole all render hyperlinks. So an attacker-controlled response can produce a clickable link inpytest's terminal output even though "ANSI was stripped". - DCS / SOS / PM / APC (
\x1b[PXY^_]…ST) - 8-bit C1 (
\x9b…) — less common but allowed by ECMA-48
The PR explicitly markets ANSI stripping as a defense against terminal injection from compromised workers, so this gap directly undermines a stated security property. A stricter pattern that handles the C1 introducers, or just re.sub(r"[\x00-\x08\x0b-\x1f\x7f-\x9f]", "", text) to strip all C0/C1 controls, would close it. Whichever you pick, also worth adding a couple of OSC-flavored test cases to TestStripAnsi.
| def _package_version() -> str: | ||
| """Return the installed rampart package version (best-effort).""" | ||
| try: | ||
| from importlib.metadata import version # noqa: PLC0415 | ||
|
|
||
| return version("rampart") | ||
| except Exception: # noqa: BLE001 | ||
| return "unknown" |
There was a problem hiding this comment.
what case would this happen?
| ) | ||
|
|
||
|
|
||
| def discover_sinks_from_conftest(*, config: pytest.Config) -> list[ReportSink]: |
There was a problem hiding this comment.
ai find:
discover_sinks_from_conftest is a parallel sink-discovery path with a silent capability gap.
Two different mechanisms now resolve rampart_sinks:
| Process | Mechanism | Can satisfy fixture deps? |
|---|---|---|
| Non-xdist / worker | _rampart_sink_bootstrap autouse fixture → request.getfixturevalue("rampart_sinks") |
Yes |
| xdist controller | discover_sinks_from_conftest → plugin attribute scan + inspect.signature() |
No (any param ≠ 0 ⇒ warn + skip) |
So @pytest.fixture(scope="session") def rampart_sinks(tmp_path_factory): ... works perfectly under pytest -n0 and silently disappears under pytest -n2, leaving only a logger.warning(...).
Consider:
- Hook-based contributor API: define a
pytest_rampart_sinks(config) -> list[ReportSink]hookspec. Both controller and workers call it the same way; no fixture machinery dependency. Teams move their factory from a fixture into a hookimpl in conftest. The cost is asking users to update their conftest; the benefit is one unified path with no capability gap.
Also reaching into candidate._get_wrapped_function() is a private pytest API. The __wrapped__ fallback is fine; the _get_wrapped_function call is at risk on any pytest minor bump.
Description
Summary
Adds first-class
pytest-xdistsupport to RAMPART so attack/probe sessions can shard across worker processes and still produce a single coherent report. Worker processes serialize their results through xdist'sworkeroutputchannel; the controller deserializes, merges, and writes the final report.What's included
New: _xdist.py
rampart.xdist.v1) for worker → controller payloads_sanitizeJSON-safe coercion at the trust boundary with depth limit, byte-size cap, and ANSI-escape stripping on deserialize to defend against terminal injection from worker outputSizeLimitErrorraised byfinalize_workerwhen serialized payload exceeds the configured cap; controller logs and continues--rampart-xdist-max-bytes/ ini optionrampart_xdist_max_bytes(default 64 MiB), validated positiveKeyboardInterrupt/SystemExit, catchesExceptionTrial-group aggregation across workers
RampartSessionat collection time and shipped through therampart.xdist.v1payload (trial_specs), so the controller aggregates trial groups from merged worker data instead ofsession.items— which is not reliably populated with trial clones on the controller at session finish. Without this, trial-group FAIL/PASS verdicts silently disappeared under xdist even though per-clone results were present.trial_specsis back-compatible: payloads without trials emit an empty list; malformed entries are skipped and non-finite thresholds are clamped on deserialize.Updated plugin hooks (
plugin.py,_session.py)pytest_addoptionregisters the new CLI/ini optionfinalize_workerto surface size-limit truncation as a warningReporting
JsonFileReportSinknow projectsreport.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) lands in the emitted JSON fileTests
--dist=loadgroupand--dist=loadDocs
Incidental
Unknown*cascade caused bymsgraph-sdkshipping without type stubs. No behavior change.Security notes
The xdist boundary treats worker output as untrusted:
_sanitize→ JSON-safe primitives onlySizeLimitErrorMAX_METADATA_DEPTH = 6) prevents pathological nestingValidation
uv run pre-commit run --all-files→ all hooks pass (ruff, ruff-format, pyright)pytest tests/unit/pytest_plugin/test_xdist.py→ 73 passedpytest tests/integration/test_xdist_aggregation.py→ 13 passed (trial-group verdicts verified under--dist=loadgroupand--dist=load)pytest -n 4 --dist=loadgroupagainst HelpDesk Bot example inrampart-examples, verified 1 report generated. Running with xdist also took 99.66s versus serial execution at 230.73s (yay for speed!)Breaking changes
None
Checklist
pre-commit run --all-filespasses