Skip to content

Commit 758c65f

Browse files
committed
refactor: introduce declarative permission pattern for authorization
Replace procedural authorization logic with a declarative Permission/Policy pattern. This improves code clarity, maintainability, and testability. Key changes: - Add Permission abstraction and concrete implementations (IsSelf, IsSuperior, etc.) - Add composite permissions (AnyOf, AllOf) for complex authorization rules - Extend AuthorizationService with declarative authorize() method - Refactor ChangePasswordInteractor and GrantAdminInteractor to use new pattern The old procedural methods are retained for backward compatibility. This change demonstrates Clean Architecture principles by improving the application layer abstractions without affecting other layers.
1 parent b678eb0 commit 758c65f

4 files changed

Lines changed: 110 additions & 15 deletions

File tree

src/app/application/commands/change_password.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from dataclasses import dataclass
33

4-
from app.application.common.exceptions.authorization import AuthorizationError
4+
from app.application.common.permissions import AnyOf, IsSelf, IsSuperior
55
from app.application.common.ports.transaction_manager import (
66
TransactionManager,
77
)
@@ -65,16 +65,12 @@ async def __call__(self, request_data: ChangePasswordRequest) -> None:
6565
if user is None:
6666
raise UserNotFoundByUsernameError(username)
6767

68-
try:
69-
self._authorization_service.authorize_for_self(
70-
current_user.id_,
71-
target_id=user.id_,
72-
)
73-
except AuthorizationError:
74-
self._authorization_service.authorize_for_subordinate_role(
75-
current_user.role,
76-
target_role=user.role,
77-
)
68+
# Declarative authorization: can change own or subordinate's password
69+
self._authorization_service.authorize(
70+
current_user,
71+
AnyOf(IsSelf(), IsSuperior()),
72+
target_user=user,
73+
)
7874

7975
self._user_service.change_password(user, password)
8076
await self._transaction_manager.commit()

src/app/application/commands/grant_admin.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from dataclasses import dataclass
33

4+
from app.application.common.permissions import CanManageRole
45
from app.application.common.ports.transaction_manager import (
56
TransactionManager,
67
)
@@ -56,9 +57,11 @@ async def __call__(self, request_data: GrantAdminRequest) -> None:
5657
)
5758

5859
current_user = await self._current_user_service.get_current_user()
59-
self._authorization_service.authorize_for_subordinate_role(
60-
current_user.role,
61-
target_role=UserRole.ADMIN,
60+
61+
# Declarative authorization: only users who can manage ADMIN role
62+
self._authorization_service.authorize(
63+
current_user,
64+
CanManageRole(UserRole.ADMIN),
6265
)
6366

6467
username = Username(request_data.username)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""Permission policies for declarative authorization."""
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Any
5+
6+
from app.application.common.services.authorization import SUBORDINATE_ROLES
7+
from app.domain.entities.user import User
8+
from app.domain.enums.user_role import UserRole
9+
10+
11+
class Permission(ABC):
12+
"""Base class for a permission policy."""
13+
14+
@abstractmethod
15+
def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool:
16+
"""Check if the permission is satisfied for the given user and context."""
17+
...
18+
19+
20+
class IsSelf(Permission):
21+
"""Permission that checks if the user is acting on themselves."""
22+
23+
def is_satisfied_by(
24+
self, current_user: User, *, target_user: User, **kwargs: Any
25+
) -> bool:
26+
"""Check if current user is the same as target user."""
27+
_ = kwargs # Unused but required for interface
28+
return current_user.id_ == target_user.id_
29+
30+
31+
class IsSuperior(Permission):
32+
"""Permission that checks if the user has a superior role over the target."""
33+
34+
def is_satisfied_by(
35+
self, current_user: User, *, target_user: User, **kwargs: Any
36+
) -> bool:
37+
"""Check if current user's role is superior to target user's role."""
38+
_ = kwargs # Unused but required for interface
39+
allowed_roles = SUBORDINATE_ROLES.get(current_user.role, set())
40+
return target_user.role in allowed_roles
41+
42+
43+
class AnyOf(Permission):
44+
"""Composite permission that is satisfied if ANY sub-permission is satisfied."""
45+
46+
def __init__(self, *permissions: Permission) -> None:
47+
"""Initialize with a list of permissions to check."""
48+
self._permissions = permissions
49+
50+
def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool:
51+
"""Check if any of the sub-permissions is satisfied."""
52+
return any(p.is_satisfied_by(current_user, **kwargs) for p in self._permissions)
53+
54+
55+
class AllOf(Permission):
56+
"""Composite permission that is satisfied if ALL sub-permissions are satisfied."""
57+
58+
def __init__(self, *permissions: Permission) -> None:
59+
"""Initialize with a list of permissions to check."""
60+
self._permissions = permissions
61+
62+
def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool:
63+
"""Check if all of the sub-permissions are satisfied."""
64+
return all(p.is_satisfied_by(current_user, **kwargs) for p in self._permissions)
65+
66+
67+
class CanManageRole(Permission):
68+
"""Permission that checks if the user can manage a specific role."""
69+
70+
def __init__(self, target_role: UserRole) -> None:
71+
"""Initialize with the role to manage."""
72+
self._target_role = target_role
73+
74+
def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool:
75+
"""Check if current user can manage the target role."""
76+
_ = kwargs # Unused but required for interface
77+
allowed_roles = SUBORDINATE_ROLES.get(current_user.role, set())
78+
return self._target_role in allowed_roles

src/app/application/common/services/authorization.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from collections.abc import Mapping
2-
from typing import Final
2+
from typing import TYPE_CHECKING, Any, Final
33

44
from app.application.common.constants import AUTHZ_NOT_AUTHORIZED
55
from app.application.common.exceptions.authorization import AuthorizationError
66
from app.domain.enums.user_role import UserRole
77
from app.domain.value_objects.user_id import UserId
88

9+
if TYPE_CHECKING:
10+
from app.application.common.permissions import Permission
11+
from app.domain.entities.user import User
12+
913
SUBORDINATE_ROLES: Final[Mapping[UserRole, set[UserRole]]] = {
1014
UserRole.SUPER_ADMIN: {UserRole.ADMIN, UserRole.USER},
1115
UserRole.ADMIN: {UserRole.USER},
@@ -14,6 +18,20 @@
1418

1519

1620
class AuthorizationService:
21+
def authorize(
22+
self,
23+
current_user: "User",
24+
permission: "Permission",
25+
**kwargs: Any,
26+
) -> None:
27+
"""
28+
Authorize action using declarative permission policy.
29+
30+
:raises AuthorizationError: If permission is not satisfied
31+
"""
32+
if not permission.is_satisfied_by(current_user, **kwargs):
33+
raise AuthorizationError(AUTHZ_NOT_AUTHORIZED)
34+
1735
def authorize_for_self(
1836
self,
1937
current_user_id: UserId,

0 commit comments

Comments
 (0)