forked from openedx/openedx-authz
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenforcer.py
More file actions
201 lines (151 loc) · 7.01 KB
/
enforcer.py
File metadata and controls
201 lines (151 loc) · 7.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
Core authorization enforcer for Open edX AuthZ system.
Provides a Casbin SyncedEnforcer instance with extended adapter for database policy
storage and automatic policy synchronization.
Components:
- Enforcer: Main SyncedEnforcer instance for policy evaluation
- Adapter: ExtendedAdapter for filtered database policy loading
Usage:
from openedx_authz.engine.enforcer import AuthzEnforcer
allowed = enforcer.enforce(user, resource, action)
Requires `CASBIN_MODEL` setting.
"""
import logging
from casbin import SyncedEnforcer
from casbin_adapter.enforcer import initialize_enforcer
from django.conf import settings
from openedx_authz.engine.adapter import ExtendedAdapter
from openedx_authz.engine.matcher import is_admin_or_superuser_check
def libraries_v2_enabled() -> bool:
"""Dummy toggle that is always enabled."""
return True
if getattr(settings, "SERVICE_VARIANT", None) == "cms":
try:
from cms.djangoapps.contentstore.toggles import libraries_v2_enabled
except ImportError:
# If the CMS is not available, use the dummy toggle.
pass
logger = logging.getLogger(__name__)
class AuthzEnforcer:
"""Singleton class to manage the Casbin SyncedEnforcer instance.
Ensures a single enforcer instance is created safely and configured with the
ExtendedAdapter for policy management and automatic synchronization.
There are two main use cases for this class:
1. Directly get the enforcer instance and initialize it if needed::
from openedx_authz.engine.enforcer import AuthzEnforcer
enforcer = AuthzEnforcer.get_enforcer()
allowed = enforcer.enforce(user, resource, action)
2. Instantiate the class to get the singleton enforcer instance::
from openedx_authz.engine.enforcer import AuthzEnforcer
enforcer = AuthzEnforcer()
allowed = enforcer.get_enforcer().enforce(user, resource, action)
Any of the two approaches will yield the same singleton enforcer instance.
"""
_enforcer = None
def __new__(cls):
"""Singleton pattern to ensure a single enforcer instance."""
if cls._enforcer is None:
cls._enforcer = cls._initialize_enforcer()
return cls._enforcer
@classmethod
def configure_enforcer_auto_loading(cls, auto_load_policy_interval: int = None):
"""Enable auto-load policy and auto-save on the enforcer.
This method ensures that the singleton enforcer instance is created
and ready for use.
Returns:
None
"""
if not cls._enforcer.is_auto_loading_running():
cls._enforcer.start_auto_load_policy(auto_load_policy_interval)
@classmethod
def is_auto_save_enabled(cls) -> bool:
"""Check if auto-save is currently enabled on the enforcer.
Returns:
bool: True if auto-save is enabled, False otherwise
"""
if cls._enforcer is None:
return False
return cls._enforcer._e.auto_save # pylint: disable=protected-access
@classmethod
def configure_enforcer_auto_save(cls, auto_save_policy: bool):
"""Configure auto-save on the enforcer.
This method ensures that auto-save is enabled or disabled based on
the auto_save_policy parameter.
Args:
auto_save_policy: True to enable auto-save, False to disable
Returns:
None
"""
if cls.is_auto_save_enabled() != auto_save_policy:
cls._enforcer.enable_auto_save(auto_save_policy)
@classmethod
def deactivate_enforcer(cls):
"""Deactivate the current enforcer instance, if any.
This method stops the auto-load policy thread. It can be used in testing
or when re-initialization of the enforcer is needed. IT DOES NOT
clear the singleton instance to avoid initializing it again unintentionally.
Returns:
None
"""
if cls._enforcer is not None:
try:
cls._enforcer.stop_auto_load_policy()
cls._enforcer.enable_auto_save(False)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error stopping auto-load policy thread: {e}")
@classmethod
def configure_enforcer_auto_save_and_load(cls):
"""Enable auto-load policy and auto-save on the enforcer.
This method ensures that the singleton enforcer instance is configured
for auto-load and auto-save based on settings.
Returns:
None
"""
auto_load_policy_interval = getattr(settings, "CASBIN_AUTO_LOAD_POLICY_INTERVAL", 0)
auto_save_policy = getattr(settings, "CASBIN_AUTO_SAVE_POLICY", True)
if auto_load_policy_interval > 0:
cls.configure_enforcer_auto_loading(auto_load_policy_interval)
else:
logger.warning("CASBIN_AUTO_LOAD_POLICY_INTERVAL is not set or zero; auto-load is disabled.")
cls.configure_enforcer_auto_save(auto_save_policy)
@classmethod
def get_enforcer(cls) -> SyncedEnforcer:
"""Get the enforcer instance, creating it if needed.
Returns:
SyncedEnforcer: The singleton enforcer instance.
"""
if cls._enforcer is None:
cls._enforcer = cls._initialize_enforcer()
# HACK: This code block will only be useful when in Ulmo to deactivate
# the enforcer when the new library experience is disabled. It should be
# removed for the next release cycle.
# When replaced, we will only need to configure the enforcer here. Which
# is in charge of enabling/disabling auto-load and auto-save.
if libraries_v2_enabled():
cls.configure_enforcer_auto_save_and_load()
else:
cls.deactivate_enforcer()
return cls._enforcer
@classmethod
def _initialize_enforcer(cls) -> SyncedEnforcer:
"""
Create and configure the Casbin SyncedEnforcer instance.
This method initializes the SyncedEnforcer with the ExtendedAdapter
for database policy storage and automatic policy synchronization.
It also initializes the enforcer with the specified database alias from settings.
Returns:
SyncedEnforcer: Configured Casbin enforcer with adapter and auto-sync
"""
db_alias = getattr(settings, "CASBIN_DB_ALIAS", "default")
try:
# Initialize the enforcer with the specified database alias to set up the adapter.
# Best to lazy load it when it's first used to ensure the database is ready and avoid
# issues when the app is not fully loaded (e.g., while pulling translations, etc.).
initialize_enforcer(db_alias)
except Exception as e:
logger.error(f"Failed to initialize Casbin enforcer with DB alias '{db_alias}': {e}")
raise
adapter = ExtendedAdapter()
enforcer = SyncedEnforcer(settings.CASBIN_MODEL, adapter)
enforcer.add_function("is_staff_or_superuser", is_admin_or_superuser_check)
return enforcer