From 0124d396024503344b4a6648ca50e8f5db864724 Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Tue, 29 Apr 2025 15:46:49 +0200 Subject: [PATCH] tests: Add a custom script for getting info about Stratis pools There are multiple versions of Stratis in systems we support and it is hard to get all the information to verify test results from the Stratis cmdline utility in a consistent way. This adds a small Python script that uses Stratis DBus API to get the information we need for the tests. --- tests/scripts/stratis_pool_info.py | 155 +++++++++++++++++++++++++++++ tests/verify-pool-stratis.yml | 26 ++--- 2 files changed, 169 insertions(+), 12 deletions(-) create mode 100644 tests/scripts/stratis_pool_info.py diff --git a/tests/scripts/stratis_pool_info.py b/tests/scripts/stratis_pool_info.py new file mode 100644 index 00000000..4d5ff375 --- /dev/null +++ b/tests/scripts/stratis_pool_info.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +# Helper script for gathering information about stratis pools using stratis DBus API. + +# The script is meant to be a supporting tool for the storage role tests + +import sys +from collections import namedtuple + +import json + +import gi # pylint: disable=import-error +gi.require_version("GLib", "2.0") +gi.require_version("Gio", "2.0") + +from gi.repository import GLib, Gio # pylint: disable=import-error + +STRATIS_SERVICE = "org.storage.stratis3" +STRATIS_PATH = "/org/storage/stratis3" +STRATIS_POOL_INTF = STRATIS_SERVICE + ".pool.r0" + +# Code for working with DBus, taken from blivet/safe_dbus.py +DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties" +DBUS_INTRO_IFACE = "org.freedesktop.DBus.Introspectable" + + +class SafeDBusError(Exception): + """Class for exceptions defined in this module.""" + + +class DBusCallError(SafeDBusError): + """Class for the errors related to calling methods over DBus.""" + + +class DBusPropertyError(DBusCallError): + """Class for the errors related to getting property values over DBus.""" + + +def get_new_system_connection(): + """Return a new connection to the system bus.""" + + return Gio.DBusConnection.new_for_address_sync( + Gio.dbus_address_get_for_bus_sync(Gio.BusType.SYSTEM, None), + Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT + | Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION, + None, None) + + +def call_sync(service, obj_path, iface, method, args, connection=None, fds=None): + if not connection: + try: + connection = get_new_system_connection() + except GLib.GError as gerr: + raise DBusCallError("Unable to connect to system bus: %s" % gerr) from gerr + + if connection.is_closed(): + raise DBusCallError("Connection is closed") + + try: + ret = connection.call_with_unix_fd_list_sync(service, obj_path, iface, method, + args, None, Gio.DBusCallFlags.NONE, + -1, fds, None) + except GLib.GError as gerr: + msg = "Failed to call %s method on %s with %s arguments: %s" % \ + (method, obj_path, args, gerr.message) # pylint: disable=no-member + raise DBusCallError(msg) from gerr + + if ret is None: + msg = "No return from %s method on %s with %s arguments" % (method, obj_path, + args) + raise DBusCallError(msg) + + return ret[0].unpack() + + +def get_properties_sync(service, obj_path, iface, connection=None): + args = GLib.Variant('(s)', (iface,)) + ret = call_sync(service, obj_path, DBUS_PROPS_IFACE, "GetAll", args, + connection) + return ret + + +# Extracting and printing Stratis pool information +StratisPoolInfo = namedtuple("StratisPoolInfo", ["name", "encrypted", "key_desc", + "clevis_pin", "clevis_args"]) + + +def _print_pool_info_json(pool_info): + pi_dict = pool_info._asdict() + pi_json = json.dumps(pi_dict) + print(pi_json) + + +def _get_pool_info(pool_path): + try: + properties = get_properties_sync(STRATIS_SERVICE, + pool_path, + STRATIS_POOL_INTF)[0] + except DBusPropertyError: + return None + + if not properties: + return None + + description = properties.get("KeyDescription", None) + if not description or not description[0] or not description[1][0]: + key_desc = None + else: + key_desc = description[1][1] + + clevis_info = properties.get("ClevisInfo", None) + if not clevis_info or not clevis_info[0] or not clevis_info[1][0]: + clevis = None + else: + clevis = clevis_info[1][1] + + if clevis: + clevis_pin = clevis[0] + clevis_args = json.loads(clevis[1]) + else: + clevis_pin = None + clevis_args = {} + + return StratisPoolInfo(name=properties["Name"], + encrypted=properties["Encrypted"], + key_desc=key_desc, + clevis_pin=clevis_pin, + clevis_args=clevis_args) + + +def main(pool_name): + objects = call_sync(STRATIS_SERVICE, + STRATIS_PATH, + "org.freedesktop.DBus.ObjectManager", + "GetManagedObjects", + None)[0] + + for path, interfaces in objects.items(): + if STRATIS_POOL_INTF in interfaces.keys(): + pool_info = _get_pool_info(path) + if pool_info and pool_info.name == pool_name: + _print_pool_info_json(pool_info) + return True + + print(json.dumps(None)) + return True + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python %s " % sys.argv[0], file=sys.stderr) + sys.exit(1) + + succ = main(sys.argv[1]) + sys.exit(0) if succ else sys.exit(1) diff --git a/tests/verify-pool-stratis.yml b/tests/verify-pool-stratis.yml index 7f380171..b473da6a 100644 --- a/tests/verify-pool-stratis.yml +++ b/tests/verify-pool-stratis.yml @@ -3,30 +3,33 @@ - name: Check Stratis options when: storage_test_pool.type == 'stratis' block: - - name: Run 'stratis report' - command: stratis report + - name: Get stratis pool information + ansible.builtin.script: >- + scripts/stratis_pool_info.py + "{{ storage_test_pool.name }}" + args: + executable: "{{ ansible_python.executable }}" register: storage_test_stratis_report changed_when: false + - name: Print script output + debug: + msg: "{{ storage_test_stratis_report.stdout }}" + - name: Get information about Stratis set_fact: _stratis_pool_info: "{{ storage_test_stratis_report.stdout | from_json }}" - name: Verify that the pools was created assert: - that: _stratis_pool_info.pools | length == 1 and - _stratis_pool_info.pools[0].name == storage_test_pool.name + that: _stratis_pool_info.name == storage_test_pool.name msg: >- Stratis pool '{{ storage_test_pool.name }}' not found when: storage_test_pool.state == 'present' - # Stratis internally uses LUKS so verify-pool-member-encryption will also - # cover this we just need to make sure this is encrypted Stratis pool - # and not Stratis on top of "normal LUKS - name: Verify that encryption is correctly set assert: - that: storage_test_pool.name in - _stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['key_description'] + that: storage_test_pool.name in _stratis_pool_info.key_desc msg: >- Stratis pool '{{ storage_test_pool.name }}' is not encrypted when: @@ -36,9 +39,8 @@ - name: Verify that Clevis/Tang encryption is correctly set assert: that: - _stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['clevis_pin'] == 'tang' and - _stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['clevis_config']['url'] == - storage_test_pool.encryption_tang_url + _stratis_pool_info.clevis_pin == 'tang' and + _stratis_pool_info.clevis_args['url'] == storage_test_pool.encryption_tang_url msg: >- Stratis pool '{{ storage_test_pool.name }}' Clevis is not correctly configured when: