|
| 1 | +#!/usr/bin/python3 |
| 2 | +import os |
| 3 | +import unittest |
| 4 | +from staslib import conf, stas, trid |
| 5 | + |
| 6 | +HOSTNQN = 'nqn.2014-08.org.nvmexpress:uuid:01234567-0123-0123-0123-0123456789ab' |
| 7 | +SUBSYSNQN = 'nqn.1988-11.com.dell:SFSS:2:20220208134025e8' |
| 8 | + |
| 9 | + |
| 10 | +# ============================================================================== |
| 11 | +class TestExcluded(unittest.TestCase): |
| 12 | + '''Unit tests for stas._excluded() — a pure function with no dependencies.''' |
| 13 | + |
| 14 | + def test_empty_exclusion_list(self): |
| 15 | + self.assertFalse(stas._excluded([], {'transport': 'tcp', 'traddr': '1.2.3.4'})) |
| 16 | + |
| 17 | + def test_exact_match_is_excluded(self): |
| 18 | + excluded = [{'transport': 'tcp', 'traddr': '1.2.3.4'}] |
| 19 | + self.assertTrue(stas._excluded(excluded, {'transport': 'tcp', 'traddr': '1.2.3.4'})) |
| 20 | + |
| 21 | + def test_partial_exclusion_matches_any_controller_with_that_field(self): |
| 22 | + # Exclusion specifies only transport — should match any TCP controller |
| 23 | + excluded = [{'transport': 'tcp'}] |
| 24 | + self.assertTrue(stas._excluded(excluded, {'transport': 'tcp', 'traddr': '99.99.99.99'})) |
| 25 | + |
| 26 | + def test_one_field_mismatch_not_excluded(self): |
| 27 | + excluded = [{'transport': 'tcp', 'traddr': '1.2.3.4'}] |
| 28 | + self.assertFalse(stas._excluded(excluded, {'transport': 'tcp', 'traddr': '5.5.5.5'})) |
| 29 | + |
| 30 | + def test_missing_key_in_controller_not_excluded(self): |
| 31 | + # Exclusion requires traddr but controller dict has none — should not match |
| 32 | + excluded = [{'transport': 'tcp', 'traddr': '1.2.3.4'}] |
| 33 | + self.assertFalse(stas._excluded(excluded, {'transport': 'tcp'})) |
| 34 | + |
| 35 | + def test_multiple_entries_first_matches(self): |
| 36 | + excluded = [ |
| 37 | + {'transport': 'tcp', 'traddr': '1.2.3.4'}, |
| 38 | + {'transport': 'rdma', 'traddr': '5.5.5.5'}, |
| 39 | + ] |
| 40 | + self.assertTrue(stas._excluded(excluded, {'transport': 'tcp', 'traddr': '1.2.3.4'})) |
| 41 | + |
| 42 | + def test_multiple_entries_second_matches(self): |
| 43 | + excluded = [ |
| 44 | + {'transport': 'tcp', 'traddr': '99.99.99.99'}, |
| 45 | + {'transport': 'rdma', 'traddr': '5.5.5.5'}, |
| 46 | + ] |
| 47 | + self.assertTrue(stas._excluded(excluded, {'transport': 'rdma', 'traddr': '5.5.5.5'})) |
| 48 | + |
| 49 | + def test_multiple_entries_none_match(self): |
| 50 | + excluded = [ |
| 51 | + {'transport': 'tcp', 'traddr': '1.2.3.4'}, |
| 52 | + {'transport': 'rdma', 'traddr': '5.5.5.5'}, |
| 53 | + ] |
| 54 | + self.assertFalse(stas._excluded(excluded, {'transport': 'fc', 'traddr': '5.5.5.5'})) |
| 55 | + |
| 56 | + |
| 57 | +# ============================================================================== |
| 58 | +class TestTidFromDlpe(unittest.TestCase): |
| 59 | + '''Unit tests for stas.tid_from_dlpe().''' |
| 60 | + |
| 61 | + DLPE = { |
| 62 | + 'trtype': 'tcp', |
| 63 | + 'traddr': '10.10.10.10', |
| 64 | + 'trsvcid': '8009', |
| 65 | + 'subnqn': SUBSYSNQN, |
| 66 | + } |
| 67 | + |
| 68 | + def test_returns_tid_instance(self): |
| 69 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='1.2.3.4', host_iface='eth0', host_nqn=HOSTNQN) |
| 70 | + self.assertIsInstance(result, trid.TID) |
| 71 | + |
| 72 | + def test_transport_field(self): |
| 73 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='', host_iface='', host_nqn=HOSTNQN) |
| 74 | + self.assertEqual(result.transport, 'tcp') |
| 75 | + |
| 76 | + def test_traddr_field(self): |
| 77 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='', host_iface='', host_nqn=HOSTNQN) |
| 78 | + self.assertEqual(result.traddr, '10.10.10.10') |
| 79 | + |
| 80 | + def test_trsvcid_field(self): |
| 81 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='', host_iface='', host_nqn=HOSTNQN) |
| 82 | + self.assertEqual(result.trsvcid, '8009') |
| 83 | + |
| 84 | + def test_subsysnqn_field(self): |
| 85 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='', host_iface='', host_nqn=HOSTNQN) |
| 86 | + self.assertEqual(result.subsysnqn, SUBSYSNQN) |
| 87 | + |
| 88 | + def test_host_traddr_field(self): |
| 89 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='1.2.3.4', host_iface='', host_nqn=HOSTNQN) |
| 90 | + self.assertEqual(result.host_traddr, '1.2.3.4') |
| 91 | + |
| 92 | + def test_none_host_nqn_falls_back_to_sysconf(self): |
| 93 | + # When host_nqn is None, TID falls back to SysConf.hostnqn (which may |
| 94 | + # itself be None if /etc/nvme/hostnqn is absent — that is acceptable here) |
| 95 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='', host_iface='', host_nqn=None) |
| 96 | + self.assertIsInstance(result, trid.TID) |
| 97 | + |
| 98 | + def test_usable_as_dict_key(self): |
| 99 | + result = stas.tid_from_dlpe(self.DLPE, host_traddr='', host_iface='', host_nqn=HOSTNQN) |
| 100 | + d = {result: 'value'} |
| 101 | + self.assertEqual(d[result], 'value') |
| 102 | + |
| 103 | + def test_identical_dlpes_produce_equal_tids(self): |
| 104 | + t1 = stas.tid_from_dlpe(self.DLPE, '1.2.3.4', 'eth0', HOSTNQN) |
| 105 | + t2 = stas.tid_from_dlpe(self.DLPE, '1.2.3.4', 'eth0', HOSTNQN) |
| 106 | + self.assertEqual(t1, t2) |
| 107 | + |
| 108 | + def test_different_traddr_produces_unequal_tids(self): |
| 109 | + dlpe2 = dict(self.DLPE) |
| 110 | + dlpe2['traddr'] = '20.20.20.20' |
| 111 | + t1 = stas.tid_from_dlpe(self.DLPE, '1.2.3.4', 'eth0', HOSTNQN) |
| 112 | + t2 = stas.tid_from_dlpe(dlpe2, '1.2.3.4', 'eth0', HOSTNQN) |
| 113 | + self.assertNotEqual(t1, t2) |
| 114 | + |
| 115 | + def test_different_host_traddr_produces_unequal_tids(self): |
| 116 | + t1 = stas.tid_from_dlpe(self.DLPE, '1.2.3.4', 'eth0', HOSTNQN) |
| 117 | + t2 = stas.tid_from_dlpe(self.DLPE, '9.9.9.9', 'eth0', HOSTNQN) |
| 118 | + self.assertNotEqual(t1, t2) |
| 119 | + |
| 120 | + |
| 121 | +# ============================================================================== |
| 122 | +class TestRemoveExcluded(unittest.TestCase): |
| 123 | + '''Unit tests for stas.remove_excluded().''' |
| 124 | + |
| 125 | + FNAME = '/tmp/stas-test-remove-excluded.conf' |
| 126 | + |
| 127 | + @classmethod |
| 128 | + def setUpClass(cls): |
| 129 | + with open(cls.FNAME, 'w') as f: |
| 130 | + f.writelines([ |
| 131 | + '[Controllers]\n', |
| 132 | + 'exclude=transport=tcp;traddr=10.10.10.10\n', |
| 133 | + 'exclude=transport=rdma;traddr=192.168.1.1\n', |
| 134 | + ]) |
| 135 | + conf.SvcConf().set_conf_file(cls.FNAME) |
| 136 | + |
| 137 | + @classmethod |
| 138 | + def tearDownClass(cls): |
| 139 | + if os.path.exists(cls.FNAME): |
| 140 | + os.remove(cls.FNAME) |
| 141 | + |
| 142 | + def _make_tid(self, transport, traddr): |
| 143 | + return trid.TID({'transport': transport, 'traddr': traddr, 'subsysnqn': SUBSYSNQN, 'host-nqn': HOSTNQN}) |
| 144 | + |
| 145 | + def test_empty_list_unchanged(self): |
| 146 | + self.assertEqual(stas.remove_excluded([]), []) |
| 147 | + |
| 148 | + def test_excluded_controller_is_removed(self): |
| 149 | + controllers = [self._make_tid('tcp', '10.10.10.10')] |
| 150 | + self.assertEqual(stas.remove_excluded(controllers), []) |
| 151 | + |
| 152 | + def test_second_exclusion_rule_applied(self): |
| 153 | + controllers = [self._make_tid('rdma', '192.168.1.1')] |
| 154 | + self.assertEqual(stas.remove_excluded(controllers), []) |
| 155 | + |
| 156 | + def test_non_excluded_controller_is_kept(self): |
| 157 | + t = self._make_tid('tcp', '1.1.1.1') |
| 158 | + self.assertEqual(stas.remove_excluded([t]), [t]) |
| 159 | + |
| 160 | + def test_mixed_list_only_excluded_removed(self): |
| 161 | + excluded = self._make_tid('tcp', '10.10.10.10') |
| 162 | + kept = self._make_tid('tcp', '1.1.1.1') |
| 163 | + result = stas.remove_excluded([excluded, kept]) |
| 164 | + self.assertNotIn(excluded, result) |
| 165 | + self.assertIn(kept, result) |
| 166 | + |
| 167 | + def test_multiple_non_excluded_all_kept(self): |
| 168 | + t1 = self._make_tid('tcp', '1.1.1.1') |
| 169 | + t2 = self._make_tid('tcp', '2.2.2.2') |
| 170 | + result = stas.remove_excluded([t1, t2]) |
| 171 | + self.assertEqual(len(result), 2) |
| 172 | + |
| 173 | + |
| 174 | +# ============================================================================== |
| 175 | +class TestRemoveInvalidAddresses(unittest.TestCase): |
| 176 | + '''Unit tests for stas.remove_invalid_addresses().''' |
| 177 | + |
| 178 | + FNAME_BOTH = '/tmp/stas-test-addr-both.conf' |
| 179 | + FNAME_IPV4 = '/tmp/stas-test-addr-ipv4.conf' |
| 180 | + FNAME_IPV6 = '/tmp/stas-test-addr-ipv6.conf' |
| 181 | + |
| 182 | + @classmethod |
| 183 | + def setUpClass(cls): |
| 184 | + for fname, family in ( |
| 185 | + (cls.FNAME_BOTH, 'ipv4+ipv6'), |
| 186 | + (cls.FNAME_IPV4, 'ipv4'), |
| 187 | + (cls.FNAME_IPV6, 'ipv6'), |
| 188 | + ): |
| 189 | + with open(fname, 'w') as f: |
| 190 | + f.write(f'[Global]\nip-family={family}\n') |
| 191 | + |
| 192 | + @classmethod |
| 193 | + def tearDownClass(cls): |
| 194 | + for fname in (cls.FNAME_BOTH, cls.FNAME_IPV4, cls.FNAME_IPV6): |
| 195 | + if os.path.exists(fname): |
| 196 | + os.remove(fname) |
| 197 | + |
| 198 | + def _make_tid(self, transport, traddr): |
| 199 | + return trid.TID({'transport': transport, 'traddr': traddr, 'subsysnqn': SUBSYSNQN, 'host-nqn': HOSTNQN}) |
| 200 | + |
| 201 | + def test_empty_list_unchanged(self): |
| 202 | + conf.SvcConf().set_conf_file(self.FNAME_BOTH) |
| 203 | + self.assertEqual(stas.remove_invalid_addresses([]), []) |
| 204 | + |
| 205 | + def test_valid_ipv4_kept_when_both_families_allowed(self): |
| 206 | + conf.SvcConf().set_conf_file(self.FNAME_BOTH) |
| 207 | + t = self._make_tid('tcp', '10.10.10.10') |
| 208 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 209 | + |
| 210 | + def test_valid_ipv6_kept_when_both_families_allowed(self): |
| 211 | + conf.SvcConf().set_conf_file(self.FNAME_BOTH) |
| 212 | + t = self._make_tid('tcp', '::1') |
| 213 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 214 | + |
| 215 | + def test_invalid_address_always_removed(self): |
| 216 | + conf.SvcConf().set_conf_file(self.FNAME_BOTH) |
| 217 | + t = self._make_tid('tcp', 'not-an-ip-address') |
| 218 | + self.assertEqual(stas.remove_invalid_addresses([t]), []) |
| 219 | + |
| 220 | + def test_ipv4_removed_when_only_ipv6_enabled(self): |
| 221 | + conf.SvcConf().set_conf_file(self.FNAME_IPV6) |
| 222 | + t = self._make_tid('tcp', '10.10.10.10') |
| 223 | + self.assertEqual(stas.remove_invalid_addresses([t]), []) |
| 224 | + |
| 225 | + def test_ipv6_removed_when_only_ipv4_enabled(self): |
| 226 | + conf.SvcConf().set_conf_file(self.FNAME_IPV4) |
| 227 | + t = self._make_tid('tcp', '::1') |
| 228 | + self.assertEqual(stas.remove_invalid_addresses([t]), []) |
| 229 | + |
| 230 | + def test_ipv4_kept_when_only_ipv4_enabled(self): |
| 231 | + conf.SvcConf().set_conf_file(self.FNAME_IPV4) |
| 232 | + t = self._make_tid('tcp', '10.10.10.10') |
| 233 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 234 | + |
| 235 | + def test_ipv6_kept_when_only_ipv6_enabled(self): |
| 236 | + conf.SvcConf().set_conf_file(self.FNAME_IPV6) |
| 237 | + t = self._make_tid('tcp', '::1') |
| 238 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 239 | + |
| 240 | + def test_rdma_with_valid_ipv4_kept(self): |
| 241 | + conf.SvcConf().set_conf_file(self.FNAME_BOTH) |
| 242 | + t = self._make_tid('rdma', '192.168.0.1') |
| 243 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 244 | + |
| 245 | + def test_fc_transport_always_kept_regardless_of_ip_family(self): |
| 246 | + conf.SvcConf().set_conf_file(self.FNAME_IPV4) |
| 247 | + t = self._make_tid('fc', 'nn-0x1000000044001123:pn-0x2000000055001123') |
| 248 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 249 | + |
| 250 | + def test_loop_transport_always_kept(self): |
| 251 | + conf.SvcConf().set_conf_file(self.FNAME_IPV4) |
| 252 | + t = self._make_tid('loop', '') |
| 253 | + self.assertEqual(stas.remove_invalid_addresses([t]), [t]) |
| 254 | + |
| 255 | + def test_unknown_transport_always_removed(self): |
| 256 | + conf.SvcConf().set_conf_file(self.FNAME_BOTH) |
| 257 | + t = self._make_tid('unknown', '10.10.10.10') |
| 258 | + self.assertEqual(stas.remove_invalid_addresses([t]), []) |
| 259 | + |
| 260 | + |
| 261 | +if __name__ == '__main__': |
| 262 | + unittest.main() |
0 commit comments