Skip to content

Commit 6b9a161

Browse files
committed
feat: update command including an interactive mode
1 parent 1492d27 commit 6b9a161

1 file changed

Lines changed: 104 additions & 126 deletions

File tree

openedx_authz/management/commands/enforcement.py

Lines changed: 104 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,89 @@
11
"""
2-
Django management command for testing Casbin enforcement policies.
2+
Django management command for interactive Casbin enforcement testing.
33
4-
This command creates a Casbin enforcer from model.conf and auth.policy files,
5-
then tests enforcement for each request in request.sample.
4+
This command creates a Casbin enforcer using the model.conf configuration and a
5+
user-specified policy file, then provides an interactive mode for testing
6+
authorization enforcement requests.
7+
8+
The command supports:
9+
- Loading Casbin model from the built-in model.conf file
10+
- Using custom policy files (specified via --policy-file-path argument)
11+
- Interactive testing with format: subject action object scope
12+
- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED)
13+
- Display of loaded policies, role assignments, and action grouping rules
14+
15+
Example usage:
16+
python manage.py lms enforcement --policy-file-path /path/to/authz.policy
17+
18+
Example test input:
19+
user:alice act:read lib:test-lib org:OpenedX
620
"""
721

822
import os
923

1024
import casbin
1125
from django.core.management.base import BaseCommand, CommandError
1226

27+
from openedx_authz import ROOT_DIRECTORY
28+
1329

1430
class Command(BaseCommand):
1531
"""
16-
Test Casbin enforcement policies using model.conf, auth.policy, and request.sample
32+
Django management command for interactive Casbin enforcement testing.
33+
34+
This command loads a Casbin model configuration and user-specified policy file
35+
to create an enforcer instance, then provides an interactive shell for testing
36+
authorization requests in real-time with immediate feedback.
1737
"""
1838

1939
help = (
20-
"Test Casbin enforcement policies using model.conf, auth.policy, and request.sample. "
21-
"Supports interactive mode for custom testing."
40+
"Interactive mode for testing Casbin enforcement policies using model.conf and a custom policy file. "
41+
"Provides real-time authorization testing with format: subject action object scope. "
42+
"Use --policy-file-path to specify the policy file location."
2243
)
2344

2445
def add_arguments(self, parser) -> None:
25-
"""Add the arguments to the parser."""
26-
parser.add_argument(
27-
"--model-file",
28-
type=str,
29-
default="model.conf",
30-
help="Path to the Casbin model configuration file (default: model.conf)",
31-
)
32-
parser.add_argument(
33-
"--policy-file",
34-
type=str,
35-
default="authz.policy",
36-
help="Path to the policy CSV file (default: auth.policy)",
37-
)
46+
"""Add command-line arguments to the argument parser.
47+
48+
Args:
49+
parser: The Django argument parser instance to configure.
50+
"""
3851
parser.add_argument(
39-
"--request-file",
52+
"--policy-file-path",
4053
type=str,
41-
default="request.sample",
42-
help="Path to the request test file (default: request.sample)",
43-
)
44-
parser.add_argument(
45-
"--interactive",
46-
action="store_true",
47-
help="Run in interactive mode for enforcement requests",
54+
required=True,
55+
help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)",
4856
)
4957

5058
def handle(self, *args, **options):
51-
"""Handle the command."""
52-
model_file = self.get_file_path(options["model_file"])
53-
policy_file = self.get_file_path(options["policy_file"])
54-
interactive_mode = options.get("interactive", False)
55-
56-
if not os.path.isfile(model_file):
57-
raise CommandError(f"Model file not found: {model_file}")
58-
if not os.path.isfile(policy_file):
59-
raise CommandError(f"Policy file not found: {policy_file}")
60-
61-
if not interactive_mode:
62-
request_file = self.get_file_path(options["request_file"])
63-
if not os.path.isfile(request_file):
64-
raise CommandError(f"Request file not found: {request_file}")
65-
66-
self.stdout.write(self.style.SUCCESS("=== Casbin Enforcement Testing ==="))
67-
self.stdout.write(f"Model file: {model_file}")
68-
self.stdout.write(f"Policy file: {policy_file}")
69-
if interactive_mode:
70-
self.stdout.write("Mode: Interactive")
71-
else:
72-
request_file = self.get_file_path(options["request_file"])
73-
self.stdout.write(f"Request file: {request_file}")
59+
"""Execute the enforcement testing command.
60+
61+
Loads the Casbin model and policy files, creates an enforcer instance,
62+
displays configuration summary, and starts the interactive testing mode.
63+
64+
Args:
65+
*args: Positional command arguments (unused).
66+
**options: Command options including 'policy_file_path'.
67+
68+
Raises:
69+
CommandError: If model or policy files are not found or enforcer creation fails.
70+
"""
71+
model_file_path = self.get_file_path("model.conf")
72+
policy_file_path = options["policy_file_path"]
73+
74+
if not os.path.isfile(model_file_path):
75+
raise CommandError(f"Model file not found: {model_file_path}")
76+
if not os.path.isfile(policy_file_path):
77+
raise CommandError(f"Policy file not found: {policy_file_path}")
78+
79+
self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement"))
80+
self.stdout.write(f"Model file path: {model_file_path}")
81+
self.stdout.write(f"Policy file path: {policy_file_path}")
7482
self.stdout.write("")
7583

7684
try:
77-
enforcer = casbin.Enforcer(model_file, policy_file)
78-
self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully"))
85+
enforcer = casbin.Enforcer(model_file_path, policy_file_path)
86+
self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully"))
7987

8088
policies = enforcer.get_policy()
8189
roles = enforcer.get_grouping_policy()
@@ -86,110 +94,80 @@ def handle(self, *args, **options):
8694
self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules")
8795
self.stdout.write("")
8896

89-
if interactive_mode:
90-
self._run_interactive_mode(enforcer)
91-
else:
92-
request_file = self.get_file_path(options["request_file"])
93-
self._process_requests(enforcer, request_file)
97+
self._run_interactive_mode(enforcer)
9498

9599
except Exception as e:
96100
raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e
97101

98102
def get_file_path(self, file_name: str) -> str:
99-
"""Get the file path for the given file name."""
100-
return os.path.join(os.path.dirname(__file__), file_name)
101-
102-
def _process_requests(self, enforcer: casbin.Enforcer, request_file: str) -> None:
103-
"""Process each request in the request file and test enforcement."""
104-
self.stdout.write(self.style.SUCCESS("=== Processing Enforcement Requests ==="))
105-
106-
total_requests = 0
107-
passed_requests = 0
108-
failed_requests = 0
109-
110-
with open(request_file, "r") as file:
111-
for line_num, line in enumerate(file, 1):
112-
line = line.strip()
113-
114-
# Skip empty lines and comments
115-
if not line or line.startswith("#"):
116-
continue
117-
118-
total_requests += 1
119-
120-
try:
121-
# Parse request line: subject, action, object, scope, expected_result
122-
parts = [part.strip() for part in line.split(",")]
123-
if len(parts) != 5:
124-
self.stdout.write(
125-
self.style.ERROR(f"Line {line_num}: Invalid format - expected 5 parts, got {len(parts)}")
126-
)
127-
failed_requests += 1
128-
continue
103+
"""Construct the full file path for a configuration file.
129104
130-
subject, action, obj, scope, expected_str = parts
131-
expected_result = expected_str.lower() == "true"
105+
Args:
106+
file_name (str): The name of the configuration file (e.g., 'model.conf').
132107
133-
actual_result = enforcer.enforce(subject, action, obj, scope)
108+
Returns:
109+
str: The absolute path to the configuration file in the engine/config directory.
110+
"""
111+
return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name)
134112

135-
if actual_result == expected_result:
136-
status = self.style.SUCCESS("✓ PASS")
137-
passed_requests += 1
138-
else:
139-
status = self.style.ERROR("✗ FAIL")
140-
failed_requests += 1
113+
def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
114+
"""Start the interactive enforcement testing shell.
141115
142-
self.stdout.write(
143-
f"{status} Line {line_num:2d}: {subject}, {action}, {obj}, {scope} "
144-
f"-> Expected: {expected_result}, Got: {actual_result}"
145-
)
116+
Provides a continuous loop where users can input enforcement requests
117+
in the format 'subject action object scope' and receive immediate
118+
authorization results with visual feedback.
146119
147-
except (ValueError, IndexError) as e:
148-
self.stdout.write(self.style.ERROR(f"Line {line_num}: Error processing request - {str(e)}"))
149-
failed_requests += 1
120+
Args:
121+
enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing.
150122
151-
self.stdout.write("")
152-
self.stdout.write(self.style.SUCCESS("=== Enforcement Test Summary ==="))
153-
self.stdout.write(f"Total requests: {total_requests}")
154-
self.stdout.write(self.style.SUCCESS(f"Passed: {passed_requests}"))
155-
if failed_requests > 0:
156-
self.stdout.write(self.style.ERROR(f"Failed: {failed_requests}"))
157-
else:
158-
self.stdout.write(f"Failed: {failed_requests}")
159-
160-
success_rate = (passed_requests / total_requests * 100) if total_requests > 0 else 0
161-
self.stdout.write(f"Success rate: {success_rate:.1f}%")
162-
163-
if failed_requests == 0:
164-
self.stdout.write(self.style.SUCCESS("All tests passed!"))
165-
else:
166-
self.stdout.write(self.style.WARNING(f"⚠️ {failed_requests} test(s) failed"))
167-
168-
def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
169-
"""Run interactive mode for testing custom enforcement requests."""
170-
self.stdout.write(self.style.SUCCESS("=== Interactive Mode ==="))
123+
Note:
124+
Exit the interactive mode with Ctrl+C or Ctrl+D.
125+
"""
126+
self.stdout.write(self.style.SUCCESS("Interactive Mode"))
171127
self.stdout.write("Test custom enforcement requests interactively.")
128+
self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.")
129+
self.stdout.write("")
172130
self.stdout.write("Format: subject action object scope")
173131
self.stdout.write("Example: user:alice act:read lib:test-lib org:OpenedX")
174132
self.stdout.write("")
175133

176134
while True:
177135
try:
178136
user_input = input("Enter enforcement test: ").strip()
137+
179138
if not user_input:
180139
continue
140+
141+
if user_input.lower() in ["quit", "exit", "q"]:
142+
break
143+
181144
self._test_interactive_request(enforcer, user_input)
182145
except (KeyboardInterrupt, EOFError):
146+
self.stdout.write(self.style.ERROR("Exiting interactive mode..."))
183147
break
184148

185149
def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None:
186-
"""Test a single enforcement request from interactive input."""
150+
"""Process and test a single enforcement request from user input.
151+
152+
Parses the input string, validates the format, executes the enforcement
153+
check, and displays the result with appropriate styling.
154+
155+
Args:
156+
enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing.
157+
user_input (str): The user's input string in format 'subject action object scope'.
158+
159+
Expected format:
160+
subject: The requesting entity (e.g., 'user:alice')
161+
action: The requested action (e.g., 'act:read')
162+
object: The target resource (e.g., 'lib:test-lib')
163+
scope: The authorization context (e.g., 'org:OpenedX')
164+
"""
187165
try:
188166
parts = [part.strip() for part in user_input.split()]
189167
if len(parts) != 4:
190168
self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 4 parts, got {len(parts)}"))
191-
self.stdout.write(" Format: subject action object scope")
192-
self.stdout.write(" Example: user:alice act:read lib:test-lib org:OpenedX")
169+
self.stdout.write("Format: subject action object scope")
170+
self.stdout.write("Example: user:alice act:read lib:test-lib org:OpenedX")
193171
return
194172

195173
subject, action, obj, scope = parts

0 commit comments

Comments
 (0)