-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathactivate_user.py
More file actions
97 lines (83 loc) · 2.91 KB
/
activate_user.py
File metadata and controls
97 lines (83 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
from dataclasses import dataclass
from app.application.common.ports.transaction_manager import (
TransactionManager,
)
from app.application.common.ports.user_command_gateway import UserCommandGateway
from app.application.common.services.authorization.authorize import (
authorize,
)
from app.application.common.services.authorization.permissions import (
CanManageRole,
CanManageSubordinate,
RoleManagementContext,
UserManagementContext,
)
from app.application.common.services.current_user import CurrentUserService
from app.domain.entities.user import User
from app.domain.enums.user_role import UserRole
from app.domain.exceptions.user import UserNotFoundByUsernameError
from app.domain.services.user import UserService
from app.domain.value_objects.username.username import Username
log = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True)
class ActivateUserRequest:
username: str
class ActivateUserInteractor:
"""
- Open to admins.
- Restores a previously soft-deleted user.
- Only super admins can activate other admins.
"""
def __init__(
self,
current_user_service: CurrentUserService,
user_command_gateway: UserCommandGateway,
user_service: UserService,
transaction_manager: TransactionManager,
):
self._current_user_service = current_user_service
self._user_command_gateway = user_command_gateway
self._user_service = user_service
self._transaction_manager = transaction_manager
async def execute(self, request_data: ActivateUserRequest) -> None:
"""
:raises AuthenticationError:
:raises DataMapperError:
:raises AuthorizationError:
:raises DomainFieldError:
:raises UserNotFoundByUsernameError:
:raises ActivationChangeNotPermittedError:
"""
log.info(
"Activate user: started. Username: '%s'.",
request_data.username,
)
current_user = await self._current_user_service.get_current_user()
authorize(
CanManageRole(),
context=RoleManagementContext(
subject=current_user,
target_role=UserRole.USER,
),
)
username = Username(request_data.username)
user: User | None = await self._user_command_gateway.read_by_username(
username,
for_update=True,
)
if user is None:
raise UserNotFoundByUsernameError(username)
authorize(
CanManageSubordinate(),
context=UserManagementContext(
subject=current_user,
target=user,
),
)
self._user_service.toggle_user_activation(user, is_active=True)
await self._transaction_manager.commit()
log.info(
"Activate user: done. Username: '%s'.",
user.username.value,
)