11"""
2- Django management command for testing Casbin enforcement policies .
2+ Django management command for interactive Casbin enforcement testing .
33
4- This command creates a Casbin enforcer from model.conf and auth.policy files,
5- then tests enforcement for each request in request.sample.
4+ This command creates a Casbin enforcer using the model.conf configuration and a
5+ user-specified policy file, then provides an interactive mode for testing
6+ authorization enforcement requests.
7+
8+ The command supports:
9+ - Loading Casbin model from the built-in model.conf file
10+ - Using custom policy files (specified via --policy-file-path argument)
11+ - Interactive testing with format: subject action object scope
12+ - Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED)
13+ - Display of loaded policies, role assignments, and action grouping rules
14+
15+ Example usage:
16+ python manage.py lms enforcement --policy-file-path /path/to/authz.policy
17+
18+ Example test input:
19+ user:alice act:read lib:test-lib org:OpenedX
620"""
721
822import os
923
1024import casbin
1125from django .core .management .base import BaseCommand , CommandError
1226
27+ from openedx_authz import ROOT_DIRECTORY
28+
1329
1430class Command (BaseCommand ):
1531 """
16- Test Casbin enforcement policies using model.conf, auth.policy, and request.sample
32+ Django management command for interactive Casbin enforcement testing.
33+
34+ This command loads a Casbin model configuration and user-specified policy file
35+ to create an enforcer instance, then provides an interactive shell for testing
36+ authorization requests in real-time with immediate feedback.
1737 """
1838
1939 help = (
20- "Test Casbin enforcement policies using model.conf, auth.policy, and request.sample. "
21- "Supports interactive mode for custom testing."
40+ "Interactive mode for testing Casbin enforcement policies using model.conf and a custom policy file. "
41+ "Provides real-time authorization testing with format: subject action object scope. "
42+ "Use --policy-file-path to specify the policy file location."
2243 )
2344
2445 def add_arguments (self , parser ) -> None :
25- """Add the arguments to the parser."""
26- parser .add_argument (
27- "--model-file" ,
28- type = str ,
29- default = "model.conf" ,
30- help = "Path to the Casbin model configuration file (default: model.conf)" ,
31- )
32- parser .add_argument (
33- "--policy-file" ,
34- type = str ,
35- default = "authz.policy" ,
36- help = "Path to the policy CSV file (default: auth.policy)" ,
37- )
46+ """Add command-line arguments to the argument parser.
47+
48+ Args:
49+ parser: The Django argument parser instance to configure.
50+ """
3851 parser .add_argument (
39- "--request -file" ,
52+ "--policy -file-path " ,
4053 type = str ,
41- default = "request.sample" ,
42- help = "Path to the request test file (default: request.sample)" ,
43- )
44- parser .add_argument (
45- "--interactive" ,
46- action = "store_true" ,
47- help = "Run in interactive mode for enforcement requests" ,
54+ required = True ,
55+ help = "Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)" ,
4856 )
4957
5058 def handle (self , * args , ** options ):
51- """Handle the command."""
52- model_file = self . get_file_path ( options [ "model_file" ])
53- policy_file = self . get_file_path ( options [ "policy_file" ])
54- interactive_mode = options . get ( "interactive" , False )
55-
56- if not os . path . isfile ( model_file ) :
57- raise CommandError ( f"Model file not found: { model_file } " )
58- if not os . path . isfile ( policy_file ):
59- raise CommandError ( f"Policy file not found: { policy_file } " )
60-
61- if not interactive_mode :
62- request_file = self . get_file_path ( options [ "request_file" ])
63- if not os . path . isfile ( request_file ):
64- raise CommandError ( f"Request file not found: { request_file } " )
65-
66- self . stdout . write ( self . style . SUCCESS ( "=== Casbin Enforcement Testing ===" ))
67- self . stdout . write (f"Model file: { model_file } " )
68- self . stdout . write ( f"Policy file: { policy_file } " )
69- if interactive_mode :
70- self . stdout . write ( "Mode: Interactive" )
71- else :
72- request_file = self .get_file_path ( options [ "request_file" ] )
73- self .stdout .write (f"Request file: { request_file } " )
59+ """Execute the enforcement testing command.
60+
61+ Loads the Casbin model and policy files, creates an enforcer instance,
62+ displays configuration summary, and starts the interactive testing mode.
63+
64+ Args :
65+ *args: Positional command arguments (unused).
66+ **options: Command options including 'policy_file_path'.
67+
68+ Raises:
69+ CommandError: If model or policy files are not found or enforcer creation fails.
70+ """
71+ model_file_path = self . get_file_path ( "model.conf" )
72+ policy_file_path = options [ "policy_file_path" ]
73+
74+ if not os . path . isfile ( model_file_path ):
75+ raise CommandError (f"Model file not found : { model_file_path } " )
76+ if not os . path . isfile ( policy_file_path ):
77+ raise CommandError ( f"Policy file not found: { policy_file_path } " )
78+
79+ self . stdout . write ( self . style . SUCCESS ( "Casbin Interactive Enforcement" ))
80+ self .stdout . write ( f"Model file path: { model_file_path } " )
81+ self .stdout .write (f"Policy file path : { policy_file_path } " )
7482 self .stdout .write ("" )
7583
7684 try :
77- enforcer = casbin .Enforcer (model_file , policy_file )
78- self .stdout .write (self .style .SUCCESS ("✓ Casbin enforcer created successfully" ))
85+ enforcer = casbin .Enforcer (model_file_path , policy_file_path )
86+ self .stdout .write (self .style .SUCCESS ("Casbin enforcer created successfully" ))
7987
8088 policies = enforcer .get_policy ()
8189 roles = enforcer .get_grouping_policy ()
@@ -86,110 +94,80 @@ def handle(self, *args, **options):
8694 self .stdout .write (f"✓ Loaded { len (action_grouping )} action grouping rules" )
8795 self .stdout .write ("" )
8896
89- if interactive_mode :
90- self ._run_interactive_mode (enforcer )
91- else :
92- request_file = self .get_file_path (options ["request_file" ])
93- self ._process_requests (enforcer , request_file )
97+ self ._run_interactive_mode (enforcer )
9498
9599 except Exception as e :
96100 raise CommandError (f"Error creating Casbin enforcer: { str (e )} " ) from e
97101
98102 def get_file_path (self , file_name : str ) -> str :
99- """Get the file path for the given file name."""
100- return os .path .join (os .path .dirname (__file__ ), file_name )
101-
102- def _process_requests (self , enforcer : casbin .Enforcer , request_file : str ) -> None :
103- """Process each request in the request file and test enforcement."""
104- self .stdout .write (self .style .SUCCESS ("=== Processing Enforcement Requests ===" ))
105-
106- total_requests = 0
107- passed_requests = 0
108- failed_requests = 0
109-
110- with open (request_file , "r" ) as file :
111- for line_num , line in enumerate (file , 1 ):
112- line = line .strip ()
113-
114- # Skip empty lines and comments
115- if not line or line .startswith ("#" ):
116- continue
117-
118- total_requests += 1
119-
120- try :
121- # Parse request line: subject, action, object, scope, expected_result
122- parts = [part .strip () for part in line .split ("," )]
123- if len (parts ) != 5 :
124- self .stdout .write (
125- self .style .ERROR (f"Line { line_num } : Invalid format - expected 5 parts, got { len (parts )} " )
126- )
127- failed_requests += 1
128- continue
103+ """Construct the full file path for a configuration file.
129104
130- subject , action , obj , scope , expected_str = parts
131- expected_result = expected_str . lower () == "true"
105+ Args:
106+ file_name (str): The name of the configuration file (e.g., 'model.conf').
132107
133- actual_result = enforcer .enforce (subject , action , obj , scope )
108+ Returns:
109+ str: The absolute path to the configuration file in the engine/config directory.
110+ """
111+ return os .path .join (ROOT_DIRECTORY , "engine" , "config" , file_name )
134112
135- if actual_result == expected_result :
136- status = self .style .SUCCESS ("✓ PASS" )
137- passed_requests += 1
138- else :
139- status = self .style .ERROR ("✗ FAIL" )
140- failed_requests += 1
113+ def _run_interactive_mode (self , enforcer : casbin .Enforcer ) -> None :
114+ """Start the interactive enforcement testing shell.
141115
142- self .stdout .write (
143- f"{ status } Line { line_num :2d} : { subject } , { action } , { obj } , { scope } "
144- f"-> Expected: { expected_result } , Got: { actual_result } "
145- )
116+ Provides a continuous loop where users can input enforcement requests
117+ in the format 'subject action object scope' and receive immediate
118+ authorization results with visual feedback.
146119
147- except (ValueError , IndexError ) as e :
148- self .stdout .write (self .style .ERROR (f"Line { line_num } : Error processing request - { str (e )} " ))
149- failed_requests += 1
120+ Args:
121+ enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing.
150122
151- self .stdout .write ("" )
152- self .stdout .write (self .style .SUCCESS ("=== Enforcement Test Summary ===" ))
153- self .stdout .write (f"Total requests: { total_requests } " )
154- self .stdout .write (self .style .SUCCESS (f"Passed: { passed_requests } " ))
155- if failed_requests > 0 :
156- self .stdout .write (self .style .ERROR (f"Failed: { failed_requests } " ))
157- else :
158- self .stdout .write (f"Failed: { failed_requests } " )
159-
160- success_rate = (passed_requests / total_requests * 100 ) if total_requests > 0 else 0
161- self .stdout .write (f"Success rate: { success_rate :.1f} %" )
162-
163- if failed_requests == 0 :
164- self .stdout .write (self .style .SUCCESS ("All tests passed!" ))
165- else :
166- self .stdout .write (self .style .WARNING (f"⚠️ { failed_requests } test(s) failed" ))
167-
168- def _run_interactive_mode (self , enforcer : casbin .Enforcer ) -> None :
169- """Run interactive mode for testing custom enforcement requests."""
170- self .stdout .write (self .style .SUCCESS ("=== Interactive Mode ===" ))
123+ Note:
124+ Exit the interactive mode with Ctrl+C or Ctrl+D.
125+ """
126+ self .stdout .write (self .style .SUCCESS ("Interactive Mode" ))
171127 self .stdout .write ("Test custom enforcement requests interactively." )
128+ self .stdout .write ("Enter 'quit', 'exit', or 'q' to exit the interactive mode." )
129+ self .stdout .write ("" )
172130 self .stdout .write ("Format: subject action object scope" )
173131 self .stdout .write ("Example: user:alice act:read lib:test-lib org:OpenedX" )
174132 self .stdout .write ("" )
175133
176134 while True :
177135 try :
178136 user_input = input ("Enter enforcement test: " ).strip ()
137+
179138 if not user_input :
180139 continue
140+
141+ if user_input .lower () in ["quit" , "exit" , "q" ]:
142+ break
143+
181144 self ._test_interactive_request (enforcer , user_input )
182145 except (KeyboardInterrupt , EOFError ):
146+ self .stdout .write (self .style .ERROR ("Exiting interactive mode..." ))
183147 break
184148
185149 def _test_interactive_request (self , enforcer : casbin .Enforcer , user_input : str ) -> None :
186- """Test a single enforcement request from interactive input."""
150+ """Process and test a single enforcement request from user input.
151+
152+ Parses the input string, validates the format, executes the enforcement
153+ check, and displays the result with appropriate styling.
154+
155+ Args:
156+ enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing.
157+ user_input (str): The user's input string in format 'subject action object scope'.
158+
159+ Expected format:
160+ subject: The requesting entity (e.g., 'user:alice')
161+ action: The requested action (e.g., 'act:read')
162+ object: The target resource (e.g., 'lib:test-lib')
163+ scope: The authorization context (e.g., 'org:OpenedX')
164+ """
187165 try :
188166 parts = [part .strip () for part in user_input .split ()]
189167 if len (parts ) != 4 :
190168 self .stdout .write (self .style .ERROR (f"✗ Invalid format. Expected 4 parts, got { len (parts )} " ))
191- self .stdout .write (" Format: subject action object scope" )
192- self .stdout .write (" Example: user:alice act:read lib:test-lib org:OpenedX" )
169+ self .stdout .write ("Format: subject action object scope" )
170+ self .stdout .write ("Example: user:alice act:read lib:test-lib org:OpenedX" )
193171 return
194172
195173 subject , action , obj , scope = parts
0 commit comments