Skip to content

Commit 7fdceca

Browse files
committed
Segregate flush and commit
1 parent 10e0c7d commit 7fdceca

9 files changed

Lines changed: 91 additions & 56 deletions

File tree

src/app/application/commands/create_user.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import TypedDict
44
from uuid import UUID
55

6+
from app.application.common.ports.flusher import Flusher
67
from app.application.common.ports.transaction_manager import (
78
TransactionManager,
89
)
@@ -52,13 +53,15 @@ class CreateUserInteractor:
5253
def __init__(
5354
self,
5455
current_user_service: CurrentUserService,
55-
user_command_gateway: UserCommandGateway,
5656
user_service: UserService,
57+
user_command_gateway: UserCommandGateway,
58+
flusher: Flusher,
5759
transaction_manager: TransactionManager,
5860
):
5961
self._current_user_service = current_user_service
60-
self._user_command_gateway = user_command_gateway
6162
self._user_service = user_service
63+
self._user_command_gateway = user_command_gateway
64+
self._flusher = flusher
6265
self._transaction_manager = transaction_manager
6366

6467
async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse:
@@ -84,7 +87,7 @@ async def __call__(self, request_data: CreateUserRequest) -> CreateUserResponse:
8487
self._user_command_gateway.add(user)
8588

8689
try:
87-
await self._transaction_manager.flush()
90+
await self._flusher.flush()
8891
except UsernameAlreadyExistsError:
8992
raise
9093

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from abc import abstractmethod
2+
from typing import Protocol
3+
4+
5+
class Flusher(Protocol):
6+
"""
7+
Interface for flushing intermediate changes during a business transaction.
8+
"""
9+
10+
@abstractmethod
11+
async def flush(self) -> None:
12+
"""
13+
Flush pending changes to validate constraints or trigger side effects.
14+
15+
:raises DataMapperError:
16+
:raises UsernameAlreadyExists:
17+
"""

src/app/application/common/ports/transaction_manager.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,15 @@
44

55
class TransactionManager(Protocol):
66
"""
7-
UOW-compatible interface for flushing and
8-
committing changes to the data source.
9-
The actual implementation of UOW can be bundled with an ORM,
10-
like SQLAlchemy's session.
7+
UoW-compatible interface for committing a business transaction.
8+
May be extended with rollback support.
9+
The implementation may be an ORM session, such as SQLAlchemy's.
1110
"""
1211

13-
@abstractmethod
14-
async def flush(self) -> None:
15-
"""
16-
Mostly to check data source constraints.
17-
18-
:raises DataMapperError:
19-
:raises UsernameAlreadyExists:
20-
"""
21-
2212
@abstractmethod
2313
async def commit(self) -> None:
2414
"""
25-
Persist changes to the data source.
15+
Commit the successful outcome of a business transaction.
2616
2717
:raises DataMapperError:
2818
"""
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
from collections.abc import Mapping
3+
from typing import Any, cast
4+
5+
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
6+
7+
from app.application.common.ports.flusher import Flusher
8+
from app.domain.exceptions.user import UsernameAlreadyExistsError
9+
from app.infrastructure.adapters.constants import (
10+
DB_CONSTRAINT_VIOLATION,
11+
DB_FLUSH_DONE,
12+
DB_FLUSH_FAILED,
13+
DB_QUERY_FAILED,
14+
)
15+
from app.infrastructure.adapters.types import MainAsyncSession
16+
from app.infrastructure.exceptions.gateway import DataMapperError
17+
18+
log = logging.getLogger(__name__)
19+
20+
21+
class SqlaMainFlusher(Flusher):
22+
def __init__(self, session: MainAsyncSession):
23+
self._session = session
24+
25+
async def flush(self) -> None:
26+
"""
27+
:raises DataMapperError:
28+
:raises UsernameAlreadyExists:
29+
"""
30+
try:
31+
await self._session.flush()
32+
log.debug("%s Main session.", DB_FLUSH_DONE)
33+
34+
except IntegrityError as error:
35+
if "uq_users_username" in str(error):
36+
params: Mapping[str, Any] = cast(Mapping[str, Any], error.params)
37+
username = str(params.get("username", "unknown"))
38+
raise UsernameAlreadyExistsError(username) from error
39+
40+
raise DataMapperError(DB_CONSTRAINT_VIOLATION) from error
41+
42+
except SQLAlchemyError as error:
43+
raise DataMapperError(f"{DB_QUERY_FAILED} {DB_FLUSH_FAILED}") from error

src/app/infrastructure/adapters/main_transaction_manager_sqla.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
import logging
2-
from collections.abc import Mapping
3-
from typing import Any, cast
42

5-
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
3+
from sqlalchemy.exc import SQLAlchemyError
64

75
from app.application.common.ports.transaction_manager import (
86
TransactionManager,
97
)
10-
from app.domain.exceptions.user import UsernameAlreadyExistsError
118
from app.infrastructure.adapters.constants import (
129
DB_COMMIT_DONE,
1310
DB_COMMIT_FAILED,
14-
DB_CONSTRAINT_VIOLATION,
15-
DB_FLUSH_DONE,
16-
DB_FLUSH_FAILED,
1711
DB_QUERY_FAILED,
1812
)
1913
from app.infrastructure.adapters.types import MainAsyncSession
@@ -26,26 +20,6 @@ class SqlaMainTransactionManager(TransactionManager):
2620
def __init__(self, session: MainAsyncSession):
2721
self._session = session
2822

29-
async def flush(self) -> None:
30-
"""
31-
:raises DataMapperError:
32-
:raises UsernameAlreadyExists:
33-
"""
34-
try:
35-
await self._session.flush()
36-
log.debug("%s Main session.", DB_FLUSH_DONE)
37-
38-
except IntegrityError as error:
39-
if "uq_users_username" in str(error):
40-
params: Mapping[str, Any] = cast(Mapping[str, Any], error.params)
41-
username = str(params.get("username", "unknown"))
42-
raise UsernameAlreadyExistsError(username) from error
43-
44-
raise DataMapperError(DB_CONSTRAINT_VIOLATION) from error
45-
46-
except SQLAlchemyError as error:
47-
raise DataMapperError(f"{DB_QUERY_FAILED} {DB_FLUSH_FAILED}") from error
48-
4923
async def commit(self) -> None:
5024
"""
5125
:raises DataMapperError:

src/app/infrastructure/auth/handlers/log_in.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ class LogInHandler:
5050

5151
def __init__(
5252
self,
53-
user_command_gateway: UserCommandGateway,
5453
current_user_service: CurrentUserService,
54+
user_command_gateway: UserCommandGateway,
5555
user_service: UserService,
5656
auth_session_service: AuthSessionService,
5757
):
58-
self._user_command_gateway = user_command_gateway
5958
self._current_user_service = current_user_service
59+
self._user_command_gateway = user_command_gateway
6060
self._user_service = user_service
6161
self._auth_session_service = auth_session_service
6262

src/app/infrastructure/auth/handlers/sign_up.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import TypedDict
44
from uuid import UUID
55

6+
from app.application.common.ports.flusher import Flusher
67
from app.application.common.ports.transaction_manager import TransactionManager
78
from app.application.common.ports.user_command_gateway import UserCommandGateway
89
from app.application.common.services.current_user import CurrentUserService
@@ -48,15 +49,17 @@ class SignUpHandler:
4849

4950
def __init__(
5051
self,
51-
user_command_gateway: UserCommandGateway,
52-
transaction_manager: TransactionManager,
5352
current_user_service: CurrentUserService,
5453
user_service: UserService,
54+
user_command_gateway: UserCommandGateway,
55+
flusher: Flusher,
56+
transaction_manager: TransactionManager,
5557
):
56-
self._user_command_gateway = user_command_gateway
57-
self._transaction_manager = transaction_manager
5858
self._current_user_service = current_user_service
5959
self._user_service = user_service
60+
self._user_command_gateway = user_command_gateway
61+
self._flusher = flusher
62+
self._transaction_manager = transaction_manager
6063

6164
async def __call__(self, request_data: SignUpRequest) -> SignUpResponse:
6265
log.info("Sign up: started. Username: '%s'.", request_data.username)
@@ -75,7 +78,7 @@ async def __call__(self, request_data: SignUpRequest) -> SignUpResponse:
7578
self._user_command_gateway.add(user)
7679

7780
try:
78-
await self._transaction_manager.flush()
81+
await self._flusher.flush()
7982
except UsernameAlreadyExistsError:
8083
raise
8184

src/app/infrastructure/auth/session/ports/transaction_manager.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,15 @@ class AuthSessionTransactionManager(Protocol):
77
Defined to allow easier mocking and swapping
88
of implementations in the same layer.
99
10-
UOW-compatible interface for flushing and
11-
committing changes to the data source.
12-
The actual implementation of UOW can be bundled with an ORM,
13-
like SQLAlchemy's session.
10+
UoW-compatible interface for committing a business transaction.
11+
May be extended with rollback support.
12+
The implementation may be an ORM session, such as SQLAlchemy's.
1413
"""
1514

1615
@abstractmethod
1716
async def commit(self) -> None:
1817
"""
19-
Persist changes to the data source.
18+
Commit the successful outcome of a business transaction.
2019
2120
:raises DataMapperError:
2221
"""

src/app/setup/ioc/application.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from app.application.commands.reactivate_user import ReactivateUserInteractor
88
from app.application.commands.revoke_admin import RevokeAdminInteractor
99
from app.application.common.ports.access_revoker import AccessRevoker
10+
from app.application.common.ports.flusher import Flusher
1011
from app.application.common.ports.identity_provider import IdentityProvider
1112
from app.application.common.ports.transaction_manager import (
1213
TransactionManager,
@@ -15,6 +16,7 @@
1516
from app.application.common.ports.user_query_gateway import UserQueryGateway
1617
from app.application.common.services.current_user import CurrentUserService
1718
from app.application.queries.list_users import ListUsersQueryService
19+
from app.infrastructure.adapters.main_flusher_sqla import SqlaMainFlusher
1820
from app.infrastructure.adapters.main_transaction_manager_sqla import (
1921
SqlaMainTransactionManager,
2022
)
@@ -53,6 +55,10 @@ class ApplicationProvider(Provider):
5355
source=SqlaMainTransactionManager,
5456
provides=TransactionManager,
5557
)
58+
flusher = provide(
59+
source=SqlaMainFlusher,
60+
provides=Flusher,
61+
)
5662
user_command_gateway = provide(
5763
source=SqlaUserDataMapper,
5864
provides=UserCommandGateway,

0 commit comments

Comments
 (0)