Skip to content

Commit b882525

Browse files
committed
test: add auto-load policy functionality tests
1 parent e509b6a commit b882525

1 file changed

Lines changed: 101 additions & 1 deletion

File tree

openedx_authz/tests/test_enforcer.py

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
that would be used in production environments.
66
"""
77

8+
import time
9+
810
import casbin
911
from ddt import data as ddt_data
1012
from ddt import ddt
11-
from django.test import TestCase
13+
from django.conf import settings
14+
from django.test import TestCase, TransactionTestCase, override_settings
1215

1316
from openedx_authz.engine.enforcer import AuthzEnforcer
1417
from openedx_authz.engine.filter import Filter
1518
from openedx_authz.engine.utils import migrate_policy_between_enforcers
19+
from openedx_authz.tests.test_utils import make_action_key, make_role_key, make_scope_key, make_user_key
1620

1721

1822
class PolicyLoadingTestSetupMixin(TestCase):
@@ -420,3 +424,99 @@ def test_multi_scope_filtering(self):
420424
total_count = len(global_enforcer.get_policy())
421425

422426
self.assertEqual(total_count, lib_count + course_count + org_count)
427+
428+
429+
class TestAutoLoadPolicy(TransactionTestCase):
430+
"""Test cases for auto-load policy functionality.
431+
432+
Uses TransactionTestCase to avoid database locking issues with SQLite
433+
when testing concurrent access patterns.
434+
"""
435+
436+
def setUp(self):
437+
"""Set up test environment."""
438+
super().setUp()
439+
AuthzEnforcer._enforcer = None # pylint: disable=protected-access
440+
441+
def _seed_database_with_policies(self):
442+
"""Seed the database with policies from the policy file."""
443+
global_enforcer = AuthzEnforcer.get_enforcer()
444+
global_enforcer.clear_policy()
445+
446+
migrate_policy_between_enforcers(
447+
source_enforcer=casbin.Enforcer(
448+
"openedx_authz/engine/config/model.conf",
449+
"openedx_authz/engine/config/authz.policy",
450+
),
451+
target_enforcer=global_enforcer,
452+
)
453+
global_enforcer.clear_policy()
454+
455+
def _wait_for_auto_load(self) -> None:
456+
"""Wait for one auto-load cycle plus a small buffer.
457+
458+
This uses the configured interval plus a buffer to ensure
459+
the auto-load has completed.
460+
"""
461+
interval = settings.CASBIN_AUTO_LOAD_POLICY_INTERVAL
462+
# Add 50% buffer to ensure auto-load completes
463+
time.sleep(interval * 1.5)
464+
465+
@override_settings(CASBIN_AUTO_LOAD_POLICY_INTERVAL=0.5)
466+
def test_auto_load_policy_detects_changes(self):
467+
"""Test that policy changes are automatically detected without manual reload.
468+
469+
This test verifies that the SyncedEnforcer's auto-load functionality
470+
works correctly by:
471+
1. Setting a short auto-load interval (0.5 seconds)
472+
2. Seeding the database with policies
473+
3. Waiting for auto-load to populate the enforcer
474+
4. Adding a new policy via add_policy() (auto-saved to DB)
475+
5. Waiting for auto-load to detect and load the change
476+
6. Adding a role assignment via add_role_for_user_in_domain()
477+
7. Verifying both changes appear without manual reload
478+
479+
Expected result:
480+
- Seeded policies are automatically loaded from database
481+
- New policies added via add_policy() appear after auto-load interval
482+
- Role assignments added via add_role_for_user_in_domain() appear after auto-load interval
483+
- No explicit load_policy() calls are needed
484+
"""
485+
global_enforcer = AuthzEnforcer.get_enforcer()
486+
self._seed_database_with_policies()
487+
488+
# Initial policy count should be 0
489+
initial_policy_count = len(global_enforcer.get_policy())
490+
self.assertEqual(initial_policy_count, 0)
491+
self._wait_for_auto_load()
492+
493+
# After auto-load, the default policies should be loaded
494+
policies_after_auto_load = global_enforcer.get_policy()
495+
self.assertGreater(len(policies_after_auto_load), initial_policy_count)
496+
497+
# Add a new policy
498+
new_policy = [
499+
make_role_key("fake_role"),
500+
make_action_key("fake_action"),
501+
make_scope_key("lib", "*"),
502+
"allow",
503+
]
504+
global_enforcer.add_policy(*new_policy)
505+
self._wait_for_auto_load()
506+
507+
# After auto-load, the new policy should be loaded
508+
policies_after_auto_load = global_enforcer.get_policy()
509+
self.assertIn(new_policy, policies_after_auto_load)
510+
511+
# Add a new role assignment
512+
new_assignment = [
513+
make_user_key("fake_user"),
514+
make_role_key("fake_role"),
515+
make_scope_key("lib", "lib:FakeOrg:FAKELIB"),
516+
]
517+
global_enforcer.add_role_for_user_in_domain(*new_assignment)
518+
self._wait_for_auto_load()
519+
520+
# After auto-load, the new role assignment should be loaded
521+
policies_after_auto_load = global_enforcer.get_grouping_policy()
522+
self.assertIn(new_assignment, policies_after_auto_load)

0 commit comments

Comments
 (0)