From 758c65f5a5fd593d6e65147166d0f0faf89a63ec Mon Sep 17 00:00:00 2001 From: ppravdin Date: Tue, 15 Jul 2025 15:58:06 +0200 Subject: [PATCH 1/2] refactor: introduce declarative permission pattern for authorization Replace procedural authorization logic with a declarative Permission/Policy pattern. This improves code clarity, maintainability, and testability. Key changes: - Add Permission abstraction and concrete implementations (IsSelf, IsSuperior, etc.) - Add composite permissions (AnyOf, AllOf) for complex authorization rules - Extend AuthorizationService with declarative authorize() method - Refactor ChangePasswordInteractor and GrantAdminInteractor to use new pattern The old procedural methods are retained for backward compatibility. This change demonstrates Clean Architecture principles by improving the application layer abstractions without affecting other layers. --- .../application/commands/change_password.py | 18 ++--- src/app/application/commands/grant_admin.py | 9 ++- src/app/application/common/permissions.py | 78 +++++++++++++++++++ .../common/services/authorization.py | 20 ++++- 4 files changed, 110 insertions(+), 15 deletions(-) create mode 100644 src/app/application/common/permissions.py diff --git a/src/app/application/commands/change_password.py b/src/app/application/commands/change_password.py index c8a29e4..125bb86 100644 --- a/src/app/application/commands/change_password.py +++ b/src/app/application/commands/change_password.py @@ -1,7 +1,7 @@ import logging from dataclasses import dataclass -from app.application.common.exceptions.authorization import AuthorizationError +from app.application.common.permissions import AnyOf, IsSelf, IsSuperior from app.application.common.ports.transaction_manager import ( TransactionManager, ) @@ -65,16 +65,12 @@ async def __call__(self, request_data: ChangePasswordRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - try: - self._authorization_service.authorize_for_self( - current_user.id_, - target_id=user.id_, - ) - except AuthorizationError: - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=user.role, - ) + # Declarative authorization: can change own or subordinate's password + self._authorization_service.authorize( + current_user, + AnyOf(IsSelf(), IsSuperior()), + target_user=user, + ) self._user_service.change_password(user, password) await self._transaction_manager.commit() diff --git a/src/app/application/commands/grant_admin.py b/src/app/application/commands/grant_admin.py index 5eee7e7..9c52903 100644 --- a/src/app/application/commands/grant_admin.py +++ b/src/app/application/commands/grant_admin.py @@ -1,6 +1,7 @@ import logging from dataclasses import dataclass +from app.application.common.permissions import CanManageRole from app.application.common.ports.transaction_manager import ( TransactionManager, ) @@ -56,9 +57,11 @@ async def __call__(self, request_data: GrantAdminRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.ADMIN, + + # Declarative authorization: only users who can manage ADMIN role + self._authorization_service.authorize( + current_user, + CanManageRole(UserRole.ADMIN), ) username = Username(request_data.username) diff --git a/src/app/application/common/permissions.py b/src/app/application/common/permissions.py new file mode 100644 index 0000000..b02503c --- /dev/null +++ b/src/app/application/common/permissions.py @@ -0,0 +1,78 @@ +"""Permission policies for declarative authorization.""" + +from abc import ABC, abstractmethod +from typing import Any + +from app.application.common.services.authorization import SUBORDINATE_ROLES +from app.domain.entities.user import User +from app.domain.enums.user_role import UserRole + + +class Permission(ABC): + """Base class for a permission policy.""" + + @abstractmethod + def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: + """Check if the permission is satisfied for the given user and context.""" + ... + + +class IsSelf(Permission): + """Permission that checks if the user is acting on themselves.""" + + def is_satisfied_by( + self, current_user: User, *, target_user: User, **kwargs: Any + ) -> bool: + """Check if current user is the same as target user.""" + _ = kwargs # Unused but required for interface + return current_user.id_ == target_user.id_ + + +class IsSuperior(Permission): + """Permission that checks if the user has a superior role over the target.""" + + def is_satisfied_by( + self, current_user: User, *, target_user: User, **kwargs: Any + ) -> bool: + """Check if current user's role is superior to target user's role.""" + _ = kwargs # Unused but required for interface + allowed_roles = SUBORDINATE_ROLES.get(current_user.role, set()) + return target_user.role in allowed_roles + + +class AnyOf(Permission): + """Composite permission that is satisfied if ANY sub-permission is satisfied.""" + + def __init__(self, *permissions: Permission) -> None: + """Initialize with a list of permissions to check.""" + self._permissions = permissions + + def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: + """Check if any of the sub-permissions is satisfied.""" + return any(p.is_satisfied_by(current_user, **kwargs) for p in self._permissions) + + +class AllOf(Permission): + """Composite permission that is satisfied if ALL sub-permissions are satisfied.""" + + def __init__(self, *permissions: Permission) -> None: + """Initialize with a list of permissions to check.""" + self._permissions = permissions + + def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: + """Check if all of the sub-permissions are satisfied.""" + return all(p.is_satisfied_by(current_user, **kwargs) for p in self._permissions) + + +class CanManageRole(Permission): + """Permission that checks if the user can manage a specific role.""" + + def __init__(self, target_role: UserRole) -> None: + """Initialize with the role to manage.""" + self._target_role = target_role + + def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: + """Check if current user can manage the target role.""" + _ = kwargs # Unused but required for interface + allowed_roles = SUBORDINATE_ROLES.get(current_user.role, set()) + return self._target_role in allowed_roles diff --git a/src/app/application/common/services/authorization.py b/src/app/application/common/services/authorization.py index 9e80e1c..f17f0ff 100644 --- a/src/app/application/common/services/authorization.py +++ b/src/app/application/common/services/authorization.py @@ -1,11 +1,15 @@ from collections.abc import Mapping -from typing import Final +from typing import TYPE_CHECKING, Any, Final from app.application.common.constants import AUTHZ_NOT_AUTHORIZED from app.application.common.exceptions.authorization import AuthorizationError from app.domain.enums.user_role import UserRole from app.domain.value_objects.user_id import UserId +if TYPE_CHECKING: + from app.application.common.permissions import Permission + from app.domain.entities.user import User + SUBORDINATE_ROLES: Final[Mapping[UserRole, set[UserRole]]] = { UserRole.SUPER_ADMIN: {UserRole.ADMIN, UserRole.USER}, UserRole.ADMIN: {UserRole.USER}, @@ -14,6 +18,20 @@ class AuthorizationService: + def authorize( + self, + current_user: "User", + permission: "Permission", + **kwargs: Any, + ) -> None: + """ + Authorize action using declarative permission policy. + + :raises AuthorizationError: If permission is not satisfied + """ + if not permission.is_satisfied_by(current_user, **kwargs): + raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) + def authorize_for_self( self, current_user_id: UserId, From 71d437c2b118dab6dd49e88a764e3de077c2bcaa Mon Sep 17 00:00:00 2001 From: ivan-borovets <130386813+ivan-borovets@users.noreply.github.com> Date: Fri, 18 Jul 2025 23:45:00 +0500 Subject: [PATCH 2/2] Migrate to declarative authz --- pyproject.toml | 4 +- .../application/commands/change_password.py | 27 +++-- src/app/application/commands/create_user.py | 20 +++- src/app/application/commands/grant_admin.py | 21 ++-- .../application/commands/inactivate_user.py | 31 +++-- .../application/commands/reactivate_user.py | 31 +++-- src/app/application/commands/revoke_admin.py | 18 ++- src/app/application/common/permissions.py | 78 ------------ .../common/services/authorization.py | 60 ---------- .../common/services/authorization/__init__.py | 0 .../services/authorization/authorize.py | 18 +++ .../common/services/authorization/base.py | 12 ++ .../services/authorization/composite.py | 12 ++ .../services/authorization/permissions.py | 53 ++++++++ .../services/authorization/role_hierarchy.py | 10 ++ src/app/application/queries/list_users.py | 20 +++- src/app/infrastructure/handlers/sign_up.py | 3 +- src/app/setup/ioc/di_providers/application.py | 2 - .../profile_password_hasher_bcrypt.py | 2 +- .../application/authz_service/__init__.py | 0 .../authz_service/permission_stubs.py | 18 +++ .../authz_service/test_authorize.py | 26 ++++ .../authz_service/test_composite.py | 21 ++++ .../authz_service/test_permissions.py | 113 ++++++++++++++++++ .../unit/application/test_authz_service.py | 74 ------------ 25 files changed, 401 insertions(+), 273 deletions(-) delete mode 100644 src/app/application/common/permissions.py delete mode 100644 src/app/application/common/services/authorization.py create mode 100644 src/app/application/common/services/authorization/__init__.py create mode 100644 src/app/application/common/services/authorization/authorize.py create mode 100644 src/app/application/common/services/authorization/base.py create mode 100644 src/app/application/common/services/authorization/composite.py create mode 100644 src/app/application/common/services/authorization/permissions.py create mode 100644 src/app/application/common/services/authorization/role_hierarchy.py create mode 100644 tests/app/unit/application/authz_service/__init__.py create mode 100644 tests/app/unit/application/authz_service/permission_stubs.py create mode 100644 tests/app/unit/application/authz_service/test_authorize.py create mode 100644 tests/app/unit/application/authz_service/test_composite.py create mode 100644 tests/app/unit/application/authz_service/test_permissions.py delete mode 100644 tests/app/unit/application/test_authz_service.py diff --git a/pyproject.toml b/pyproject.toml index 7597ff5..5118453 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,15 +123,13 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "src/app/infrastructure/persistence_sqla/alembic/**" = ["ALL", ] "tests/**" = [ + "ARG002", # unused-method-argument "PLC2801", # unnecessary-dunder-call "PLR2004", # magic-value-comparison "PT011", # pytest-raises-too-broad "S101", # assert - "S105", # hardcoded-password-string "S106", # hardcoded-password-func-arg "S107", # hardcoded-password-default - "SLF001", # private-member-access - "UP012", # unnecessary-encode-utf8 ] # "src/app/domain/value_objects/base.py" = ["B024", ] # abstract-base-class-without-abstract-method diff --git a/src/app/application/commands/change_password.py b/src/app/application/commands/change_password.py index 125bb86..1af8012 100644 --- a/src/app/application/commands/change_password.py +++ b/src/app/application/commands/change_password.py @@ -1,12 +1,19 @@ import logging from dataclasses import dataclass -from app.application.common.permissions import AnyOf, IsSelf, IsSuperior from app.application.common.ports.transaction_manager import ( TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.composite import AnyOf +from app.application.common.services.authorization.permissions import ( + CanManageSelf, + CanManageSubordinate, + UserManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.exceptions.user import UserNotFoundByUsernameError @@ -40,13 +47,11 @@ class ChangePasswordInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -65,11 +70,15 @@ async def __call__(self, request_data: ChangePasswordRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - # Declarative authorization: can change own or subordinate's password - self._authorization_service.authorize( - current_user, - AnyOf(IsSelf(), IsSuperior()), - target_user=user, + authorize( + AnyOf( + CanManageSelf(), + CanManageSubordinate(), + ), + context=UserManagementContext( + subject=current_user, + target=user, + ), ) self._user_service.change_password(user, password) diff --git a/src/app/application/commands/create_user.py b/src/app/application/commands/create_user.py index f424405..6e8f81e 100644 --- a/src/app/application/commands/create_user.py +++ b/src/app/application/commands/create_user.py @@ -7,7 +7,13 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.enums.user_role import UserRole from app.domain.exceptions.user import UsernameAlreadyExistsError @@ -46,13 +52,11 @@ class CreateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -64,9 +68,13 @@ async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=request_data.role, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=request_data.role, + ), ) username = Username(request_data.username) diff --git a/src/app/application/commands/grant_admin.py b/src/app/application/commands/grant_admin.py index 9c52903..5bebbfb 100644 --- a/src/app/application/commands/grant_admin.py +++ b/src/app/application/commands/grant_admin.py @@ -1,12 +1,17 @@ import logging from dataclasses import dataclass -from app.application.common.permissions import CanManageRole from app.application.common.ports.transaction_manager import ( TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -39,13 +44,11 @@ class GrantAdminInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -58,10 +61,12 @@ async def __call__(self, request_data: GrantAdminRequest) -> None: current_user = await self._current_user_service.get_current_user() - # Declarative authorization: only users who can manage ADMIN role - self._authorization_service.authorize( - current_user, - CanManageRole(UserRole.ADMIN), + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.ADMIN, + ), ) username = Username(request_data.username) diff --git a/src/app/application/commands/inactivate_user.py b/src/app/application/commands/inactivate_user.py index ac91e49..f94666a 100644 --- a/src/app/application/commands/inactivate_user.py +++ b/src/app/application/commands/inactivate_user.py @@ -6,7 +6,15 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + CanManageSubordinate, + RoleManagementContext, + UserManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -41,14 +49,12 @@ class InactivateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, access_revoker: AccessRevoker, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -61,9 +67,13 @@ async def __call__(self, request_data: InactivateUserRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.USER, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.USER, + ), ) username = Username(request_data.username) @@ -74,9 +84,12 @@ async def __call__(self, request_data: InactivateUserRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=user.role, + authorize( + CanManageSubordinate(), + context=UserManagementContext( + subject=current_user, + target=user, + ), ) self._user_service.toggle_user_activation(user, is_active=False) diff --git a/src/app/application/commands/reactivate_user.py b/src/app/application/commands/reactivate_user.py index f0532c0..4f62916 100644 --- a/src/app/application/commands/reactivate_user.py +++ b/src/app/application/commands/reactivate_user.py @@ -5,7 +5,15 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + CanManageSubordinate, + RoleManagementContext, + UserManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -39,13 +47,11 @@ class ReactivateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -57,9 +63,13 @@ async def __call__(self, request_data: ReactivateUserRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.USER, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.USER, + ), ) username = Username(request_data.username) @@ -70,9 +80,12 @@ async def __call__(self, request_data: ReactivateUserRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=user.role, + authorize( + CanManageSubordinate(), + context=UserManagementContext( + subject=current_user, + target=user, + ), ) self._user_service.toggle_user_activation(user, is_active=True) diff --git a/src/app/application/commands/revoke_admin.py b/src/app/application/commands/revoke_admin.py index c75b481..f05532e 100644 --- a/src/app/application/commands/revoke_admin.py +++ b/src/app/application/commands/revoke_admin.py @@ -5,7 +5,11 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import authorize +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -38,13 +42,11 @@ class RevokeAdminInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -56,9 +58,13 @@ async def __call__(self, request_data: RevokeAdminRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.ADMIN, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.ADMIN, + ), ) username = Username(request_data.username) diff --git a/src/app/application/common/permissions.py b/src/app/application/common/permissions.py deleted file mode 100644 index b02503c..0000000 --- a/src/app/application/common/permissions.py +++ /dev/null @@ -1,78 +0,0 @@ -"""Permission policies for declarative authorization.""" - -from abc import ABC, abstractmethod -from typing import Any - -from app.application.common.services.authorization import SUBORDINATE_ROLES -from app.domain.entities.user import User -from app.domain.enums.user_role import UserRole - - -class Permission(ABC): - """Base class for a permission policy.""" - - @abstractmethod - def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: - """Check if the permission is satisfied for the given user and context.""" - ... - - -class IsSelf(Permission): - """Permission that checks if the user is acting on themselves.""" - - def is_satisfied_by( - self, current_user: User, *, target_user: User, **kwargs: Any - ) -> bool: - """Check if current user is the same as target user.""" - _ = kwargs # Unused but required for interface - return current_user.id_ == target_user.id_ - - -class IsSuperior(Permission): - """Permission that checks if the user has a superior role over the target.""" - - def is_satisfied_by( - self, current_user: User, *, target_user: User, **kwargs: Any - ) -> bool: - """Check if current user's role is superior to target user's role.""" - _ = kwargs # Unused but required for interface - allowed_roles = SUBORDINATE_ROLES.get(current_user.role, set()) - return target_user.role in allowed_roles - - -class AnyOf(Permission): - """Composite permission that is satisfied if ANY sub-permission is satisfied.""" - - def __init__(self, *permissions: Permission) -> None: - """Initialize with a list of permissions to check.""" - self._permissions = permissions - - def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: - """Check if any of the sub-permissions is satisfied.""" - return any(p.is_satisfied_by(current_user, **kwargs) for p in self._permissions) - - -class AllOf(Permission): - """Composite permission that is satisfied if ALL sub-permissions are satisfied.""" - - def __init__(self, *permissions: Permission) -> None: - """Initialize with a list of permissions to check.""" - self._permissions = permissions - - def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: - """Check if all of the sub-permissions are satisfied.""" - return all(p.is_satisfied_by(current_user, **kwargs) for p in self._permissions) - - -class CanManageRole(Permission): - """Permission that checks if the user can manage a specific role.""" - - def __init__(self, target_role: UserRole) -> None: - """Initialize with the role to manage.""" - self._target_role = target_role - - def is_satisfied_by(self, current_user: User, **kwargs: Any) -> bool: - """Check if current user can manage the target role.""" - _ = kwargs # Unused but required for interface - allowed_roles = SUBORDINATE_ROLES.get(current_user.role, set()) - return self._target_role in allowed_roles diff --git a/src/app/application/common/services/authorization.py b/src/app/application/common/services/authorization.py deleted file mode 100644 index f17f0ff..0000000 --- a/src/app/application/common/services/authorization.py +++ /dev/null @@ -1,60 +0,0 @@ -from collections.abc import Mapping -from typing import TYPE_CHECKING, Any, Final - -from app.application.common.constants import AUTHZ_NOT_AUTHORIZED -from app.application.common.exceptions.authorization import AuthorizationError -from app.domain.enums.user_role import UserRole -from app.domain.value_objects.user_id import UserId - -if TYPE_CHECKING: - from app.application.common.permissions import Permission - from app.domain.entities.user import User - -SUBORDINATE_ROLES: Final[Mapping[UserRole, set[UserRole]]] = { - UserRole.SUPER_ADMIN: {UserRole.ADMIN, UserRole.USER}, - UserRole.ADMIN: {UserRole.USER}, - UserRole.USER: set(), -} - - -class AuthorizationService: - def authorize( - self, - current_user: "User", - permission: "Permission", - **kwargs: Any, - ) -> None: - """ - Authorize action using declarative permission policy. - - :raises AuthorizationError: If permission is not satisfied - """ - if not permission.is_satisfied_by(current_user, **kwargs): - raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) - - def authorize_for_self( - self, - current_user_id: UserId, - /, - *, - target_id: UserId, - ) -> None: - """ - :raises AuthorizationError: - """ - if current_user_id != target_id: - raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) - - def authorize_for_subordinate_role( - self, - current_user_role: UserRole, - /, - *, - target_role: UserRole, - ) -> None: - """ - :raises AuthorizationError: - """ - allowed_roles = SUBORDINATE_ROLES.get(current_user_role, set()) - if target_role not in allowed_roles: - raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) diff --git a/src/app/application/common/services/authorization/__init__.py b/src/app/application/common/services/authorization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/application/common/services/authorization/authorize.py b/src/app/application/common/services/authorization/authorize.py new file mode 100644 index 0000000..3d93ac5 --- /dev/null +++ b/src/app/application/common/services/authorization/authorize.py @@ -0,0 +1,18 @@ +from app.application.common.constants import AUTHZ_NOT_AUTHORIZED +from app.application.common.exceptions.authorization import AuthorizationError +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) + + +def authorize[PC: PermissionContext]( + permission: Permission[PC], + *, + context: PC, +) -> None: + """ + :raises AuthorizationError: + """ + if not permission.is_satisfied_by(context): + raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) diff --git a/src/app/application/common/services/authorization/base.py b/src/app/application/common/services/authorization/base.py new file mode 100644 index 0000000..a824a9c --- /dev/null +++ b/src/app/application/common/services/authorization/base.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass(frozen=True) +class PermissionContext: + pass + + +class Permission[PC: PermissionContext](ABC): + @abstractmethod + def is_satisfied_by(self, context: PC) -> bool: ... diff --git a/src/app/application/common/services/authorization/composite.py b/src/app/application/common/services/authorization/composite.py new file mode 100644 index 0000000..3ac1dae --- /dev/null +++ b/src/app/application/common/services/authorization/composite.py @@ -0,0 +1,12 @@ +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) + + +class AnyOf[PC: PermissionContext](Permission[PC]): + def __init__(self, *permissions: Permission[PC]) -> None: + self._permissions = permissions + + def is_satisfied_by(self, context: PC) -> bool: + return any(p.is_satisfied_by(context) for p in self._permissions) diff --git a/src/app/application/common/services/authorization/permissions.py b/src/app/application/common/services/authorization/permissions.py new file mode 100644 index 0000000..8e302f9 --- /dev/null +++ b/src/app/application/common/services/authorization/permissions.py @@ -0,0 +1,53 @@ +from collections.abc import Mapping +from dataclasses import dataclass + +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) +from app.application.common.services.authorization.role_hierarchy import ( + SUBORDINATE_ROLES, +) +from app.domain.entities.user import User +from app.domain.enums.user_role import UserRole + + +@dataclass(frozen=True, kw_only=True) +class UserManagementContext(PermissionContext): + subject: User + target: User + + +class CanManageSelf(Permission[UserManagementContext]): + def is_satisfied_by(self, context: UserManagementContext) -> bool: + return context.subject == context.target + + +class CanManageSubordinate(Permission[UserManagementContext]): + def __init__( + self, + role_hierarchy: Mapping[UserRole, set[UserRole]] = SUBORDINATE_ROLES, + ) -> None: + self._role_hierarchy = role_hierarchy + + def is_satisfied_by(self, context: UserManagementContext) -> bool: + allowed_roles = self._role_hierarchy.get(context.subject.role, set()) + return context.target.role in allowed_roles + + +@dataclass(frozen=True, kw_only=True) +class RoleManagementContext(PermissionContext): + subject: User + target_role: UserRole + + +class CanManageRole(Permission[RoleManagementContext]): + def __init__( + self, + role_hierarchy: Mapping[UserRole, set[UserRole]] = SUBORDINATE_ROLES, + ) -> None: + self._role_hierarchy = role_hierarchy + + def is_satisfied_by(self, context: RoleManagementContext) -> bool: + allowed_roles = self._role_hierarchy.get(context.subject.role, set()) + return context.target_role in allowed_roles diff --git a/src/app/application/common/services/authorization/role_hierarchy.py b/src/app/application/common/services/authorization/role_hierarchy.py new file mode 100644 index 0000000..cc14fd4 --- /dev/null +++ b/src/app/application/common/services/authorization/role_hierarchy.py @@ -0,0 +1,10 @@ +from collections.abc import Mapping +from typing import Final + +from app.domain.enums.user_role import UserRole + +SUBORDINATE_ROLES: Final[Mapping[UserRole, set[UserRole]]] = { + UserRole.SUPER_ADMIN: {UserRole.ADMIN, UserRole.USER}, + UserRole.ADMIN: {UserRole.USER}, + UserRole.USER: set(), +} diff --git a/src/app/application/queries/list_users.py b/src/app/application/queries/list_users.py index 6eb2b6a..e563c24 100644 --- a/src/app/application/queries/list_users.py +++ b/src/app/application/queries/list_users.py @@ -11,7 +11,13 @@ UserListParams, UserListSorting, ) -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.enums.user_role import UserRole @@ -46,20 +52,22 @@ class ListUsersQueryService: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_query_gateway: UserQueryGateway, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_query_gateway = user_query_gateway async def __call__(self, request_data: ListUsersRequest) -> ListUsersResponse: log.info("List users: started.") current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.USER, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.USER, + ), ) log.debug("Retrieving list of users.") diff --git a/src/app/infrastructure/handlers/sign_up.py b/src/app/infrastructure/handlers/sign_up.py index bbb1d51..550446b 100644 --- a/src/app/infrastructure/handlers/sign_up.py +++ b/src/app/infrastructure/handlers/sign_up.py @@ -6,7 +6,6 @@ from app.application.common.ports.transaction_manager import TransactionManager from app.application.common.ports.user_command_gateway import UserCommandGateway from app.application.common.services.current_user import CurrentUserService -from app.domain.entities.user import User from app.domain.exceptions.user import UsernameAlreadyExistsError from app.domain.services.user import UserService from app.domain.value_objects.raw_password.raw_password import RawPassword @@ -69,7 +68,7 @@ async def __call__(self, request_data: SignUpRequest) -> SignUpResponse: username = Username(request_data.username) password = RawPassword(request_data.password) - user: User = self._user_service.create_user(username, password) + user = self._user_service.create_user(username, password) self._user_command_gateway.add(user) diff --git a/src/app/setup/ioc/di_providers/application.py b/src/app/setup/ioc/di_providers/application.py index fbd23c7..7a76847 100644 --- a/src/app/setup/ioc/di_providers/application.py +++ b/src/app/setup/ioc/di_providers/application.py @@ -13,7 +13,6 @@ ) from app.application.common.ports.user_command_gateway import UserCommandGateway from app.application.common.ports.user_query_gateway import UserQueryGateway -from app.application.common.services.authorization import AuthorizationService from app.application.common.services.current_user import CurrentUserService from app.application.queries.list_users import ListUsersQueryService from app.infrastructure.adapters.main_transaction_manager_sqla import ( @@ -36,7 +35,6 @@ class ApplicationProvider(Provider): # Services services = provide_all( - AuthorizationService, CurrentUserService, ) diff --git a/tests/app/performance/profile_password_hasher_bcrypt.py b/tests/app/performance/profile_password_hasher_bcrypt.py index fb1ea9b..d29f27f 100644 --- a/tests/app/performance/profile_password_hasher_bcrypt.py +++ b/tests/app/performance/profile_password_hasher_bcrypt.py @@ -20,7 +20,7 @@ def main() -> None: profiler = LineProfiler() profiler.add_function(profile_password_hashing) - profiler.runcall(profile_password_hashing, hasher) # type: ignore[no-untyped-call] + profiler.runcall(profile_password_hashing, hasher) profiler.print_stats() diff --git a/tests/app/unit/application/authz_service/__init__.py b/tests/app/unit/application/authz_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/app/unit/application/authz_service/permission_stubs.py b/tests/app/unit/application/authz_service/permission_stubs.py new file mode 100644 index 0000000..81e06fd --- /dev/null +++ b/tests/app/unit/application/authz_service/permission_stubs.py @@ -0,0 +1,18 @@ +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) + + +class DummyContext(PermissionContext): + pass + + +class AlwaysAllow(Permission[DummyContext]): + def is_satisfied_by(self, context: DummyContext) -> bool: + return True + + +class AlwaysDeny(Permission[DummyContext]): + def is_satisfied_by(self, context: DummyContext) -> bool: + return False diff --git a/tests/app/unit/application/authz_service/test_authorize.py b/tests/app/unit/application/authz_service/test_authorize.py new file mode 100644 index 0000000..084a725 --- /dev/null +++ b/tests/app/unit/application/authz_service/test_authorize.py @@ -0,0 +1,26 @@ +import pytest + +from app.application.common.exceptions.authorization import AuthorizationError +from app.application.common.services.authorization.authorize import ( + authorize, +) +from tests.app.unit.application.authz_service.permission_stubs import ( + AlwaysAllow, + AlwaysDeny, + DummyContext, +) + + +def test_authorize_allows_when_permission_is_satisfied() -> None: + context = DummyContext() + permission = AlwaysAllow() + + authorize(permission, context=context) + + +def test_authorize_raises_when_permission_not_satisfied() -> None: + context = DummyContext() + permission = AlwaysDeny() + + with pytest.raises(AuthorizationError): + authorize(permission, context=context) diff --git a/tests/app/unit/application/authz_service/test_composite.py b/tests/app/unit/application/authz_service/test_composite.py new file mode 100644 index 0000000..4e2617c --- /dev/null +++ b/tests/app/unit/application/authz_service/test_composite.py @@ -0,0 +1,21 @@ +from app.application.common.services.authorization.composite import AnyOf +from tests.app.unit.application.authz_service.permission_stubs import ( + AlwaysAllow, + AlwaysDeny, + DummyContext, +) + + +def test_any_of_allows_if_at_least_one_allows() -> None: + sut = AnyOf(AlwaysDeny(), AlwaysAllow()) + assert sut.is_satisfied_by(DummyContext()) + + +def test_any_of_denies_if_all_deny() -> None: + sut = AnyOf(AlwaysDeny(), AlwaysDeny()) + assert not sut.is_satisfied_by(DummyContext()) + + +def test_any_of_empty_returns_false() -> None: + sut: AnyOf[DummyContext] = AnyOf() + assert not sut.is_satisfied_by(DummyContext()) diff --git a/tests/app/unit/application/authz_service/test_permissions.py b/tests/app/unit/application/authz_service/test_permissions.py new file mode 100644 index 0000000..3ada833 --- /dev/null +++ b/tests/app/unit/application/authz_service/test_permissions.py @@ -0,0 +1,113 @@ +import pytest + +from app.application.common.services.authorization.permissions import ( + CanManageRole, + CanManageSelf, + CanManageSubordinate, + RoleManagementContext, + UserManagementContext, +) +from app.domain.enums.user_role import UserRole +from tests.app.unit.factories.user_entity import create_user +from tests.app.unit.factories.value_objects import create_user_id + + +def test_can_manage_self() -> None: + user_id = create_user_id() + subject = create_user(user_id=user_id) + target = create_user(user_id=user_id) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSelf() + + assert sut.is_satisfied_by(context) + + +def test_cannot_manage_another_user() -> None: + subject_id = create_user_id() + subject = create_user(user_id=subject_id) + target_id = create_user_id() + target = create_user(user_id=target_id) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSelf() + + assert not sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.ADMIN), + (UserRole.SUPER_ADMIN, UserRole.USER), + (UserRole.ADMIN, UserRole.USER), + ], +) +def test_can_manage_subordinate( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + target = create_user(role=target_role) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSubordinate() + + assert sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.ADMIN), + (UserRole.USER, UserRole.ADMIN), + ], +) +def test_cannot_manage_non_subordinate( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + target = create_user(role=target_role) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSubordinate() + + assert not sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.ADMIN), + (UserRole.SUPER_ADMIN, UserRole.USER), + (UserRole.ADMIN, UserRole.USER), + ], +) +def test_can_manage_role( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + context = RoleManagementContext(subject=subject, target_role=target_role) + sut = CanManageRole() + + assert sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.ADMIN), + (UserRole.USER, UserRole.ADMIN), + ], +) +def test_cannot_manage_role( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + context = RoleManagementContext(subject=subject, target_role=target_role) + sut = CanManageRole() + + assert not sut.is_satisfied_by(context) diff --git a/tests/app/unit/application/test_authz_service.py b/tests/app/unit/application/test_authz_service.py deleted file mode 100644 index 052730a..0000000 --- a/tests/app/unit/application/test_authz_service.py +++ /dev/null @@ -1,74 +0,0 @@ -import pytest - -from app.application.common.exceptions.authorization import AuthorizationError -from app.application.common.services.authorization import AuthorizationService -from app.domain.enums.user_role import UserRole -from tests.app.unit.factories.user_entity import create_user - - -def test_user_can_act_on_himself() -> None: - sut = AuthorizationService() - user = create_user() - - sut.authorize_for_self(user.id_, target_id=user.id_) - - -def test_user_cannot_act_on_another_user() -> None: - sut = AuthorizationService() - user1 = create_user() - user2 = create_user() - - with pytest.raises(AuthorizationError): - sut.authorize_for_self(user1.id_, target_id=user2.id_) - - -@pytest.mark.parametrize( - ("superior", "subordinate"), - [ - (UserRole.SUPER_ADMIN, UserRole.ADMIN), - (UserRole.SUPER_ADMIN, UserRole.USER), - (UserRole.ADMIN, UserRole.USER), - ], -) -def test_superior_role_can_act_on_subordinate( - superior: UserRole, - subordinate: UserRole, -) -> None: - sut = AuthorizationService() - - sut.authorize_for_subordinate_role(superior, target_role=subordinate) - - -@pytest.mark.parametrize( - "role", - [ - UserRole.SUPER_ADMIN, - UserRole.ADMIN, - UserRole.USER, - ], -) -def test_peer_role_cannot_act_on_peer( - role: UserRole, -) -> None: - sut = AuthorizationService() - - with pytest.raises(AuthorizationError): - sut.authorize_for_subordinate_role(role, target_role=role) - - -@pytest.mark.parametrize( - ("inferior", "superior"), - [ - (UserRole.USER, UserRole.ADMIN), - (UserRole.USER, UserRole.SUPER_ADMIN), - (UserRole.ADMIN, UserRole.SUPER_ADMIN), - ], -) -def test_inferior_role_cannot_act_on_superior( - inferior: UserRole, - superior: UserRole, -) -> None: - sut = AuthorizationService() - - with pytest.raises(AuthorizationError): - sut.authorize_for_subordinate_role(inferior, target_role=superior)