-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathgrant_admin.py
More file actions
84 lines (71 loc) · 2.61 KB
/
grant_admin.py
File metadata and controls
84 lines (71 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import logging
from dataclasses import dataclass
from app.application.common.ports.transaction_manager import (
TransactionManager,
)
from app.application.common.ports.user_command_gateway import UserCommandGateway
from app.application.common.services.authorization.authorize import (
authorize,
)
from app.application.common.services.authorization.permissions import (
CanManageRole,
RoleManagementContext,
)
from app.application.common.services.current_user import CurrentUserService
from app.domain.entities.user import User
from app.domain.enums.user_role import UserRole
from app.domain.exceptions.user import UserNotFoundByUsernameError
from app.domain.services.user import UserService
from app.domain.value_objects.username import Username
log = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True)
class GrantAdminRequest:
username: str
class GrantAdminInteractor:
"""
- Open to super admins.
- Grants admin rights to a specified user.
- Super admin rights can not be changed.
"""
def __init__(
self,
current_user_service: CurrentUserService,
user_command_gateway: UserCommandGateway,
user_service: UserService,
transaction_manager: TransactionManager,
):
self._current_user_service = current_user_service
self._user_command_gateway = user_command_gateway
self._user_service = user_service
self._transaction_manager = transaction_manager
async def execute(self, request_data: GrantAdminRequest) -> None:
"""
:raises AuthenticationError:
:raises DataMapperError:
:raises AuthorizationError:
:raises DomainFieldError:
:raises UserNotFoundByUsernameError:
:raises RoleChangeNotPermittedError:
"""
log.info(
"Grant admin: started. Username: '%s'.",
request_data.username,
)
current_user = await self._current_user_service.get_current_user()
authorize(
CanManageRole(),
context=RoleManagementContext(
subject=current_user,
target_role=UserRole.ADMIN,
),
)
username = Username(request_data.username)
user: User | None = await self._user_command_gateway.read_by_username(
username,
for_update=True,
)
if user is None:
raise UserNotFoundByUsernameError(username)
self._user_service.toggle_user_admin_role(user, is_admin=True)
await self._transaction_manager.commit()
log.info("Grant admin: done. Username: '%s'.", user.username.value)