-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathchange_password.py
More file actions
88 lines (75 loc) · 2.8 KB
/
change_password.py
File metadata and controls
88 lines (75 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import logging
from dataclasses import dataclass
from app.application.common.ports.transaction_manager import (
TransactionManager,
)
from app.application.common.ports.user_command_gateway import UserCommandGateway
from app.application.common.services.authorization.authorize import (
authorize,
)
from app.application.common.services.authorization.composite import AnyOf
from app.application.common.services.authorization.permissions import (
CanManageSelf,
CanManageSubordinate,
UserManagementContext,
)
from app.application.common.services.current_user import CurrentUserService
from app.domain.entities.user import User
from app.domain.exceptions.user import UserNotFoundByUsernameError
from app.domain.services.user import UserService
from app.domain.value_objects.raw_password.raw_password import RawPassword
from app.domain.value_objects.username.username import Username
log = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True, kw_only=True)
class ChangePasswordRequest:
username: str
password: str
class ChangePasswordInteractor:
"""
- Open to authenticated users.
- Changes the user's password.
- The current user can change their own password.
- Admins can change passwords of subordinate users.
"""
def __init__(
self,
current_user_service: CurrentUserService,
user_command_gateway: UserCommandGateway,
user_service: UserService,
transaction_manager: TransactionManager,
):
self._current_user_service = current_user_service
self._user_command_gateway = user_command_gateway
self._user_service = user_service
self._transaction_manager = transaction_manager
async def execute(self, request_data: ChangePasswordRequest) -> None:
"""
:raises AuthenticationError:
:raises DataMapperError:
:raises AuthorizationError:
:raises DomainFieldError:
:raises UserNotFoundByUsernameError:
"""
log.info("Change password: started.")
current_user = await self._current_user_service.get_current_user()
username = Username(request_data.username)
password = RawPassword(request_data.password)
user: User | None = await self._user_command_gateway.read_by_username(
username,
for_update=True,
)
if user is None:
raise UserNotFoundByUsernameError(username)
authorize(
AnyOf(
CanManageSelf(),
CanManageSubordinate(),
),
context=UserManagementContext(
subject=current_user,
target=user,
),
)
self._user_service.change_password(user, password)
await self._transaction_manager.commit()
log.info("Change password: done.")