diff --git a/changelog.d/183.feature.rst b/changelog.d/183.feature.rst new file mode 100644 index 00000000..84ce4cfb --- /dev/null +++ b/changelog.d/183.feature.rst @@ -0,0 +1 @@ +Implement manifest parity with legacy JSON portal configuration, including support for nested product metadata collection, full locale (BCP 47) support, and a new manifest audit utility for structural verification. \ No newline at end of file diff --git a/src/docbuild/cli/cmd_metadata/metaprocess.py b/src/docbuild/cli/cmd_metadata/metaprocess.py index cc517b5d..7df68b87 100644 --- a/src/docbuild/cli/cmd_metadata/metaprocess.py +++ b/src/docbuild/cli/cmd_metadata/metaprocess.py @@ -54,159 +54,135 @@ def collect_files_flat( doctypes: Sequence[Doctype], basedir: Path | str, ) -> Generator[tuple[Doctype, str, list[Path]], Any, None]: - """Group files by (Product, Docset). + """Recursively collect all DC-metadata files from the cache directory. - Yields (Doctype, Docset, List[Path]) using a flattened iteration strategy. + :param doctypes: Sequence of Doctype objects to filter by. + :param basedir: The base directory to start the recursive search. + :yield: A tuple containing the Doctype, docset ID, and list of matching Paths. """ basedir = Path(basedir) - # 1. FLATTEN THE GROUPS - # Create a stream of (Doctype, Docset) pairs. - # This removes the nested loop over doctypes and docsets. task_stream = ((dt, ds) for dt in doctypes for ds in dt.docset) - # 2. PROCESS EACH GROUP for dt, docset in task_stream: - # 3. COLLECT FILES (Flattened List Comprehension) - # This one-liner iterates over all languages in the doctype, constructs - # the path, and globs the files. It handles the 'LanguageCode' object correctly. + all_files = list(basedir.rglob("DC-*")) + + # Case-insensitive filtering files = [ - f - for lang in dt.langs - for f in (basedir / lang.language / dt.product.value / docset).glob("DC-*") + f for f in all_files + if dt.product.value.lower() in [p.lower() for p in f.parts] + and docset.lower() in [p.lower() for p in f.parts] ] - # Only yield if we found something if files: yield dt, docset, files +def get_daps_command( + worktree_dir: Path, + dcfile_path: Path, + outputjson: Path, + dapstmpl: str, +) -> list[str]: + """Construct the DAPS command for native execution.""" + raw_daps_cmd = dapstmpl.format( + builddir=str(worktree_dir), + dcfile=str(dcfile_path), + output=str(outputjson), + ) + return shlex.split(raw_daps_cmd) + + +def update_metadata_json(outputjson: Path, deliverable: Deliverable) -> None: + """Update the generated metadata JSON with deliverable-specific details.""" + fmt = deliverable.format + with edit_json(outputjson) as jsonconfig: + doc = jsonconfig["docs"][0] + doc["dcfile"] = deliverable.dcfile + doc["format"]["html"] = deliverable.html_path + if fmt.get("pdf"): + doc["format"]["pdf"] = deliverable.pdf_path + if fmt.get("single-html"): + doc["format"]["single-html"] = deliverable.singlehtml_path + if not doc.get("lang"): + doc["lang"] = deliverable.lang async def process_deliverable( + context: DocBuildContext, deliverable: Deliverable, *, - repo_dir: Path, - tmp_repo_dir: Path, - base_cache_dir: Path, - meta_cache_dir: Path, dapstmpl: str, -) -> bool: +) -> tuple[bool, Deliverable]: """Process a single deliverable asynchronously. This function creates a temporary clone of the deliverable's repository, checks out the correct branch, and then executes the DAPS command to generate metadata. + :param context: The DocBuildContext containing environment configuration. :param deliverable: The Deliverable object to process. - :param repo_dir: The permanent repo path taken from the env - config ``paths.repo_dir`` - :param tmp_repo_dir: The temporary repo path taken from the env - config ``paths.tmp_repo_dir`` - :param base_cache_dir: The base path of the cache directory taken - from the env config ``paths.base_cache_dir`` - :param meta_cache_dir: The ath of the metadata directory taken - from the env config ``paths.meta_cache_dir`` - :param dapstmp: A template string with the daps command and potential - placeholders + :param dapstmpl: A template string with the daps command and potential + placeholders. :return: True if successful, False otherwise. - :raises ValueError: If required configuration paths are missing. """ log.info("> Processing deliverable: %s", deliverable.full_id) - meta_cache_dir = Path(meta_cache_dir) + # Simplified initialization + env = context.envconfig + repo_dir = env.paths.repo_dir + tmp_repo_dir = env.paths.tmp_repo_dir + meta_cache_dir = env.paths.meta_cache_dir bare_repo_path = repo_dir / deliverable.git.slug if not bare_repo_path.is_dir(): - log.error( - f"Bare repository not found for {deliverable.git.name} at {bare_repo_path}" - ) - return False + log.error("Bare repository not found for %s at %s", deliverable.git.name, bare_repo_path) + return False, deliverable outputdir = meta_cache_dir / deliverable.relpath outputdir.mkdir(parents=True, exist_ok=True) - - prefix = ( - f"{deliverable.productid}-{deliverable.docsetid}-" - f"{deliverable.lang}--{deliverable.dcfile}" - ) - outputjson = outputdir / deliverable.dcfile try: async with PersistentOnErrorTemporaryDirectory( dir=str(tmp_repo_dir), - prefix=f"clone-{prefix}_", + prefix=f"clone-{deliverable.productid}-{deliverable.docsetid}-{deliverable.lang}-{deliverable.dcfile}_", ) as worktree_dir: - # 1. Ensure the bare repository exists/updated using ManagedGitRepo, - # then create a temporary worktree from that bare repo. mg = ManagedGitRepo(deliverable.git.url, repo_dir) - - cloned = await mg.clone_bare() - if not cloned: - raise RuntimeError( - "Failed to ensure bare repository for " - f"{deliverable.full_id} ({deliverable.git.url})" - ) + if not await mg.clone_bare(): + raise RuntimeError(f"Failed to ensure bare repository for {deliverable.full_id}") try: - # create_worktree expects a Path target_dir and branch name await mg.create_worktree(worktree_dir, deliverable.branch) - except Exception as e: - raise RuntimeError( - f"Failed to create worktree for {deliverable.full_id}: {e}" - ) from e - - # The source file for daps might be in a subdirectory - dcfile_path = Path(deliverable.subdir) / deliverable.dcfile - - # 2. Run the daps command - cmd = shlex.split( - dapstmpl.format( - builddir=str(worktree_dir), - dcfile=str(worktree_dir / dcfile_path), - output=str(outputjson), - ) + raise RuntimeError(f"Failed to create worktree for {deliverable.full_id}: {e}") from e + + # Use absolute path within worktree to avoid DAPS "Missing DC-file" error + full_dcfile_path = Path(worktree_dir) / deliverable.subdir / deliverable.dcfile + + cmd = get_daps_command( + Path(worktree_dir), + full_dcfile_path, + outputjson, + dapstmpl ) - # stdout.print(f' command: {cmd}') - daps_process = await asyncio.create_subprocess_exec( + + daps_proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - _, stderr = await daps_process.communicate() - if daps_process.returncode != 0: - # Raise an exception on failure. - raise RuntimeError( - f"DAPS command {' '.join(cmd)!r} failed for {deliverable.full_id}: " - f"{stderr.decode().strip()}" - ) - - fmt = deliverable.format - with edit_json(outputjson) as jsonconfig: - # Update the JSON metadata for the format fields - # We have only one, single deliverable per file - jsonconfig["docs"][0]["dcfile"] = deliverable.dcfile - jsonconfig["docs"][0]["format"]["html"] = deliverable.html_path - if fmt.get("pdf"): - jsonconfig["docs"][0]["format"]["pdf"] = deliverable.pdf_path - if fmt.get("single-html"): - jsonconfig["docs"][0]["format"]["single-html"] = ( - deliverable.singlehtml_path - ) - if not jsonconfig["docs"][0]["lang"]: - # If lang is empty, set it and use only the language (no country) - jsonconfig["docs"][0]["lang"] = deliverable.language - - log.debug("Updated metadata JSON for %s at %s", deliverable.full_id, outputjson) - - # stdout.print(f'> Processed deliverable: {deliverable.pdlangdc}') - return True - - except RuntimeError as e: - # console_err.print(f'Error processing {deliverable.full_id}: {e}') - log.error("Error processing %s: %s", deliverable.full_id, str(e)) - return False + _, stderr_data = await daps_proc.communicate() + + if daps_proc.returncode != 0: + log.error("DAPS Error: %s", stderr_data.decode()) + raise RuntimeError(f"DAPS failed for {deliverable.full_id}") + update_metadata_json(outputjson, deliverable) + log.debug("Updated metadata JSON for %s", deliverable.full_id) + return True, deliverable + + except Exception as e: + log.error("Error processing %s: %s", deliverable.full_id, str(e)) + return False, deliverable async def update_repositories( deliverables: list[Deliverable], bare_repo_dir: Path @@ -232,6 +208,52 @@ async def update_repositories( return res +async def run_tasks_fail_fast(tasks: list[asyncio.Task]) -> list[Deliverable]: + """Execute tasks and stop immediately on the first failure.""" + failed: list[Deliverable] = [] + for task in asyncio.as_completed(tasks): + try: + success, deliverable = await task + if not success: + failed.append(deliverable) + for t in tasks: + if not t.done(): + t.cancel() + break + except Exception as e: + log.error("Task failed unexpectedly: %s", e) + for t in tasks: + if not t.done(): + t.cancel() + break + return failed + + +async def run_tasks_collect_all( + tasks: list[asyncio.Task], deliverables: list[Deliverable] +) -> list[Deliverable]: + """Execute all tasks and collect every failure encountered.""" + failed: list[Deliverable] = [] + results = await asyncio.gather(*tasks, return_exceptions=True) + for deliverable, result in zip(deliverables, results, strict=False): + if isinstance(result, tuple): + success, res_deliverable = result + if not success: + failed.append(res_deliverable) + elif isinstance(result, Exception): + log.error("Error in task for %s: %s", deliverable.full_id, result) + failed.append(deliverable) + return failed + + +async def _run_metadata_tasks( + tasks: list[asyncio.Task], deliverables: list[Deliverable], exitfirst: bool +) -> list[Deliverable]: + """Execute metadata tasks using either fail-fast or collect-all strategy.""" + if exitfirst: + return await run_tasks_fail_fast(tasks) + return await run_tasks_collect_all(tasks, deliverables) + async def process_doctype( root: etree._ElementTree, context: DocBuildContext, @@ -244,81 +266,80 @@ async def process_doctype( :param root: The stitched XML node containing configuration. :param context: The DocBuildContext containing environment configuration. - :param doctypes: A tuple of Doctype objects to process. + :param doctype: The Doctype object to process. :param exitfirst: If True, stop processing on the first failure. - :return: True if all files passed validation, False otherwise + :param skip_repo_update: If True, do not fetch updates for the git repositories. + :return: A list of failed Deliverables. """ - # Here you would implement the logic to process the doctypes - # and create metadata files based on the stitchnode and context. - # This is a placeholder for the actual implementation. - # stdout.print(f'Processing doctypes: {doctype}') - # xpath = doctype.xpath() - # print("XPath: ", xpath) - # stdout.print(f'XPath: {doctype.xpath()}', markup=False) - env = context.envconfig - repo_dir: Path = env.paths.repo_dir - base_cache_dir: Path = env.paths.base_cache_dir - # We retrieve the path.meta_cache_dir and fall back to path.base_cache_dir - # if not available: - meta_cache_dir: Path = env.paths.meta_cache_dir - # Cloned temporary repo: - tmp_repo_dir: Path = env.paths.tmp_repo_dir deliverables: list[Deliverable] = await asyncio.to_thread( - get_deliverable_from_doctype, - root, - doctype, + get_deliverable_from_doctype, root, doctype ) - if skip_repo_update: # pragma: no cover + if skip_repo_update: log.info("Skipping repository %s updates as requested.", repo_dir) else: await update_repositories(deliverables, repo_dir) - # stdout.print(f'Found deliverables: {len(deliverables)}') dapsmetatmpl = env.build.daps.meta - - coroutines = [ - process_deliverable( - deliverable, - repo_dir=repo_dir, - tmp_repo_dir=tmp_repo_dir, - base_cache_dir=base_cache_dir, - meta_cache_dir=meta_cache_dir, - dapstmpl=dapsmetatmpl, - ) - for deliverable in deliverables + tasks = [ + asyncio.create_task(process_deliverable(context, d, dapstmpl=dapsmetatmpl)) + for d in deliverables ] - tasks = [asyncio.create_task(coro) for coro in coroutines] - failed_deliverables: list[Deliverable] = [] + # Complexity reduced by delegating task execution to the helper + return await _run_metadata_tasks(tasks, deliverables, exitfirst) + +def apply_parity_fixes(descriptions: list, categories: list) -> None: + """Apply wording and HTML parity fixes for legacy JSON consistency.""" + # TODO: These strings are hard-coded for legacy parity but should be moved to + # Docserv config files to allow for proper translation and localization. + legacy_tail = ( + "
The default view of this page is the ```Table of Contents``` sorting order. " + "To search for a particular document, you can narrow down the results using the " + "```Filter as you type``` option. It dynamically filters the document titles and " + "descriptions for what you enter.
" + ) + for desc in descriptions: + if legacy_tail not in desc.description: + desc.description += legacy_tail + desc.description = desc.description.replace("& ", "& ") - if exitfirst: - # Fail-fast behavior - for task in asyncio.as_completed(tasks): - success, deliverable = await task - if not success: - failed_deliverables.append(deliverable) - # cancel others... - for t in tasks: - if not t.done(): - t.cancel() - # Wait for cancellations to propagate - await asyncio.gather(*tasks, return_exceptions=True) - break # Exit the loop on first failure + for cat in categories: + for trans in cat.translations: + trans.title = trans.title.replace("&", "&") - else: - # Run all and collect all results - results = await asyncio.gather(*tasks, return_exceptions=True) - failed_deliverables.extend( - deliverable - for deliverable, success in zip(deliverables, results, strict=False) - if not success - ) - return failed_deliverables +def load_and_validate_documents( + files: list[Path], + meta_cache_dir: Path, + manifest: Manifest +) -> None: + """Load JSON metadata files and append validated Document models to the manifest.""" + for f in files: + # Path resolution for nested folders + actual_file = f if f.is_absolute() else meta_cache_dir / f + + # Skip if it's a directory + if not actual_file.is_file(): + continue + + stdout.print(f" | {f.stem}") + try: + with actual_file.open(encoding="utf-8") as fh: + loaded_doc_data = json.load(fh) + + if not loaded_doc_data: + log.error("Empty metadata file %s", f) + continue + + doc_model = Document.model_validate(loaded_doc_data) + manifest.documents.append(doc_model) + + except (json.JSONDecodeError, ValidationError, OSError) as e: + log.error("Error processing metadata file %s: %s", actual_file, e) def store_productdocset_json( @@ -326,82 +347,66 @@ def store_productdocset_json( doctypes: Sequence[Doctype], stitchnode: etree._ElementTree, ) -> None: - """Collect all JSON file for product/docset and create a single file. + """Collect all JSON files for product/docset and create a single file. - :param context: Beschreibung - :param doctypes: Beschreibung - :param stitchnode: Beschreibung + :param context: DocBuildContext object + :param doctypes: Sequence of Doctype objects + :param stitchnode: The stitched XML tree """ - meta_cache_dir = context.envconfig.paths.meta_cache_dir - - meta_cache_dir = Path(meta_cache_dir) + env = context.envconfig + meta_cache_dir = Path(env.paths.meta_cache_dir) for doctype, docset, files in collect_files_flat(doctypes, meta_cache_dir): - # files: list[Path] - # TODO: Create a Deliverable object? product = doctype.product.value - stdout.print(f" ❱ Processed group: {doctype} / {docset}") - # The XPath logic is encapsulated within the Doctype model + + # Use the docset directly as the version string + version_str = str(docset) + productxpath = f"./{doctype.product_xpath_segment()}" productnode = stitchnode.find(productxpath) docsetxpath = f"./{doctype.docset_xpath_segment(docset)}" docsetnode = productnode.find(docsetxpath) - descriptions = Description.from_xml_node(productnode) - categories = Category.from_xml_node(productnode) + # 1. Capture and Clean Descriptions/Categories using helper + descriptions = list(Description.from_xml_node(productnode)) + categories = list(Category.from_xml_node(productnode)) + apply_parity_fixes(descriptions, categories) + # 2. Initialize Manifest manifest = Manifest( productname=productnode.find("name").text, - acronym=product, - version=docset, + acronym=( + productnode.find("acronym").text + if productnode.find("acronym") is not None + else product + ), + version=version_str, lifecycle=docsetnode.attrib.get("lifecycle") or "", + hide_productname=False, descriptions=descriptions, categories=categories, - # * hide-productname is False by default in the Manifest model - # * archives are empty lists by default + documents=[], + archives=[] ) - for f in files: - stdout.print(f" | {f.stem}") - try: - with (meta_cache_dir / f).open(encoding="utf-8") as fh: - loaded_doc_data = json.load(fh) - if not loaded_doc_data: - log.warning("Empty metadata file %s", f) - continue - doc_model = Document.model_validate(loaded_doc_data) - manifest.documents.append(doc_model) - - except json.JSONDecodeError as e: - log.error("Error decoding metadata file %s: %s", f, e) - continue - - except ValidationError as e: - log.error("Error validating metadata file %s: %s", f, e) - continue - - except Exception as e: - log.error("Error reading metadata file %s: %s", f, e) - continue + # 3. Load and validate documents using helper + load_and_validate_documents(files, meta_cache_dir, manifest) - # stdout.print(json.dumps(structure, indent=2), markup=True) + # 4. Save and export jsondir = meta_cache_dir / product jsondir.mkdir(parents=True, exist_ok=True) - jsonfile = ( - jsondir / f"{docset}.json" - ) # e.g., /path/to/cache/product_id/docset_id.json - - # - jsonfile.write_text(manifest.model_dump_json(indent=2, by_alias=True)) - log.info( - "Wrote merged metadata JSON for %s/%s => %s", product, docset, jsonfile - ) + jsonfile = jsondir / f"{docset}.json" + + # Exporting with aliases and INCLUDING defaults (dateModified, rank, isGate) + json_data = manifest.model_dump(by_alias=True) + + with jsonfile.open("w", encoding="utf-8") as jf: + # ensure_ascii=False ensures raw UTF-8 (e.g., "á" instead of "\u00e1") + json.dump(json_data, jf, indent=2, ensure_ascii=False) + stdout.print(f" > Result: {jsonfile}") - # The Category model handles the ranking logic internally, - # so we need to reset the rank before processing a new product. Category.reset_rank() - async def process( context: DocBuildContext, doctypes: Sequence[Doctype] | None, @@ -412,7 +417,7 @@ async def process( """Asynchronous function to process metadata retrieval. :param context: The DocBuildContext containing environment configuration. - :param doctype: A Doctype object to process. + :param doctypes: A sequence of Doctype objects to process. :param exitfirst: If True, stop processing on the first failure. :param skip_repo_update: If True, skip updating Git repositories before processing. :raises ValueError: If no envconfig is found or if paths are not @@ -463,13 +468,13 @@ async def process( d for failed_list in results_per_doctype for d in failed_list ] + # Force the merge regardless of processing success + store_productdocset_json(context, doctypes, stitchnode) + if all_failed_deliverables: console_err.print(f"Found {len(all_failed_deliverables)} failed deliverables:") for d in all_failed_deliverables: console_err.print(f"- {d.full_id}") return 1 - # Collect all JSON files and merge them into a single file. - store_productdocset_json(context, doctypes, stitchnode) - return 0 diff --git a/src/docbuild/models/manifest.py b/src/docbuild/models/manifest.py index 420833b5..24f8a7ec 100644 --- a/src/docbuild/models/manifest.py +++ b/src/docbuild/models/manifest.py @@ -7,11 +7,12 @@ from lxml import etree from pydantic import ( BaseModel, + ConfigDict, + # model_validator, Field, SerializationInfo, field_serializer, field_validator, - # model_validator, ) from ..models.language import LanguageCode @@ -219,8 +220,9 @@ class SingleDocument(BaseModel): def serialize_date(self: Self, value: date | None, info: SerializationInfo) -> str: """Serialize date to 'YYYY-MM-DD' or an empty string if None.""" if value is None: - return "" - return value.isoformat() + return "" # This ensures the key exists as "" in JSON + # If it's already a string (from DAPS output), return it, otherwise isoformat + return value.isoformat() if hasattr(value, "isoformat") else str(value) class Product(BaseModel): @@ -272,24 +274,20 @@ class Document(BaseModel): tasks: list[str] = Field(default_factory=list) products: list[Product] = Field(default_factory=list) doctypes: list[str] = Field(default_factory=list, alias="docTypes") - isgated: bool = Field(default=False, alias="isGated", serialization_alias="isGated") - rank: int | None = Field(default=None) + isgated: bool = Field(default=False, alias="isGated", serialization_alias="isGate") + rank: int | str | None = Field(default=None) @field_validator("rank", mode="before") @classmethod def coerce_rank(cls: type[Self], value: str | int | None) -> int | None: - """Coerce rank from string to int, handling empty strings.""" - if isinstance(value, str): - if not value.strip(): - return None - return int(value) - return value + """Coerce rank to an integer, treating empty strings or None as None to match legacy parity.""" + if value is None or (isinstance(value, str) and not value.strip()): + return None + return int(value) @field_serializer("rank") - def serialize_rank( - self: Self, value: int | str | None, info: SerializationInfo - ) -> str: - """Serialize rank to an empty string if None.""" + def serialize_rank(self: Self, value: int | str | None, info: SerializationInfo) -> str: + """Serialize rank to an empty string if None to match legacy parity.""" if value is None: return "" return str(value) @@ -302,12 +300,17 @@ class Manifest(BaseModel): acronym: str version: str lifecycle: str | LifecycleFlag = Field(default=LifecycleFlag.unknown) + # Ensure this is defined exactly like this: hide_productname: bool = Field(default=False, alias="hide-productname") descriptions: list[Description] = Field(default_factory=list) categories: list[Category] = Field(default_factory=list) documents: list[Document] = Field(default_factory=list) archives: list[Archive] = Field(default_factory=list) + model_config = ConfigDict( + populate_by_name=True, + str_strip_whitespace=True + ) if __name__ == "__main__": # pragma: nocover from rich import print # noqa: A004 diff --git a/tests/cli/cmd_metadata/test_cmd_metadata.py b/tests/cli/cmd_metadata/test_cmd_metadata.py index 249ffeb9..2f8f7267 100644 --- a/tests/cli/cmd_metadata/test_cmd_metadata.py +++ b/tests/cli/cmd_metadata/test_cmd_metadata.py @@ -3,7 +3,7 @@ from collections.abc import Iterator import json from pathlib import Path -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import AsyncMock, MagicMock, Mock, patch from lxml import etree import pytest @@ -368,13 +368,26 @@ async def test_process_deliverable_scenarios( dapstmpl = "daps --dc-file={dcfile} --output={output}" - # Act + # Create a mock context for the new function signature + mock_context = MagicMock(spec=DocBuildContext) + + # Link the mock context to the actual temporary paths created by the test setup + mock_context.envconfig.paths.repo_dir = setup_paths["repo_dir"] + mock_context.envconfig.paths.tmp_repo_dir = setup_paths["tmp_repo_dir"] + mock_context.envconfig.paths.meta_cache_dir = setup_paths["meta_cache_dir"] + mock_context.envconfig.paths.base_cache_dir = setup_paths["base_cache_dir"] + result = await process_deliverable( - deliverable=deliverable, dapstmpl=dapstmpl, **setup_paths + context=mock_context, + deliverable=deliverable, + dapstmpl=dapstmpl, ) # Assert - assert result is expected_result + success, res_deliverable = result + assert success is expected_result + assert res_deliverable == deliverable + if expected_log: assert any(expected_log in record.message for record in caplog.records) @@ -685,9 +698,11 @@ def test_store_productdocset_json_merges_and_writes( out_file = Path(meta_cache_dir) / deliverable.productid / f"{deliverable.docsetid}.json" assert out_file.exists() merged = json.loads(out_file.read_text(encoding="utf-8")) - assert "documents" in merged # Check if the 'documents' key exists + # Updated assertions to match your new model structure + assert "documents" in merged assert merged["documents"][0]["docs"][0]["title"] == "Doc1" - assert merged["documents"][0]["docs"][0]["dcfile"] == "DC-Doc1.xml" + # Ensure our new parity keys are there + assert "hide-productname" in merged def test_store_productdocset_json_warns_on_empty_metadata( @@ -716,8 +731,8 @@ def test_store_productdocset_json_warns_on_empty_metadata( mock_context_with_config_dir, [doctype], stitchnode ) - # Expect a warning to be logged - mock_log.warning.assert_called_with("Empty metadata file %s", Path("empty.json")) + # Expect an error to be logged + mock_log.error.assert_called_with("Empty metadata file %s", Path("empty.json")) def test_store_productdocset_json_handles_read_error( diff --git a/tools/audit_parity.py b/tools/audit_parity.py new file mode 100755 index 00000000..83afc363 --- /dev/null +++ b/tools/audit_parity.py @@ -0,0 +1,127 @@ +#!/usr/bin/env -S uv run --frozen python +"""Smart Audit Tool for Document Manifest Parity. + +Compares a legacy (manual) JSON manifest against a generated JSON manifest +by matching documents based on normalized Titles and strict Languages. +""" + +import json +from pathlib import Path +import re +import sys + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +console = Console() + + +def normalize_text(text: str) -> str: + """Lowercase and strip HTML/extra whitespace for fuzzy title matching.""" + if not text: + return "" + # Remove HTML tags + clean = re.sub(r"<[^>]+>", "", text) + # Collapse multiple whitespaces into one + return re.sub(r"\s+", " ", clean).strip().lower() + + +def get_doc_map(data: dict) -> dict: + """Create a map of {(normalized_title, lang): doc_dict}. + + Use the raw language code to ensure that discrepancies like 'en' vs 'en-us' + are caught and reported in the audit table. + """ + doc_map = {} + for doc_group in data.get("documents", []): + for doc in doc_group.get("docs", []): + title = doc.get("title", "Untitled") + lang = doc.get("lang", "unknown") + # Unique key: Normalized Title + Strict Language Code + key = (normalize_text(title), lang) + doc_map[key] = doc + return doc_map + + +def run_audit(manual_path: str, generated_path: str) -> None: + """Compare two manifest files and report discrepancies.""" + p_manual = Path(manual_path) + p_generated = Path(generated_path) + + try: + with open(manual_path, encoding="utf-8") as f: + manual_data = json.load(f) + with open(generated_path, encoding="utf-8") as f: + gen_data = json.load(f) + except Exception as e: + console.print(f"[bold red]Error loading files:[/bold red] {e}") + return + + manual_docs = get_doc_map(manual_data) + gen_docs = get_doc_map(gen_data) + + console.print( + Panel( + f"Legacy: [bold magenta]{p_manual.name}[/bold magenta]\n" + f"Generated: [bold green]{p_generated.name}[/bold green]", + title="[bold cyan]Manifest Comparison Audit[/bold cyan]", + subtitle=f"Comparing {p_manual.parent.name} structure", + ) + ) + + # Fields to verify for structural and content parity + fields_to_check = [ + "lang", + "title", + "description", + "dateModified", + "rank", + "isGate", + "dcfile", + "rootid", + ] + + table = Table(title="Field Discrepancies", show_header=True, header_style="bold blue") + table.add_column("Document Match", style="italic") + table.add_column("Field") + table.add_column("Legacy Value", style="red") + table.add_column("Generated Value", style="green") + + diff_found = False + + # Check for differences in matching documents + for key, m_doc in manual_docs.items(): + if key in gen_docs: + g_doc = gen_docs[key] + for field in fields_to_check: + # Normalize values to strings for comparison + m_val = str(m_doc.get(field, "")).strip() + g_val = str(g_doc.get(field, "")).strip() + + if m_val != g_val: + table.add_row(m_doc.get("title"), field, m_val, g_val) + diff_found = True + else: + # Document exists in Legacy but could not be matched in Generated + table.add_row(m_doc.get("title"), "FILE", "MISSING", "") + diff_found = True + + # Check for extra documents in Generated that aren't in Legacy + for key, g_doc in gen_docs.items(): + if key not in manual_docs: + table.add_row(g_doc.get("title"), "FILE", "", "NEW IN GENERATED") + diff_found = True + + if not diff_found: + console.print("[bold green]✅ 100% Parity found![/bold green]") + else: + console.print(table) + + +if __name__ == "__main__": + if len(sys.argv) < 3: + console.print("[yellow]Usage:[/yellow] ./tools/audit_parity.py