diff --git a/tests/tap_runner.py b/tests/tap_runner.py index e04ac9374a..2db5f2596b 100644 --- a/tests/tap_runner.py +++ b/tests/tap_runner.py @@ -15,7 +15,9 @@ import argparse import importlib import io +import os import sys +import threading import traceback import unittest @@ -23,8 +25,8 @@ class DiagnosticCapture(io.TextIOBase): """Capture writes and re-emit them as TAP diagnostic lines (# ...).""" - def __init__(self, real_stdout: io.TextIOBase) -> None: - self._real = real_stdout + def __init__(self, stream: io.TextIOBase) -> None: + self._real = stream self._buf = '' def write(self, text: str) -> int: @@ -42,13 +44,58 @@ def flush(self) -> None: self._real.flush() +class FDCapture: + """Redirect a file descriptor at the OS level and re-emit captured output + as TAP diagnostic lines. This intercepts writes from subprocesses which + bypass the Python-level sys.stderr redirect.""" + + def __init__(self, fd: int, real_stdout: io.TextIOBase) -> None: + self._fd = fd + self._real = real_stdout + self._saved_fd = os.dup(fd) + r_fd, w_fd = os.pipe() + os.dup2(w_fd, fd) + os.close(w_fd) + self._thread = threading.Thread(target=self._reader, args=(r_fd,), + daemon=True) + # daemon=True: if restore() is somehow never called (e.g. os._exit()), + # the process can still exit rather than hang on a blocking read. + self._thread.start() + + def _reader(self, r_fd: int) -> None: + buf = b'' + # Open unbuffered (bufsize=0) so bytes are delivered to the reader + # as soon as they are written, without waiting for a buffer to fill. + with open(r_fd, 'rb', 0) as f: + while True: + chunk = f.read(4096) + if not chunk: + break + buf += chunk + while b'\n' in buf: + line, buf = buf.split(b'\n', 1) + self._real.write( + '# {}\n'.format(line.decode('utf-8', errors='replace'))) + self._real.flush() + if buf: + self._real.write( + '# {}\n'.format(buf.decode('utf-8', errors='replace'))) + self._real.flush() + + def restore(self) -> None: + """Restore the original file descriptor and wait for the reader to drain.""" + os.dup2(self._saved_fd, self._fd) + os.close(self._saved_fd) + self._thread.join() + + class TAPTestResult(unittest.TestResult): """Collect unittest results and render them as TAP version 13.""" - def __init__(self) -> None: + def __init__(self, stream: io.TextIOBase) -> None: super().__init__() + self._stream = stream self._test_count = 0 - self._lines: list[str] = [] def _description(self, test: unittest.TestCase) -> str: return '{} ({})'.format(test._testMethodName, type(test).__name__) @@ -56,50 +103,50 @@ def _description(self, test: unittest.TestCase) -> str: def addSuccess(self, test: unittest.TestCase) -> None: super().addSuccess(test) self._test_count += 1 - self._lines.append('ok {} - {}\n'.format( + self._stream.write('ok {} - {}\n'.format( self._test_count, self._description(test))) + self._stream.flush() def addError(self, test: unittest.TestCase, err: object) -> None: super().addError(test, err) self._test_count += 1 - self._lines.append('not ok {} - {}\n'.format( + self._stream.write('not ok {} - {}\n'.format( self._test_count, self._description(test))) for line in traceback.format_exception(*err): # type: ignore[misc] for subline in line.splitlines(): - self._lines.append('# {}\n'.format(subline)) + self._stream.write('# {}\n'.format(subline)) + self._stream.flush() def addFailure(self, test: unittest.TestCase, err: object) -> None: super().addFailure(test, err) self._test_count += 1 - self._lines.append('not ok {} - {}\n'.format( + self._stream.write('not ok {} - {}\n'.format( self._test_count, self._description(test))) for line in traceback.format_exception(*err): # type: ignore[misc] for subline in line.splitlines(): - self._lines.append('# {}\n'.format(subline)) + self._stream.write('# {}\n'.format(subline)) + self._stream.flush() def addSkip(self, test: unittest.TestCase, reason: str) -> None: super().addSkip(test, reason) self._test_count += 1 - self._lines.append('ok {} - {} # SKIP {}\n'.format( + self._stream.write('ok {} - {} # SKIP {}\n'.format( self._test_count, self._description(test), reason)) + self._stream.flush() def addExpectedFailure(self, test: unittest.TestCase, err: object) -> None: super().addExpectedFailure(test, err) self._test_count += 1 - self._lines.append('ok {} - {} # TODO expected failure\n'.format( + self._stream.write('ok {} - {} # TODO expected failure\n'.format( self._test_count, self._description(test))) + self._stream.flush() def addUnexpectedSuccess(self, test: unittest.TestCase) -> None: super().addUnexpectedSuccess(test) self._test_count += 1 - self._lines.append('not ok {} - {} # TODO unexpected success\n'.format( + self._stream.write('not ok {} - {} # TODO unexpected success\n'.format( self._test_count, self._description(test))) - - def print_tap(self, stream: io.TextIOBase) -> None: - stream.write('1..{}\n'.format(self._test_count)) - for line in self._lines: - stream.write(line) - stream.flush() + self._stream.flush() def run_tests(test_module_name: str, start_dir: str | None = None) -> bool: @@ -112,21 +159,30 @@ def run_tests(test_module_name: str, start_dir: str | None = None) -> bool: suite = loader.loadTestsFromModule(module) real_stdout = sys.stdout - # TAP version header must be the very first line on stdout. + real_stderr = sys.stderr + # TAP version header and plan must appear before any test output. real_stdout.write('TAP version 13\n') + real_stdout.write('1..{}\n'.format(suite.countTestCases())) real_stdout.flush() - # Redirect stdout so any print() calls from setUp/tearDown/tests are - # re-emitted as TAP diagnostic lines and do not break the TAP stream. + # Redirect stdout and stderr so any print()/sys.stderr.write() calls from + # setUp/tearDown/tests are re-emitted as TAP diagnostic lines and do not + # break the TAP stream. sys.stdout = DiagnosticCapture(real_stdout) # type: ignore[assignment] + sys.stderr = DiagnosticCapture(real_stdout) # type: ignore[assignment] + # Also redirect fd 2 at the OS level so that subprocess stderr (which + # inherits the raw file descriptor and bypasses sys.stderr) is captured. + stderr_fd_capture = FDCapture(2, real_stdout) try: - result = TAPTestResult() + result = TAPTestResult(real_stdout) suite.run(result) finally: sys.stdout.flush() sys.stdout = real_stdout + sys.stderr.flush() + sys.stderr = real_stderr + stderr_fd_capture.restore() - result.print_tap(real_stdout) return result.wasSuccessful()