diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bed73db7..fa5e87d3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -15,6 +15,7 @@ Unreleased ********** * Migrate from using pycodestyle and isort to ruff for code quality checks and formatting. +* Enhance enforcement command with dual operational modes (database and file mode). 0.7.0 - 2025-10-23 ****************** diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py index 55dd4717..acbb392f 100644 --- a/openedx_authz/management/commands/enforcement.py +++ b/openedx_authz/management/commands/enforcement.py @@ -1,51 +1,74 @@ """ Django management command for interactive Casbin enforcement testing. -This command creates a Casbin enforcer using the model.conf configuration and a -user-specified policy file, then provides an interactive mode for testing -authorization enforcement requests. +This command provides an interactive mode for testing authorization enforcement +requests with two operational modes: + +1. **Database mode (default)**: Uses AuthzEnforcer with policies from the database + +2. **File mode**: Uses a custom Casbin enforcer with policies from files + - Activated when --policy-file-path and --model-file-path are provided + - Reads policies directly from the specified CSV file The command supports: -- Loading Casbin model from the built-in model.conf file or a custom file (specified via --model-file-path argument) -- Using custom policy files (specified via --policy-file-path argument) - Interactive testing with format: subject action scope - Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED) - Display of loaded policies, role assignments, and action grouping rules Example usage: - python manage.py enforcement --policy-file-path /path/to/authz.policy + # Use policies from database with default model + python manage.py lms enforcement - python manage.py enforcement --policy-file-path /path/to/authz.policy --model-file-path /path/to/model.conf + # Use custom model and policy files + python manage.py lms enforcement -m /path/to/model.conf -p /path/to/policies.csv Example test input: - user^alice act^read org^OpenedX + >>> alice view_library_team lib:OpenedX:CSPROB + ✓ ALLOWED: alice view_library_team lib:OpenedX:CSPROB + >>> bob manage_library_team lib:DemoX:LIB1 + ✗ DENIED: bob manage_library_team lib:DemoX:LIB1 """ import argparse import os -import casbin +from casbin import Enforcer +from casbin.util.log import disabled_logging from django.core.management.base import BaseCommand, CommandError -from openedx_authz import ROOT_DIRECTORY +from openedx_authz import api +from openedx_authz.api.data import ActionData, ScopeData, UserData +from openedx_authz.engine.enforcer import AuthzEnforcer class Command(BaseCommand): """ Django management command for interactive Casbin enforcement testing. - This command loads a Casbin model configuration and user-specified policy file - to create an enforcer instance, then provides an interactive shell for testing - authorization requests in real-time with immediate feedback. + This command provides two operational modes for testing authorization: + + 1. Database mode (default): Uses AuthzEnforcer with policies from the database. + This is the default behavior when no arguments are provided. + + 2. File mode: Uses a custom Casbin enforcer with policies from files. + Activated when --policy-file-path and/or --model-file-path are provided. + + The command provides an interactive shell for testing authorization requests + in real-time with immediate feedback. """ help = ( - "Interactive mode for testing Casbin enforcement policies using a custom model file and" - "a custom policy file. Provides real-time authorization testing with format: subject action scope. " - "Use --policy-file-path to specify the policy file location. " - "Use --model-file-path to specify the model file location. " + "Interactive mode for testing Casbin enforcement policies. By default, uses " + "AuthzEnforcer with policies from the database. Use --policy-file-path and " + "--model-file-path to test with custom files instead. " + "Format: subject action scope." ) + def __init__(self, *args, **kwargs): + """Initialize the command with required attributes.""" + super().__init__(*args, **kwargs) + self._custom_enforcer = None + def add_arguments(self, parser: argparse.ArgumentParser) -> None: """Add command-line arguments to the argument parser. @@ -53,83 +76,129 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: parser (argparse.ArgumentParser): The Django argument parser instance to configure. """ parser.add_argument( + "-p", "--policy-file-path", type=str, - required=True, - help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)", + default=None, + help=( + "Path to the Casbin policy CSV file. When provided, switches to file mode using a " + "custom enforcer instead of the database. Supports CSV format with policies, roles, " + "and action grouping." + ), ) parser.add_argument( + "-m", "--model-file-path", type=str, - required=False, - help="Path to the Casbin model file. If not provided, the default model.conf file will be used.", + default=None, + help=( + "Path to the Casbin model configuration file. When provided, switches to file mode " + "using a custom enforcer instead of the database. If not specified in file mode, " + "uses the default model.conf." + ), ) def handle(self, *args, **options): """Execute the enforcement testing command. - Loads the Casbin model and policy files, creates an enforcer instance, - displays configuration summary, and starts the interactive testing mode. + Determines the operational mode based on provided arguments and creates the + appropriate enforcer instance, then starts the interactive testing mode. + + Operational modes: + - Database mode: Uses AuthzEnforcer with policies from database (default) + - File mode: Uses custom Enforcer with policies from files (when files provided) Args: *args: Positional command arguments (unused). - **options: Command options including `policy_file_path` and `model_file_path`. + **options: Command options including ``--policy-file-path`` and ``--model-file-path``. + """ + policy_file_path = options["policy_file_path"] + model_file_path = options["model_file_path"] + + use_file_mode = policy_file_path is not None and model_file_path is not None + + if use_file_mode: + self._handle_file_mode(policy_file_path, model_file_path) + else: + self._handle_database_mode() + + def _handle_database_mode(self) -> None: + """Handle enforcement testing using AuthzEnforcer with database policies. + + Uses the AuthzEnforcer singleton with policies loaded from the database. + This is the default mode when no custom files are provided. Raises: - CommandError: If model or policy files are not found or enforcer creation fails. + CommandError: If enforcer creation or policy loading fails. """ - model_file_path = self._get_file_path("model.conf") or options["model_file_path"] - policy_file_path = options["policy_file_path"] + try: + enforcer = AuthzEnforcer.get_enforcer() + enforcer.load_policy() + disabled_logging() + + self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement (Database Mode)")) + self.stdout.write("Using AuthzEnforcer with policies from database") + self.stdout.write("") + + self._display_loaded_policies(enforcer) + self._run_interactive_mode() + except Exception as e: + raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e + + def _handle_file_mode(self, policy_file_path: str, model_file_path: str) -> None: + """Handle enforcement testing using custom Enforcer with file-based policies. + + Creates a custom Casbin Enforcer instance using the specified model and policy files. + This mode is useful for testing policies before loading them into the database. + + Args: + policy_file_path (str): Path to the policy CSV file. + model_file_path (str): Path to the model configuration file. + Raises: + CommandError: If required files are not found or enforcer creation fails. + """ if not os.path.isfile(model_file_path): raise CommandError(f"Model file not found: {model_file_path}") if not os.path.isfile(policy_file_path): raise CommandError(f"Policy file not found: {policy_file_path}") - self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement")) - self.stdout.write(f"Model file path: {model_file_path}") - self.stdout.write(f"Policy file path: {policy_file_path}") - self.stdout.write("") - try: - enforcer = casbin.Enforcer(model_file_path, policy_file_path) - self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully")) - - policies = enforcer.get_policy() - roles = enforcer.get_grouping_policy() - action_grouping = enforcer.get_named_grouping_policy("g2") + enforcer = Enforcer(model_file_path, policy_file_path) - self.stdout.write(f"✓ Loaded {len(policies)} policies") - self.stdout.write(f"✓ Loaded {len(roles)} role assignments") - self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") + self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement (File Mode)")) + self.stdout.write(f"Model file: {model_file_path}") + self.stdout.write(f"Policy file: {policy_file_path}") self.stdout.write("") - self._run_interactive_mode(enforcer) - + self._custom_enforcer = enforcer + self._display_loaded_policies(enforcer) + self._run_interactive_mode() except Exception as e: raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e - def _get_file_path(self, file_name: str) -> str: - """Construct the full file path for a configuration file. + def _display_loaded_policies(self, enforcer: Enforcer) -> None: + """Display statistics about loaded policies, roles, and action grouping. Args: - file_name (str): The name of the configuration file (e.g., 'model.conf'). - - Returns: - str: The absolute path to the configuration file in the engine/config directory. + enforcer (Enforcer): The Casbin enforcer instance with loaded policies. """ - return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name) + policies = enforcer.get_policy() + roles = enforcer.get_grouping_policy() + action_grouping = enforcer.get_named_grouping_policy("g2") - def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: + self.stdout.write(f"✓ Loaded {len(policies)} policies") + self.stdout.write(f"✓ Loaded {len(roles)} role assignments") + self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") + self.stdout.write("") + + def _run_interactive_mode(self) -> None: """Start the interactive enforcement testing shell. Provides a continuous loop where users can input enforcement requests in the format 'subject action scope' and receive immediate authorization results with visual feedback. - Args: - enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing. - Note: Exit the interactive mode with Ctrl+C or Ctrl+D. """ @@ -138,7 +207,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.") self.stdout.write("") self.stdout.write("Format: subject action scope") - self.stdout.write("Example: user^alice act^read org^OpenedX") + self.stdout.write("Example: alice view_library_team lib:OpenedX:CSPROB") self.stdout.write("") while True: @@ -151,41 +220,50 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: if user_input.lower() in ["quit", "exit", "q"]: break - self._test_interactive_request(enforcer, user_input) + self._test_interactive_request(user_input) except (KeyboardInterrupt, EOFError): self.stdout.write(self.style.ERROR("Exiting interactive mode...")) break - def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None: + def _test_interactive_request(self, user_input: str) -> None: """Process and test a single enforcement request from user input. Parses the input string, validates the format, executes the enforcement check, and displays the result with appropriate styling. Args: - enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing. user_input (str): The user's input string in format 'subject action scope'. Expected format: - subject: The requesting entity (e.g., 'user^alice') - action: The requested action (e.g., 'act^read') - scope: The authorization context (e.g., 'org^OpenedX') + subject: The requesting entity (e.g., 'alice') + action: The requested action (e.g., 'view_library_team') + scope: The authorization context (e.g., 'lib:OpenedX:CSPROB') """ try: parts = [part.strip() for part in user_input.split()] if len(parts) != 3: self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}")) self.stdout.write("Format: subject action scope") - self.stdout.write("Example: user^alice act^read org^OpenedX") + self.stdout.write("Example: alice view_library_team lib:OpenedX:CSPROB") return subject, action, scope = parts - result = enforcer.enforce(subject, action, scope) + + if self._custom_enforcer is not None: + user_data = UserData(external_key=subject) + action_data = ActionData(external_key=action) + scope_data = ScopeData(external_key=scope) + result = self._custom_enforcer.enforce( + user_data.namespaced_key, + action_data.namespaced_key, + scope_data.namespaced_key, + ) + else: + result = api.is_user_allowed(subject, action, scope) if result: self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) else: self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")) - except (ValueError, IndexError, TypeError) as e: self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}")) diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py index 6fa90679..e12e49fb 100644 --- a/openedx_authz/tests/test_commands.py +++ b/openedx_authz/tests/test_commands.py @@ -3,7 +3,7 @@ """ import io -from tempfile import TemporaryFile +from tempfile import NamedTemporaryFile from unittest import TestCase from unittest.mock import Mock, patch @@ -12,12 +12,11 @@ from django.core.management.base import CommandError from openedx_authz import ROOT_DIRECTORY -from openedx_authz.management.commands.enforcement import Command as EnforcementCommand +from openedx_authz import api as authz_api +from openedx_authz.engine.enforcer import AuthzEnforcer from openedx_authz.management.commands.load_policies import Command as LoadPoliciesCommand -from openedx_authz.tests.test_utils import make_action_key, make_scope_key, make_user_key -# pylint: disable=protected-access @ddt class EnforcementCommandTests(TestCase): """ @@ -34,186 +33,244 @@ class EnforcementCommandTests(TestCase): def setUp(self): super().setUp() self.buffer = io.StringIO() - self.policy_file_path = TemporaryFile() - self.command = EnforcementCommand() - self.command.stdout = self.buffer + self.command_name = "enforcement" + + self.policy_file_path = NamedTemporaryFile(suffix=".policy") + self.model_file_path = NamedTemporaryFile(suffix=".conf") + + self.policies = [ + ["role^library_admin", "act^delete_library", "lib^*", "allow"], + ["role^library_admin", "act^publish_library", "lib^*", "allow"], + ["role^library_admin", "act^manage_library_team", "lib^*", "allow"], + ] + self.roles = [ + ["user^alice", "role^library_admin", "lib^*"], + ] + self.action_grouping = [ + ["act^delete_library", "act^view_library"], + ] + self.enforcer = Mock() + self.enforcer.get_policy.return_value = self.policies + self.enforcer.get_grouping_policy.return_value = self.roles + self.enforcer.get_named_grouping_policy.return_value = self.action_grouping - def test_requires_policy_file_argument(self): - """Test that calling the command without --policy-file-path should error from argparse.""" - with self.assertRaises(CommandError) as ctx: - call_command("enforcement") + @patch.object(AuthzEnforcer, "get_enforcer") + @patch("openedx_authz.management.commands.enforcement.disabled_logging") + def test_handle_database_mode_default(self, mock_logging: Mock, mock_get_enforcer: Mock): + """Test database mode is used when no file paths are provided.""" + mock_get_enforcer.return_value = self.enforcer - self.assertEqual( - "Error: the following arguments are required: --policy-file-path", - str(ctx.exception), - ) + with patch("builtins.input", side_effect=["quit"]): + call_command(self.command_name, stdout=self.buffer) + + output = self.buffer.getvalue() + self.assertIn("Database Mode", output) + self.assertIn("AuthzEnforcer", output) + self.enforcer.load_policy.assert_called_once() + mock_logging.assert_called_once() + + @patch("openedx_authz.management.commands.enforcement.Enforcer") + def test_handle_file_mode(self, mock_enforcer_class: Mock): + """Test file mode is used when both file paths are provided.""" + mock_enforcer_class.return_value = self.enforcer + + with patch("builtins.input", side_effect=["quit"]): + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=self.model_file_path.name, + stdout=self.buffer, + ) + + output = self.buffer.getvalue() + self.assertIn("File Mode", output) + self.assertIn(self.policy_file_path.name, output) + self.assertIn(self.model_file_path.name, output) + mock_enforcer_class.assert_called_once_with(self.model_file_path.name, self.policy_file_path.name) def test_policy_file_not_found_raises(self): """Test that command errors when the provided policy file does not exist.""" - non_existent = "invalid/path/does-not-exist.policy" + non_existent_policy = "invalid/path/authz.policy" with self.assertRaises(CommandError) as ctx: - call_command("enforcement", policy_file_path=non_existent) + call_command( + self.command_name, + policy_file_path=non_existent_policy, + model_file_path=self.model_file_path.name, + ) - self.assertEqual(f"Policy file not found: {non_existent}", str(ctx.exception)) + self.assertEqual(f"Policy file not found: {non_existent_policy}", str(ctx.exception)) - @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") - def test_model_file_not_found_raises(self, mock_get_file_path: Mock): + def test_model_file_not_found_raises(self): """Test that command errors when the provided model file does not exist.""" + non_existent_model = "invalid/path/model.conf" + with self.assertRaises(CommandError) as ctx: - call_command("enforcement", policy_file_path=self.policy_file_path.name) + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=non_existent_model, + ) - self.assertEqual( - f"Model file not found: {mock_get_file_path.return_value}", - str(ctx.exception), - ) + self.assertEqual(f"Model file not found: {non_existent_model}", str(ctx.exception)) - @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") - def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): - """Test that command errors when the enforcer creation fails.""" - mock_enforcer_cls.side_effect = Exception("Enforcer creation error") + @patch.object(AuthzEnforcer, "get_enforcer") + def test_display_loaded_policies(self, mock_get_enforcer: Mock): + """Test that policy statistics are displayed correctly.""" + mock_get_enforcer.return_value = self.enforcer - with self.assertRaises(CommandError) as ctx: - call_command("enforcement", policy_file_path=self.policy_file_path.name) + with patch("builtins.input", side_effect=["quit"]): + call_command(self.command_name, stdout=self.buffer) - self.assertEqual( - "Error creating Casbin enforcer: Enforcer creation error", - str(ctx.exception), - ) + output = self.buffer.getvalue() + self.assertIn(f"✓ Loaded {len(self.policies)} policies", output) + self.assertIn(f"✓ Loaded {len(self.roles)} role assignments", output) + self.assertIn(f"✓ Loaded {len(self.action_grouping)} action grouping rules", output) - @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") - @patch.object(EnforcementCommand, "_run_interactive_mode") - def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): - """ - Test successful command execution with policy file and interactive mode. - When files exist, command should create enforcer, print counts, and call interactive loop. - """ - mock_enforcer = Mock() - policies = [["p", "role:platform_admin", "act:manage", "*", "allow"]] - roles = [["g", "user:user-1", "role:platform_admin", "*"]] - action_grouping = [ - ["g2", "act:edit", "act:read"], - ["g2", "act:edit", "act:write"], - ] - mock_enforcer.get_policy.return_value = policies - mock_enforcer.get_grouping_policy.return_value = roles - mock_enforcer.get_named_grouping_policy.return_value = action_grouping - mock_enforcer_cls.return_value = mock_enforcer - - call_command( - "enforcement", - policy_file_path=self.policy_file_path.name, - stdout=self.buffer, - ) + @patch.object(AuthzEnforcer, "get_enforcer") + @patch.object(authz_api, "is_user_allowed") + def test_interactive_mode_allowed_request(self, mock_is_allowed: Mock, mock_get_enforcer: Mock): + """Test interactive mode with an allowed enforcement request.""" + mock_get_enforcer.return_value = self.enforcer + mock_is_allowed.return_value = True + + with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): + call_command(self.command_name, stdout=self.buffer) output = self.buffer.getvalue() - self.assertIn("Casbin Interactive Enforcement", output) - self.assertIn("Casbin enforcer created successfully", output) - self.assertIn(f"✓ Loaded {len(policies)} policies", output) - self.assertIn(f"✓ Loaded {len(roles)} role assignments", output) - self.assertIn(f"✓ Loaded {len(action_grouping)} action grouping rules", output) - mock_run_interactive.assert_called_once_with(mock_enforcer) - - def test_run_interactive_mode_displays_help(self): - """Test that the interactive mode runs.""" - with patch("builtins.input", side_effect=["quit"]): - self.command._run_interactive_mode(self.enforcer) - - example_text = f"Example: {make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" - self.assertIn("Interactive Mode", self.buffer.getvalue()) - self.assertIn("Test custom enforcement requests interactively.", self.buffer.getvalue()) - self.assertIn( - "Enter 'quit', 'exit', or 'q' to exit the interactive mode.", - self.buffer.getvalue(), - ) - self.assertIn("Format: subject action scope", self.buffer.getvalue()) - self.assertIn(example_text, self.buffer.getvalue()) + self.assertIn("✓ ALLOWED: alice view_library lib:Org1:LIB1", output) + mock_is_allowed.assert_called_once_with("alice", "view_library", "lib:Org1:LIB1") + + @patch.object(AuthzEnforcer, "get_enforcer") + @patch.object(authz_api, "is_user_allowed") + def test_interactive_mode_denied_request(self, mock_is_allowed: Mock, mock_get_enforcer: Mock): + """Test interactive mode with a denied enforcement request.""" + mock_get_enforcer.return_value = self.enforcer + mock_is_allowed.return_value = False - def test_run_interactive_mode_maintains_interactive_loop(self): - """Test that the interactive mode maintains the interactive loop.""" - input_values = ["", "", "", "quit"] + with patch("builtins.input", side_effect=["bob delete_library lib:Org2:LIB2", "quit"]): + call_command(self.command_name, stdout=self.buffer) - with patch("builtins.input", side_effect=input_values) as mock_input: - self.command._run_interactive_mode(self.enforcer) + output = self.buffer.getvalue() + self.assertIn("✗ DENIED: bob delete_library lib:Org2:LIB2", output) + mock_is_allowed.assert_called_once_with("bob", "delete_library", "lib:Org2:LIB2") + + @patch("openedx_authz.management.commands.enforcement.Enforcer") + def test_interactive_mode_file_mode_enforcement(self, mock_enforcer_class: Mock): + """Test that file mode uses custom enforcer for enforcement checks.""" + mock_enforcer_class.return_value = self.enforcer + + with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=self.model_file_path.name, + stdout=self.buffer, + ) - self.assertEqual(mock_input.call_count, len(input_values)) + output = self.buffer.getvalue() + self.assertIn("✓ ALLOWED: alice view_library lib:Org1:LIB1", output) + self.enforcer.enforce.assert_called_once_with("user^alice", "act^view_library", "lib^lib:Org1:LIB1") @data( - [f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"], - [f"{make_user_key('bob')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 5, - [f"{make_user_key('john')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}"] * 10, + "alice", + "alice view_library", + "alice view_library lib:Org1:LIB1 lib:Org1:LIB1", + "alice view_library lib:Org1:LIB1 lib:Org1:LIB1 lib:Org1:LIB1", ) - def test_run_interactive_mode_processes_request(self, user_input: list[str]): - """Test that the interactive mode processes the request.""" - with patch("builtins.input", side_effect=user_input + ["quit"]) as mock_input: - with patch.object(self.command, "_test_interactive_request") as mock_method: - self.command._run_interactive_mode(self.enforcer) + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_invalid_format(self, user_input: str, mock_get_enforcer: Mock): + """Test interactive mode handles invalid input format.""" + mock_get_enforcer.return_value = self.enforcer - self.assertEqual(mock_input.call_count, len(user_input) + 1) - self.assertEqual(mock_method.call_count, len(user_input)) - for value in user_input: - mock_method.assert_any_call(self.enforcer, value) + with patch("builtins.input", side_effect=[user_input, "quit"]): + call_command(self.command_name, stdout=self.buffer) - @data("quit", "exit", "q", "QUIT", "EXIT", "Q") - def test_quit_commands_case_insensitive(self, quit_command: str): - """Test that all quit commands work regardless of case.""" - with patch("builtins.input", side_effect=[quit_command]) as mock_input: - self.command._run_interactive_mode(self.enforcer) + output = self.buffer.getvalue() + self.assertIn("✗ Invalid format", output) + self.assertIn(f"Expected 3 parts, got {len(user_input.split())}", output) - self.assertEqual(mock_input.call_count, 1) + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_empty_input(self, mock_get_enforcer: Mock): + """Test interactive mode handles empty input gracefully.""" + mock_get_enforcer.return_value = self.enforcer - @data(KeyboardInterrupt(), EOFError()) - def test_handles_exceptions(self, exception: Exception): - """Test that interactive mode handles exceptions gracefully.""" - with patch("builtins.input", side_effect=exception): - self.command._run_interactive_mode(self.enforcer) + with patch("builtins.input", side_effect=["", " ", "quit"]): + call_command(self.command_name, stdout=self.buffer) - self.assertIn("Exiting interactive mode...", self.buffer.getvalue()) + output = self.buffer.getvalue() + self.assertIn("Interactive Mode", output) - def test_interactive_request_allowed(self): - """Test that `_test_interactive_request` prints allowed output format.""" - self.enforcer.enforce.return_value = True - user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + @data("quit", "exit", "q", "QUIT", "EXIT", "Q") + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_exit_commands(self, exit_cmd: str, mock_get_enforcer: Mock): + """Test that various exit commands work correctly.""" + mock_get_enforcer.return_value = self.enforcer - self.command._test_interactive_request(self.enforcer, user_input) + with patch("builtins.input", side_effect=[exit_cmd]): + call_command(self.command_name, stdout=self.buffer) - allowed_output = self.buffer.getvalue() - self.assertIn(f"✓ ALLOWED: {user_input}", allowed_output) + output = self.buffer.getvalue() + self.assertIn("Interactive Mode", output) - def test_interactive_request_denied(self): - """Test that `_test_interactive_request` prints denied output format.""" - self.enforcer.enforce.return_value = False - user_input = f"{make_user_key('alice')} {make_action_key('delete')} {make_scope_key('org', 'OpenedX')}" + @data(KeyboardInterrupt(), EOFError()) + @patch.object(AuthzEnforcer, "get_enforcer") + def test_interactive_mode_keyboard_interrupt(self, exception: Exception, mock_get_enforcer: Mock): + """Test interactive mode handles KeyboardInterrupt gracefully.""" + mock_get_enforcer.return_value = self.enforcer - self.command._test_interactive_request(self.enforcer, user_input) + with patch("builtins.input", side_effect=exception): + call_command(self.command_name, stdout=self.buffer) - denied_output = self.buffer.getvalue() - self.assertIn(f"✗ DENIED: {user_input}", denied_output) + output = self.buffer.getvalue() + self.assertIn("Exiting interactive mode...", output) - def test_interactive_request_invalid_format(self): - """Test that `_test_interactive_request` reports invalid input format.""" - user_input = f"{make_user_key('alice')} {make_action_key('read')}" + @patch.object(AuthzEnforcer, "get_enforcer") + def test_database_mode_enforcer_creation_error(self, mock_get_enforcer: Mock): + """Test CommandError is raised when enforcer creation fails in database mode.""" + error_message = "Database connection failed" + mock_get_enforcer.side_effect = Exception(error_message) - self.command._test_interactive_request(self.enforcer, user_input) + with self.assertRaises(CommandError) as ctx: + call_command(self.command_name) - invalid_output = self.buffer.getvalue() - self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) - self.assertIn("Format: subject action scope", invalid_output) - self.assertIn(f"Example: {user_input} {make_scope_key('org', 'OpenedX')}", invalid_output) + self.assertEqual(f"Error creating Casbin enforcer: {error_message}", str(ctx.exception)) + mock_get_enforcer.assert_called_once() - @data(ValueError(), IndexError(), TypeError()) - def test_interactive_request_error(self, exception: Exception): - """Test that `_test_interactive_request` handles processing errors.""" - self.enforcer.enforce.side_effect = exception - user_input = f"{make_user_key('alice')} {make_action_key('read')} {make_scope_key('org', 'OpenedX')}" + @patch("openedx_authz.management.commands.enforcement.Enforcer") + def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): + """Test CommandError is raised when the enforcer creation fails in file mode.""" + error_message = "Enforcer creation error" + mock_enforcer_cls.side_effect = Exception(error_message) - self.command._test_interactive_request(self.enforcer, user_input) + with self.assertRaises(CommandError) as ctx: + call_command( + self.command_name, + policy_file_path=self.policy_file_path.name, + model_file_path=self.model_file_path.name, + ) + + self.assertEqual(f"Error creating Casbin enforcer: {error_message}", str(ctx.exception)) + mock_enforcer_cls.assert_called_once_with(self.model_file_path.name, self.policy_file_path.name) + + @data(ValueError("Value error"), TypeError("Type error"), IndexError("Index error")) + @patch.object(AuthzEnforcer, "get_enforcer") + @patch.object(authz_api, "is_user_allowed") + def test_interactive_request_error(self, exception: Exception, mock_is_allowed: Mock, mock_get_enforcer: Mock): + """Test interactive mode handles enforcement errors gracefully.""" + mock_get_enforcer.return_value = self.enforcer + mock_is_allowed.side_effect = exception + + with patch("builtins.input", side_effect=["alice view_library lib:Org1:LIB1", "quit"]): + call_command(self.command_name, stdout=self.buffer) - error_output = self.buffer.getvalue() - self.assertIn(f"✗ Error processing request: {str(exception)}", error_output) + output = self.buffer.getvalue() + self.assertIn("✗ Error processing request:", output) + self.assertIn(str(exception), output) +# pylint: disable=protected-access class LoadPoliciesCommandTests(TestCase): """ Tests for the `load_policies` Django management command.