|
| 1 | +"""Defines the JulietTestSuiteC dataset for evaluating SAST tools on C code. |
| 2 | + |
| 3 | +This module provides the classes and logic to load the Juliet Test Suite for C/C++, |
| 4 | +which consists of C test files with known vulnerabilities. It downloads the source code |
| 5 | +from the NIST Software Assurance Reference Dataset (SARD) and parses an XML manifest |
| 6 | +to associate test files with expected results. |
| 7 | +""" |
| 8 | + |
| 9 | +import io |
| 10 | +import re |
| 11 | +import shutil |
| 12 | +import zipfile |
| 13 | +from pathlib import Path |
| 14 | +from typing import Self |
| 15 | + |
| 16 | +import requests |
| 17 | +from lxml import etree |
| 18 | + |
| 19 | +from codesectools.datasets.core.dataset import File, PrebuiltFileDataset |
| 20 | +from codesectools.shared.cwe import CWE, CWEs |
| 21 | +from codesectools.utils import CPU_COUNT |
| 22 | + |
| 23 | + |
| 24 | +class TestCode(File): |
| 25 | + """Represents a single test file in the JulietTestSuiteC dataset.""" |
| 26 | + |
| 27 | + def __init__( |
| 28 | + self, |
| 29 | + filepath: Path, |
| 30 | + content: str | bytes, |
| 31 | + cwes: list[CWE], |
| 32 | + has_vuln: bool, |
| 33 | + ) -> None: |
| 34 | + """Initialize a TestCode instance. |
| 35 | + |
| 36 | + Args: |
| 37 | + filepath: The path to the file. |
| 38 | + content: The content of the file, as a string or bytes. |
| 39 | + cwes: A list of CWEs associated with the file. |
| 40 | + has_vuln: A boolean indicating if the vulnerability is real or a false positive test case. |
| 41 | + |
| 42 | + """ |
| 43 | + super().__init__( |
| 44 | + filepath=filepath, content=content, cwes=cwes, has_vuln=has_vuln |
| 45 | + ) |
| 46 | + |
| 47 | + |
| 48 | +class JulietTestSuiteC(PrebuiltFileDataset): |
| 49 | + """Represents the Juliet Test Suite for C/C++. |
| 50 | + |
| 51 | + This class handles downloading, extracting, and loading the C/C++ test cases |
| 52 | + from the Juliet Test Suite. |
| 53 | + """ |
| 54 | + |
| 55 | + name = "JulietTestSuiteC" |
| 56 | + supported_languages = ["c"] |
| 57 | + license = "CC0 1.0 Universal" |
| 58 | + license_url = "https://data.niaid.nih.gov/resources?id=zenodo_4701386#description" |
| 59 | + |
| 60 | + build_command = f"bear -- make -C ./C individuals -j{CPU_COUNT}" |
| 61 | + prebuilt_expected = (Path("."), "compile_commands.json") |
| 62 | + artefacts_arg = "compile_commands.json" |
| 63 | + |
| 64 | + def __init__(self, lang: None | str = None) -> None: |
| 65 | + """Initialize the JulietTestSuiteC dataset. |
| 66 | + |
| 67 | + Args: |
| 68 | + lang: The programming language of the dataset files. |
| 69 | + Must be one of the supported languages. |
| 70 | + |
| 71 | + """ |
| 72 | + super().__init__(lang) |
| 73 | + |
| 74 | + def __eq__(self, other: str | Self) -> bool: |
| 75 | + """Compare this dataset with another object for equality. |
| 76 | + |
| 77 | + Args: |
| 78 | + other: The object to compare with. Can be a string (dataset name) |
| 79 | + or another JulietTestSuiteC instance. |
| 80 | + |
| 81 | + Returns: |
| 82 | + True if the names are equal, False otherwise. |
| 83 | + |
| 84 | + """ |
| 85 | + if isinstance(other, str): |
| 86 | + return self.name == other |
| 87 | + elif isinstance(other, self.__class__): |
| 88 | + return self.name == other.name |
| 89 | + else: |
| 90 | + return False |
| 91 | + |
| 92 | + def download_files(self: Self, test: bool = False) -> None: |
| 93 | + """Download and extract the dataset from the NIST SARD website. |
| 94 | + |
| 95 | + Downloads the zip archive, extracts its contents, and prunes the test cases |
| 96 | + to a smaller subset for faster processing. If in test mode, it further |
| 97 | + reduces the dataset to only a single CWE. |
| 98 | + |
| 99 | + Args: |
| 100 | + test: If True, reduce the number of test files for faster testing. |
| 101 | + |
| 102 | + """ |
| 103 | + zip_file = io.BytesIO( |
| 104 | + requests.get( |
| 105 | + "https://samate.nist.gov/SARD/downloads/test-suites/2017-10-01-juliet-test-suite-for-c-cplusplus-v1-3.zip" |
| 106 | + ).content |
| 107 | + ) |
| 108 | + with zipfile.ZipFile(zip_file, "r") as zip_ref: |
| 109 | + zip_ref.extractall(self.directory) |
| 110 | + |
| 111 | + # Limit to one set for each CWE |
| 112 | + testcases = self.directory / "C" / "testcases" |
| 113 | + for set_dir in testcases.glob("CWE*/s*"): |
| 114 | + if set_dir.name != "s01": |
| 115 | + shutil.move(set_dir, set_dir.parent / f"_{set_dir.name}") |
| 116 | + |
| 117 | + if test: |
| 118 | + for cwe_dir in list(testcases.glob("CWE*")): |
| 119 | + if not cwe_dir.name.startswith("CWE835"): |
| 120 | + shutil.rmtree(cwe_dir) |
| 121 | + |
| 122 | + def load_dataset(self) -> list[TestCode]: |
| 123 | + """Load the JulietTestSuiteC dataset from the source files. |
| 124 | + |
| 125 | + Parses the `manifest.xml` file to identify vulnerabilities in the C/C++ |
| 126 | + source files and creates a `TestCode` object for each file containing a flaw. |
| 127 | + |
| 128 | + Returns: |
| 129 | + A list of `TestCode` objects representing the dataset. |
| 130 | + |
| 131 | + """ |
| 132 | + files = [] |
| 133 | + testcode_dir = self.directory / "C" / "testcases" |
| 134 | + testcode_paths = { |
| 135 | + path.name: path |
| 136 | + for path in list(testcode_dir.rglob("CWE*.c")) |
| 137 | + + list(testcode_dir.rglob("CWE*.cpp")) |
| 138 | + } |
| 139 | + manifest_path = self.directory / "C" / "manifest.xml" |
| 140 | + manifest = etree.parse(manifest_path) |
| 141 | + testcases = manifest.xpath("/container/testcase") |
| 142 | + for testcase in testcases: |
| 143 | + files_tree = testcase.xpath("file") |
| 144 | + for file_tree in files_tree: |
| 145 | + file_path = file_tree.get("path") |
| 146 | + if file_obj := testcode_paths.get(file_path): |
| 147 | + if file_tree.xpath("flaw"): |
| 148 | + flaw = file_tree.xpath("flaw")[0] |
| 149 | + flaw_name = flaw.get("name") |
| 150 | + if m := re.search(r"CWE-(\d+)", flaw_name): |
| 151 | + cwe_id = int(m.group(1)) |
| 152 | + files.append( |
| 153 | + TestCode( |
| 154 | + filepath=file_obj.relative_to(self.directory), |
| 155 | + content=file_obj.read_bytes(), |
| 156 | + cwes=[CWEs.from_id(cwe_id)], |
| 157 | + has_vuln=True, |
| 158 | + ) |
| 159 | + ) |
| 160 | + return files |
0 commit comments