|
| 1 | +""" |
| 2 | +Django management command for interactive Casbin enforcement testing. |
| 3 | +
|
| 4 | +This command creates a Casbin enforcer using the model.conf configuration and a |
| 5 | +user-specified policy file, then provides an interactive mode for testing |
| 6 | +authorization enforcement requests. |
| 7 | +
|
| 8 | +The command supports: |
| 9 | +- Loading Casbin model from the built-in model.conf file |
| 10 | +- Using custom policy files (specified via --policy-file-path argument) |
| 11 | +- Interactive testing with format: subject action scope |
| 12 | +- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED) |
| 13 | +- Display of loaded policies, role assignments, and action grouping rules |
| 14 | +
|
| 15 | +Example usage: |
| 16 | + python manage.py enforcement --policy-file-path /path/to/authz.policy |
| 17 | +
|
| 18 | +Example test input: |
| 19 | + user:alice act:read org:OpenedX |
| 20 | +""" |
| 21 | + |
| 22 | +import os |
| 23 | + |
| 24 | +import casbin |
| 25 | +from django.core.management.base import BaseCommand, CommandError |
| 26 | + |
| 27 | +from openedx_authz import ROOT_DIRECTORY |
| 28 | + |
| 29 | + |
| 30 | +class Command(BaseCommand): |
| 31 | + """ |
| 32 | + Django management command for interactive Casbin enforcement testing. |
| 33 | +
|
| 34 | + This command loads a Casbin model configuration and user-specified policy file |
| 35 | + to create an enforcer instance, then provides an interactive shell for testing |
| 36 | + authorization requests in real-time with immediate feedback. |
| 37 | + """ |
| 38 | + |
| 39 | + help = ( |
| 40 | + "Interactive mode for testing Casbin enforcement policies using model.conf and a custom policy file. " |
| 41 | + "Provides real-time authorization testing with format: subject action scope. " |
| 42 | + "Use --policy-file-path to specify the policy file location." |
| 43 | + ) |
| 44 | + |
| 45 | + def add_arguments(self, parser) -> None: |
| 46 | + """Add command-line arguments to the argument parser. |
| 47 | +
|
| 48 | + Args: |
| 49 | + parser: The Django argument parser instance to configure. |
| 50 | + """ |
| 51 | + parser.add_argument( |
| 52 | + "--policy-file-path", |
| 53 | + type=str, |
| 54 | + required=True, |
| 55 | + help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)", |
| 56 | + ) |
| 57 | + |
| 58 | + def handle(self, *args, **options): |
| 59 | + """Execute the enforcement testing command. |
| 60 | +
|
| 61 | + Loads the Casbin model and policy files, creates an enforcer instance, |
| 62 | + displays configuration summary, and starts the interactive testing mode. |
| 63 | +
|
| 64 | + Args: |
| 65 | + *args: Positional command arguments (unused). |
| 66 | + **options: Command options including `policy_file_path`. |
| 67 | +
|
| 68 | + Raises: |
| 69 | + CommandError: If model or policy files are not found or enforcer creation fails. |
| 70 | + """ |
| 71 | + model_file_path = self.get_file_path("model.conf") |
| 72 | + policy_file_path = options["policy_file_path"] |
| 73 | + |
| 74 | + if not os.path.isfile(model_file_path): |
| 75 | + raise CommandError(f"Model file not found: {model_file_path}") |
| 76 | + if not os.path.isfile(policy_file_path): |
| 77 | + raise CommandError(f"Policy file not found: {policy_file_path}") |
| 78 | + |
| 79 | + self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement")) |
| 80 | + self.stdout.write(f"Model file path: {model_file_path}") |
| 81 | + self.stdout.write(f"Policy file path: {policy_file_path}") |
| 82 | + self.stdout.write("") |
| 83 | + |
| 84 | + try: |
| 85 | + enforcer = casbin.Enforcer(model_file_path, policy_file_path) |
| 86 | + self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully")) |
| 87 | + |
| 88 | + policies = enforcer.get_policy() |
| 89 | + roles = enforcer.get_grouping_policy() |
| 90 | + action_grouping = enforcer.get_named_grouping_policy("g2") |
| 91 | + |
| 92 | + self.stdout.write(f"✓ Loaded {len(policies)} policies") |
| 93 | + self.stdout.write(f"✓ Loaded {len(roles)} role assignments") |
| 94 | + self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") |
| 95 | + self.stdout.write("") |
| 96 | + |
| 97 | + self._run_interactive_mode(enforcer) |
| 98 | + |
| 99 | + except Exception as e: |
| 100 | + raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e |
| 101 | + |
| 102 | + def get_file_path(self, file_name: str) -> str: |
| 103 | + """Construct the full file path for a configuration file. |
| 104 | +
|
| 105 | + Args: |
| 106 | + file_name (str): The name of the configuration file (e.g., 'model.conf'). |
| 107 | +
|
| 108 | + Returns: |
| 109 | + str: The absolute path to the configuration file in the engine/config directory. |
| 110 | + """ |
| 111 | + return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name) |
| 112 | + |
| 113 | + def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: |
| 114 | + """Start the interactive enforcement testing shell. |
| 115 | +
|
| 116 | + Provides a continuous loop where users can input enforcement requests |
| 117 | + in the format 'subject action scope' and receive immediate |
| 118 | + authorization results with visual feedback. |
| 119 | +
|
| 120 | + Args: |
| 121 | + enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing. |
| 122 | +
|
| 123 | + Note: |
| 124 | + Exit the interactive mode with Ctrl+C or Ctrl+D. |
| 125 | + """ |
| 126 | + self.stdout.write(self.style.SUCCESS("Interactive Mode")) |
| 127 | + self.stdout.write("Test custom enforcement requests interactively.") |
| 128 | + self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.") |
| 129 | + self.stdout.write("") |
| 130 | + self.stdout.write("Format: subject action scope") |
| 131 | + self.stdout.write("Example: user:alice act:read org:OpenedX") |
| 132 | + self.stdout.write("") |
| 133 | + |
| 134 | + while True: |
| 135 | + try: |
| 136 | + user_input = input("Enter enforcement test: ").strip() |
| 137 | + |
| 138 | + if not user_input: |
| 139 | + continue |
| 140 | + |
| 141 | + if user_input.lower() in ["quit", "exit", "q"]: |
| 142 | + break |
| 143 | + |
| 144 | + self._test_interactive_request(enforcer, user_input) |
| 145 | + except (KeyboardInterrupt, EOFError): |
| 146 | + self.stdout.write(self.style.ERROR("Exiting interactive mode...")) |
| 147 | + break |
| 148 | + |
| 149 | + def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None: |
| 150 | + """Process and test a single enforcement request from user input. |
| 151 | +
|
| 152 | + Parses the input string, validates the format, executes the enforcement |
| 153 | + check, and displays the result with appropriate styling. |
| 154 | +
|
| 155 | + Args: |
| 156 | + enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing. |
| 157 | + user_input (str): The user's input string in format 'subject action scope'. |
| 158 | +
|
| 159 | + Expected format: |
| 160 | + subject: The requesting entity (e.g., 'user:alice') |
| 161 | + action: The requested action (e.g., 'act:read') |
| 162 | + scope: The authorization context (e.g., 'org:OpenedX') |
| 163 | + """ |
| 164 | + try: |
| 165 | + parts = [part.strip() for part in user_input.split()] |
| 166 | + if len(parts) != 3: |
| 167 | + self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}")) |
| 168 | + self.stdout.write("Format: subject action scope") |
| 169 | + self.stdout.write("Example: user:alice act:read org:OpenedX") |
| 170 | + return |
| 171 | + |
| 172 | + subject, action, scope = parts |
| 173 | + result = enforcer.enforce(subject, action, scope) |
| 174 | + |
| 175 | + if result: |
| 176 | + self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) |
| 177 | + else: |
| 178 | + self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")) |
| 179 | + |
| 180 | + except (ValueError, IndexError, TypeError) as e: |
| 181 | + self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}")) |
0 commit comments