Skip to content

Commit 8298d3e

Browse files
authored
[FC-0099] feat: enhance enforcement command (#106)
1 parent 19fb2f0 commit 8298d3e

3 files changed

Lines changed: 340 additions & 204 deletions

File tree

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Unreleased
1515
**********
1616

1717
* Migrate from using pycodestyle and isort to ruff for code quality checks and formatting.
18+
* Enhance enforcement command with dual operational modes (database and file mode).
1819

1920
0.7.0 - 2025-10-23
2021
******************
Lines changed: 142 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,135 +1,204 @@
11
"""
22
Django management command for interactive Casbin enforcement testing.
33
4-
This command creates a Casbin enforcer using the model.conf configuration and a
5-
user-specified policy file, then provides an interactive mode for testing
6-
authorization enforcement requests.
4+
This command provides an interactive mode for testing authorization enforcement
5+
requests with two operational modes:
6+
7+
1. **Database mode (default)**: Uses AuthzEnforcer with policies from the database
8+
9+
2. **File mode**: Uses a custom Casbin enforcer with policies from files
10+
- Activated when --policy-file-path and --model-file-path are provided
11+
- Reads policies directly from the specified CSV file
712
813
The command supports:
9-
- Loading Casbin model from the built-in model.conf file or a custom file (specified via --model-file-path argument)
10-
- Using custom policy files (specified via --policy-file-path argument)
1114
- Interactive testing with format: subject action scope
1215
- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED)
1316
- Display of loaded policies, role assignments, and action grouping rules
1417
1518
Example usage:
16-
python manage.py enforcement --policy-file-path /path/to/authz.policy
19+
# Use policies from database with default model
20+
python manage.py lms enforcement
1721
18-
python manage.py enforcement --policy-file-path /path/to/authz.policy --model-file-path /path/to/model.conf
22+
# Use custom model and policy files
23+
python manage.py lms enforcement -m /path/to/model.conf -p /path/to/policies.csv
1924
2025
Example test input:
21-
user^alice act^read org^OpenedX
26+
>>> alice view_library_team lib:OpenedX:CSPROB
27+
✓ ALLOWED: alice view_library_team lib:OpenedX:CSPROB
28+
>>> bob manage_library_team lib:DemoX:LIB1
29+
✗ DENIED: bob manage_library_team lib:DemoX:LIB1
2230
"""
2331

2432
import argparse
2533
import os
2634

27-
import casbin
35+
from casbin import Enforcer
36+
from casbin.util.log import disabled_logging
2837
from django.core.management.base import BaseCommand, CommandError
2938

30-
from openedx_authz import ROOT_DIRECTORY
39+
from openedx_authz import api
40+
from openedx_authz.api.data import ActionData, ScopeData, UserData
41+
from openedx_authz.engine.enforcer import AuthzEnforcer
3142

3243

3344
class Command(BaseCommand):
3445
"""
3546
Django management command for interactive Casbin enforcement testing.
3647
37-
This command loads a Casbin model configuration and user-specified policy file
38-
to create an enforcer instance, then provides an interactive shell for testing
39-
authorization requests in real-time with immediate feedback.
48+
This command provides two operational modes for testing authorization:
49+
50+
1. Database mode (default): Uses AuthzEnforcer with policies from the database.
51+
This is the default behavior when no arguments are provided.
52+
53+
2. File mode: Uses a custom Casbin enforcer with policies from files.
54+
Activated when --policy-file-path and/or --model-file-path are provided.
55+
56+
The command provides an interactive shell for testing authorization requests
57+
in real-time with immediate feedback.
4058
"""
4159

4260
help = (
43-
"Interactive mode for testing Casbin enforcement policies using a custom model file and"
44-
"a custom policy file. Provides real-time authorization testing with format: subject action scope. "
45-
"Use --policy-file-path to specify the policy file location. "
46-
"Use --model-file-path to specify the model file location. "
61+
"Interactive mode for testing Casbin enforcement policies. By default, uses "
62+
"AuthzEnforcer with policies from the database. Use --policy-file-path and "
63+
"--model-file-path to test with custom files instead. "
64+
"Format: subject action scope."
4765
)
4866

67+
def __init__(self, *args, **kwargs):
68+
"""Initialize the command with required attributes."""
69+
super().__init__(*args, **kwargs)
70+
self._custom_enforcer = None
71+
4972
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
5073
"""Add command-line arguments to the argument parser.
5174
5275
Args:
5376
parser (argparse.ArgumentParser): The Django argument parser instance to configure.
5477
"""
5578
parser.add_argument(
79+
"-p",
5680
"--policy-file-path",
5781
type=str,
58-
required=True,
59-
help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)",
82+
default=None,
83+
help=(
84+
"Path to the Casbin policy CSV file. When provided, switches to file mode using a "
85+
"custom enforcer instead of the database. Supports CSV format with policies, roles, "
86+
"and action grouping."
87+
),
6088
)
6189
parser.add_argument(
90+
"-m",
6291
"--model-file-path",
6392
type=str,
64-
required=False,
65-
help="Path to the Casbin model file. If not provided, the default model.conf file will be used.",
93+
default=None,
94+
help=(
95+
"Path to the Casbin model configuration file. When provided, switches to file mode "
96+
"using a custom enforcer instead of the database. If not specified in file mode, "
97+
"uses the default model.conf."
98+
),
6699
)
67100

68101
def handle(self, *args, **options):
69102
"""Execute the enforcement testing command.
70103
71-
Loads the Casbin model and policy files, creates an enforcer instance,
72-
displays configuration summary, and starts the interactive testing mode.
104+
Determines the operational mode based on provided arguments and creates the
105+
appropriate enforcer instance, then starts the interactive testing mode.
106+
107+
Operational modes:
108+
- Database mode: Uses AuthzEnforcer with policies from database (default)
109+
- File mode: Uses custom Enforcer with policies from files (when files provided)
73110
74111
Args:
75112
*args: Positional command arguments (unused).
76-
**options: Command options including `policy_file_path` and `model_file_path`.
113+
**options: Command options including ``--policy-file-path`` and ``--model-file-path``.
114+
"""
115+
policy_file_path = options["policy_file_path"]
116+
model_file_path = options["model_file_path"]
117+
118+
use_file_mode = policy_file_path is not None and model_file_path is not None
119+
120+
if use_file_mode:
121+
self._handle_file_mode(policy_file_path, model_file_path)
122+
else:
123+
self._handle_database_mode()
124+
125+
def _handle_database_mode(self) -> None:
126+
"""Handle enforcement testing using AuthzEnforcer with database policies.
127+
128+
Uses the AuthzEnforcer singleton with policies loaded from the database.
129+
This is the default mode when no custom files are provided.
77130
78131
Raises:
79-
CommandError: If model or policy files are not found or enforcer creation fails.
132+
CommandError: If enforcer creation or policy loading fails.
80133
"""
81-
model_file_path = self._get_file_path("model.conf") or options["model_file_path"]
82-
policy_file_path = options["policy_file_path"]
134+
try:
135+
enforcer = AuthzEnforcer.get_enforcer()
136+
enforcer.load_policy()
137+
disabled_logging()
138+
139+
self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement (Database Mode)"))
140+
self.stdout.write("Using AuthzEnforcer with policies from database")
141+
self.stdout.write("")
142+
143+
self._display_loaded_policies(enforcer)
144+
self._run_interactive_mode()
145+
except Exception as e:
146+
raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e
147+
148+
def _handle_file_mode(self, policy_file_path: str, model_file_path: str) -> None:
149+
"""Handle enforcement testing using custom Enforcer with file-based policies.
150+
151+
Creates a custom Casbin Enforcer instance using the specified model and policy files.
152+
This mode is useful for testing policies before loading them into the database.
153+
154+
Args:
155+
policy_file_path (str): Path to the policy CSV file.
156+
model_file_path (str): Path to the model configuration file.
83157
158+
Raises:
159+
CommandError: If required files are not found or enforcer creation fails.
160+
"""
84161
if not os.path.isfile(model_file_path):
85162
raise CommandError(f"Model file not found: {model_file_path}")
86163
if not os.path.isfile(policy_file_path):
87164
raise CommandError(f"Policy file not found: {policy_file_path}")
88165

89-
self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement"))
90-
self.stdout.write(f"Model file path: {model_file_path}")
91-
self.stdout.write(f"Policy file path: {policy_file_path}")
92-
self.stdout.write("")
93-
94166
try:
95-
enforcer = casbin.Enforcer(model_file_path, policy_file_path)
96-
self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully"))
97-
98-
policies = enforcer.get_policy()
99-
roles = enforcer.get_grouping_policy()
100-
action_grouping = enforcer.get_named_grouping_policy("g2")
167+
enforcer = Enforcer(model_file_path, policy_file_path)
101168

102-
self.stdout.write(f"✓ Loaded {len(policies)} policies")
103-
self.stdout.write(f"✓ Loaded {len(roles)} role assignments")
104-
self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules")
169+
self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement (File Mode)"))
170+
self.stdout.write(f"Model file: {model_file_path}")
171+
self.stdout.write(f"Policy file: {policy_file_path}")
105172
self.stdout.write("")
106173

107-
self._run_interactive_mode(enforcer)
108-
174+
self._custom_enforcer = enforcer
175+
self._display_loaded_policies(enforcer)
176+
self._run_interactive_mode()
109177
except Exception as e:
110178
raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e
111179

112-
def _get_file_path(self, file_name: str) -> str:
113-
"""Construct the full file path for a configuration file.
180+
def _display_loaded_policies(self, enforcer: Enforcer) -> None:
181+
"""Display statistics about loaded policies, roles, and action grouping.
114182
115183
Args:
116-
file_name (str): The name of the configuration file (e.g., 'model.conf').
117-
118-
Returns:
119-
str: The absolute path to the configuration file in the engine/config directory.
184+
enforcer (Enforcer): The Casbin enforcer instance with loaded policies.
120185
"""
121-
return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name)
186+
policies = enforcer.get_policy()
187+
roles = enforcer.get_grouping_policy()
188+
action_grouping = enforcer.get_named_grouping_policy("g2")
122189

123-
def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
190+
self.stdout.write(f"✓ Loaded {len(policies)} policies")
191+
self.stdout.write(f"✓ Loaded {len(roles)} role assignments")
192+
self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules")
193+
self.stdout.write("")
194+
195+
def _run_interactive_mode(self) -> None:
124196
"""Start the interactive enforcement testing shell.
125197
126198
Provides a continuous loop where users can input enforcement requests
127199
in the format 'subject action scope' and receive immediate
128200
authorization results with visual feedback.
129201
130-
Args:
131-
enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing.
132-
133202
Note:
134203
Exit the interactive mode with Ctrl+C or Ctrl+D.
135204
"""
@@ -138,7 +207,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
138207
self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.")
139208
self.stdout.write("")
140209
self.stdout.write("Format: subject action scope")
141-
self.stdout.write("Example: user^alice act^read org^OpenedX")
210+
self.stdout.write("Example: alice view_library_team lib:OpenedX:CSPROB")
142211
self.stdout.write("")
143212

144213
while True:
@@ -151,41 +220,50 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
151220
if user_input.lower() in ["quit", "exit", "q"]:
152221
break
153222

154-
self._test_interactive_request(enforcer, user_input)
223+
self._test_interactive_request(user_input)
155224
except (KeyboardInterrupt, EOFError):
156225
self.stdout.write(self.style.ERROR("Exiting interactive mode..."))
157226
break
158227

159-
def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None:
228+
def _test_interactive_request(self, user_input: str) -> None:
160229
"""Process and test a single enforcement request from user input.
161230
162231
Parses the input string, validates the format, executes the enforcement
163232
check, and displays the result with appropriate styling.
164233
165234
Args:
166-
enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing.
167235
user_input (str): The user's input string in format 'subject action scope'.
168236
169237
Expected format:
170-
subject: The requesting entity (e.g., 'user^alice')
171-
action: The requested action (e.g., 'act^read')
172-
scope: The authorization context (e.g., 'org^OpenedX')
238+
subject: The requesting entity (e.g., 'alice')
239+
action: The requested action (e.g., 'view_library_team')
240+
scope: The authorization context (e.g., 'lib:OpenedX:CSPROB')
173241
"""
174242
try:
175243
parts = [part.strip() for part in user_input.split()]
176244
if len(parts) != 3:
177245
self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}"))
178246
self.stdout.write("Format: subject action scope")
179-
self.stdout.write("Example: user^alice act^read org^OpenedX")
247+
self.stdout.write("Example: alice view_library_team lib:OpenedX:CSPROB")
180248
return
181249

182250
subject, action, scope = parts
183-
result = enforcer.enforce(subject, action, scope)
251+
252+
if self._custom_enforcer is not None:
253+
user_data = UserData(external_key=subject)
254+
action_data = ActionData(external_key=action)
255+
scope_data = ScopeData(external_key=scope)
256+
result = self._custom_enforcer.enforce(
257+
user_data.namespaced_key,
258+
action_data.namespaced_key,
259+
scope_data.namespaced_key,
260+
)
261+
else:
262+
result = api.is_user_allowed(subject, action, scope)
184263

185264
if result:
186265
self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}"))
187266
else:
188267
self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}"))
189-
190268
except (ValueError, IndexError, TypeError) as e:
191269
self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}"))

0 commit comments

Comments
 (0)