Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ natural.
│ └── ... # ports, enums, exceptions, etc.
├── application/... # application layer
│ ├── commands/ # write operations, business-critical reads
│ ├── commands/ # write ops, business-critical reads
│ │ ├── create_user.py # interactor
│ │ └── ... # other interactors
│ ├── queries/ # optimized read operations
Expand All @@ -444,9 +444,9 @@ natural.
│ └── ... # ports, exceptions, etc.
├── infrastructure/... # infrastructure layer
│ ├── auth_session/... # auth context (session-based)
│ ├── handlers/... # account handlers (log in, log out, sign up)
│ └── ... # adapters, persistence, exceptions, etc.
│ ├── adapters/... # port adapters
│ ├── auth/... # auth context (session-based)
│ └── ... # persistence, exceptions, etc.
├── presentation/... # presentation layer
│ └── http/ # http interface
Expand Down
179 changes: 104 additions & 75 deletions config/toml_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import rtoml

ConfigDict = dict[str, Any]
ExportEnv = dict[str, str]

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -55,6 +58,9 @@ def configure_logging(*, level: LoggingLevel = DEFAULT_LOG_LEVEL) -> None:
# ENVIRONMENT & PATHS


ENV_VAR_NAME: Final[str] = "APP_ENV"


class ValidEnvs(StrEnum):
"""
Values should reflect actual directory names.
Expand All @@ -76,9 +82,7 @@ class DirContents(StrEnum):
DOTENV_NAME = ".env"


ENV_VAR_NAME: Final[str] = "APP_ENV"

BASE_DIR_PATH: Final[Path] = Path(__file__).resolve().parent.parent
BASE_DIR_PATH: Final[Path] = Path(__file__).resolve().parents[1]
CONFIG_PATH: Final[Path] = BASE_DIR_PATH / "config"

ENV_TO_DIR_PATHS: Final[Mapping[ValidEnvs, Path]] = MappingProxyType({
Expand All @@ -88,7 +92,7 @@ class DirContents(StrEnum):
})


def validate_env(*, env: str | None) -> ValidEnvs:
def validate_env(env: str | None) -> ValidEnvs:
if env is None:
raise ValueError(f"{ENV_VAR_NAME} is not set.")
try:
Expand All @@ -101,19 +105,33 @@ def validate_env(*, env: str | None) -> ValidEnvs:


def get_current_env() -> ValidEnvs:
env_value = os.getenv(ENV_VAR_NAME)
return validate_env(env=env_value)
return validate_env(os.getenv(ENV_VAR_NAME))


# CONFIG READING


def load_full_config(
env: ValidEnvs,
dir_paths: Mapping[ValidEnvs, Path] = ENV_TO_DIR_PATHS,
main_config: DirContents = DirContents.CONFIG_NAME,
secrets_config: DirContents = DirContents.SECRETS_NAME,
) -> ConfigDict:
log.info("Reading config for environment: '%s'", env)
config = read_config(env=env, config=main_config, dir_paths=dir_paths)
try:
secrets = read_config(env=env, config=secrets_config, dir_paths=dir_paths)
except FileNotFoundError:
log.warning("Secrets file not found. Full config will not contain secrets.")
return config
return merge_dicts(dict1=config, dict2=secrets)


def read_config(
*,
env: ValidEnvs,
config: DirContents,
dir_paths: Mapping[ValidEnvs, Path],
) -> dict[str, Any]:
config: DirContents,
) -> ConfigDict:
dir_path = dir_paths.get(env)
if dir_path is None:
raise FileNotFoundError(f"No directory path configured for environment: {env}")
Expand All @@ -126,7 +144,7 @@ def read_config(
return rtoml.load(file)


def merge_dicts(*, dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, Any]:
def merge_dicts(*, dict1: ConfigDict, dict2: ConfigDict) -> ConfigDict:
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
Expand All @@ -136,31 +154,68 @@ def merge_dicts(*, dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, An
return result


def load_full_config(
*,
# EXPORT PROCESSING


EXPORT_SECTION: Final[str] = "export"
EXPORT_FIELDS_KEY: Final[str] = "fields"


def get_exported_env_variables(
env: ValidEnvs,
main_config: DirContents = DirContents.CONFIG_NAME,
secrets_config: DirContents = DirContents.SECRETS_NAME,
dir_paths: Mapping[ValidEnvs, Path] = ENV_TO_DIR_PATHS,
) -> dict[str, Any]:
log.info("Reading config for environment: '%s'", env)
config = read_config(env=env, config=main_config, dir_paths=dir_paths)
try:
secrets = read_config(env=env, config=secrets_config, dir_paths=dir_paths)
except FileNotFoundError:
log.warning("Secrets file not found. Full config will not contain secrets.")
return config
return merge_dicts(dict1=config, dict2=secrets)
) -> ExportEnv:
config = load_full_config(env=env, dir_paths=dir_paths)
export_fields = load_export_fields(env=env, dir_paths=dir_paths)
return extract_export_fields_from_config(config=config, export_fields=export_fields)


# EXPORT PROCESSING
def load_export_fields(
env: ValidEnvs,
dir_paths: Mapping[ValidEnvs, Path],
) -> list[str]:
export_data = read_config(
env=env,
config=DirContents.EXPORT_NAME,
dir_paths=dir_paths,
)

export_section = export_data.get(EXPORT_SECTION)
if not isinstance(export_section, dict):
raise ValueError(
f"Invalid {DirContents.EXPORT_NAME}: missing [{EXPORT_SECTION}] section"
)

fields = export_section.get(EXPORT_FIELDS_KEY)
if not isinstance(fields, list) or not all(isinstance(f, str) for f in fields):
raise ValueError(
f"Invalid {DirContents.EXPORT_NAME}: "
f"'{EXPORT_FIELDS_KEY}' must be a list of strings"
)
if not fields:
raise ValueError(
f"Invalid {DirContents.EXPORT_NAME}: '{EXPORT_FIELDS_KEY}' cannot be empty"
)

return fields

def get_env_value_by_export_field(*, config: dict[str, Any], field: str) -> Any:
parts = field.split(".")

def extract_export_fields_from_config(
config: ConfigDict,
export_fields: list[str],
) -> ExportEnv:
result: ExportEnv = {}
for field in export_fields:
str_value = get_env_value_by_export_field(config=config, field=field)
env_key = "_".join(part.upper() for part in field.split("."))
result[env_key] = str_value
return result


def get_env_value_by_export_field(*, config: ConfigDict, field: str) -> str:
current = config
for part in parts:
if part not in current:
for part in field.split("."):
if not isinstance(current, dict) or part not in current:
raise KeyError(f"Field '{field}' not found in config")
current = current[part]

Expand All @@ -169,85 +224,59 @@ def get_env_value_by_export_field(*, config: dict[str, Any], field: str) -> Any:
f"Field '{field}' cannot be converted to string: "
f"got {type(current).__name__}",
)

try:
return str(current)
except (TypeError, ValueError) as e:
raise ValueError(f"Field '{field}' cannot be converted to string: {e!s}") from e


def extract_exported(
*,
config: dict[str, Any],
export_fields: list[str],
) -> dict[str, str]:
result: dict[str, str] = {}
for field in export_fields:
str_value = get_env_value_by_export_field(config=config, field=field)
env_key = "_".join(part.upper() for part in field.split("."))
result[env_key] = str_value
return result


def load_export_fields(*, env: ValidEnvs) -> tuple[dict[str, Any], list[str]]:
config = load_full_config(env=env)
export_data = read_config(
env=env,
config=DirContents.EXPORT_NAME,
dir_paths=ENV_TO_DIR_PATHS,
)
if "export" not in export_data or "fields" not in export_data["export"]:
raise ValueError("Invalid export.toml: missing [export] section or 'fields'")
export_fields = export_data["export"]["fields"]
return config, export_fields
# DOTENV GENERATION


# DOTENV GENERATION
def write_dotenv_file(
*,
env: ValidEnvs,
exported_fields: ExportEnv,
generated_at: datetime | None = None,
) -> None:
if generated_at is None:
generated_at = datetime.now(UTC)

dotenv_filename = f"{DirContents.DOTENV_NAME}.{env.value}"
dotenv_path = ENV_TO_DIR_PATHS[env] / dotenv_filename

def write_dotenv_file(*, env: ValidEnvs, exported_fields: dict[str, str]) -> None:
env_filename = f"{DirContents.DOTENV_NAME}.{env.value}"
env_path = ENV_TO_DIR_PATHS[env] / env_filename
header = [
"# This .env file was automatically generated by toml_config_manager.",
"# Do not edit directly. Make changes in config.toml or .secrets.toml instead.",
"# Ensure values here match those in config files.",
f"# Environment: {env}",
f"# Generated: {datetime.now(UTC).isoformat()}",
f"# Generated: {generated_at.isoformat()}",
]
body = [f"{key}={value}" for key, value in exported_fields.items()]
body.append("")

with open(env_path, "w", encoding="utf-8") as f:
with open(dotenv_path, "w", encoding="utf-8") as f:
f.write("\n".join(header + body))

try:
relative_path = env_path.relative_to(BASE_DIR_PATH)
except ValueError:
relative_path = env_path

log.info(
"Dotenv for environment '%s' was successfully generated at '%s'! ✨",
env.value,
relative_path,
str(dotenv_path.resolve()),
)


def generate_dotenv(*, env: ValidEnvs) -> None:
config, export_fields = load_export_fields(env=env)
exported_fields = extract_exported(config=config, export_fields=export_fields)
write_dotenv_file(env=env, exported_fields=exported_fields)


# ENTRY POINT


def main() -> None:
log_lvl: str = os.getenv(LOG_LEVEL_VAR_NAME, DEFAULT_LOG_LEVEL)
validated_log_lvl: LoggingLevel = validate_logging_level(level=log_lvl)
configure_logging(level=validated_log_lvl)
log_lvl_str = os.getenv(LOG_LEVEL_VAR_NAME, DEFAULT_LOG_LEVEL)
log_lvl = validate_logging_level(level=log_lvl_str)
configure_logging(level=log_lvl)

current_env = get_current_env()
generate_dotenv(env=current_env)
env = get_current_env()
exported_fields = get_exported_env_variables(env)
write_dotenv_file(env=env, exported_fields=exported_fields)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ ignore = [
#
"src/app/domain/value_objects/base.py" = ["B024", ] # abstract-base-class-without-abstract-method
"src/app/infrastructure/adapters/password_hasher_bcrypt.py" = ["E501"] # line-too-long
"src/app/infrastructure/constants.py" = ["S105"] # hardcoded-password-string
"src/app/infrastructure/auth/session/constants.py" = ["S105"] # hardcoded-password-string
"src/app/presentation/http/auth/constants.py" = ["S105"] # hardcoded-password-string
"src/app/presentation/http/exceptions/handlers.py" = ["RUF029", ] # unused-async
"scripts/dishka/plot_dependencies_data.py" = ["T201", ] # print
Expand Down
2 changes: 1 addition & 1 deletion scripts/dishka/plot_dependencies_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dishka import AsyncContainer, make_async_container

from app.setup.config.settings import AppSettings, load_settings
from app.setup.ioc.registry import get_providers
from app.setup.ioc.provider_registry import get_providers


def make_plot_data_container(settings: AppSettings) -> AsyncContainer:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from app.application.common.constants import AUTHZ_NOT_AUTHORIZED
from app.application.common.exceptions.authorization import AuthorizationError
from app.application.common.services.authorization.base import (
Permission,
PermissionContext,
)
from app.application.common.services.constants import AUTHZ_NOT_AUTHORIZED


def authorize[PC: PermissionContext](
Expand Down
5 changes: 4 additions & 1 deletion src/app/application/common/services/current_user.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging

from app.application.common.constants import AUTHZ_NO_CURRENT_USER, AUTHZ_NOT_AUTHORIZED
from app.application.common.exceptions.authorization import AuthorizationError
from app.application.common.ports.access_revoker import AccessRevoker
from app.application.common.ports.identity_provider import IdentityProvider
from app.application.common.ports.user_command_gateway import UserCommandGateway
from app.application.common.services.constants import (
AUTHZ_NO_CURRENT_USER,
AUTHZ_NOT_AUTHORIZED,
)
from app.domain.entities.user import User

log = logging.getLogger(__name__)
Expand Down
8 changes: 8 additions & 0 deletions src/app/infrastructure/adapters/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Final

DB_CONSTRAINT_VIOLATION: Final[str] = "Database constraint violation."
DB_COMMIT_DONE: Final[str] = "Commit was done."
DB_COMMIT_FAILED: Final[str] = "Commit failed."
DB_FLUSH_DONE: Final[str] = "Flush was done."
DB_FLUSH_FAILED: Final[str] = "Flush failed."
DB_QUERY_FAILED: Final[str] = "Database query failed."
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
TransactionManager,
)
from app.domain.exceptions.user import UsernameAlreadyExistsError
from app.infrastructure.adapters.types import MainAsyncSession
from app.infrastructure.constants import (
from app.infrastructure.adapters.constants import (
DB_COMMIT_DONE,
DB_COMMIT_FAILED,
DB_CONSTRAINT_VIOLATION,
DB_FLUSH_DONE,
DB_FLUSH_FAILED,
DB_QUERY_FAILED,
)
from app.infrastructure.adapters.types import MainAsyncSession
from app.infrastructure.exceptions.gateway import DataMapperError

log = logging.getLogger(__name__)
Expand Down
1 change: 1 addition & 0 deletions src/app/infrastructure/adapters/password_hasher_bcrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def hash(self, raw_password: RawPassword) -> bytes:
This issue can be resolved by applying `base64` encoding to the digest.
The resulting `base64(hmac-sha256(password, pepper))` string is then ready for bcrypt hashing.
Salt is added to this string before passing it to `bcrypt` for the final hashing step.
Inspired by: https://blog.ircmaxell.com/2015/03/security-issue-combining-bcrypt-with.html
"""
base64_hmac_password: bytes = self._add_pepper(raw_password, self._pepper)
salt: bytes = bcrypt.gensalt()
Expand Down
2 changes: 1 addition & 1 deletion src/app/infrastructure/adapters/user_data_mapper_sqla.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from app.domain.entities.user import User
from app.domain.value_objects.user_id import UserId
from app.domain.value_objects.username.username import Username
from app.infrastructure.adapters.constants import DB_QUERY_FAILED
from app.infrastructure.adapters.types import MainAsyncSession
from app.infrastructure.constants import DB_QUERY_FAILED
from app.infrastructure.exceptions.gateway import DataMapperError


Expand Down
Loading