Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions tests/scripts/stratis_pool_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/env python

# Helper script for gathering information about stratis pools using stratis DBus API.

# The script is meant to be a supporting tool for the storage role tests

import sys
from collections import namedtuple

import json

import gi # pylint: disable=import-error
gi.require_version("GLib", "2.0")
gi.require_version("Gio", "2.0")

from gi.repository import GLib, Gio # pylint: disable=import-error

STRATIS_SERVICE = "org.storage.stratis3"
STRATIS_PATH = "/org/storage/stratis3"
STRATIS_POOL_INTF = STRATIS_SERVICE + ".pool.r0"

# Code for working with DBus, taken from blivet/safe_dbus.py
DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties"
DBUS_INTRO_IFACE = "org.freedesktop.DBus.Introspectable"


class SafeDBusError(Exception):
"""Class for exceptions defined in this module."""


class DBusCallError(SafeDBusError):
"""Class for the errors related to calling methods over DBus."""


class DBusPropertyError(DBusCallError):
"""Class for the errors related to getting property values over DBus."""


def get_new_system_connection():
"""Return a new connection to the system bus."""

return Gio.DBusConnection.new_for_address_sync(
Gio.dbus_address_get_for_bus_sync(Gio.BusType.SYSTEM, None),
Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT
| Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION,
None, None)


def call_sync(service, obj_path, iface, method, args, connection=None, fds=None):
if not connection:
try:
connection = get_new_system_connection()
except GLib.GError as gerr:
raise DBusCallError("Unable to connect to system bus: %s" % gerr) from gerr

if connection.is_closed():
raise DBusCallError("Connection is closed")

try:
ret = connection.call_with_unix_fd_list_sync(service, obj_path, iface, method,
args, None, Gio.DBusCallFlags.NONE,
-1, fds, None)
except GLib.GError as gerr:
msg = "Failed to call %s method on %s with %s arguments: %s" % \
(method, obj_path, args, gerr.message) # pylint: disable=no-member
raise DBusCallError(msg) from gerr

if ret is None:
msg = "No return from %s method on %s with %s arguments" % (method, obj_path,
args)
raise DBusCallError(msg)

return ret[0].unpack()


def get_properties_sync(service, obj_path, iface, connection=None):
args = GLib.Variant('(s)', (iface,))
ret = call_sync(service, obj_path, DBUS_PROPS_IFACE, "GetAll", args,
connection)
return ret


# Extracting and printing Stratis pool information
StratisPoolInfo = namedtuple("StratisPoolInfo", ["name", "encrypted", "key_desc",
"clevis_pin", "clevis_args"])


def _print_pool_info_json(pool_info):
pi_dict = pool_info._asdict()
pi_json = json.dumps(pi_dict)
print(pi_json)


def _get_pool_info(pool_path):
try:
properties = get_properties_sync(STRATIS_SERVICE,
pool_path,
STRATIS_POOL_INTF)[0]
except DBusPropertyError:
return None

if not properties:
return None

description = properties.get("KeyDescription", None)
if not description or not description[0] or not description[1][0]:
key_desc = None
else:
key_desc = description[1][1]

clevis_info = properties.get("ClevisInfo", None)
if not clevis_info or not clevis_info[0] or not clevis_info[1][0]:
clevis = None
else:
clevis = clevis_info[1][1]

if clevis:
clevis_pin = clevis[0]
clevis_args = json.loads(clevis[1])
else:
clevis_pin = None
clevis_args = {}

return StratisPoolInfo(name=properties["Name"],
encrypted=properties["Encrypted"],
key_desc=key_desc,
clevis_pin=clevis_pin,
clevis_args=clevis_args)


def main(pool_name):
objects = call_sync(STRATIS_SERVICE,
STRATIS_PATH,
"org.freedesktop.DBus.ObjectManager",
"GetManagedObjects",
None)[0]

for path, interfaces in objects.items():
if STRATIS_POOL_INTF in interfaces.keys():
pool_info = _get_pool_info(path)
if pool_info and pool_info.name == pool_name:
_print_pool_info_json(pool_info)
return True

print(json.dumps(None))
return True


if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python %s <pool name>" % sys.argv[0], file=sys.stderr)
sys.exit(1)

succ = main(sys.argv[1])
sys.exit(0) if succ else sys.exit(1)
26 changes: 14 additions & 12 deletions tests/verify-pool-stratis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,33 @@
- name: Check Stratis options
when: storage_test_pool.type == 'stratis'
block:
- name: Run 'stratis report'
command: stratis report
- name: Get stratis pool information
ansible.builtin.script: >-
scripts/stratis_pool_info.py
"{{ storage_test_pool.name }}"
args:
executable: "{{ ansible_python.executable }}"
register: storage_test_stratis_report
changed_when: false

- name: Print script output
debug:
msg: "{{ storage_test_stratis_report.stdout }}"

- name: Get information about Stratis
set_fact:
_stratis_pool_info: "{{ storage_test_stratis_report.stdout | from_json }}"

- name: Verify that the pools was created
assert:
that: _stratis_pool_info.pools | length == 1 and
_stratis_pool_info.pools[0].name == storage_test_pool.name
that: _stratis_pool_info.name == storage_test_pool.name
msg: >-
Stratis pool '{{ storage_test_pool.name }}' not found
when: storage_test_pool.state == 'present'

# Stratis internally uses LUKS so verify-pool-member-encryption will also
# cover this we just need to make sure this is encrypted Stratis pool
# and not Stratis on top of "normal LUKS
- name: Verify that encryption is correctly set
assert:
that: storage_test_pool.name in
_stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['key_description']
that: storage_test_pool.name in _stratis_pool_info.key_desc
msg: >-
Stratis pool '{{ storage_test_pool.name }}' is not encrypted
when:
Expand All @@ -36,9 +39,8 @@
- name: Verify that Clevis/Tang encryption is correctly set
assert:
that:
_stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['clevis_pin'] == 'tang' and
_stratis_pool_info.pools[0]['blockdevs']['datadevs'][0]['clevis_config']['url'] ==
storage_test_pool.encryption_tang_url
_stratis_pool_info.clevis_pin == 'tang' and
_stratis_pool_info.clevis_args['url'] == storage_test_pool.encryption_tang_url
msg: >-
Stratis pool '{{ storage_test_pool.name }}' Clevis is not correctly configured
when:
Expand Down
Loading