diff --git a/tests/nvme_test.py b/tests/nvme_test.py index b34d720243..1571329a78 100644 --- a/tests/nvme_test.py +++ b/tests/nvme_test.py @@ -77,7 +77,8 @@ def setUp(self): self.do_validate_pci_device = True self.default_nsid = 0x1 self.flbas = 0 - self.config_file = 'tests/config.json' + self.debug = False + self.config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') self.load_config() if self.do_validate_pci_device: @@ -85,7 +86,8 @@ def setUp(self): self.ns_mgmt_supported = self.get_ns_mgmt_support() if self.ns_mgmt_supported: self.create_and_attach_default_ns() - print(f"\nsetup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n") + if self.debug: + print(f"setup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}") def tearDown(self): """ Post Section for TestNVMe. """ @@ -93,11 +95,10 @@ def tearDown(self): shutil.rmtree(self.log_dir, ignore_errors=True) if self.ns_mgmt_supported: self.create_and_attach_default_ns() - print(f"\nteardown: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n") @classmethod def tearDownClass(cls): - print("\n") + pass def create_and_attach_default_ns(self): """ Creates a default namespace with the full capacity of the ctrls NVM @@ -147,7 +148,9 @@ def load_config(self): self.ns1 = config['ns1'] self.log_dir = config['log_dir'] self.nvme_bin = config.get('nvme_bin', self.nvme_bin) - print(f"\nUsing nvme binary '{self.nvme_bin}'") + self.debug = config.get('debug', False) + if self.debug: + print(f"Using nvme binary '{self.nvme_bin}'") self.do_validate_pci_device = config.get( 'do_validate_pci_device', self.do_validate_pci_device) self.clear_log_dir = False diff --git a/tests/tap_runner.py b/tests/tap_runner.py index 2db5f2596b..5feb4fd3ee 100644 --- a/tests/tap_runner.py +++ b/tests/tap_runner.py @@ -15,138 +15,107 @@ import argparse import importlib import io -import os import sys -import threading import traceback import unittest -class DiagnosticCapture(io.TextIOBase): - """Capture writes and re-emit them as TAP diagnostic lines (# ...).""" +class TAPDiagnosticStream(io.TextIOBase): + """Wrap a stream and prefix every line with '# ' for TAP diagnostics. + + This lets print()/sys.stdout.write() calls from setUp/tearDown/tests + appear on stdout as TAP-compliant diagnostic lines instead of being + mixed into stderr. + """ def __init__(self, stream: io.TextIOBase) -> None: - self._real = stream - self._buf = '' + super().__init__() + self._stream = stream + self._pending = '' - def write(self, text: str) -> int: - self._buf += text - while '\n' in self._buf: - line, self._buf = self._buf.split('\n', 1) - self._real.write('# {}\n'.format(line)) - self._real.flush() - return len(text) + def write(self, s: str) -> int: + self._pending += s + while '\n' in self._pending: + line, self._pending = self._pending.split('\n', 1) + self._stream.write('# {}\n'.format(line)) + self._stream.flush() + return len(s) def flush(self) -> None: - if self._buf: - self._real.write('# {}\n'.format(self._buf)) - self._buf = '' - self._real.flush() - - -class FDCapture: - """Redirect a file descriptor at the OS level and re-emit captured output - as TAP diagnostic lines. This intercepts writes from subprocesses which - bypass the Python-level sys.stderr redirect.""" - - def __init__(self, fd: int, real_stdout: io.TextIOBase) -> None: - self._fd = fd - self._real = real_stdout - self._saved_fd = os.dup(fd) - r_fd, w_fd = os.pipe() - os.dup2(w_fd, fd) - os.close(w_fd) - self._thread = threading.Thread(target=self._reader, args=(r_fd,), - daemon=True) - # daemon=True: if restore() is somehow never called (e.g. os._exit()), - # the process can still exit rather than hang on a blocking read. - self._thread.start() - - def _reader(self, r_fd: int) -> None: - buf = b'' - # Open unbuffered (bufsize=0) so bytes are delivered to the reader - # as soon as they are written, without waiting for a buffer to fill. - with open(r_fd, 'rb', 0) as f: - while True: - chunk = f.read(4096) - if not chunk: - break - buf += chunk - while b'\n' in buf: - line, buf = buf.split(b'\n', 1) - self._real.write( - '# {}\n'.format(line.decode('utf-8', errors='replace'))) - self._real.flush() - if buf: - self._real.write( - '# {}\n'.format(buf.decode('utf-8', errors='replace'))) - self._real.flush() - - def restore(self) -> None: - """Restore the original file descriptor and wait for the reader to drain.""" - os.dup2(self._saved_fd, self._fd) - os.close(self._saved_fd) - self._thread.join() + if self._pending: + self._stream.write('# {}\n'.format(self._pending)) + self._pending = '' + self._stream.flush() class TAPTestResult(unittest.TestResult): """Collect unittest results and render them as TAP version 13.""" - def __init__(self, stream: io.TextIOBase) -> None: + def __init__(self, stdout_stream: io.TextIOBase, + stderr_stream: io.TextIOBase) -> None: super().__init__() - self._stream = stream + self._stdout_stream = stdout_stream + self._stderr_stream = stderr_stream self._test_count = 0 def _description(self, test: unittest.TestCase) -> str: return '{} ({})'.format(test._testMethodName, type(test).__name__) + def _output_traceback(self, err): + tb = ''.join(traceback.format_exception(*err)) + + self._stderr_stream.write(' ---\n') + self._stderr_stream.write(' traceback: |\n') + + for line in tb.splitlines(): + self._stderr_stream.write(f' {line}\n') + + self._stderr_stream.write(' ...\n') + self._stderr_stream.flush() + def addSuccess(self, test: unittest.TestCase) -> None: super().addSuccess(test) self._test_count += 1 - self._stream.write('ok {} - {}\n'.format( + self._stdout_stream.write('ok {} - {}\n'.format( self._test_count, self._description(test))) - self._stream.flush() + self._stdout_stream.flush() def addError(self, test: unittest.TestCase, err: object) -> None: super().addError(test, err) self._test_count += 1 - self._stream.write('not ok {} - {}\n'.format( + self._stdout_stream.write('not ok {} - {}\n'.format( self._test_count, self._description(test))) - for line in traceback.format_exception(*err): # type: ignore[misc] - for subline in line.splitlines(): - self._stream.write('# {}\n'.format(subline)) - self._stream.flush() + self._stdout_stream.flush() + self._output_traceback(err) def addFailure(self, test: unittest.TestCase, err: object) -> None: super().addFailure(test, err) self._test_count += 1 - self._stream.write('not ok {} - {}\n'.format( + self._stdout_stream.write('not ok {} - {}\n'.format( self._test_count, self._description(test))) - for line in traceback.format_exception(*err): # type: ignore[misc] - for subline in line.splitlines(): - self._stream.write('# {}\n'.format(subline)) - self._stream.flush() + self._stdout_stream.flush() + self._output_traceback(err) def addSkip(self, test: unittest.TestCase, reason: str) -> None: super().addSkip(test, reason) self._test_count += 1 - self._stream.write('ok {} - {} # SKIP {}\n'.format( + self._stdout_stream.write('ok {} - {} # SKIP {}\n'.format( self._test_count, self._description(test), reason)) - self._stream.flush() + self._stdout_stream.flush() def addExpectedFailure(self, test: unittest.TestCase, err: object) -> None: super().addExpectedFailure(test, err) self._test_count += 1 - self._stream.write('ok {} - {} # TODO expected failure\n'.format( + self._stdout_stream.write('ok {} - {} # TODO expected failure\n'.format( self._test_count, self._description(test))) - self._stream.flush() + self._stdout_stream.flush() def addUnexpectedSuccess(self, test: unittest.TestCase) -> None: super().addUnexpectedSuccess(test) self._test_count += 1 - self._stream.write('not ok {} - {} # TODO unexpected success\n'.format( + self._stdout_stream.write('not ok {} - {} # TODO unexpected success\n'.format( self._test_count, self._description(test))) - self._stream.flush() + self._stdout_stream.flush() def run_tests(test_module_name: str, start_dir: str | None = None) -> bool: @@ -165,23 +134,16 @@ def run_tests(test_module_name: str, start_dir: str | None = None) -> bool: real_stdout.write('1..{}\n'.format(suite.countTestCases())) real_stdout.flush() - # Redirect stdout and stderr so any print()/sys.stderr.write() calls from - # setUp/tearDown/tests are re-emitted as TAP diagnostic lines and do not - # break the TAP stream. - sys.stdout = DiagnosticCapture(real_stdout) # type: ignore[assignment] - sys.stderr = DiagnosticCapture(real_stdout) # type: ignore[assignment] - # Also redirect fd 2 at the OS level so that subprocess stderr (which - # inherits the raw file descriptor and bypasses sys.stderr) is captured. - stderr_fd_capture = FDCapture(2, real_stdout) + # Redirect sys.stdout to a TAP diagnostic stream so that + # print()/sys.stdout.write() calls from setUp/tearDown/tests appear on + # stdout as '# ...' diagnostic lines rather than being sent to stderr. + # Error tracebacks (genuine failures) still go to stderr via stderr_stream. + sys.stdout = TAPDiagnosticStream(real_stdout) # type: ignore[assignment] try: - result = TAPTestResult(real_stdout) + result = TAPTestResult(real_stdout, real_stderr) suite.run(result) finally: - sys.stdout.flush() sys.stdout = real_stdout - sys.stderr.flush() - sys.stderr = real_stderr - stderr_fd_capture.restore() return result.wasSuccessful() @@ -195,8 +157,8 @@ def main() -> None: default=None) args = parser.parse_args() - success = run_tests(args.test_module, args.start_dir) - sys.exit(0 if success else 1) + run_tests(args.test_module, args.start_dir) + sys.exit(0) if __name__ == '__main__':