11"""
22Django management command for interactive Casbin enforcement testing.
33
4- This command creates a Casbin enforcer using the model.conf configuration and a
5- user-specified policy file, then provides an interactive mode for testing
6- authorization enforcement requests.
4+ This command provides an interactive mode for testing authorization enforcement
5+ requests with two operational modes:
6+
7+ 1. **Database mode (default)**: Uses AuthzEnforcer with policies from the database
8+
9+ 2. **File mode**: Uses a custom Casbin enforcer with policies from files
10+ - Activated when --policy-file-path and --model-file-path are provided
11+ - Reads policies directly from the specified CSV file
712
813The command supports:
9- - Loading Casbin model from the built-in model.conf file or a custom file (specified via --model-file-path argument)
10- - Using custom policy files (specified via --policy-file-path argument)
1114- Interactive testing with format: subject action scope
1215- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED)
1316- Display of loaded policies, role assignments, and action grouping rules
1417
1518Example usage:
16- python manage.py enforcement --policy-file-path /path/to/authz.policy
19+ # Use policies from database with default model
20+ python manage.py lms enforcement
1721
18- python manage.py enforcement --policy-file-path /path/to/authz.policy --model-file-path /path/to/model.conf
22+ # Use custom model and policy files
23+ python manage.py lms enforcement -m /path/to/model.conf -p /path/to/policies.csv
1924
2025Example test input:
21- user^alice act^read org^OpenedX
26+ >>> alice view_library_team lib:OpenedX:CSPROB
27+ ✓ ALLOWED: alice view_library_team lib:OpenedX:CSPROB
28+ >>> bob manage_library_team lib:DemoX:LIB1
29+ ✗ DENIED: bob manage_library_team lib:DemoX:LIB1
2230"""
2331
2432import argparse
2533import os
2634
27- import casbin
35+ from casbin import Enforcer
36+ from casbin .util .log import disabled_logging
2837from django .core .management .base import BaseCommand , CommandError
2938
30- from openedx_authz import ROOT_DIRECTORY
39+ from openedx_authz import api
40+ from openedx_authz .api .data import ActionData , ScopeData , UserData
41+ from openedx_authz .engine .enforcer import AuthzEnforcer
3142
3243
3344class Command (BaseCommand ):
3445 """
3546 Django management command for interactive Casbin enforcement testing.
3647
37- This command loads a Casbin model configuration and user-specified policy file
38- to create an enforcer instance, then provides an interactive shell for testing
39- authorization requests in real-time with immediate feedback.
48+ This command provides two operational modes for testing authorization:
49+
50+ 1. Database mode (default): Uses AuthzEnforcer with policies from the database.
51+ This is the default behavior when no arguments are provided.
52+
53+ 2. File mode: Uses a custom Casbin enforcer with policies from files.
54+ Activated when --policy-file-path and/or --model-file-path are provided.
55+
56+ The command provides an interactive shell for testing authorization requests
57+ in real-time with immediate feedback.
4058 """
4159
4260 help = (
43- "Interactive mode for testing Casbin enforcement policies using a custom model file and "
44- "a custom policy file. Provides real-time authorization testing with format: subject action scope. "
45- "Use --policy -file-path to specify the policy file location . "
46- "Use --model-file-path to specify the model file location. "
61+ "Interactive mode for testing Casbin enforcement policies. By default, uses "
62+ "AuthzEnforcer with policies from the database. Use --policy-file-path and "
63+ "--model -file-path to test with custom files instead . "
64+ "Format: subject action scope. "
4765 )
4866
67+ def __init__ (self , * args , ** kwargs ):
68+ """Initialize the command with required attributes."""
69+ super ().__init__ (* args , ** kwargs )
70+ self ._custom_enforcer = None
71+
4972 def add_arguments (self , parser : argparse .ArgumentParser ) -> None :
5073 """Add command-line arguments to the argument parser.
5174
5275 Args:
5376 parser (argparse.ArgumentParser): The Django argument parser instance to configure.
5477 """
5578 parser .add_argument (
79+ "-p" ,
5680 "--policy-file-path" ,
5781 type = str ,
58- required = True ,
59- help = "Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)" ,
82+ default = None ,
83+ help = (
84+ "Path to the Casbin policy CSV file. When provided, switches to file mode using a "
85+ "custom enforcer instead of the database. Supports CSV format with policies, roles, "
86+ "and action grouping."
87+ ),
6088 )
6189 parser .add_argument (
90+ "-m" ,
6291 "--model-file-path" ,
6392 type = str ,
64- required = False ,
65- help = "Path to the Casbin model file. If not provided, the default model.conf file will be used." ,
93+ default = None ,
94+ help = (
95+ "Path to the Casbin model configuration file. When provided, switches to file mode "
96+ "using a custom enforcer instead of the database. If not specified in file mode, "
97+ "uses the default model.conf."
98+ ),
6699 )
67100
68101 def handle (self , * args , ** options ):
69102 """Execute the enforcement testing command.
70103
71- Loads the Casbin model and policy files, creates an enforcer instance,
72- displays configuration summary, and starts the interactive testing mode.
104+ Determines the operational mode based on provided arguments and creates the
105+ appropriate enforcer instance, then starts the interactive testing mode.
106+
107+ Operational modes:
108+ - Database mode: Uses AuthzEnforcer with policies from database (default)
109+ - File mode: Uses custom Enforcer with policies from files (when files provided)
73110
74111 Args:
75112 *args: Positional command arguments (unused).
76- **options: Command options including `policy_file_path` and `model_file_path`.
113+ **options: Command options including ``--policy-file-path`` and ``--model-file-path``.
114+ """
115+ policy_file_path = options ["policy_file_path" ]
116+ model_file_path = options ["model_file_path" ]
117+
118+ use_file_mode = policy_file_path is not None and model_file_path is not None
119+
120+ if use_file_mode :
121+ self ._handle_file_mode (policy_file_path , model_file_path )
122+ else :
123+ self ._handle_database_mode ()
124+
125+ def _handle_database_mode (self ) -> None :
126+ """Handle enforcement testing using AuthzEnforcer with database policies.
127+
128+ Uses the AuthzEnforcer singleton with policies loaded from the database.
129+ This is the default mode when no custom files are provided.
77130
78131 Raises:
79- CommandError: If model or policy files are not found or enforcer creation fails.
132+ CommandError: If enforcer creation or policy loading fails.
80133 """
81- model_file_path = self ._get_file_path ("model.conf" ) or options ["model_file_path" ]
82- policy_file_path = options ["policy_file_path" ]
134+ try :
135+ enforcer = AuthzEnforcer .get_enforcer ()
136+ enforcer .load_policy ()
137+ disabled_logging ()
138+
139+ self .stdout .write (self .style .SUCCESS ("Casbin Interactive Enforcement (Database Mode)" ))
140+ self .stdout .write ("Using AuthzEnforcer with policies from database" )
141+ self .stdout .write ("" )
142+
143+ self ._display_loaded_policies (enforcer )
144+ self ._run_interactive_mode ()
145+ except Exception as e :
146+ raise CommandError (f"Error creating Casbin enforcer: { str (e )} " ) from e
147+
148+ def _handle_file_mode (self , policy_file_path : str , model_file_path : str ) -> None :
149+ """Handle enforcement testing using custom Enforcer with file-based policies.
150+
151+ Creates a custom Casbin Enforcer instance using the specified model and policy files.
152+ This mode is useful for testing policies before loading them into the database.
153+
154+ Args:
155+ policy_file_path (str): Path to the policy CSV file.
156+ model_file_path (str): Path to the model configuration file.
83157
158+ Raises:
159+ CommandError: If required files are not found or enforcer creation fails.
160+ """
84161 if not os .path .isfile (model_file_path ):
85162 raise CommandError (f"Model file not found: { model_file_path } " )
86163 if not os .path .isfile (policy_file_path ):
87164 raise CommandError (f"Policy file not found: { policy_file_path } " )
88165
89- self .stdout .write (self .style .SUCCESS ("Casbin Interactive Enforcement" ))
90- self .stdout .write (f"Model file path: { model_file_path } " )
91- self .stdout .write (f"Policy file path: { policy_file_path } " )
92- self .stdout .write ("" )
93-
94166 try :
95- enforcer = casbin .Enforcer (model_file_path , policy_file_path )
96- self .stdout .write (self .style .SUCCESS ("Casbin enforcer created successfully" ))
97-
98- policies = enforcer .get_policy ()
99- roles = enforcer .get_grouping_policy ()
100- action_grouping = enforcer .get_named_grouping_policy ("g2" )
167+ enforcer = Enforcer (model_file_path , policy_file_path )
101168
102- self .stdout .write (f"✓ Loaded { len ( policies ) } policies" )
103- self .stdout .write (f"✓ Loaded { len ( roles ) } role assignments " )
104- self .stdout .write (f"✓ Loaded { len ( action_grouping ) } action grouping rules " )
169+ self .stdout .write (self . style . SUCCESS ( "Casbin Interactive Enforcement (File Mode)" ) )
170+ self .stdout .write (f"Model file: { model_file_path } " )
171+ self .stdout .write (f"Policy file: { policy_file_path } " )
105172 self .stdout .write ("" )
106173
107- self ._run_interactive_mode (enforcer )
108-
174+ self ._custom_enforcer = enforcer
175+ self ._display_loaded_policies (enforcer )
176+ self ._run_interactive_mode ()
109177 except Exception as e :
110178 raise CommandError (f"Error creating Casbin enforcer: { str (e )} " ) from e
111179
112- def _get_file_path (self , file_name : str ) -> str :
113- """Construct the full file path for a configuration file .
180+ def _display_loaded_policies (self , enforcer : Enforcer ) -> None :
181+ """Display statistics about loaded policies, roles, and action grouping .
114182
115183 Args:
116- file_name (str): The name of the configuration file (e.g., 'model.conf').
117-
118- Returns:
119- str: The absolute path to the configuration file in the engine/config directory.
184+ enforcer (Enforcer): The Casbin enforcer instance with loaded policies.
120185 """
121- return os .path .join (ROOT_DIRECTORY , "engine" , "config" , file_name )
186+ policies = enforcer .get_policy ()
187+ roles = enforcer .get_grouping_policy ()
188+ action_grouping = enforcer .get_named_grouping_policy ("g2" )
122189
123- def _run_interactive_mode (self , enforcer : casbin .Enforcer ) -> None :
190+ self .stdout .write (f"✓ Loaded { len (policies )} policies" )
191+ self .stdout .write (f"✓ Loaded { len (roles )} role assignments" )
192+ self .stdout .write (f"✓ Loaded { len (action_grouping )} action grouping rules" )
193+ self .stdout .write ("" )
194+
195+ def _run_interactive_mode (self ) -> None :
124196 """Start the interactive enforcement testing shell.
125197
126198 Provides a continuous loop where users can input enforcement requests
127199 in the format 'subject action scope' and receive immediate
128200 authorization results with visual feedback.
129201
130- Args:
131- enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing.
132-
133202 Note:
134203 Exit the interactive mode with Ctrl+C or Ctrl+D.
135204 """
@@ -138,7 +207,7 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
138207 self .stdout .write ("Enter 'quit', 'exit', or 'q' to exit the interactive mode." )
139208 self .stdout .write ("" )
140209 self .stdout .write ("Format: subject action scope" )
141- self .stdout .write ("Example: user^ alice act^read org^ OpenedX" )
210+ self .stdout .write ("Example: alice view_library_team lib: OpenedX:CSPROB " )
142211 self .stdout .write ("" )
143212
144213 while True :
@@ -151,41 +220,50 @@ def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
151220 if user_input .lower () in ["quit" , "exit" , "q" ]:
152221 break
153222
154- self ._test_interactive_request (enforcer , user_input )
223+ self ._test_interactive_request (user_input )
155224 except (KeyboardInterrupt , EOFError ):
156225 self .stdout .write (self .style .ERROR ("Exiting interactive mode..." ))
157226 break
158227
159- def _test_interactive_request (self , enforcer : casbin . Enforcer , user_input : str ) -> None :
228+ def _test_interactive_request (self , user_input : str ) -> None :
160229 """Process and test a single enforcement request from user input.
161230
162231 Parses the input string, validates the format, executes the enforcement
163232 check, and displays the result with appropriate styling.
164233
165234 Args:
166- enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing.
167235 user_input (str): The user's input string in format 'subject action scope'.
168236
169237 Expected format:
170- subject: The requesting entity (e.g., 'user^ alice')
171- action: The requested action (e.g., 'act^read ')
172- scope: The authorization context (e.g., 'org^ OpenedX')
238+ subject: The requesting entity (e.g., 'alice')
239+ action: The requested action (e.g., 'view_library_team ')
240+ scope: The authorization context (e.g., 'lib: OpenedX:CSPROB ')
173241 """
174242 try :
175243 parts = [part .strip () for part in user_input .split ()]
176244 if len (parts ) != 3 :
177245 self .stdout .write (self .style .ERROR (f"✗ Invalid format. Expected 3 parts, got { len (parts )} " ))
178246 self .stdout .write ("Format: subject action scope" )
179- self .stdout .write ("Example: user^ alice act^read org^ OpenedX" )
247+ self .stdout .write ("Example: alice view_library_team lib: OpenedX:CSPROB " )
180248 return
181249
182250 subject , action , scope = parts
183- result = enforcer .enforce (subject , action , scope )
251+
252+ if self ._custom_enforcer is not None :
253+ user_data = UserData (external_key = subject )
254+ action_data = ActionData (external_key = action )
255+ scope_data = ScopeData (external_key = scope )
256+ result = self ._custom_enforcer .enforce (
257+ user_data .namespaced_key ,
258+ action_data .namespaced_key ,
259+ scope_data .namespaced_key ,
260+ )
261+ else :
262+ result = api .is_user_allowed (subject , action , scope )
184263
185264 if result :
186265 self .stdout .write (self .style .SUCCESS (f"✓ ALLOWED: { subject } { action } { scope } " ))
187266 else :
188267 self .stdout .write (self .style .ERROR (f"✗ DENIED: { subject } { action } { scope } " ))
189-
190268 except (ValueError , IndexError , TypeError ) as e :
191269 self .stdout .write (self .style .ERROR (f"✗ Error processing request: { str (e )} " ))
0 commit comments