From 78caadd2eb87b181b075af6ec92e20361181282f Mon Sep 17 00:00:00 2001 From: Daniel Wagner Date: Fri, 10 Apr 2026 08:28:02 +0000 Subject: [PATCH] tests: modernize subprocess calls Python recommends to use the subprocess.run interface these days. There are several different ways how calls are done which also makes the logging sprinkling through the code base. Switch over to the run API and use one central place for it. Signed-off-by: Daniel Wagner --- tests/nvme_copy_test.py | 43 ++------ tests/nvme_format_test.py | 14 +-- tests/nvme_get_features_test.py | 15 +-- tests/nvme_test.py | 188 +++++++++++--------------------- 4 files changed, 77 insertions(+), 183 deletions(-) diff --git a/tests/nvme_copy_test.py b/tests/nvme_copy_test.py index 38c5805878..18dcae14db 100644 --- a/tests/nvme_copy_test.py +++ b/tests/nvme_copy_test.py @@ -16,13 +16,8 @@ """ -import logging -import subprocess - from nvme_test import TestNVMe -logger = logging.getLogger(__name__) - class TestNVMeCopy(TestNVMe): @@ -44,14 +39,10 @@ def setUp(self): # get host behavior support data get_features_cmd = f"{self.nvme_bin} get-feature {self.ctrl} " + \ "--feature-id=0x16 --data-len=512 --raw-binary" - logger.debug(get_features_cmd) - proc = subprocess.Popen(get_features_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() + result = self.run_cmd(get_features_cmd) + err = result.returncode self.assertEqual(err, 0, "ERROR : nvme get-feature failed") - self.host_behavior_data = proc.stdout.read() + self.host_behavior_data = result.stdout # enable cross-namespace copy formats if self.host_behavior_data[4] & cross_namespace_copy: # skip if already enabled @@ -61,23 +52,13 @@ def setUp(self): data = self.host_behavior_data[:4] + cross_namespace_copy.to_bytes(2, 'little') + self.host_behavior_data[6:] set_features_cmd = f"{self.nvme_bin} set-feature " + \ f"{self.ctrl} --feature-id=0x16 --data-len=512" - proc = subprocess.Popen(set_features_cmd, - shell=True, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - encoding='utf-8') - proc.communicate(input=data) - self.assertEqual(proc.returncode, 0, "Failed to enable cross-namespace copy formats") + result = self.run_cmd(set_features_cmd, stdin_data=data) + self.assertEqual(result.returncode, 0, "Failed to enable cross-namespace copy formats") get_ns_id_cmd = f"{self.nvme_bin} get-ns-id {self.ns1}" - logger.debug(get_ns_id_cmd) - proc = subprocess.Popen(get_ns_id_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() + result = self.run_cmd(get_ns_id_cmd) + err = result.returncode self.assertEqual(err, 0, "ERROR : nvme get-ns-id failed") - output = proc.stdout.read() - logger.debug(output) + output = result.stdout self.ns1_nsid = int(output.strip().split(':')[-1]) self.setup_log_dir(self.__class__.__name__) @@ -87,13 +68,7 @@ def tearDown(self): # restore saved host behavior support data set_features_cmd = f"{self.nvme_bin} set-feature {self.ctrl} " + \ "--feature-id=0x16 --data-len=512" - logger.debug(set_features_cmd) - proc = subprocess.Popen(set_features_cmd, - shell=True, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - encoding='utf-8') - proc.communicate(input=self.host_behavior_data) + self.run_cmd(set_features_cmd, stdin_data=self.host_behavior_data) super().tearDown() def copy(self, sdlba, blocks, slbs, **kwargs): diff --git a/tests/nvme_format_test.py b/tests/nvme_format_test.py index 9c247c842a..82d31731fb 100644 --- a/tests/nvme_format_test.py +++ b/tests/nvme_format_test.py @@ -40,7 +40,6 @@ import json import logging import math -import subprocess from nvme_test import TestNVMe @@ -110,16 +109,9 @@ def attach_detach_primary_ns(self): # read lbaf information id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \ f"--namespace-id={self.default_nsid} --output-format=json" - logger.debug(id_ns_cmd) - proc = subprocess.Popen(id_ns_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() - self.assertEqual(err, 0, "ERROR : nvme id-ns failed") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(id_ns_cmd) + self.assertEqual(result.returncode, 0, "ERROR : nvme id-ns failed") + json_output = json.loads(result.stdout) self.lba_format_list = json_output['lbafs'] self.assertTrue(len(self.lba_format_list) > 0, "ERROR : nvme id-ns could not find any lba formats") diff --git a/tests/nvme_get_features_test.py b/tests/nvme_get_features_test.py index ff22eb72d0..68d9849749 100644 --- a/tests/nvme_get_features_test.py +++ b/tests/nvme_get_features_test.py @@ -34,13 +34,8 @@ 9. 0Bh M Asynchronous Event Configuration. """ -import logging -import subprocess - from nvme_test import TestNVMe -logger = logging.getLogger(__name__) - class TestNVMeGetMandatoryFeatures(TestNVMe): @@ -62,14 +57,8 @@ def setUp(self): device = self.ctrl.split('/')[-1] get_vector_list_cmd = "grep " + device + "q /proc/interrupts |" \ " cut -d : -f 1 | tr -d ' ' | tr '\n' ' '" - logger.debug(get_vector_list_cmd) - proc = subprocess.Popen(get_vector_list_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - output = proc.stdout.read() - logger.debug(output) - self.vector_list_len = len(output.strip().split(" ")) + result = self.run_cmd(get_vector_list_cmd) + self.vector_list_len = len(result.stdout.strip().split(" ")) def tearDown(self): """ Post Section for TestNVMeGetMandatoryFeatures diff --git a/tests/nvme_test.py b/tests/nvme_test.py index d8a8eb4311..fabfb47cc0 100644 --- a/tests/nvme_test.py +++ b/tests/nvme_test.py @@ -134,8 +134,7 @@ def validate_pci_device(self): """ x1, x2, dev = self.ctrl.split('/') cmd = "find /sys/devices -name \\*" + dev + " | grep -i pci" - logger.debug(cmd) - err = subprocess.call(cmd, shell=True, stdout=subprocess.DEVNULL) + err = self.run_cmd(cmd).returncode self.assertEqual(err, 0, "ERROR : Only NVMe PCI subsystem is supported") def load_config(self): @@ -182,14 +181,28 @@ def setup_log_dir(self, test_name): sys.stdout = TestNVMeLogger(self.test_log_dir + "/" + "stdout.log") sys.stderr = TestNVMeLogger(self.test_log_dir + "/" + "stderr.log") + def run_cmd(self, cmd, stdin_data=None): + """ Run a shell command using subprocess.run, log the command and its + output, and return the CompletedProcess result. + - Args: + - cmd : shell command string to execute. + - stdin_data : optional string to pass as stdin input. + - Returns: + - CompletedProcess result. + """ + logger.debug(f"Running: {cmd}") + result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8', + input=stdin_data) + if result.stdout: + logger.debug(result.stdout) + if result.stderr: + logger.debug(result.stderr) + return result + def exec_cmd(self, cmd): """ Wrapper for executing a shell command and return the result. """ - logger.debug(cmd) - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - encoding='utf-8') - ret = proc.wait() - logger.debug(proc.stdout.read()) - return ret + return self.run_cmd(cmd).returncode def nvme_reset_ctrl(self): """ Wrapper for nvme reset command. @@ -199,19 +212,11 @@ def nvme_reset_ctrl(self): - None """ nvme_reset_cmd = f"{self.nvme_bin} reset {self.ctrl}" - logger.debug(nvme_reset_cmd) - err = subprocess.call(nvme_reset_cmd, - shell=True, - stdout=subprocess.DEVNULL) + err = self.run_cmd(nvme_reset_cmd).returncode self.assertEqual(err, 0, "ERROR : nvme reset failed") rescan_cmd = "echo 1 > /sys/bus/pci/rescan" - logger.debug(rescan_cmd) - proc = subprocess.Popen(rescan_cmd, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding='utf-8') - self.assertEqual(proc.wait(), 0, "ERROR : pci rescan failed") + result = self.run_cmd(rescan_cmd) + self.assertEqual(result.returncode, 0, "ERROR : pci rescan failed") def get_ctrl_id(self): """ Wrapper for extracting the first controller id. @@ -222,16 +227,9 @@ def get_ctrl_id(self): """ get_ctrl_id = f"{self.nvme_bin} list-ctrl {self.ctrl} " + \ "--output-format=json" - logger.debug(get_ctrl_id) - proc = subprocess.Popen(get_ctrl_id, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() - self.assertEqual(err, 0, "ERROR : nvme list-ctrl failed") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(get_ctrl_id) + self.assertEqual(result.returncode, 0, "ERROR : nvme list-ctrl failed") + json_output = json.loads(result.stdout) self.assertTrue(len(json_output['ctrl_list']) > 0, "ERROR : nvme list-ctrl could not find ctrl") return str(json_output['ctrl_list'][0]['ctrl_id']) @@ -270,15 +268,9 @@ def get_nsid_list(self): ns_list = [] ns_list_cmd = f"{self.nvme_bin} list-ns {self.ctrl} " + \ "--output-format=json" - logger.debug(ns_list_cmd) - proc = subprocess.Popen(ns_list_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - self.assertEqual(proc.wait(), 0, "ERROR : nvme list namespace failed") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(ns_list_cmd) + self.assertEqual(result.returncode, 0, "ERROR : nvme list namespace failed") + json_output = json.loads(result.stdout) for ns in json_output['nsid_list']: ns_list.append(ns['nsid']) @@ -294,16 +286,9 @@ def get_max_ns(self): """ max_ns_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl} " + \ "--output-format=json" - logger.debug(max_ns_cmd) - proc = subprocess.Popen(max_ns_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() - self.assertEqual(err, 0, "ERROR : reading maximum namespace count failed") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(max_ns_cmd) + self.assertEqual(result.returncode, 0, "ERROR : reading maximum namespace count failed") + json_output = json.loads(result.stdout) return int(json_output['nn']) def get_lba_status_supported(self): @@ -324,16 +309,9 @@ def get_lba_format_size(self): """ nvme_id_ns_cmd = f"{self.nvme_bin} id-ns {self.ns1} " + \ "--output-format=json" - logger.debug(nvme_id_ns_cmd) - proc = subprocess.Popen(nvme_id_ns_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() - self.assertEqual(err, 0, "ERROR : reading id-ns") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(nvme_id_ns_cmd) + self.assertEqual(result.returncode, 0, "ERROR : reading id-ns") + json_output = json.loads(result.stdout) self.assertTrue(len(json_output['lbafs']) > self.flbas, "Error : could not match the given flbas to an existing lbaf") lbaf_json = json_output['lbafs'][int(self.flbas)] @@ -365,16 +343,9 @@ def get_id_ctrl_field_value(self, field): """ id_ctrl_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl} " + \ "--output-format=json" - logger.debug(id_ctrl_cmd) - proc = subprocess.Popen(id_ctrl_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() - self.assertEqual(err, 0, "ERROR : reading id-ctrl failed") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(id_ctrl_cmd) + self.assertEqual(result.returncode, 0, "ERROR : reading id-ctrl failed") + json_output = json.loads(result.stdout) self.assertTrue(field in json_output, f"ERROR : reading field '{field}' failed") return str(json_output[field]) @@ -400,15 +371,9 @@ def delete_all_ns(self): self.assertEqual(self.exec_cmd(delete_ns_cmd), 0) list_ns_cmd = f"{self.nvme_bin} list-ns {self.ctrl} --all " + \ "--output-format=json" - logger.debug(list_ns_cmd) - proc = subprocess.Popen(list_ns_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - self.assertEqual(proc.wait(), 0, "ERROR : nvme list-ns failed") - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + result = self.run_cmd(list_ns_cmd) + self.assertEqual(result.returncode, 0, "ERROR : nvme list-ns failed") + json_output = json.loads(result.stdout) self.assertEqual(len(json_output['nsid_list']), 0, "ERROR : deleting all namespace failed") @@ -420,14 +385,13 @@ def create_ns(self, nsze, ncap, flbas, dps): - flbas : new namespace format. - dps : new namespace data protection information. - Returns: - - Popen object of the nvme create namespace command. + - Tuple of (returncode, stdout) from the nvme create-ns command. """ create_ns_cmd = f"{self.nvme_bin} create-ns {self.ctrl} " + \ f"--nsze={str(nsze)} --ncap={str(ncap)} --flbas={str(flbas)} " + \ f"--dps={str(dps)} --verbose --output-format=json" - logger.debug(create_ns_cmd) - return subprocess.Popen(create_ns_cmd, shell=True, - stdout=subprocess.PIPE, encoding='utf-8') + result = self.run_cmd(create_ns_cmd) + return result.returncode, result.stdout def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps): """ Wrapper for creating and validating a namespace. @@ -440,20 +404,14 @@ def create_and_validate_ns(self, nsid, nsze, ncap, flbas, dps): - Returns: - return 0 on success, error code on failure. """ - proc = self.create_ns(nsze, ncap, flbas, dps) - err = proc.wait() + err, stdout = self.create_ns(nsze, ncap, flbas, dps) if err == 0: - output = proc.stdout.read() - logger.debug(output) - json_output = json.loads(output) + json_output = json.loads(stdout) self.assertEqual(int(json_output['nsid']), nsid, "ERROR : create namespace failed") id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \ f"--namespace-id={str(nsid)}" - logger.debug(id_ns_cmd) - err = subprocess.call(id_ns_cmd, - shell=True, - stdout=subprocess.DEVNULL) + err = self.run_cmd(id_ns_cmd).returncode return err def attach_ns(self, ctrl_id, nsid): @@ -466,10 +424,7 @@ def attach_ns(self, ctrl_id, nsid): """ attach_ns_cmd = f"{self.nvme_bin} attach-ns {self.ctrl} " + \ f"--namespace-id={str(nsid)} --controllers={ctrl_id} --verbose" - logger.debug(attach_ns_cmd) - err = subprocess.call(attach_ns_cmd, - shell=True, - stdout=subprocess.DEVNULL) + err = self.run_cmd(attach_ns_cmd).returncode if err != 0: return err @@ -493,10 +448,7 @@ def detach_ns(self, ctrl_id, nsid): """ detach_ns_cmd = f"{self.nvme_bin} detach-ns {self.ctrl} " + \ f"--namespace-id={str(nsid)} --controllers={ctrl_id} --verbose" - logger.debug(detach_ns_cmd) - return subprocess.call(detach_ns_cmd, - shell=True, - stdout=subprocess.DEVNULL) + return self.run_cmd(detach_ns_cmd).returncode def delete_and_validate_ns(self, nsid): """ Wrapper for deleting and validating that namespace is deleted. @@ -508,10 +460,7 @@ def delete_and_validate_ns(self, nsid): # delete the namespace delete_ns_cmd = f"{self.nvme_bin} delete-ns {self.ctrl} " + \ f"--namespace-id={str(nsid)} --verbose" - logger.debug(delete_ns_cmd) - err = subprocess.call(delete_ns_cmd, - shell=True, - stdout=subprocess.DEVNULL) + err = self.run_cmd(delete_ns_cmd).returncode self.assertEqual(err, 0, "ERROR : delete namespace failed") return err @@ -524,7 +473,8 @@ def get_smart_log(self, nsid): """ smart_log_cmd = f"{self.nvme_bin} smart-log {self.ctrl} " + \ f"--namespace-id={str(nsid)}" - err = self.exec_cmd(smart_log_cmd) + result = self.run_cmd(smart_log_cmd) + err = result.returncode self.assertEqual(err, 0, "ERROR : nvme smart log failed") return err @@ -540,7 +490,8 @@ def get_id_ctrl(self, vendor=False): else: id_ctrl_cmd = f"{self.nvme_bin} id-ctrl " +\ f"--vendor-specific {self.ctrl}" - err = self.exec_cmd(id_ctrl_cmd) + result = self.run_cmd(id_ctrl_cmd) + err = result.returncode self.assertEqual(err, 0, "ERROR : nvme id controller failed") return err @@ -553,16 +504,11 @@ def get_error_log(self): """ pattern = re.compile(r"^ Entry\[[ ]*[0-9]+\]") error_log_cmd = f"{self.nvme_bin} error-log {self.ctrl}" - logger.debug(error_log_cmd) - proc = subprocess.Popen(error_log_cmd, - shell=True, - stdout=subprocess.PIPE, - encoding='utf-8') - err = proc.wait() + result = self.run_cmd(error_log_cmd) + err = result.returncode self.assertEqual(err, 0, "ERROR : nvme error log failed") - output = proc.stdout.read() - logger.debug(output) - lines = output.splitlines() + # This sanity checkes the 'normal' output + lines = result.stdout.splitlines() if not lines: return 1 err_log_entry_count = int(lines[0].split(" ")[5].strip().split(":")[1]) @@ -581,16 +527,8 @@ def run_ns_io(self, nsid, lbads, count=10): block_size = ds if int(lbads) < 9 else 2 ** int(lbads) ns_path = self.ctrl + "n" + str(nsid) io_cmd = "dd if=" + ns_path + " of=/dev/null" + " bs=" + \ - str(block_size) + " count=" + str(count) + " > /dev/null 2>&1" - logger.debug(io_cmd) - run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE, - encoding='utf-8') - run_io_result = run_io.communicate()[1] - self.assertEqual(run_io_result, None) + str(block_size) + " count=" + str(count) + self.assertEqual(self.run_cmd(io_cmd).returncode, 0) io_cmd = "dd if=/dev/zero of=" + ns_path + " bs=" + \ - str(block_size) + " count=" + str(count) + " > /dev/null 2>&1" - logger.debug(io_cmd) - run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE, - encoding='utf-8') - run_io_result = run_io.communicate()[1] - self.assertEqual(run_io_result, None) + str(block_size) + " count=" + str(count) + self.assertEqual(self.run_cmd(io_cmd).returncode, 0)