Skip to content

Commit b24be62

Browse files
committed
Enhance SecNode API CLI and reporting features
- Added support for parsing identity contexts from a JSON file in the CLI. - Introduced new command-line arguments for identities, request budget, per-endpoint budget, and max iterations. - Implemented agent mode for running pipelines with context-aware reporting. - Updated report generation to include run metrics and suspected findings. - Enhanced vulnerability models to accommodate identity contexts and validation sources. - Refactored pipeline service to support identity variants and deduplication of test cases.
1 parent 8fcdad0 commit b24be62

6 files changed

Lines changed: 601 additions & 49 deletions

File tree

src/secnodeapi/ai/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44

55
from .generate import generate_test_cases
66
from .understand import understand_api_with_ai
7-
from .validate import validate_findings_with_ai
7+
from .validate import classify_findings, validate_findings_with_ai
88

99
__all__ = [
1010
"understand_api_with_ai",
1111
"generate_test_cases",
12+
"classify_findings",
1213
"validate_findings_with_ai",
1314
]

src/secnodeapi/ai/validate.py

Lines changed: 162 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,201 @@
11
"""
22
AI validation stage for filtering true-positive findings.
33
"""
4+
45
import asyncio
56
import json
6-
from typing import List, Optional
7+
from typing import List, Optional, Tuple
78

89
import structlog
10+
from pydantic import BaseModel, ValidationError
911

1012
from ..vulnerability_models import Finding, TestResult
11-
1213
from .llm_client import call_llm
1314

1415
logger = structlog.get_logger(__name__)
1516

1617

17-
async def _evaluate_single_result(result: TestResult) -> Optional[Finding]:
18-
"""Evaluate one test result and return a confirmed finding when applicable."""
19-
sys_prompt = (
20-
"You are a CISO and Elite AppSec Triager reviewing penetration test results. Your job is to definitively determine "
21-
"if an executed test case reveals a true positive vulnerability, or if it is a false positive / expected behavior. "
22-
"Apply strict heuristics:\n"
23-
"- 401 Unauthorized / 403 Forbidden is usually expected security behavior (NOT a vulnerability).\n"
24-
"- 500 Internal Server Error reveals a lack of robustness, potentially a DoS or injection vuln, but requires context.\n"
25-
"- 200/201 OK on an endpoint that shouldn't grant access (e.g. a BOLA or mass assignment test) is a highly probable vulnerability.\n"
26-
"- If a Rate Limit test returns 200 OK after 100 requests, RATE LIMITING IS BROKEN.\n\n"
27-
"You MUST perform a chain-of-thought analysis before concluding.\n"
28-
"Output ONLY JSON with this precise schema:\n"
29-
'{"analysis": "str (your thought process)", "is_vulnerable": bool, "cvss_score": float, "cvss_vector": "str", '
30-
'"description": "str", "remediation": "str", "confidence": float}'
18+
class AIValidationPayload(BaseModel):
19+
analysis: str
20+
is_vulnerable: bool
21+
cvss_score: float
22+
cvss_vector: str
23+
description: str
24+
remediation: str
25+
confidence: float
26+
27+
28+
def _class_keywords(result: TestResult) -> str:
29+
return " ".join(
30+
[
31+
result.test_case.owasp_category.lower(),
32+
result.test_case.name.lower(),
33+
result.test_case.description.lower(),
34+
]
3135
)
3236

33-
user_prompt = f"Test Result Context:\n{result.model_dump_json(indent=2)}"
3437

35-
try:
36-
llm_resp = await call_llm(sys_prompt, user_prompt, temperature=0.1)
37-
data = json.loads(llm_resp)
38+
def _deterministic_validate_result(result: TestResult) -> Optional[Finding]:
39+
"""Deterministic validators for high-value classes to reduce hallucinations."""
40+
category_text = _class_keywords(result)
41+
status = result.status_code
42+
is_2xx = 200 <= status < 300
43+
req_snippet = f"{result.test_case.method} {result.request_url}"
44+
resp_snippet = result.response_body[:500]
3845

39-
if data.get("is_vulnerable") and data.get("confidence", 0.0) >= 0.75:
40-
req_snippet = f"{result.test_case.method} {result.request_url}"
41-
resp_snippet = result.response_body[:500]
46+
bola_like = any(
47+
keyword in category_text
48+
for keyword in ("bola", "idor", "api1", "bfla", "api5", "broken object", "broken function")
49+
)
50+
mass_assignment_like = any(
51+
keyword in category_text for keyword in ("mass assignment", "bopla", "api3")
52+
)
53+
rate_limit_like = any(
54+
keyword in category_text for keyword in ("rate limit", "ratelimit", "api4")
55+
)
56+
57+
if bola_like and is_2xx:
58+
return Finding(
59+
test_case_id=result.test_case.id,
60+
endpoint=result.test_case.endpoint,
61+
method=result.test_case.method,
62+
vulnerability_class=result.test_case.owasp_category,
63+
cvss_score=8.2,
64+
cvss_vector="CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:N",
65+
description=(
66+
"Deterministic validator observed successful access on an authorization-focused test."
67+
),
68+
remediation="Enforce object and function authorization checks server-side.",
69+
confidence=0.9,
70+
evidence_request=req_snippet,
71+
evidence_response=resp_snippet,
72+
validation_source="deterministic",
73+
identity=result.test_case.identity,
74+
)
4275

76+
if mass_assignment_like and is_2xx and isinstance(result.test_case.body, dict):
77+
protected_fields = {"is_admin", "role", "credit_balance", "permissions"}
78+
if protected_fields.intersection(set(result.test_case.body.keys())):
4379
return Finding(
4480
test_case_id=result.test_case.id,
4581
endpoint=result.test_case.endpoint,
4682
method=result.test_case.method,
4783
vulnerability_class=result.test_case.owasp_category,
48-
cvss_score=data.get("cvss_score", 0.0),
49-
cvss_vector=data.get("cvss_vector", "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:N/A:N"),
50-
description=data.get("description", "Vulnerability detected."),
51-
remediation=data.get("remediation", "Review endpoint authorization."),
52-
confidence=data.get("confidence", 0.8),
84+
cvss_score=8.0,
85+
cvss_vector="CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:N",
86+
description=(
87+
"Deterministic validator observed accepted protected-field mutation payload."
88+
),
89+
remediation="Allowlist writable fields and reject privileged attributes at API boundary.",
90+
confidence=0.88,
5391
evidence_request=req_snippet,
5492
evidence_response=resp_snippet,
93+
validation_source="deterministic",
94+
identity=result.test_case.identity,
5595
)
96+
97+
if rate_limit_like and is_2xx:
98+
return Finding(
99+
test_case_id=result.test_case.id,
100+
endpoint=result.test_case.endpoint,
101+
method=result.test_case.method,
102+
vulnerability_class=result.test_case.owasp_category,
103+
cvss_score=6.8,
104+
cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
105+
description=(
106+
"Rate-limit-focused test continued receiving successful responses without clear throttling."
107+
),
108+
remediation="Introduce per-principal and per-IP rate limits with enforced backoff.",
109+
confidence=0.8,
110+
evidence_request=req_snippet,
111+
evidence_response=resp_snippet,
112+
validation_source="deterministic",
113+
identity=result.test_case.identity,
114+
)
115+
116+
return None
117+
118+
119+
def _build_finding_from_ai(result: TestResult, payload: AIValidationPayload, source: str) -> Finding:
120+
req_snippet = f"{result.test_case.method} {result.request_url}"
121+
resp_snippet = result.response_body[:500]
122+
return Finding(
123+
test_case_id=result.test_case.id,
124+
endpoint=result.test_case.endpoint,
125+
method=result.test_case.method,
126+
vulnerability_class=result.test_case.owasp_category,
127+
cvss_score=payload.cvss_score,
128+
cvss_vector=payload.cvss_vector,
129+
description=payload.description,
130+
remediation=payload.remediation,
131+
confidence=payload.confidence,
132+
evidence_request=req_snippet,
133+
evidence_response=resp_snippet,
134+
validation_source=source,
135+
identity=result.test_case.identity,
136+
)
137+
138+
139+
async def _evaluate_single_result(result: TestResult) -> Tuple[Optional[Finding], Optional[Finding]]:
140+
"""Evaluate one test result and return (confirmed, suspected)."""
141+
deterministic = _deterministic_validate_result(result)
142+
if deterministic is not None:
143+
return deterministic, None
144+
145+
sys_prompt = (
146+
"You are a senior AppSec triager validating API pentest results. "
147+
"Return ONLY valid JSON with exact schema:\n"
148+
'{"analysis":"str","is_vulnerable":bool,"cvss_score":float,"cvss_vector":"str",'
149+
'"description":"str","remediation":"str","confidence":float}\n'
150+
"Never return markdown or extra text."
151+
)
152+
user_prompt = f"Test Result Context:\n{result.model_dump_json(indent=2)}"
153+
154+
try:
155+
llm_resp = await call_llm(sys_prompt, user_prompt, temperature=0.1)
156+
payload = AIValidationPayload.model_validate(json.loads(llm_resp))
157+
if not payload.is_vulnerable:
158+
return None, None
159+
if payload.confidence >= 0.75:
160+
return _build_finding_from_ai(result, payload, "ai"), None
161+
return None, _build_finding_from_ai(result, payload, "ai-suspected")
162+
except (json.JSONDecodeError, ValidationError) as e:
163+
logger.warning(
164+
"AI output schema validation failed",
165+
error=str(e),
166+
test_id=result.test_case.id,
167+
)
56168
except Exception as e:
57169
logger.warning("Failed validation evaluation", error=str(e), test_id=result.test_case.id)
58-
return None
170+
return None, None
59171

60172

61-
async def validate_findings_with_ai(results: List[TestResult]) -> List[Finding]:
62-
"""Replay AI analysis over execution results to confirm findings."""
173+
async def classify_findings(results: List[TestResult]) -> Tuple[List[Finding], List[Finding]]:
174+
"""Classify findings into deterministic/AI-confirmed and suspected buckets."""
63175
logger.info("Validating execution results with AI", count=len(results))
176+
confirmed: List[Finding] = []
177+
suspected: List[Finding] = []
64178

65-
findings: List[Finding] = []
66179
batch_size = 5
67180
for i in range(0, len(results), batch_size):
68181
batch = results[i : i + batch_size]
69182
tasks = [_evaluate_single_result(result) for result in batch]
70-
batch_findings = await asyncio.gather(*tasks, return_exceptions=True)
183+
batch_outcomes = await asyncio.gather(*tasks, return_exceptions=True)
184+
185+
for outcome in batch_outcomes:
186+
if isinstance(outcome, Exception):
187+
logger.error("Error evaluating result", exc_info=outcome)
188+
continue
189+
confirmed_finding, suspected_finding = outcome
190+
if confirmed_finding is not None:
191+
confirmed.append(confirmed_finding)
192+
if suspected_finding is not None:
193+
suspected.append(suspected_finding)
71194

72-
for finding in batch_findings:
73-
if isinstance(finding, Exception):
74-
logger.error("Error evaluating result", exc_info=finding)
75-
elif finding is not None:
76-
findings.append(finding)
195+
return confirmed, suspected
77196

78-
return findings
197+
198+
async def validate_findings_with_ai(results: List[TestResult]) -> List[Finding]:
199+
"""Backwards-compatible wrapper returning only confirmed findings."""
200+
confirmed, _ = await classify_findings(results)
201+
return confirmed

src/secnodeapi/cli.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""SecNode API command-line interface."""
2+
23
import argparse
34
import asyncio
45
import json
56
import warnings
67
from pathlib import Path
8+
from typing import List
79

810
import structlog
911

@@ -26,9 +28,12 @@
2628
from .config import RuntimeConfig, has_supported_provider_key
2729
from .schema_fetcher import analyze_api_structure, fetch_schema
2830
from .services.pipeline import (
31+
IdentityContext,
2932
PipelineInput,
3033
build_pipeline_artifacts,
3134
build_report,
35+
build_report_with_context,
36+
run_agent_pipeline,
3237
validate_and_retest,
3338
write_report,
3439
)
@@ -51,13 +56,63 @@ def parse_auth(auth_header: str, auth_file: str) -> dict:
5156
return headers
5257

5358

59+
def parse_identities(identities_file: str) -> List[IdentityContext]:
60+
"""Parse static identity contexts from JSON file."""
61+
if not identities_file:
62+
return []
63+
try:
64+
payload = json.loads(Path(identities_file).read_text(encoding="utf-8"))
65+
except (OSError, json.JSONDecodeError) as e:
66+
logger.error("Failed to read identities file", error=str(e))
67+
return []
68+
69+
items = payload if isinstance(payload, list) else payload.get("identities", [])
70+
identities: List[IdentityContext] = []
71+
for item in items:
72+
if not isinstance(item, dict):
73+
continue
74+
name = item.get("name")
75+
headers = item.get("headers", {})
76+
if isinstance(name, str) and isinstance(headers, dict):
77+
identities.append(IdentityContext(name=name, headers=headers))
78+
return identities
79+
80+
5481
def parse_args():
5582
parser = argparse.ArgumentParser(description="SecNode API pentesting CLI")
5683
parser.add_argument("--target", required=True, help="URL or path to OpenAPI schema")
5784
parser.add_argument("--concurrency", type=int, default=5, help="Concurrent requests")
5885
parser.add_argument("--auth-header", help="Inline header e.g. 'Authorization: Bearer <token>'")
5986
parser.add_argument("--auth-file", help="Path to JSON file with auth headers")
6087
parser.add_argument("--proxy", help="HTTP proxy to route traffic through")
88+
parser.add_argument(
89+
"--identities-file",
90+
help="Path to JSON file containing identity header sets for differential testing",
91+
)
92+
parser.add_argument(
93+
"--mode",
94+
choices=["agent", "legacy"],
95+
default="agent",
96+
help="Pipeline mode: agent loop (default) or legacy fixed pipeline",
97+
)
98+
parser.add_argument(
99+
"--request-budget",
100+
type=int,
101+
default=400,
102+
help="Maximum requests for an agent run",
103+
)
104+
parser.add_argument(
105+
"--per-endpoint-budget",
106+
type=int,
107+
default=20,
108+
help="Maximum attempts per endpoint per agent run",
109+
)
110+
parser.add_argument(
111+
"--max-iterations",
112+
type=int,
113+
default=4,
114+
help="Maximum plan/execute iterations in agent mode",
115+
)
61116
parser.add_argument(
62117
"--insecure",
63118
action="store_true",
@@ -151,10 +206,28 @@ async def _run_full_pipeline(args, pipeline_input: PipelineInput) -> None:
151206
logger.info("SecNode pipeline completed successfully", output_dir=output_dir)
152207

153208

209+
async def _run_agent_mode(args, pipeline_input: PipelineInput) -> None:
210+
api_structure, confirmed, suspected, metrics = await run_agent_pipeline(pipeline_input)
211+
report = build_report_with_context(
212+
api_structure.title,
213+
confirmed,
214+
suspected_findings=suspected,
215+
metrics=metrics,
216+
)
217+
output_dir = write_report(report, args.target)
218+
logger.info(
219+
"SecNode agent mode completed successfully",
220+
output_dir=output_dir,
221+
confirmed=len(confirmed),
222+
suspected=len(suspected),
223+
)
224+
225+
154226
async def main():
155227
args = parse_args()
156228
runtime = RuntimeConfig(insecure=args.insecure)
157229
auth_headers = parse_auth(args.auth_header, args.auth_file)
230+
identities = parse_identities(getattr(args, "identities_file", None))
158231
_require_provider_key(args.schema_only)
159232
logger.info(
160233
"Starting SecNode API audit",
@@ -168,6 +241,10 @@ async def main():
168241
auth_headers=auth_headers,
169242
proxy=args.proxy,
170243
verify_ssl=runtime.verify_ssl,
244+
identities=identities,
245+
request_budget=max(1, getattr(args, "request_budget", 400)),
246+
per_endpoint_budget=max(1, getattr(args, "per_endpoint_budget", 20)),
247+
max_iterations=max(1, getattr(args, "max_iterations", 4)),
171248
)
172249

173250
try:
@@ -178,7 +255,11 @@ async def main():
178255
await _run_dry_run(args, pipeline_input)
179256
return
180257

181-
await _run_full_pipeline(args, pipeline_input)
258+
mode = getattr(args, "mode", "legacy")
259+
if mode == "legacy":
260+
await _run_full_pipeline(args, pipeline_input)
261+
else:
262+
await _run_agent_mode(args, pipeline_input)
182263
except Exception as e:
183264
logger.error("Pipeline failed", error=str(e), exc_info=e)
184265
raise SystemExit(1) from e

0 commit comments

Comments
 (0)