From 904e958de35c3e46ad56fe0dcc96473a06da66e5 Mon Sep 17 00:00:00 2001 From: Daniel Wagner Date: Wed, 8 Apr 2026 09:12:16 +0200 Subject: [PATCH 1/3] tests: fix config lookup path Ensure the config file is found. Signed-off-by: Daniel Wagner --- tests/nvme_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/nvme_test.py b/tests/nvme_test.py index b34d720243..dc72aaf7ef 100644 --- a/tests/nvme_test.py +++ b/tests/nvme_test.py @@ -77,7 +77,7 @@ def setUp(self): self.do_validate_pci_device = True self.default_nsid = 0x1 self.flbas = 0 - self.config_file = 'tests/config.json' + self.config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') self.load_config() if self.do_validate_pci_device: From 35fc82914a8fabe3b8a83d389fcec94138da32d6 Mon Sep 17 00:00:00 2001 From: Daniel Wagner Date: Wed, 8 Apr 2026 07:29:35 +0000 Subject: [PATCH 2/3] tests: fix tap_runner to route diagnostic output to stderr The TAP allows to have stderr output and meson test is also expecting the errors reported on stderr. So don't intercept the stderr and forward it to stdout. This reduces the whole tap runner complexity quite a bit. Signed-off-by: Daniel Wagner --- tests/tap_runner.py | 156 +++++++++++++++++--------------------------- 1 file changed, 59 insertions(+), 97 deletions(-) diff --git a/tests/tap_runner.py b/tests/tap_runner.py index 2db5f2596b..5feb4fd3ee 100644 --- a/tests/tap_runner.py +++ b/tests/tap_runner.py @@ -15,138 +15,107 @@ import argparse import importlib import io -import os import sys -import threading import traceback import unittest -class DiagnosticCapture(io.TextIOBase): - """Capture writes and re-emit them as TAP diagnostic lines (# ...).""" +class TAPDiagnosticStream(io.TextIOBase): + """Wrap a stream and prefix every line with '# ' for TAP diagnostics. + + This lets print()/sys.stdout.write() calls from setUp/tearDown/tests + appear on stdout as TAP-compliant diagnostic lines instead of being + mixed into stderr. + """ def __init__(self, stream: io.TextIOBase) -> None: - self._real = stream - self._buf = '' + super().__init__() + self._stream = stream + self._pending = '' - def write(self, text: str) -> int: - self._buf += text - while '\n' in self._buf: - line, self._buf = self._buf.split('\n', 1) - self._real.write('# {}\n'.format(line)) - self._real.flush() - return len(text) + def write(self, s: str) -> int: + self._pending += s + while '\n' in self._pending: + line, self._pending = self._pending.split('\n', 1) + self._stream.write('# {}\n'.format(line)) + self._stream.flush() + return len(s) def flush(self) -> None: - if self._buf: - self._real.write('# {}\n'.format(self._buf)) - self._buf = '' - self._real.flush() - - -class FDCapture: - """Redirect a file descriptor at the OS level and re-emit captured output - as TAP diagnostic lines. This intercepts writes from subprocesses which - bypass the Python-level sys.stderr redirect.""" - - def __init__(self, fd: int, real_stdout: io.TextIOBase) -> None: - self._fd = fd - self._real = real_stdout - self._saved_fd = os.dup(fd) - r_fd, w_fd = os.pipe() - os.dup2(w_fd, fd) - os.close(w_fd) - self._thread = threading.Thread(target=self._reader, args=(r_fd,), - daemon=True) - # daemon=True: if restore() is somehow never called (e.g. os._exit()), - # the process can still exit rather than hang on a blocking read. - self._thread.start() - - def _reader(self, r_fd: int) -> None: - buf = b'' - # Open unbuffered (bufsize=0) so bytes are delivered to the reader - # as soon as they are written, without waiting for a buffer to fill. - with open(r_fd, 'rb', 0) as f: - while True: - chunk = f.read(4096) - if not chunk: - break - buf += chunk - while b'\n' in buf: - line, buf = buf.split(b'\n', 1) - self._real.write( - '# {}\n'.format(line.decode('utf-8', errors='replace'))) - self._real.flush() - if buf: - self._real.write( - '# {}\n'.format(buf.decode('utf-8', errors='replace'))) - self._real.flush() - - def restore(self) -> None: - """Restore the original file descriptor and wait for the reader to drain.""" - os.dup2(self._saved_fd, self._fd) - os.close(self._saved_fd) - self._thread.join() + if self._pending: + self._stream.write('# {}\n'.format(self._pending)) + self._pending = '' + self._stream.flush() class TAPTestResult(unittest.TestResult): """Collect unittest results and render them as TAP version 13.""" - def __init__(self, stream: io.TextIOBase) -> None: + def __init__(self, stdout_stream: io.TextIOBase, + stderr_stream: io.TextIOBase) -> None: super().__init__() - self._stream = stream + self._stdout_stream = stdout_stream + self._stderr_stream = stderr_stream self._test_count = 0 def _description(self, test: unittest.TestCase) -> str: return '{} ({})'.format(test._testMethodName, type(test).__name__) + def _output_traceback(self, err): + tb = ''.join(traceback.format_exception(*err)) + + self._stderr_stream.write(' ---\n') + self._stderr_stream.write(' traceback: |\n') + + for line in tb.splitlines(): + self._stderr_stream.write(f' {line}\n') + + self._stderr_stream.write(' ...\n') + self._stderr_stream.flush() + def addSuccess(self, test: unittest.TestCase) -> None: super().addSuccess(test) self._test_count += 1 - self._stream.write('ok {} - {}\n'.format( + self._stdout_stream.write('ok {} - {}\n'.format( self._test_count, self._description(test))) - self._stream.flush() + self._stdout_stream.flush() def addError(self, test: unittest.TestCase, err: object) -> None: super().addError(test, err) self._test_count += 1 - self._stream.write('not ok {} - {}\n'.format( + self._stdout_stream.write('not ok {} - {}\n'.format( self._test_count, self._description(test))) - for line in traceback.format_exception(*err): # type: ignore[misc] - for subline in line.splitlines(): - self._stream.write('# {}\n'.format(subline)) - self._stream.flush() + self._stdout_stream.flush() + self._output_traceback(err) def addFailure(self, test: unittest.TestCase, err: object) -> None: super().addFailure(test, err) self._test_count += 1 - self._stream.write('not ok {} - {}\n'.format( + self._stdout_stream.write('not ok {} - {}\n'.format( self._test_count, self._description(test))) - for line in traceback.format_exception(*err): # type: ignore[misc] - for subline in line.splitlines(): - self._stream.write('# {}\n'.format(subline)) - self._stream.flush() + self._stdout_stream.flush() + self._output_traceback(err) def addSkip(self, test: unittest.TestCase, reason: str) -> None: super().addSkip(test, reason) self._test_count += 1 - self._stream.write('ok {} - {} # SKIP {}\n'.format( + self._stdout_stream.write('ok {} - {} # SKIP {}\n'.format( self._test_count, self._description(test), reason)) - self._stream.flush() + self._stdout_stream.flush() def addExpectedFailure(self, test: unittest.TestCase, err: object) -> None: super().addExpectedFailure(test, err) self._test_count += 1 - self._stream.write('ok {} - {} # TODO expected failure\n'.format( + self._stdout_stream.write('ok {} - {} # TODO expected failure\n'.format( self._test_count, self._description(test))) - self._stream.flush() + self._stdout_stream.flush() def addUnexpectedSuccess(self, test: unittest.TestCase) -> None: super().addUnexpectedSuccess(test) self._test_count += 1 - self._stream.write('not ok {} - {} # TODO unexpected success\n'.format( + self._stdout_stream.write('not ok {} - {} # TODO unexpected success\n'.format( self._test_count, self._description(test))) - self._stream.flush() + self._stdout_stream.flush() def run_tests(test_module_name: str, start_dir: str | None = None) -> bool: @@ -165,23 +134,16 @@ def run_tests(test_module_name: str, start_dir: str | None = None) -> bool: real_stdout.write('1..{}\n'.format(suite.countTestCases())) real_stdout.flush() - # Redirect stdout and stderr so any print()/sys.stderr.write() calls from - # setUp/tearDown/tests are re-emitted as TAP diagnostic lines and do not - # break the TAP stream. - sys.stdout = DiagnosticCapture(real_stdout) # type: ignore[assignment] - sys.stderr = DiagnosticCapture(real_stdout) # type: ignore[assignment] - # Also redirect fd 2 at the OS level so that subprocess stderr (which - # inherits the raw file descriptor and bypasses sys.stderr) is captured. - stderr_fd_capture = FDCapture(2, real_stdout) + # Redirect sys.stdout to a TAP diagnostic stream so that + # print()/sys.stdout.write() calls from setUp/tearDown/tests appear on + # stdout as '# ...' diagnostic lines rather than being sent to stderr. + # Error tracebacks (genuine failures) still go to stderr via stderr_stream. + sys.stdout = TAPDiagnosticStream(real_stdout) # type: ignore[assignment] try: - result = TAPTestResult(real_stdout) + result = TAPTestResult(real_stdout, real_stderr) suite.run(result) finally: - sys.stdout.flush() sys.stdout = real_stdout - sys.stderr.flush() - sys.stderr = real_stderr - stderr_fd_capture.restore() return result.wasSuccessful() @@ -195,8 +157,8 @@ def main() -> None: default=None) args = parser.parse_args() - success = run_tests(args.test_module, args.start_dir) - sys.exit(0 if success else 1) + run_tests(args.test_module, args.start_dir) + sys.exit(0) if __name__ == '__main__': From 88c1cb743bf426af9eb5091f4385f4759390d225 Mon Sep 17 00:00:00 2001 From: Daniel Wagner Date: Wed, 8 Apr 2026 11:06:03 +0200 Subject: [PATCH 3/3] tests: move diagnostic output behind debug flag This is adding unnecessary output to every run. Make it optional. Signed-off-by: Daniel Wagner --- tests/nvme_test.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/nvme_test.py b/tests/nvme_test.py index dc72aaf7ef..1571329a78 100644 --- a/tests/nvme_test.py +++ b/tests/nvme_test.py @@ -77,6 +77,7 @@ def setUp(self): self.do_validate_pci_device = True self.default_nsid = 0x1 self.flbas = 0 + self.debug = False self.config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json') self.load_config() @@ -85,7 +86,8 @@ def setUp(self): self.ns_mgmt_supported = self.get_ns_mgmt_support() if self.ns_mgmt_supported: self.create_and_attach_default_ns() - print(f"\nsetup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n") + if self.debug: + print(f"setup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}") def tearDown(self): """ Post Section for TestNVMe. """ @@ -93,11 +95,10 @@ def tearDown(self): shutil.rmtree(self.log_dir, ignore_errors=True) if self.ns_mgmt_supported: self.create_and_attach_default_ns() - print(f"\nteardown: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n") @classmethod def tearDownClass(cls): - print("\n") + pass def create_and_attach_default_ns(self): """ Creates a default namespace with the full capacity of the ctrls NVM @@ -147,7 +148,9 @@ def load_config(self): self.ns1 = config['ns1'] self.log_dir = config['log_dir'] self.nvme_bin = config.get('nvme_bin', self.nvme_bin) - print(f"\nUsing nvme binary '{self.nvme_bin}'") + self.debug = config.get('debug', False) + if self.debug: + print(f"Using nvme binary '{self.nvme_bin}'") self.do_validate_pci_device = config.get( 'do_validate_pci_device', self.do_validate_pci_device) self.clear_log_dir = False