diff --git a/tests/scripts/stratis_pool_info.py b/tests/scripts/stratis_pool_info.py new file mode 100644 index 00000000..4d5ff375 --- /dev/null +++ b/tests/scripts/stratis_pool_info.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +# Helper script for gathering information about stratis pools using stratis DBus API. + +# The script is meant to be a supporting tool for the storage role tests + +import sys +from collections import namedtuple + +import json + +import gi # pylint: disable=import-error +gi.require_version("GLib", "2.0") +gi.require_version("Gio", "2.0") + +from gi.repository import GLib, Gio # pylint: disable=import-error + +STRATIS_SERVICE = "org.storage.stratis3" +STRATIS_PATH = "/org/storage/stratis3" +STRATIS_POOL_INTF = STRATIS_SERVICE + ".pool.r0" + +# Code for working with DBus, taken from blivet/safe_dbus.py +DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties" +DBUS_INTRO_IFACE = "org.freedesktop.DBus.Introspectable" + + +class SafeDBusError(Exception): + """Class for exceptions defined in this module.""" + + +class DBusCallError(SafeDBusError): + """Class for the errors related to calling methods over DBus.""" + + +class DBusPropertyError(DBusCallError): + """Class for the errors related to getting property values over DBus.""" + + +def get_new_system_connection(): + """Return a new connection to the system bus.""" + + return Gio.DBusConnection.new_for_address_sync( + Gio.dbus_address_get_for_bus_sync(Gio.BusType.SYSTEM, None), + Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT + | Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION, + None, None) + + +def call_sync(service, obj_path, iface, method, args, connection=None, fds=None): + if not connection: + try: + connection = get_new_system_connection() + except GLib.GError as gerr: + raise DBusCallError("Unable to connect to system bus: %s" % gerr) from gerr + + if connection.is_closed(): + raise DBusCallError("Connection is closed") + + try: + ret = connection.call_with_unix_fd_list_sync(service, obj_path, iface, method, + args, None, Gio.DBusCallFlags.NONE, + -1, fds, None) + except GLib.GError as gerr: + msg = "Failed to call %s method on %s with %s arguments: %s" % \ + (method, obj_path, args, gerr.message) # pylint: disable=no-member + raise DBusCallError(msg) from gerr + + if ret is None: + msg = "No return from %s method on %s with %s arguments" % (method, obj_path, + args) + raise DBusCallError(msg) + + return ret[0].unpack() + + +def get_properties_sync(service, obj_path, iface, connection=None): + args = GLib.Variant('(s)', (iface,)) + ret = call_sync(service, obj_path, DBUS_PROPS_IFACE, "GetAll", args, + connection) + return ret + + +# Extracting and printing Stratis pool information +StratisPoolInfo = namedtuple("StratisPoolInfo", ["name", "encrypted", "key_desc", + "clevis_pin", "clevis_args"]) + + +def _print_pool_info_json(pool_info): + pi_dict = pool_info._asdict() + pi_json = json.dumps(pi_dict) + print(pi_json) + + +def _get_pool_info(pool_path): + try: + properties = get_properties_sync(STRATIS_SERVICE, + pool_path, + STRATIS_POOL_INTF)[0] + except DBusPropertyError: + return None + + if not properties: + return None + + description = properties.get("KeyDescription", None) + if not description or not description[0] or not description[1][0]: + key_desc = None + else: + key_desc = description[1][1] + + clevis_info = properties.get("ClevisInfo", None) + if not clevis_info or not clevis_info[0] or not clevis_info[1][0]: + clevis = None + else: + clevis = clevis_info[1][1] + + if clevis: + clevis_pin = clevis[0] + clevis_args = json.loads(clevis[1]) + else: + clevis_pin = None + clevis_args = {} + + return StratisPoolInfo(name=properties["Name"], + encrypted=properties["Encrypted"], + key_desc=key_desc, + clevis_pin=clevis_pin, + clevis_args=clevis_args) + + +def main(pool_name): + objects = call_sync(STRATIS_SERVICE, + STRATIS_PATH, + "org.freedesktop.DBus.ObjectManager", + "GetManagedObjects", + None)[0] + + for path, interfaces in objects.items(): + if STRATIS_POOL_INTF in interfaces.keys(): + pool_info = _get_pool_info(path) + if pool_info and pool_info.name == pool_name: + _print_pool_info_json(pool_info) + return True + + print(json.dumps(None)) + return True + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python %s " % sys.argv[0], file=sys.stderr) + sys.exit(1) + + succ = main(sys.argv[1]) + sys.exit(0) if succ else sys.exit(1) diff --git a/tests/verify-pool-stratis.yml b/tests/verify-pool-stratis.yml index 7f380171..b473da6a 100644 --- a/tests/verify-pool-stratis.yml +++ b/tests/verify-pool-stratis.yml @@ -3,30 +3,33 @@ - name: Check Stratis options when: storage_test_pool.type == 'stratis' block: - - name: Run 'stratis report' - command: stratis report + - name: Get stratis pool information + ansible.builtin.script: >- + scripts/stratis_pool_info.py + "{{ storage_test_pool.name }}" + args: + executable: "{{ ansible_python.executable }}" register: storage_test_stratis_report changed_when: false + - name: Print script output + debug: + msg: "{{ storage_test_stratis_report.stdout }}" + - name: Get information about Stratis set_fact: _stratis_pool_info: "{{ storage_test_stratis_report.stdout | from_json }}" - name: Verify that the pools was created assert: - that: _stratis_pool_info.pools | length == 1 and - _stratis_pool_info.pools[0].name == storage_test_pool.name + that: _stratis_pool_info.name == storage_test_pool.name msg: >- Stratis pool '{{ storage_test_pool.name }}' not found when: storage_test_pool.state == 'present' - # Stratis internally uses LUKS so verify-pool-member-encryption will also - # cover this we just need to make sure this is encrypted Stratis pool - # and not Stratis on top of "normal LUKS - name: Verify that encryption is correctly set assert: - that: storage_test_pool.name in - _stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['key_description'] + that: storage_test_pool.name in _stratis_pool_info.key_desc msg: >- Stratis pool '{{ storage_test_pool.name }}' is not encrypted when: @@ -36,9 +39,8 @@ - name: Verify that Clevis/Tang encryption is correctly set assert: that: - _stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['clevis_pin'] == 'tang' and - _stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['clevis_config']['url'] == - storage_test_pool.encryption_tang_url + _stratis_pool_info.clevis_pin == 'tang' and + _stratis_pool_info.clevis_args['url'] == storage_test_pool.encryption_tang_url msg: >- Stratis pool '{{ storage_test_pool.name }}' Clevis is not correctly configured when: