diff --git a/pyproject.toml b/pyproject.toml index 7597ff5..5118453 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,15 +123,13 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "src/app/infrastructure/persistence_sqla/alembic/**" = ["ALL", ] "tests/**" = [ + "ARG002", # unused-method-argument "PLC2801", # unnecessary-dunder-call "PLR2004", # magic-value-comparison "PT011", # pytest-raises-too-broad "S101", # assert - "S105", # hardcoded-password-string "S106", # hardcoded-password-func-arg "S107", # hardcoded-password-default - "SLF001", # private-member-access - "UP012", # unnecessary-encode-utf8 ] # "src/app/domain/value_objects/base.py" = ["B024", ] # abstract-base-class-without-abstract-method diff --git a/src/app/application/commands/change_password.py b/src/app/application/commands/change_password.py index c8a29e4..1af8012 100644 --- a/src/app/application/commands/change_password.py +++ b/src/app/application/commands/change_password.py @@ -1,12 +1,19 @@ import logging from dataclasses import dataclass -from app.application.common.exceptions.authorization import AuthorizationError from app.application.common.ports.transaction_manager import ( TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.composite import AnyOf +from app.application.common.services.authorization.permissions import ( + CanManageSelf, + CanManageSubordinate, + UserManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.exceptions.user import UserNotFoundByUsernameError @@ -40,13 +47,11 @@ class ChangePasswordInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -65,16 +70,16 @@ async def __call__(self, request_data: ChangePasswordRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - try: - self._authorization_service.authorize_for_self( - current_user.id_, - target_id=user.id_, - ) - except AuthorizationError: - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=user.role, - ) + authorize( + AnyOf( + CanManageSelf(), + CanManageSubordinate(), + ), + context=UserManagementContext( + subject=current_user, + target=user, + ), + ) self._user_service.change_password(user, password) await self._transaction_manager.commit() diff --git a/src/app/application/commands/create_user.py b/src/app/application/commands/create_user.py index f424405..6e8f81e 100644 --- a/src/app/application/commands/create_user.py +++ b/src/app/application/commands/create_user.py @@ -7,7 +7,13 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.enums.user_role import UserRole from app.domain.exceptions.user import UsernameAlreadyExistsError @@ -46,13 +52,11 @@ class CreateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -64,9 +68,13 @@ async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=request_data.role, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=request_data.role, + ), ) username = Username(request_data.username) diff --git a/src/app/application/commands/grant_admin.py b/src/app/application/commands/grant_admin.py index 5eee7e7..5bebbfb 100644 --- a/src/app/application/commands/grant_admin.py +++ b/src/app/application/commands/grant_admin.py @@ -5,7 +5,13 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -38,13 +44,11 @@ class GrantAdminInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -56,9 +60,13 @@ async def __call__(self, request_data: GrantAdminRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.ADMIN, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.ADMIN, + ), ) username = Username(request_data.username) diff --git a/src/app/application/commands/inactivate_user.py b/src/app/application/commands/inactivate_user.py index ac91e49..f94666a 100644 --- a/src/app/application/commands/inactivate_user.py +++ b/src/app/application/commands/inactivate_user.py @@ -6,7 +6,15 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + CanManageSubordinate, + RoleManagementContext, + UserManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -41,14 +49,12 @@ class InactivateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, access_revoker: AccessRevoker, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -61,9 +67,13 @@ async def __call__(self, request_data: InactivateUserRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.USER, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.USER, + ), ) username = Username(request_data.username) @@ -74,9 +84,12 @@ async def __call__(self, request_data: InactivateUserRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=user.role, + authorize( + CanManageSubordinate(), + context=UserManagementContext( + subject=current_user, + target=user, + ), ) self._user_service.toggle_user_activation(user, is_active=False) diff --git a/src/app/application/commands/reactivate_user.py b/src/app/application/commands/reactivate_user.py index f0532c0..4f62916 100644 --- a/src/app/application/commands/reactivate_user.py +++ b/src/app/application/commands/reactivate_user.py @@ -5,7 +5,15 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + CanManageSubordinate, + RoleManagementContext, + UserManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -39,13 +47,11 @@ class ReactivateUserInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -57,9 +63,13 @@ async def __call__(self, request_data: ReactivateUserRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.USER, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.USER, + ), ) username = Username(request_data.username) @@ -70,9 +80,12 @@ async def __call__(self, request_data: ReactivateUserRequest) -> None: if user is None: raise UserNotFoundByUsernameError(username) - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=user.role, + authorize( + CanManageSubordinate(), + context=UserManagementContext( + subject=current_user, + target=user, + ), ) self._user_service.toggle_user_activation(user, is_active=True) diff --git a/src/app/application/commands/revoke_admin.py b/src/app/application/commands/revoke_admin.py index c75b481..f05532e 100644 --- a/src/app/application/commands/revoke_admin.py +++ b/src/app/application/commands/revoke_admin.py @@ -5,7 +5,11 @@ TransactionManager, ) from app.application.common.ports.user_command_gateway import UserCommandGateway -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import authorize +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.entities.user import User from app.domain.enums.user_role import UserRole @@ -38,13 +42,11 @@ class RevokeAdminInteractor: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_command_gateway: UserCommandGateway, user_service: UserService, transaction_manager: TransactionManager, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_command_gateway = user_command_gateway self._user_service = user_service self._transaction_manager = transaction_manager @@ -56,9 +58,13 @@ async def __call__(self, request_data: RevokeAdminRequest) -> None: ) current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.ADMIN, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.ADMIN, + ), ) username = Username(request_data.username) diff --git a/src/app/application/common/services/authorization.py b/src/app/application/common/services/authorization.py deleted file mode 100644 index 9e80e1c..0000000 --- a/src/app/application/common/services/authorization.py +++ /dev/null @@ -1,42 +0,0 @@ -from collections.abc import Mapping -from typing import Final - -from app.application.common.constants import AUTHZ_NOT_AUTHORIZED -from app.application.common.exceptions.authorization import AuthorizationError -from app.domain.enums.user_role import UserRole -from app.domain.value_objects.user_id import UserId - -SUBORDINATE_ROLES: Final[Mapping[UserRole, set[UserRole]]] = { - UserRole.SUPER_ADMIN: {UserRole.ADMIN, UserRole.USER}, - UserRole.ADMIN: {UserRole.USER}, - UserRole.USER: set(), -} - - -class AuthorizationService: - def authorize_for_self( - self, - current_user_id: UserId, - /, - *, - target_id: UserId, - ) -> None: - """ - :raises AuthorizationError: - """ - if current_user_id != target_id: - raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) - - def authorize_for_subordinate_role( - self, - current_user_role: UserRole, - /, - *, - target_role: UserRole, - ) -> None: - """ - :raises AuthorizationError: - """ - allowed_roles = SUBORDINATE_ROLES.get(current_user_role, set()) - if target_role not in allowed_roles: - raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) diff --git a/src/app/application/common/services/authorization/__init__.py b/src/app/application/common/services/authorization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/application/common/services/authorization/authorize.py b/src/app/application/common/services/authorization/authorize.py new file mode 100644 index 0000000..3d93ac5 --- /dev/null +++ b/src/app/application/common/services/authorization/authorize.py @@ -0,0 +1,18 @@ +from app.application.common.constants import AUTHZ_NOT_AUTHORIZED +from app.application.common.exceptions.authorization import AuthorizationError +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) + + +def authorize[PC: PermissionContext]( + permission: Permission[PC], + *, + context: PC, +) -> None: + """ + :raises AuthorizationError: + """ + if not permission.is_satisfied_by(context): + raise AuthorizationError(AUTHZ_NOT_AUTHORIZED) diff --git a/src/app/application/common/services/authorization/base.py b/src/app/application/common/services/authorization/base.py new file mode 100644 index 0000000..a824a9c --- /dev/null +++ b/src/app/application/common/services/authorization/base.py @@ -0,0 +1,12 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass + + +@dataclass(frozen=True) +class PermissionContext: + pass + + +class Permission[PC: PermissionContext](ABC): + @abstractmethod + def is_satisfied_by(self, context: PC) -> bool: ... diff --git a/src/app/application/common/services/authorization/composite.py b/src/app/application/common/services/authorization/composite.py new file mode 100644 index 0000000..3ac1dae --- /dev/null +++ b/src/app/application/common/services/authorization/composite.py @@ -0,0 +1,12 @@ +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) + + +class AnyOf[PC: PermissionContext](Permission[PC]): + def __init__(self, *permissions: Permission[PC]) -> None: + self._permissions = permissions + + def is_satisfied_by(self, context: PC) -> bool: + return any(p.is_satisfied_by(context) for p in self._permissions) diff --git a/src/app/application/common/services/authorization/permissions.py b/src/app/application/common/services/authorization/permissions.py new file mode 100644 index 0000000..8e302f9 --- /dev/null +++ b/src/app/application/common/services/authorization/permissions.py @@ -0,0 +1,53 @@ +from collections.abc import Mapping +from dataclasses import dataclass + +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) +from app.application.common.services.authorization.role_hierarchy import ( + SUBORDINATE_ROLES, +) +from app.domain.entities.user import User +from app.domain.enums.user_role import UserRole + + +@dataclass(frozen=True, kw_only=True) +class UserManagementContext(PermissionContext): + subject: User + target: User + + +class CanManageSelf(Permission[UserManagementContext]): + def is_satisfied_by(self, context: UserManagementContext) -> bool: + return context.subject == context.target + + +class CanManageSubordinate(Permission[UserManagementContext]): + def __init__( + self, + role_hierarchy: Mapping[UserRole, set[UserRole]] = SUBORDINATE_ROLES, + ) -> None: + self._role_hierarchy = role_hierarchy + + def is_satisfied_by(self, context: UserManagementContext) -> bool: + allowed_roles = self._role_hierarchy.get(context.subject.role, set()) + return context.target.role in allowed_roles + + +@dataclass(frozen=True, kw_only=True) +class RoleManagementContext(PermissionContext): + subject: User + target_role: UserRole + + +class CanManageRole(Permission[RoleManagementContext]): + def __init__( + self, + role_hierarchy: Mapping[UserRole, set[UserRole]] = SUBORDINATE_ROLES, + ) -> None: + self._role_hierarchy = role_hierarchy + + def is_satisfied_by(self, context: RoleManagementContext) -> bool: + allowed_roles = self._role_hierarchy.get(context.subject.role, set()) + return context.target_role in allowed_roles diff --git a/src/app/application/common/services/authorization/role_hierarchy.py b/src/app/application/common/services/authorization/role_hierarchy.py new file mode 100644 index 0000000..cc14fd4 --- /dev/null +++ b/src/app/application/common/services/authorization/role_hierarchy.py @@ -0,0 +1,10 @@ +from collections.abc import Mapping +from typing import Final + +from app.domain.enums.user_role import UserRole + +SUBORDINATE_ROLES: Final[Mapping[UserRole, set[UserRole]]] = { + UserRole.SUPER_ADMIN: {UserRole.ADMIN, UserRole.USER}, + UserRole.ADMIN: {UserRole.USER}, + UserRole.USER: set(), +} diff --git a/src/app/application/queries/list_users.py b/src/app/application/queries/list_users.py index 6eb2b6a..e563c24 100644 --- a/src/app/application/queries/list_users.py +++ b/src/app/application/queries/list_users.py @@ -11,7 +11,13 @@ UserListParams, UserListSorting, ) -from app.application.common.services.authorization import AuthorizationService +from app.application.common.services.authorization.authorize import ( + authorize, +) +from app.application.common.services.authorization.permissions import ( + CanManageRole, + RoleManagementContext, +) from app.application.common.services.current_user import CurrentUserService from app.domain.enums.user_role import UserRole @@ -46,20 +52,22 @@ class ListUsersQueryService: def __init__( self, current_user_service: CurrentUserService, - authorization_service: AuthorizationService, user_query_gateway: UserQueryGateway, ): self._current_user_service = current_user_service - self._authorization_service = authorization_service self._user_query_gateway = user_query_gateway async def __call__(self, request_data: ListUsersRequest) -> ListUsersResponse: log.info("List users: started.") current_user = await self._current_user_service.get_current_user() - self._authorization_service.authorize_for_subordinate_role( - current_user.role, - target_role=UserRole.USER, + + authorize( + CanManageRole(), + context=RoleManagementContext( + subject=current_user, + target_role=UserRole.USER, + ), ) log.debug("Retrieving list of users.") diff --git a/src/app/infrastructure/handlers/sign_up.py b/src/app/infrastructure/handlers/sign_up.py index bbb1d51..550446b 100644 --- a/src/app/infrastructure/handlers/sign_up.py +++ b/src/app/infrastructure/handlers/sign_up.py @@ -6,7 +6,6 @@ from app.application.common.ports.transaction_manager import TransactionManager from app.application.common.ports.user_command_gateway import UserCommandGateway from app.application.common.services.current_user import CurrentUserService -from app.domain.entities.user import User from app.domain.exceptions.user import UsernameAlreadyExistsError from app.domain.services.user import UserService from app.domain.value_objects.raw_password.raw_password import RawPassword @@ -69,7 +68,7 @@ async def __call__(self, request_data: SignUpRequest) -> SignUpResponse: username = Username(request_data.username) password = RawPassword(request_data.password) - user: User = self._user_service.create_user(username, password) + user = self._user_service.create_user(username, password) self._user_command_gateway.add(user) diff --git a/src/app/setup/ioc/di_providers/application.py b/src/app/setup/ioc/di_providers/application.py index fbd23c7..7a76847 100644 --- a/src/app/setup/ioc/di_providers/application.py +++ b/src/app/setup/ioc/di_providers/application.py @@ -13,7 +13,6 @@ ) from app.application.common.ports.user_command_gateway import UserCommandGateway from app.application.common.ports.user_query_gateway import UserQueryGateway -from app.application.common.services.authorization import AuthorizationService from app.application.common.services.current_user import CurrentUserService from app.application.queries.list_users import ListUsersQueryService from app.infrastructure.adapters.main_transaction_manager_sqla import ( @@ -36,7 +35,6 @@ class ApplicationProvider(Provider): # Services services = provide_all( - AuthorizationService, CurrentUserService, ) diff --git a/tests/app/performance/profile_password_hasher_bcrypt.py b/tests/app/performance/profile_password_hasher_bcrypt.py index fb1ea9b..d29f27f 100644 --- a/tests/app/performance/profile_password_hasher_bcrypt.py +++ b/tests/app/performance/profile_password_hasher_bcrypt.py @@ -20,7 +20,7 @@ def main() -> None: profiler = LineProfiler() profiler.add_function(profile_password_hashing) - profiler.runcall(profile_password_hashing, hasher) # type: ignore[no-untyped-call] + profiler.runcall(profile_password_hashing, hasher) profiler.print_stats() diff --git a/tests/app/unit/application/authz_service/__init__.py b/tests/app/unit/application/authz_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/app/unit/application/authz_service/permission_stubs.py b/tests/app/unit/application/authz_service/permission_stubs.py new file mode 100644 index 0000000..81e06fd --- /dev/null +++ b/tests/app/unit/application/authz_service/permission_stubs.py @@ -0,0 +1,18 @@ +from app.application.common.services.authorization.base import ( + Permission, + PermissionContext, +) + + +class DummyContext(PermissionContext): + pass + + +class AlwaysAllow(Permission[DummyContext]): + def is_satisfied_by(self, context: DummyContext) -> bool: + return True + + +class AlwaysDeny(Permission[DummyContext]): + def is_satisfied_by(self, context: DummyContext) -> bool: + return False diff --git a/tests/app/unit/application/authz_service/test_authorize.py b/tests/app/unit/application/authz_service/test_authorize.py new file mode 100644 index 0000000..084a725 --- /dev/null +++ b/tests/app/unit/application/authz_service/test_authorize.py @@ -0,0 +1,26 @@ +import pytest + +from app.application.common.exceptions.authorization import AuthorizationError +from app.application.common.services.authorization.authorize import ( + authorize, +) +from tests.app.unit.application.authz_service.permission_stubs import ( + AlwaysAllow, + AlwaysDeny, + DummyContext, +) + + +def test_authorize_allows_when_permission_is_satisfied() -> None: + context = DummyContext() + permission = AlwaysAllow() + + authorize(permission, context=context) + + +def test_authorize_raises_when_permission_not_satisfied() -> None: + context = DummyContext() + permission = AlwaysDeny() + + with pytest.raises(AuthorizationError): + authorize(permission, context=context) diff --git a/tests/app/unit/application/authz_service/test_composite.py b/tests/app/unit/application/authz_service/test_composite.py new file mode 100644 index 0000000..4e2617c --- /dev/null +++ b/tests/app/unit/application/authz_service/test_composite.py @@ -0,0 +1,21 @@ +from app.application.common.services.authorization.composite import AnyOf +from tests.app.unit.application.authz_service.permission_stubs import ( + AlwaysAllow, + AlwaysDeny, + DummyContext, +) + + +def test_any_of_allows_if_at_least_one_allows() -> None: + sut = AnyOf(AlwaysDeny(), AlwaysAllow()) + assert sut.is_satisfied_by(DummyContext()) + + +def test_any_of_denies_if_all_deny() -> None: + sut = AnyOf(AlwaysDeny(), AlwaysDeny()) + assert not sut.is_satisfied_by(DummyContext()) + + +def test_any_of_empty_returns_false() -> None: + sut: AnyOf[DummyContext] = AnyOf() + assert not sut.is_satisfied_by(DummyContext()) diff --git a/tests/app/unit/application/authz_service/test_permissions.py b/tests/app/unit/application/authz_service/test_permissions.py new file mode 100644 index 0000000..3ada833 --- /dev/null +++ b/tests/app/unit/application/authz_service/test_permissions.py @@ -0,0 +1,113 @@ +import pytest + +from app.application.common.services.authorization.permissions import ( + CanManageRole, + CanManageSelf, + CanManageSubordinate, + RoleManagementContext, + UserManagementContext, +) +from app.domain.enums.user_role import UserRole +from tests.app.unit.factories.user_entity import create_user +from tests.app.unit.factories.value_objects import create_user_id + + +def test_can_manage_self() -> None: + user_id = create_user_id() + subject = create_user(user_id=user_id) + target = create_user(user_id=user_id) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSelf() + + assert sut.is_satisfied_by(context) + + +def test_cannot_manage_another_user() -> None: + subject_id = create_user_id() + subject = create_user(user_id=subject_id) + target_id = create_user_id() + target = create_user(user_id=target_id) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSelf() + + assert not sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.ADMIN), + (UserRole.SUPER_ADMIN, UserRole.USER), + (UserRole.ADMIN, UserRole.USER), + ], +) +def test_can_manage_subordinate( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + target = create_user(role=target_role) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSubordinate() + + assert sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.ADMIN), + (UserRole.USER, UserRole.ADMIN), + ], +) +def test_cannot_manage_non_subordinate( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + target = create_user(role=target_role) + context = UserManagementContext(subject=subject, target=target) + sut = CanManageSubordinate() + + assert not sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.ADMIN), + (UserRole.SUPER_ADMIN, UserRole.USER), + (UserRole.ADMIN, UserRole.USER), + ], +) +def test_can_manage_role( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + context = RoleManagementContext(subject=subject, target_role=target_role) + sut = CanManageRole() + + assert sut.is_satisfied_by(context) + + +@pytest.mark.parametrize( + ("subject_role", "target_role"), + [ + (UserRole.SUPER_ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.SUPER_ADMIN), + (UserRole.ADMIN, UserRole.ADMIN), + (UserRole.USER, UserRole.ADMIN), + ], +) +def test_cannot_manage_role( + subject_role: UserRole, + target_role: UserRole, +) -> None: + subject = create_user(role=subject_role) + context = RoleManagementContext(subject=subject, target_role=target_role) + sut = CanManageRole() + + assert not sut.is_satisfied_by(context) diff --git a/tests/app/unit/application/test_authz_service.py b/tests/app/unit/application/test_authz_service.py deleted file mode 100644 index 052730a..0000000 --- a/tests/app/unit/application/test_authz_service.py +++ /dev/null @@ -1,74 +0,0 @@ -import pytest - -from app.application.common.exceptions.authorization import AuthorizationError -from app.application.common.services.authorization import AuthorizationService -from app.domain.enums.user_role import UserRole -from tests.app.unit.factories.user_entity import create_user - - -def test_user_can_act_on_himself() -> None: - sut = AuthorizationService() - user = create_user() - - sut.authorize_for_self(user.id_, target_id=user.id_) - - -def test_user_cannot_act_on_another_user() -> None: - sut = AuthorizationService() - user1 = create_user() - user2 = create_user() - - with pytest.raises(AuthorizationError): - sut.authorize_for_self(user1.id_, target_id=user2.id_) - - -@pytest.mark.parametrize( - ("superior", "subordinate"), - [ - (UserRole.SUPER_ADMIN, UserRole.ADMIN), - (UserRole.SUPER_ADMIN, UserRole.USER), - (UserRole.ADMIN, UserRole.USER), - ], -) -def test_superior_role_can_act_on_subordinate( - superior: UserRole, - subordinate: UserRole, -) -> None: - sut = AuthorizationService() - - sut.authorize_for_subordinate_role(superior, target_role=subordinate) - - -@pytest.mark.parametrize( - "role", - [ - UserRole.SUPER_ADMIN, - UserRole.ADMIN, - UserRole.USER, - ], -) -def test_peer_role_cannot_act_on_peer( - role: UserRole, -) -> None: - sut = AuthorizationService() - - with pytest.raises(AuthorizationError): - sut.authorize_for_subordinate_role(role, target_role=role) - - -@pytest.mark.parametrize( - ("inferior", "superior"), - [ - (UserRole.USER, UserRole.ADMIN), - (UserRole.USER, UserRole.SUPER_ADMIN), - (UserRole.ADMIN, UserRole.SUPER_ADMIN), - ], -) -def test_inferior_role_cannot_act_on_superior( - inferior: UserRole, - superior: UserRole, -) -> None: - sut = AuthorizationService() - - with pytest.raises(AuthorizationError): - sut.authorize_for_subordinate_role(inferior, target_role=superior)