From 2eb212248ccdcf9019e3595637c98f72234c542a Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Fri, 17 Oct 2025 14:24:01 -0500 Subject: [PATCH 01/10] feat: enhance enforcement command with dual operational modes --- .../management/commands/enforcement.py | 215 ++++++++++++------ 1 file changed, 149 insertions(+), 66 deletions(-) diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index 55dd4717..ae175032 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -1,51 +1,74 @@ """ Django management command for interactive Casbin enforcement testing. -This command creates a Casbin enforcer using the model.conf configuration and a -user-specified policy file, then provides an interactive mode for testing -authorization enforcement requests. +This command provides an interactive mode for testing authorization enforcement +requests with two operational modes: + +1. **Database mode (default)**: Uses AuthzEnforcer with policies from the database + +2. **File mode**: Uses a custom Casbin enforcer with policies from files + - Activated when --policy-file-path and --model-file-path are provided + - Reads policies directly from the specified CSV file The command supports: -- Loading Casbin model from the built-in model.conf file or a custom file (specified via --model-file-path argument) -- Using custom policy files (specified via --policy-file-path argument) - Interactive testing with format: subject action scope - Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED) - Display of loaded policies, role assignments, and action grouping rules Example usage: - python manage.py enforcement --policy-file-path /path/to/authz.policy + # Use policies from database with default model + python manage.py lms enforcement - python manage.py enforcement --policy-file-path /path/to/authz.policy --model-file-path /path/to/model.conf + # Use custom model and policy files + python manage.py lms enforcement -m /path/to/model.conf -p /path/to/policies.csv Example test input: - user^alice act^read org^OpenedX + >>> alice view_library_team lib:OpenedX:CSPROB + ✓ ALLOWED: alice view_library_team lib:OpenedX:CSPROB + >>> bob manage_library_team lib:DemoX:LIB1 + ✗ DENIED: bob manage_library_team lib:DemoX:LIB1 """ import argparse import os -import casbin +from casbin import Enforcer +from casbin.util.log import disabled_logging from django.core.management.base import BaseCommand, CommandError from openedx_authz import ROOT_DIRECTORY +from openedx_authz.api.users import is_user_allowed +from openedx_authz.engine.enforcer import AuthzEnforcer class Command(BaseCommand): """ Django management command for interactive Casbin enforcement testing. - This command loads a Casbin model configuration and user-specified policy file - to create an enforcer instance, then provides an interactive shell for testing - authorization requests in real-time with immediate feedback. + This command provides two operational modes for testing authorization: + + 1. Database mode (default): Uses AuthzEnforcer with policies from the database. + This is the default behavior when no arguments are provided. + + 2. File mode: Uses a custom Casbin enforcer with policies from files. + Activated when --policy-file-path and/or --model-file-path are provided. + + The command provides an interactive shell for testing authorization requests + in real-time with immediate feedback. """ help = ( - "Interactive mode for testing Casbin enforcement policies using a custom model file and" - "a custom policy file. Provides real-time authorization testing with format: subject action scope. " - "Use --policy-file-path to specify the policy file location. " - "Use --model-file-path to specify the model file location. " + "Interactive mode for testing Casbin enforcement policies. By default, uses " + "AuthzEnforcer with policies from the database. Use --policy-file-path and " + "--model-file-path to test with custom files instead. " + "Format: subject action scope." ) + def __init__(self, *args, **kwargs): + """Initialize the command with required attributes.""" + super().__init__(*args, **kwargs) + self._custom_enforcer = None + def add_arguments(self, parser: argparse.ArgumentParser) -> None: """Add command-line arguments to the argument parser. @@ -53,83 +76,141 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: parser (argparse.ArgumentParser): The Django argument parser instance to configure. """ parser.add_argument( + "-p", "--policy-file-path", type=str, - required=True, - help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)", + default=None, + help=( + "Path to the Casbin policy CSV file. When provided, switches to file mode using a " + "custom enforcer instead of the database. Supports CSV format with policies, roles, " + "and action grouping." + ), ) parser.add_argument( + "-m", "--model-file-path", type=str, - required=False, - help="Path to the Casbin model file. If not provided, the default model.conf file will be used.", + default=None, + help=( + "Path to the Casbin model configuration file. When provided, switches to file mode " + "using a custom enforcer instead of the database. If not specified in file mode, " + "uses the default model.conf." + ), ) + @staticmethod + def _get_file_path(file_name: str) -> str: + """Construct the full file path for a configuration file. + + Args: + file_name (str): The name of the configuration file (e.g., 'model.conf'). + + Returns: + str: The absolute path to the configuration file in the engine/config directory. + """ + return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name) + + def _display_loaded_policies(self, enforcer: Enforcer) -> None: + """Display statistics about loaded policies, roles, and action grouping. + + Args: + enforcer (Enforcer): The Casbin enforcer instance with loaded policies. + """ + policies = enforcer.get_policy() + roles = enforcer.get_grouping_policy() + action_grouping = enforcer.get_named_grouping_policy("g2") + + self.stdout.write(f"✓ Loaded {len(policies)} policies") + self.stdout.write(f"✓ Loaded {len(roles)} role assignments") + self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") + self.stdout.write("") + def handle(self, *args, **options): """Execute the enforcement testing command. - Loads the Casbin model and policy files, creates an enforcer instance, - displays configuration summary, and starts the interactive testing mode. + Determines the operational mode based on provided arguments and creates the + appropriate enforcer instance, then starts the interactive testing mode. + + Operational modes: + - Database mode: Uses AuthzEnforcer with policies from database (default) + - File mode: Uses custom Enforcer with policies from files (when files provided) Args: *args: Positional command arguments (unused). - **options: Command options including `policy_file_path` and `model_file_path`. - - Raises: - CommandError: If model or policy files are not found or enforcer creation fails. + **options: Command options including ``--policy-file-path`` and ``--model-file-path``. """ - model_file_path = self._get_file_path("model.conf") or options["model_file_path"] policy_file_path = options["policy_file_path"] + model_file_path = options["model_file_path"] - if not os.path.isfile(model_file_path): - raise CommandError(f"Model file not found: {model_file_path}") - if not os.path.isfile(policy_file_path): - raise CommandError(f"Policy file not found: {policy_file_path}") + use_file_mode = policy_file_path is not None and model_file_path is not None - self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement")) - self.stdout.write(f"Model file path: {model_file_path}") - self.stdout.write(f"Policy file path: {policy_file_path}") - self.stdout.write("") + if use_file_mode: + self._handle_file_mode(policy_file_path, model_file_path) + else: + self._handle_database_mode() - try: - enforcer = casbin.Enforcer(model_file_path, policy_file_path) - self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully")) + def _handle_database_mode(self) -> None: + """Handle enforcement testing using AuthzEnforcer with database policies. - policies = enforcer.get_policy() - roles = enforcer.get_grouping_policy() - action_grouping = enforcer.get_named_grouping_policy("g2") + Uses the AuthzEnforcer singleton with policies loaded from the database. + This is the default mode when no custom files are provided. - self.stdout.write(f"✓ Loaded {len(policies)} policies") - self.stdout.write(f"✓ Loaded {len(roles)} role assignments") - self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") - self.stdout.write("") + Raises: + CommandError: If enforcer creation or policy loading fails. + """ + try: + enforcer = AuthzEnforcer.get_enforcer() + enforcer.load_policy() + disabled_logging() - self._run_interactive_mode(enforcer) + self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement (Database Mode)")) + self.stdout.write("Using AuthzEnforcer with policies from database") + self.stdout.write("") + self._display_loaded_policies(enforcer) + self._run_interactive_mode() except Exception as e: raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e - def _get_file_path(self, file_name: str) -> str: - """Construct the full file path for a configuration file. + def _handle_file_mode(self, policy_file_path: str, model_file_path: str) -> None: + """Handle enforcement testing using custom Enforcer with file-based policies. + + Creates a custom Casbin Enforcer instance using the specified model and policy files. + This mode is useful for testing policies before loading them into the database. Args: - file_name (str): The name of the configuration file (e.g., 'model.conf'). + policy_file_path (str): Path to the policy CSV file. + model_file_path (str): Path to the model configuration file. - Returns: - str: The absolute path to the configuration file in the engine/config directory. + Raises: + CommandError: If required files are not found or enforcer creation fails. """ - return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name) + if not os.path.isfile(model_file_path): + raise CommandError(f"Model file not found: {model_file_path}") + if not os.path.isfile(policy_file_path): + raise CommandError(f"Policy file not found: {policy_file_path}") + + try: + enforcer = Enforcer(model_file_path, policy_file_path) + + self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement (File Mode)")) + self.stdout.write(f"Model file: {model_file_path}") + self.stdout.write(f"Policy file: {policy_file_path}") + self.stdout.write("") + + self._custom_enforcer = enforcer + self._display_loaded_policies(enforcer) + self._run_interactive_mode() + except Exception as e: + raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e - def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: + def _run_interactive_mode(self) -> None: """Start the interactive enforcement testing shell. Provides a continuous loop where users can input enforcement requests in the format 'subject action scope' and receive immediate authorization results with visual feedback. - Args: - enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing. - Note: Exit the interactive mode with Ctrl+C or Ctrl+D. """ @@ -138,7 +219,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.") self.stdout.write("") self.stdout.write("Format: subject action scope") - self.stdout.write("Example: user^alice act^read org^OpenedX") + self.stdout.write("Example: alice view_library_team lib:OpenedX:CSPROB") self.stdout.write("") while True: @@ -151,41 +232,43 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: if user_input.lower() in ["quit", "exit", "q"]: break - self._test_interactive_request(enforcer, user_input) + self._test_interactive_request(user_input) except (KeyboardInterrupt, EOFError): self.stdout.write(self.style.ERROR("Exiting interactive mode...")) break - def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None: + def _test_interactive_request(self, user_input: str) -> None: """Process and test a single enforcement request from user input. Parses the input string, validates the format, executes the enforcement check, and displays the result with appropriate styling. Args: - enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing. user_input (str): The user's input string in format 'subject action scope'. Expected format: - subject: The requesting entity (e.g., 'user^alice') - action: The requested action (e.g., 'act^read') - scope: The authorization context (e.g., 'org^OpenedX') + subject: The requesting entity (e.g., 'alice') + action: The requested action (e.g., 'view_library_team') + scope: The authorization context (e.g., 'lib:OpenedX:CSPROB') """ try: parts = [part.strip() for part in user_input.split()] if len(parts) != 3: self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}")) self.stdout.write("Format: subject action scope") - self.stdout.write("Example: user^alice act^read org^OpenedX") + self.stdout.write("Example: alice view_library_team lib:OpenedX:CSPROB") return subject, action, scope = parts - result = enforcer.enforce(subject, action, scope) + + if self._custom_enforcer is not None: + result = self._custom_enforcer.enforce(subject, action, scope) + else: + result = is_user_allowed(subject, action, scope) if result: self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) else: self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")) - except (ValueError, IndexError, TypeError) as e: self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}")) From de7557298f17a2ee6017ed9e13ea8d285991766e Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Fri, 17 Oct 2025 14:28:01 -0500 Subject: [PATCH 02/10] test: update enforcement command unit tests --- openedx_authz/tests/test_commands.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 6fa90679..606250f6 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -118,7 +118,7 @@ def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_en def test_run_interactive_mode_displays_help(self): """Test that the interactive mode runs.""" with patch("builtins.input", side_effect=["quit"]): - self.command._run_interactive_mode(self.enforcer) + self.command._run_interactive_mode() example_text = f"Example: {make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" self.assertIn("Interactive Mode", self.buffer.getvalue()) @@ -135,7 +135,7 @@ def test_run_interactive_mode_maintains_interactive_loop(self): input_values = ["", "", "", "quit"] with patch("builtins.input", side_effect=input_values) as mock_input: - self.command._run_interactive_mode(self.enforcer) + self.command._run_interactive_mode() self.assertEqual(mock_input.call_count, len(input_values)) @@ -148,7 +148,7 @@ def test_run_interactive_mode_processes_request(self, user_input: list[str]): """Test that the interactive mode processes the request.""" with patch("builtins.input", side_effect=user_input + ["quit"]) as mock_input: with patch.object(self.command, "_test_interactive_request") as mock_method: - self.command._run_interactive_mode(self.enforcer) + self.command._run_interactive_mode() self.assertEqual(mock_input.call_count, len(user_input) + 1) self.assertEqual(mock_method.call_count, len(user_input)) @@ -159,7 +159,7 @@ def test_run_interactive_mode_processes_request(self, user_input: list[str]): def test_quit_commands_case_insensitive(self, quit_command: str): """Test that all quit commands work regardless of case.""" with patch("builtins.input", side_effect=[quit_command]) as mock_input: - self.command._run_interactive_mode(self.enforcer) + self.command._run_interactive_mode() self.assertEqual(mock_input.call_count, 1) @@ -167,7 +167,7 @@ def test_quit_commands_case_insensitive(self, quit_command: str): def test_handles_exceptions(self, exception: Exception): """Test that interactive mode handles exceptions gracefully.""" with patch("builtins.input", side_effect=exception): - self.command._run_interactive_mode(self.enforcer) + self.command._run_interactive_mode() self.assertIn("Exiting interactive mode...", self.buffer.getvalue()) @@ -176,7 +176,7 @@ def test_interactive_request_allowed(self): self.enforcer.enforce.return_value = True user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - self.command._test_interactive_request(self.enforcer, user_input) + self.command._test_interactive_request(user_input) allowed_output = self.buffer.getvalue() self.assertIn(f"✓ ALLOWED: {user_input}", allowed_output) @@ -186,7 +186,7 @@ def test_interactive_request_denied(self): self.enforcer.enforce.return_value = False user_input = f"{make_user_key('alice')} {make_action_key('delete')} {make_scope_key('org', 'OpenedX')}" - self.command._test_interactive_request(self.enforcer, user_input) + self.command._test_interactive_request(user_input) denied_output = self.buffer.getvalue() self.assertIn(f"✗ DENIED: {user_input}", denied_output) @@ -195,7 +195,7 @@ def test_interactive_request_invalid_format(self): """Test that `_test_interactive_request` reports invalid input format.""" user_input = f"{make_user_key('alice')} {make_action_key('read')}" - self.command._test_interactive_request(self.enforcer, user_input) + self.command._test_interactive_request(user_input) invalid_output = self.buffer.getvalue() self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) @@ -208,7 +208,7 @@ def test_interactive_request_error(self, exception: Exception): self.enforcer.enforce.side_effect = exception user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - self.command._test_interactive_request(self.enforcer, user_input) + self.command._test_interactive_request(user_input) error_output = self.buffer.getvalue() self.assertIn(f"✗ Error processing request: {str(exception)}", error_output) From b2a8493c2c889000537e64c3680e539413cdf5e4 Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Mon, 20 Oct 2025 23:19:27 -0500 Subject: [PATCH 03/10] chore: remove unused method --- .../management/commands/enforcement.py | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index ae175032..a1b9b618 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -36,8 +36,7 @@ from casbin.util.log import disabled_logging from django.core.management.base import BaseCommand, CommandError -from openedx_authz import ROOT_DIRECTORY -from openedx_authz.api.users import is_user_allowed +from openedx_authz import api from openedx_authz.engine.enforcer import AuthzEnforcer @@ -98,33 +97,6 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: ), ) - @staticmethod - def _get_file_path(file_name: str) -> str: - """Construct the full file path for a configuration file. - - Args: - file_name (str): The name of the configuration file (e.g., 'model.conf'). - - Returns: - str: The absolute path to the configuration file in the engine/config directory. - """ - return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name) - - def _display_loaded_policies(self, enforcer: Enforcer) -> None: - """Display statistics about loaded policies, roles, and action grouping. - - Args: - enforcer (Enforcer): The Casbin enforcer instance with loaded policies. - """ - policies = enforcer.get_policy() - roles = enforcer.get_grouping_policy() - action_grouping = enforcer.get_named_grouping_policy("g2") - - self.stdout.write(f"✓ Loaded {len(policies)} policies") - self.stdout.write(f"✓ Loaded {len(roles)} role assignments") - self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") - self.stdout.write("") - def handle(self, *args, **options): """Execute the enforcement testing command. @@ -204,6 +176,21 @@ def _handle_file_mode(self, policy_file_path: str, model_file_path: str) -> None except Exception as e: raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e + def _display_loaded_policies(self, enforcer: Enforcer) -> None: + """Display statistics about loaded policies, roles, and action grouping. + + Args: + enforcer (Enforcer): The Casbin enforcer instance with loaded policies. + """ + policies = enforcer.get_policy() + roles = enforcer.get_grouping_policy() + action_grouping = enforcer.get_named_grouping_policy("g2") + + self.stdout.write(f"✓ Loaded {len(policies)} policies") + self.stdout.write(f"✓ Loaded {len(roles)} role assignments") + self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") + self.stdout.write("") + def _run_interactive_mode(self) -> None: """Start the interactive enforcement testing shell. @@ -264,7 +251,7 @@ def _test_interactive_request(self, user_input: str) -> None: if self._custom_enforcer is not None: result = self._custom_enforcer.enforce(subject, action, scope) else: - result = is_user_allowed(subject, action, scope) + result = api.is_user_allowed(subject, action, scope) if result: self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) From d7465f8932859481c1cac0c36cc6798d882cd78b Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Mon, 20 Oct 2025 23:19:49 -0500 Subject: [PATCH 04/10] test: update enforcement command tests according latest changes --- openedx_authz/tests/test_commands.py | 330 +++++++++++++++------------ 1 file changed, 190 insertions(+), 140 deletions(-) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 606250f6..66d0c37a 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -3,7 +3,7 @@ """ import io -from tempfile import TemporaryFile +from tempfile import NamedTemporaryFile from unittest import TestCase from unittest.mock import Mock, patch @@ -12,12 +12,12 @@ from django.core.management.base import CommandError from openedx_authz import ROOT_DIRECTORY -from openedx_authz.management.commands.enforcement import Command as EnforcementCommand +from openedx_authz import api as authz_api +from openedx_authz.engine.enforcer import AuthzEnforcer from openedx_authz.management.commands.load_policies import Command as LoadPoliciesCommand -from openedx_authz.tests.test_utils import make_action_key, make_scope_key, make_user_key +from openedx_authz.tests.test_utils import make_scope_key -# pylint: disable=protected-access @ddt class EnforcementCommandTests(TestCase): """ @@ -34,184 +34,234 @@ class EnforcementCommandTests(TestCase): def setUp(self): super().setUp() self.buffer = io.StringIO() - self.policy_file_path = TemporaryFile() - self.command = EnforcementCommand() - self.command.stdout = self.buffer + self.command_name = "enforcement" + + self.policy_file_path = NamedTemporaryFile(suffix=".policy") + self.model_file_path = NamedTemporaryFile(suffix=".conf") + + self.policies = [["alice", "read", "resource1"]] + self.roles = [["alice", "admin"]] + self.action_grouping = [["read", "view"]] + self.enforcer = Mock() + self.enforcer.get_policy.return_value = self.policies + self.enforcer.get_grouping_policy.return_value = self.roles + self.enforcer.get_named_grouping_policy.return_value = self.action_grouping - def test_requires_policy_file_argument(self): - """Test that calling the command without --policy-file-path should error from argparse.""" - with self.assertRaises(CommandError) as ctx: - call_command("enforcement") + @patch.object(AuthzEnforcer, "get_enforcer") + @patch("openedx_authz.management.commands.enforcement.disabled_logging") + def test_handle_database_mode_default(self, mock_logging: Mock, mock_get_enforcer: Mock): + """Test database mode is used when no file paths are provided.""" + mock_get_enforcer.return_value = self.enforcer - self.assertEqual( - "Error: the following arguments are required: --policy-file-path", - str(ctx.exception), - ) + with patch("builtins.input", side_effect=["quit"]): + call_command(self.command_name, stdout=self.buffer) + + output = self.buffer.getvalue() + self.assertIn("Database Mode", output) + self.assertIn("AuthzEnforcer", output) + self.enforcer.load_policy.assert_called_once() + mock_logging.assert_called_once() + + @patch("openedx_authz.management.commands.enforcement.Enforcer") + def test_handle_file_mode(self, mock_enforcer_class: Mock): + """Test file mode is used when both file paths are provided.""" + mock_enforcer_class.return_value = self.enforcer + + with patch("builtins.input", side_effect=["quit"]): + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=self.model_file_path.name, + stdout=self.buffer, + ) + + output = self.buffer.getvalue() + self.assertIn("File Mode", output) + self.assertIn(self.policy_file_path.name, output) + self.assertIn(self.model_file_path.name, output) + mock_enforcer_class.assert_called_once_with(self.model_file_path.name, self.policy_file_path.name) def test_policy_file_not_found_raises(self): """Test that command errors when the provided policy file does not exist.""" - non_existent = "invalid/path/does-not-exist.policy" + non_existent_policy = "invalid/path/authz.policy" with self.assertRaises(CommandError) as ctx: - call_command("enforcement", policy_file_path=non_existent) + call_command( + self.command_name, + policy_file_path=non_existent_policy, + model_file_path=self.model_file_path.name, + ) - self.assertEqual(f"Policy file not found: {non_existent}", str(ctx.exception)) + self.assertEqual(f"Policy file not found: {non_existent_policy}", str(ctx.exception)) - @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") - def test_model_file_not_found_raises(self, mock_get_file_path: Mock): + def test_model_file_not_found_raises(self): """Test that command errors when the provided model file does not exist.""" + non_existent_model = "invalid/path/model.conf" + with self.assertRaises(CommandError) as ctx: - call_command("enforcement", policy_file_path=self.policy_file_path.name) + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=non_existent_model, + ) - self.assertEqual( - f"Model file not found: {mock_get_file_path.return_value}", - str(ctx.exception), - ) + self.assertEqual(f"Model file not found: {non_existent_model}", str(ctx.exception)) - @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") - def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): - """Test that command errors when the enforcer creation fails.""" - mock_enforcer_cls.side_effect = Exception("Enforcer creation error") + @patch.object(AuthzEnforcer, "get_enforcer") + def test_display_loaded_policies(self, mock_get_enforcer: Mock): + """Test that policy statistics are displayed correctly.""" + mock_get_enforcer.return_value = self.enforcer - with self.assertRaises(CommandError) as ctx: - call_command("enforcement", policy_file_path=self.policy_file_path.name) + with patch("builtins.input", side_effect=["quit"]): + call_command(self.command_name, stdout=self.buffer) - self.assertEqual( - "Error creating Casbin enforcer: Enforcer creation error", - str(ctx.exception), - ) + output = self.buffer.getvalue() + self.assertIn(f"✓ Loaded {len(self.policies)} policies", output) + self.assertIn(f"✓ Loaded {len(self.roles)} role assignments", output) + self.assertIn(f"✓ Loaded {len(self.action_grouping)} action grouping rules", output) - @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") - @patch.object(EnforcementCommand, "_run_interactive_mode") - def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): - """ - Test successful command execution with policy file and interactive mode. - When files exist, command should create enforcer, print counts, and call interactive loop. - """ - mock_enforcer = Mock() - policies = [["p", "role:platform_admin", "act:manage", "*", "allow"]] - roles = [["g", "user:user-1", "role:platform_admin", "*"]] - action_grouping = [ - ["g2", "act:edit", "act:read"], - ["g2", "act:edit", "act:write"], - ] - mock_enforcer.get_policy.return_value = policies - mock_enforcer.get_grouping_policy.return_value = roles - mock_enforcer.get_named_grouping_policy.return_value = action_grouping - mock_enforcer_cls.return_value = mock_enforcer - - call_command( - "enforcement", - policy_file_path=self.policy_file_path.name, - stdout=self.buffer, - ) + @patch.object(AuthzEnforcer, "get_enforcer") + @patch.object(authz_api, "is_user_allowed") + def test_interactive_mode_allowed_request(self, mock_is_allowed: Mock, mock_get_enforcer: Mock): + """Test interactive mode with an allowed enforcement request.""" + mock_get_enforcer.return_value = self.enforcer + mock_is_allowed.return_value = True + + with patch("builtins.input", side_effect=["alice read lib:Org1:LIB1", "quit"]): + call_command(self.command_name, stdout=self.buffer) output = self.buffer.getvalue() - self.assertIn("Casbin Interactive Enforcement", output) - self.assertIn("Casbin enforcer created successfully", output) - self.assertIn(f"✓ Loaded {len(policies)} policies", output) - self.assertIn(f"✓ Loaded {len(roles)} role assignments", output) - self.assertIn(f"✓ Loaded {len(action_grouping)} action grouping rules", output) - mock_run_interactive.assert_called_once_with(mock_enforcer) - - def test_run_interactive_mode_displays_help(self): - """Test that the interactive mode runs.""" - with patch("builtins.input", side_effect=["quit"]): - self.command._run_interactive_mode() - - example_text = f"Example: {make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - self.assertIn("Interactive Mode", self.buffer.getvalue()) - self.assertIn("Test custom enforcement requests interactively.", self.buffer.getvalue()) - self.assertIn( - "Enter 'quit', 'exit', or 'q' to exit the interactive mode.", - self.buffer.getvalue(), - ) - self.assertIn("Format: subject action scope", self.buffer.getvalue()) - self.assertIn(example_text, self.buffer.getvalue()) + self.assertIn("✓ ALLOWED: alice read lib:Org1:LIB1", output) + mock_is_allowed.assert_called_once_with("alice", "read", "lib:Org1:LIB1") + + @patch.object(AuthzEnforcer, "get_enforcer") + @patch.object(authz_api, "is_user_allowed") + def test_interactive_mode_denied_request(self, mock_is_allowed: Mock, mock_get_enforcer: Mock): + """Test interactive mode with a denied enforcement request.""" + mock_get_enforcer.return_value = self.enforcer + mock_is_allowed.return_value = False - def test_run_interactive_mode_maintains_interactive_loop(self): - """Test that the interactive mode maintains the interactive loop.""" - input_values = ["", "", "", "quit"] + with patch("builtins.input", side_effect=["bob delete lib:Org2:LIB2", "quit"]): + call_command(self.command_name, stdout=self.buffer) - with patch("builtins.input", side_effect=input_values) as mock_input: - self.command._run_interactive_mode() + output = self.buffer.getvalue() + self.assertIn("✗ DENIED: bob delete lib:Org2:LIB2", output) + mock_is_allowed.assert_called_once_with("bob", "delete", "lib:Org2:LIB2") + + @patch("openedx_authz.management.commands.enforcement.Enforcer") + def test_interactive_mode_file_mode_enforcement(self, mock_enforcer_class: Mock): + """Test that file mode uses custom enforcer for enforcement checks.""" + mock_enforcer_class.return_value = self.enforcer + + with patch("builtins.input", side_effect=["alice read lib:Org1:LIB1", "quit"]): + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=self.model_file_path.name, + stdout=self.buffer, + ) - self.assertEqual(mock_input.call_count, len(input_values)) + output = self.buffer.getvalue() + self.assertIn("✓ ALLOWED: alice read lib:Org1:LIB1", output) + self.enforcer.enforce.assert_called_once_with("alice", "read", "lib:Org1:LIB1") @data( - [f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"], - [f"{make_user_key('bob')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 5, - [f"{make_user_key('john')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 10, + "alice", + "alice read", + "alice read lib:Org1:LIB1 lib:Org1:LIB1", + "alice read lib:Org1:LIB1 lib:Org1:LIB1 lib:Org1:LIB1", ) - def test_run_interactive_mode_processes_request(self, user_input: list[str]): - """Test that the interactive mode processes the request.""" - with patch("builtins.input", side_effect=user_input + ["quit"]) as mock_input: - with patch.object(self.command, "_test_interactive_request") as mock_method: - self.command._run_interactive_mode() + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_invalid_format(self, user_input: str, mock_get_enforcer: Mock): + """Test interactive mode handles invalid input format.""" + mock_get_enforcer.return_value = self.enforcer - self.assertEqual(mock_input.call_count, len(user_input) + 1) - self.assertEqual(mock_method.call_count, len(user_input)) - for value in user_input: - mock_method.assert_any_call(self.enforcer, value) + with patch("builtins.input", side_effect=[user_input, "quit"]): + call_command(self.command_name, stdout=self.buffer) - @data("quit", "exit", "q", "QUIT", "EXIT", "Q") - def test_quit_commands_case_insensitive(self, quit_command: str): - """Test that all quit commands work regardless of case.""" - with patch("builtins.input", side_effect=[quit_command]) as mock_input: - self.command._run_interactive_mode() + output = self.buffer.getvalue() + self.assertIn("✗ Invalid format", output) + self.assertIn(f"Expected 3 parts, got {len(user_input.split())}", output) - self.assertEqual(mock_input.call_count, 1) + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_empty_input(self, mock_get_enforcer: Mock): + """Test interactive mode handles empty input gracefully.""" + mock_get_enforcer.return_value = self.enforcer - @data(KeyboardInterrupt(), EOFError()) - def test_handles_exceptions(self, exception: Exception): - """Test that interactive mode handles exceptions gracefully.""" - with patch("builtins.input", side_effect=exception): - self.command._run_interactive_mode() + with patch("builtins.input", side_effect=["", " ", "quit"]): + call_command(self.command_name, stdout=self.buffer) - self.assertIn("Exiting interactive mode...", self.buffer.getvalue()) + output = self.buffer.getvalue() + self.assertIn("Interactive Mode", output) - def test_interactive_request_allowed(self): - """Test that `_test_interactive_request` prints allowed output format.""" - self.enforcer.enforce.return_value = True - user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + @data("quit", "exit", "q", "QUIT", "EXIT", "Q") + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_exit_commands(self, exit_cmd: str, mock_get_enforcer: Mock): + """Test that various exit commands work correctly.""" + mock_get_enforcer.return_value = self.enforcer - self.command._test_interactive_request(user_input) + with patch("builtins.input", side_effect=[exit_cmd]): + call_command(self.command_name, stdout=self.buffer) - allowed_output = self.buffer.getvalue() - self.assertIn(f"✓ ALLOWED: {user_input}", allowed_output) + output = self.buffer.getvalue() + self.assertIn("Interactive Mode", output) - def test_interactive_request_denied(self): - """Test that `_test_interactive_request` prints denied output format.""" - self.enforcer.enforce.return_value = False - user_input = f"{make_user_key('alice')} {make_action_key('delete')} {make_scope_key('org', 'OpenedX')}" + @data(KeyboardInterrupt(), EOFError()) + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_keyboard_interrupt(self, exception: Exception, mock_get_enforcer: Mock): + """Test interactive mode handles KeyboardInterrupt gracefully.""" + mock_get_enforcer.return_value = self.enforcer - self.command._test_interactive_request(user_input) + with patch("builtins.input", side_effect=exception): + call_command(self.command_name, stdout=self.buffer) - denied_output = self.buffer.getvalue() - self.assertIn(f"✗ DENIED: {user_input}", denied_output) + output = self.buffer.getvalue() + self.assertIn("Exiting interactive mode...", output) - def test_interactive_request_invalid_format(self): - """Test that `_test_interactive_request` reports invalid input format.""" - user_input = f"{make_user_key('alice')} {make_action_key('read')}" + @patch.object(AuthzEnforcer, "get_enforcer") + def test_database_mode_enforcer_creation_error(self, mock_get_enforcer: Mock): + """Test CommandError is raised when enforcer creation fails in database mode.""" + error_message = "Database connection failed" + mock_get_enforcer.side_effect = Exception(error_message) - self.command._test_interactive_request(user_input) + with self.assertRaises(CommandError) as ctx: + call_command(self.command_name) - invalid_output = self.buffer.getvalue() - self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) - self.assertIn("Format: subject action scope", invalid_output) - self.assertIn(f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output) + self.assertEqual(f"Error creating Casbin enforcer: {error_message}", str(ctx.exception)) + mock_get_enforcer.assert_called_once() - @data(ValueError(), IndexError(), TypeError()) - def test_interactive_request_error(self, exception: Exception): - """Test that `_test_interactive_request` handles processing errors.""" - self.enforcer.enforce.side_effect = exception - user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + @patch("openedx_authz.management.commands.enforcement.Enforcer") + def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): + """Test CommandError is raised when the enforcer creation fails in file mode.""" + error_message = "Enforcer creation error" + mock_enforcer_cls.side_effect = Exception(error_message) - self.command._test_interactive_request(user_input) + with self.assertRaises(CommandError) as ctx: + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=self.model_file_path.name, + ) + + self.assertEqual(f"Error creating Casbin enforcer: {error_message}", str(ctx.exception)) + mock_enforcer_cls.assert_called_once_with(self.model_file_path.name, self.policy_file_path.name) + + @data(ValueError("Value error"), TypeError("Type error"), IndexError("Index error")) + @patch.object(AuthzEnforcer, "get_enforcer") + @patch.object(authz_api, "is_user_allowed") + def test_interactive_request_error(self, exception: Exception, mock_is_allowed: Mock, mock_get_enforcer: Mock): + """Test interactive mode handles enforcement errors gracefully.""" + mock_get_enforcer.return_value = self.enforcer + mock_is_allowed.side_effect = exception + + with patch("builtins.input", side_effect=["alice read lib:Org1:LIB1", "quit"]): + call_command(self.command_name, stdout=self.buffer) - error_output = self.buffer.getvalue() - self.assertIn(f"✗ Error processing request: {str(exception)}", error_output) + invalid_output = self.buffer.getvalue() + self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) + self.assertIn("Format: subject action scope", invalid_output) + self.assertIn(f"Example: alice read lib:Org1:LIB1 {make_scope_key('org', 'OpenedX')}", invalid_output) class LoadPoliciesCommandTests(TestCase): From 354b0cbec64914d2d35534c8dd1e2be9e394cb5f Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Tue, 21 Oct 2025 09:38:51 -0500 Subject: [PATCH 05/10] fix: update command to use structured data types for subject, action, and scope --- openedx_authz/management/commands/enforcement.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index a1b9b618..0471c47a 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -37,6 +37,7 @@ from django.core.management.base import BaseCommand, CommandError from openedx_authz import api +from openedx_authz.api.data import ActionData, ScopeData, UserData from openedx_authz.engine.enforcer import AuthzEnforcer @@ -249,7 +250,12 @@ def _test_interactive_request(self, user_input: str) -> None: subject, action, scope = parts if self._custom_enforcer is not None: - result = self._custom_enforcer.enforce(subject, action, scope) + subject = UserData(external_key=subject) + action = ActionData(external_key=action) + scope = ScopeData(external_key=scope) + result = self._custom_enforcer.enforce( + subject.namespaced_key, action.namespaced_key, scope.namespaced_key + ) else: result = api.is_user_allowed(subject, action, scope) From e920ab79863611806e80d7485a8899e3291cf5e4 Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Tue, 21 Oct 2025 10:25:35 -0500 Subject: [PATCH 06/10] chore: rename variable name --- openedx_authz/management/commands/enforcement.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index 0471c47a..acbb392f 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -250,11 +250,13 @@ def _test_interactive_request(self, user_input: str) -> None: subject, action, scope = parts if self._custom_enforcer is not None: - subject = UserData(external_key=subject) - action = ActionData(external_key=action) - scope = ScopeData(external_key=scope) + user_data = UserData(external_key=subject) + action_data = ActionData(external_key=action) + scope_data = ScopeData(external_key=scope) result = self._custom_enforcer.enforce( - subject.namespaced_key, action.namespaced_key, scope.namespaced_key + user_data.namespaced_key, + action_data.namespaced_key, + scope_data.namespaced_key, ) else: result = api.is_user_allowed(subject, action, scope) From d4f6256f02784e8a40ecb8ed4885760677c16800 Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Tue, 21 Oct 2025 10:25:50 -0500 Subject: [PATCH 07/10] test: update enforcement command tests to reflect new action names and policy structure --- openedx_authz/tests/test_commands.py | 40 +++++++++++++++++----------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 66d0c37a..d3688f84 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -39,9 +39,17 @@ def setUp(self): self.policy_file_path = NamedTemporaryFile(suffix=".policy") self.model_file_path = NamedTemporaryFile(suffix=".conf") - self.policies = [["alice", "read", "resource1"]] - self.roles = [["alice", "admin"]] - self.action_grouping = [["read", "view"]] + self.policies = [ + ["role^library_admin", "act^delete_library", "lib^*", "allow"], + ["role^library_admin", "act^publish_library", "lib^*", "allow"], + ["role^library_admin", "act^manage_library_team", "lib^*", "allow"], + ] + self.roles = [ + ["user^alice", "role^library_admin", "lib^*"], + ] + self.action_grouping = [ + ["act^delete_library", "act^view_library"], + ] self.enforcer = Mock() self.enforcer.get_policy.return_value = self.policies @@ -128,12 +136,12 @@ def test_interactive_mode_allowed_request(self, mock_is_allowed: Mock, mock_get_ mock_get_enforcer.return_value = self.enforcer mock_is_allowed.return_value = True - with patch("builtins.input", side_effect=["alice read lib:Org1:LIB1", "quit"]): + with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): call_command(self.command_name, stdout=self.buffer) output = self.buffer.getvalue() - self.assertIn("✓ ALLOWED: alice read lib:Org1:LIB1", output) - mock_is_allowed.assert_called_once_with("alice", "read", "lib:Org1:LIB1") + self.assertIn("✓ ALLOWED: alice view_library lib:Org1:LIB1", output) + mock_is_allowed.assert_called_once_with("alice", "view_library", "lib:Org1:LIB1") @patch.object(AuthzEnforcer, "get_enforcer") @patch.object(authz_api, "is_user_allowed") @@ -142,19 +150,19 @@ def test_interactive_mode_denied_request(self, mock_is_allowed: Mock, mock_get_e mock_get_enforcer.return_value = self.enforcer mock_is_allowed.return_value = False - with patch("builtins.input", side_effect=["bob delete lib:Org2:LIB2", "quit"]): + with patch("builtins.input", side_effect=["bob delete_library lib:Org2:LIB2", "quit"]): call_command(self.command_name, stdout=self.buffer) output = self.buffer.getvalue() - self.assertIn("✗ DENIED: bob delete lib:Org2:LIB2", output) - mock_is_allowed.assert_called_once_with("bob", "delete", "lib:Org2:LIB2") + self.assertIn("✗ DENIED: bob delete_library lib:Org2:LIB2", output) + mock_is_allowed.assert_called_once_with("bob", "delete_library", "lib:Org2:LIB2") @patch("openedx_authz.management.commands.enforcement.Enforcer") def test_interactive_mode_file_mode_enforcement(self, mock_enforcer_class: Mock): """Test that file mode uses custom enforcer for enforcement checks.""" mock_enforcer_class.return_value = self.enforcer - with patch("builtins.input", side_effect=["alice read lib:Org1:LIB1", "quit"]): + with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): call_command( self.command_name, policy_file_path=self.policy_file_path.name, @@ -163,14 +171,14 @@ def test_interactive_mode_file_mode_enforcement(self, mock_enforcer_class: Mock) ) output = self.buffer.getvalue() - self.assertIn("✓ ALLOWED: alice read lib:Org1:LIB1", output) - self.enforcer.enforce.assert_called_once_with("alice", "read", "lib:Org1:LIB1") + self.assertIn("✓ ALLOWED: alice view_library lib:Org1:LIB1", output) + self.enforcer.enforce.assert_called_once_with("user^alice", "act^view_library", "lib^lib:Org1:LIB1") @data( "alice", - "alice read", - "alice read lib:Org1:LIB1 lib:Org1:LIB1", - "alice read lib:Org1:LIB1 lib:Org1:LIB1 lib:Org1:LIB1", + "alice view_library", + "alice view_library lib:Org1:LIB1 lib:Org1:LIB1", + "alice view_library lib:Org1:LIB1 lib:Org1:LIB1 lib:Org1:LIB1", ) @patch.object(AuthzEnforcer, "get_enforcer") def test_interactive_mode_invalid_format(self, user_input: str, mock_get_enforcer: Mock): @@ -255,7 +263,7 @@ def test_interactive_request_error(self, exception: Exception, mock_is_allowed: mock_get_enforcer.return_value = self.enforcer mock_is_allowed.side_effect = exception - with patch("builtins.input", side_effect=["alice read lib:Org1:LIB1", "quit"]): + with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): call_command(self.command_name, stdout=self.buffer) invalid_output = self.buffer.getvalue() From 2147c06e20a225ba24ea597028ae57e85dfd82fc Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Wed, 22 Oct 2025 10:43:57 -0500 Subject: [PATCH 08/10] test: fix unit tests --- openedx_authz/tests/test_commands.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index d3688f84..5a2e297b 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -266,10 +266,9 @@ def test_interactive_request_error(self, exception: Exception, mock_is_allowed: with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): call_command(self.command_name, stdout=self.buffer) - invalid_output = self.buffer.getvalue() - self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) - self.assertIn("Format: subject action scope", invalid_output) - self.assertIn(f"Example: alice read lib:Org1:LIB1 {make_scope_key('org', 'OpenedX')}", invalid_output) + output = self.buffer.getvalue() + self.assertIn("✗ Error processing request:", output) + self.assertIn(str(exception), output) class LoadPoliciesCommandTests(TestCase): From 6882f235d5075fac0825c8e5e5c4fa09cdddc56b Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Wed, 22 Oct 2025 10:53:15 -0500 Subject: [PATCH 09/10] chore: disable pylint warning --- openedx_authz/tests/test_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 5a2e297b..e12e49fb 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -15,7 +15,6 @@ from openedx_authz import api as authz_api from openedx_authz.engine.enforcer import AuthzEnforcer from openedx_authz.management.commands.load_policies import Command as LoadPoliciesCommand -from openedx_authz.tests.test_utils import make_scope_key @ddt @@ -271,6 +270,7 @@ def test_interactive_request_error(self, exception: Exception, mock_is_allowed: self.assertIn(str(exception), output) +# pylint: disable=protected-access class LoadPoliciesCommandTests(TestCase): """ Tests for the `load_policies` Django management command. From 89dd52668e3dd956ce12f713d6039edfa878357b Mon Sep 17 00:00:00 2001 From: Bryann Valderrama Date: Thu, 23 Oct 2025 09:45:40 -0500 Subject: [PATCH 10/10] chore: add unreleased change in changelog --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bed73db7..fa5e87d3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -15,6 +15,7 @@ Unreleased ********** * Migrate from using pycodestyle and isort to ruff for code quality checks and formatting. +* Enhance enforcement command with dual operational modes (database and file mode). 0.7.0 - 2025-10-23 ******************