Skip to content

Commit e8e72ee

Browse files
committed
feat(recon): Implement pre-generation active API discovery
- Added allowing for active probing before test case generation - Extracts dynamic undocumented endpoints into the baseline SchemaStructure - Allows the AI to view these undocumented endpoints natively
1 parent 0d83b45 commit e8e72ee

6 files changed

Lines changed: 232 additions & 48 deletions

File tree

src/secnodeapi/ai/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44

55
from .generate import generate_test_cases
66
from .understand import understand_api_with_ai
7-
from .validate import classify_findings, validate_findings_with_ai
7+
from .validate import classify_findings, validate_findings_with_ai, deduplicate_findings_with_ai
88

99
__all__ = [
1010
"understand_api_with_ai",
1111
"generate_test_cases",
1212
"classify_findings",
1313
"validate_findings_with_ai",
14+
"deduplicate_findings_with_ai",
1415
]

src/secnodeapi/ai/llm_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,14 @@ async def call_llm(system_prompt: str, user_prompt: str, temperature: float = 0.
3939
completion_kwargs["api_base"] = api_base
4040
elif provider == "nebius":
4141
# Nebius is OpenAI-compatible and requires its specific base URL
42-
completion_kwargs["api_base"] = "https://api.tokenfactory.nebius.com/v1/"
42+
# LiteLLM requires 'openai/' prefix for compatible endpoints
43+
if "/" in model:
44+
_, model_name = model.split("/", 1)
45+
completion_kwargs["model"] = f"openai/{model_name}"
46+
47+
api_base = os.getenv("NEBIUS_API_BASE", "https://api.tokenfactory.nebius.com/v1/").strip()
48+
if api_base:
49+
completion_kwargs["api_base"] = api_base
4350
completion_kwargs["api_key"] = os.getenv("NEBIUS_API_KEY", "")
4451

4552
response = await acompletion(

src/secnodeapi/ai/validate.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,77 @@ async def validate_findings_with_ai(results: List[TestResult]) -> List[Finding]:
208208
"""Backwards-compatible wrapper returning only confirmed findings."""
209209
confirmed, _ = await classify_findings(results)
210210
return confirmed
211+
212+
213+
class AIDeduplicationCluster(BaseModel):
214+
reasoning: str
215+
test_case_ids: List[str]
216+
217+
class AIDeduplicationPayload(BaseModel):
218+
clusters: List[AIDeduplicationCluster]
219+
220+
221+
async def deduplicate_findings_with_ai(findings: List[Finding]) -> List[Finding]:
222+
"""Analyze confirmed findings to merge duplicates sharing the exact same root cause."""
223+
if len(findings) <= 1:
224+
return findings
225+
226+
logger.info("Running AI deduplication analysis", count=len(findings))
227+
228+
summarized_findings = []
229+
for f in findings:
230+
summarized_findings.append({
231+
"test_case_id": f.test_case_id,
232+
"endpoint": f.endpoint,
233+
"method": f.method,
234+
"vulnerability_class": f.vulnerability_class,
235+
"description": f.description,
236+
"remediation": f.remediation,
237+
})
238+
239+
sys_prompt = (
240+
"You are a senior AppSec engineer triaging a list of validated API vulnerabilities. "
241+
"Your goal is to deduplicate findings that represent the EXACT same underlying root cause and require the exact same fix. "
242+
"For example, multiple BOLA findings on `/api/users/1` and `/api/users/2` share the exact same root cause. "
243+
"Group them into clusters. "
244+
"Return ONLY valid JSON with the exact schema:\n"
245+
'{"clusters": [{"reasoning": "str explaining why these represent the same root cause", "test_case_ids": ["id1", "id2"]}]}'
246+
"Every single finding from the input MUST belong to exactly one cluster. If a finding is unique, it should be in a cluster by itself."
247+
)
248+
user_prompt = f"Validated Findings to cluster:\n{json.dumps(summarized_findings, indent=2)}"
249+
250+
try:
251+
llm_resp = await call_llm(sys_prompt, user_prompt, temperature=0.1)
252+
payload = AIDeduplicationPayload.model_validate(json.loads(llm_resp))
253+
254+
finding_map = {f.test_case_id: f for f in findings}
255+
deduplicated: List[Finding] = []
256+
257+
for cluster in payload.clusters:
258+
cluster_findings = [
259+
finding_map[tid] for tid in cluster.test_case_ids if tid in finding_map
260+
]
261+
if not cluster_findings:
262+
continue
263+
264+
if len(cluster_findings) > 1:
265+
logger.info("AI Merge Decision", reasoning=cluster.reasoning, merged_count=len(cluster_findings), kept_id=cluster_findings[0].test_case_id)
266+
267+
# Keep the finding with the highest CVSS or confidence
268+
top_finding = sorted(
269+
cluster_findings,
270+
key=lambda x: (x.cvss_score, x.confidence),
271+
reverse=True
272+
)[0]
273+
deduplicated.append(top_finding)
274+
275+
if len(deduplicated) > 0 and len(deduplicated) <= len(findings):
276+
logger.info("AI deduplication complete", original=len(findings), final=len(deduplicated))
277+
return deduplicated
278+
279+
except (json.JSONDecodeError, ValidationError) as e:
280+
logger.warning("AI deduplication schema validation failed", error=str(e))
281+
except Exception as e:
282+
logger.warning("AI deduplication evaluation failed", error=str(e))
283+
284+
return findings

src/secnodeapi/cli.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
validate_and_retest,
4141
write_report,
4242
)
43+
from .ai import deduplicate_findings_with_ai
4344
from .services.controller import ControllerService
4445
from .test_executor import execute_proactive_tests
4546

@@ -323,6 +324,9 @@ async def _run_full_pipeline(args, pipeline_input: PipelineInput) -> None:
323324
proxy=pipeline_input.proxy,
324325
verify_ssl=pipeline_input.verify_ssl,
325326
)
327+
328+
findings = await deduplicate_findings_with_ai(findings)
329+
326330
report = build_report(api_structure.title, findings)
327331
output_dir = write_report(report, args.target)
328332
logger.info("SecNode pipeline completed successfully", output_dir=output_dir)

src/secnodeapi/services/pipeline.py

Lines changed: 24 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from ..ai import (
1414
classify_findings,
15+
deduplicate_findings_with_ai,
1516
generate_test_cases,
1617
understand_api_with_ai,
1718
validate_findings_with_ai,
@@ -162,48 +163,10 @@ def _apply_identity_variants(
162163
return variants
163164

164165

165-
def _mutate_path(path: str) -> List[str]:
166-
mutations = set()
167-
stripped = path.rstrip("/")
168-
if "/v1/" in stripped:
169-
mutations.add(stripped.replace("/v1/", "/v2/", 1))
170-
if not stripped.endswith("/admin"):
171-
mutations.add(f"{stripped}/admin")
172-
if "{id}" in stripped:
173-
mutations.add(stripped.replace("{id}", "1"))
174-
mutations.add(stripped.replace("{id}", "2"))
175-
return [m for m in mutations if m and m != path]
176-
177-
178-
def _build_discovery_tests(endpoints: List[APIEndpoint]) -> List[TestCase]:
179-
discovery_tests: List[TestCase] = []
180-
for idx, endpoint in enumerate(endpoints):
181-
existing_method = endpoint.method.upper()
182-
for probe_method in ("GET", "POST", "PUT", "DELETE"):
183-
if probe_method == existing_method:
184-
continue
185-
discovery_tests.append(
186-
TestCase(
187-
id=f"DISCOVER-{idx}-{probe_method}",
188-
name="Method probing",
189-
description="Probe undocumented method support",
190-
owasp_category="API9: Improper Inventory Management",
191-
endpoint=endpoint.path,
192-
method=probe_method,
193-
)
194-
)
195-
for mutation_idx, mutated_path in enumerate(_mutate_path(endpoint.path)):
196-
discovery_tests.append(
197-
TestCase(
198-
id=f"MUTATE-{idx}-{mutation_idx}",
199-
name="Path mutation",
200-
description="Probe possible shadow or alternate API path",
201-
owasp_category="API9: Improper Inventory Management",
202-
endpoint=mutated_path,
203-
method="GET",
204-
)
205-
)
206-
return discovery_tests
166+
167+
168+
169+
207170

208171

209172
def _extract_candidate_ids(response_body: str) -> List[str]:
@@ -318,10 +281,8 @@ async def run_agent_pipeline(
318281
"""
319282
api_structure, seed_tests = await build_pipeline_artifacts(pipeline_input)
320283
identities = _resolve_identities(pipeline_input)
321-
discovery_tests = _build_discovery_tests(api_structure.endpoints)
322-
323284
queue = _deduplicate_test_cases(
324-
_apply_identity_variants(seed_tests + discovery_tests, identities)
285+
_apply_identity_variants(seed_tests, identities)
325286
)
326287
confirmed: List[Finding] = []
327288
suspected: List[Finding] = []
@@ -366,6 +327,9 @@ async def run_agent_pipeline(
366327
)
367328
queue = _deduplicate_test_cases(queue + chain_tests)
368329

330+
# Run final AI deduplication
331+
confirmed = await deduplicate_findings_with_ai(confirmed)
332+
369333
metrics = {
370334
"iterations": iteration,
371335
"requests_attempted": pipeline_input.request_budget - remaining_budget,
@@ -378,16 +342,30 @@ async def run_agent_pipeline(
378342
return api_structure, confirmed, suspected, metrics
379343

380344

345+
from .recon import perform_active_recon
346+
381347
async def build_pipeline_artifacts(
382348
pipeline_input: PipelineInput,
383349
) -> tuple:
384-
"""Run schema load, understanding, and test generation."""
350+
"""Run schema load, active recon, understanding, and test generation."""
385351
raw_schema = await fetch_schema(
386352
pipeline_input.target,
387353
proxy=pipeline_input.proxy,
388354
verify_ssl=pipeline_input.verify_ssl,
389355
)
390356
api_structure = analyze_api_structure(raw_schema)
357+
358+
# Inject Active Reconnaissance
359+
discovered_endpoints = await perform_active_recon(api_structure, pipeline_input)
360+
if discovered_endpoints:
361+
api_structure.endpoints.extend(discovered_endpoints)
362+
363+
# Deduplicate endpoints by path and method
364+
unique_endpoints = {}
365+
for ep in api_structure.endpoints:
366+
unique_endpoints[(ep.path, ep.method)] = ep
367+
api_structure.endpoints = list(unique_endpoints.values())
368+
391369
understanding = await understand_api_with_ai(api_structure)
392370
tests = await generate_test_cases(understanding, api_structure, instructions=pipeline_input.instructions[0].model_dump() if pipeline_input.instructions else None)
393371
return api_structure, tests

src/secnodeapi/services/recon.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
"""
2+
Active API Reconnaissance & Discovery Service.
3+
Probes for hidden or undocumented endpoints and parameters before AI analysis.
4+
"""
5+
6+
from typing import List, Dict, Set
7+
import structlog
8+
9+
from ..vulnerability_models import APIEndpoint, SchemaStructure, TestCase
10+
from .pipeline import PipelineInput
11+
from ..test_executor import execute_proactive_tests_detailed
12+
13+
logger = structlog.get_logger(__name__)
14+
15+
16+
def _mutate_path(path: str) -> List[str]:
17+
"""Generate potential hidden paths based on existing routes."""
18+
mutations: Set[str] = set()
19+
stripped = path.rstrip("/")
20+
if "/v1/" in stripped:
21+
mutations.add(stripped.replace("/v1/", "/v2/", 1))
22+
mutations.add(stripped.replace("/v1/", "/internal/", 1))
23+
if not stripped.endswith("/admin"):
24+
mutations.add(f"{stripped}/admin")
25+
if "{id}" in stripped:
26+
mutations.add(stripped.replace("{id}", "1"))
27+
mutations.add(stripped.replace("{id}", "2"))
28+
return [m for m in mutations if m and m != path]
29+
30+
31+
def _build_discovery_tests(endpoints: List[APIEndpoint]) -> List[TestCase]:
32+
"""Generate exploratory test cases for undocumented methods and mutated paths."""
33+
discovery_tests: List[TestCase] = []
34+
35+
# Track which methods we already know exist for each path
36+
existing_methods_by_path: Dict[str, Set[str]] = {}
37+
for endpoint in endpoints:
38+
if endpoint.path not in existing_methods_by_path:
39+
existing_methods_by_path[endpoint.path] = set()
40+
existing_methods_by_path[endpoint.path].add(endpoint.method.upper())
41+
42+
# Generate Method Probes and Path Mutations
43+
idx = 0
44+
for path, known_methods in existing_methods_by_path.items():
45+
# Probe for undocumented HTTP methods
46+
for probe_method in ("GET", "POST", "PUT", "DELETE", "PATCH"):
47+
if probe_method in known_methods:
48+
continue
49+
discovery_tests.append(
50+
TestCase(
51+
id=f"DISCOVER-{idx}-{probe_method}",
52+
name="Method probing",
53+
description="Probe undocumented method support",
54+
owasp_category="API9: Improper Inventory Management",
55+
endpoint=path,
56+
method=probe_method,
57+
)
58+
)
59+
idx += 1
60+
61+
# Probe for path mutations (e.g. /v2/ instead of /v1/)
62+
for mutation_idx, mutated_path in enumerate(_mutate_path(path)):
63+
discovery_tests.append(
64+
TestCase(
65+
id=f"MUTATE-{idx}-{mutation_idx}",
66+
name="Path mutation",
67+
description="Probe possible shadow or alternate API path",
68+
owasp_category="API9: Improper Inventory Management",
69+
endpoint=mutated_path,
70+
method="GET",
71+
)
72+
)
73+
idx += 1
74+
75+
return discovery_tests
76+
77+
78+
async def perform_active_recon(
79+
api_structure: SchemaStructure, pipeline_input: PipelineInput
80+
) -> List[APIEndpoint]:
81+
"""
82+
Actively scan the application for undocumented endpoints and parameters.
83+
Returns a list of newly discovered APIEndpoints.
84+
"""
85+
logger.info("Starting active reconnaissance for hidden endpoints")
86+
87+
discovery_tests = _build_discovery_tests(api_structure.endpoints)
88+
if not discovery_tests:
89+
return []
90+
91+
results, _ = await execute_proactive_tests_detailed(
92+
test_cases=discovery_tests,
93+
base_url=api_structure.base_url,
94+
concurrency=pipeline_input.concurrency,
95+
auth_headers=pipeline_input.auth_headers,
96+
proxy=pipeline_input.proxy,
97+
verify_ssl=pipeline_input.verify_ssl,
98+
max_requests=len(discovery_tests),
99+
)
100+
101+
discovered_endpoints: List[APIEndpoint] = []
102+
103+
for result in results:
104+
status = result.status_code
105+
# A 2xx indicates the endpoint definitely exists.
106+
# A 401/403/405 also indicates the endpoint exists, just restricted or wrong method logic.
107+
# 404 indicates it doesn't exist.
108+
if (200 <= status < 300) or status in (401, 403, 405):
109+
new_endpoint = APIEndpoint(
110+
path=result.test_case.endpoint,
111+
method=result.test_case.method.upper(),
112+
operation_id=f"recon_discovered_{result.test_case.method.lower()}",
113+
parameters=[], # We could try to infer parameters from errors, but keep basic for now
114+
auth_required=(status in (401, 403)),
115+
request_body_schema={"present": False} # Hard to infer from recon probes without explicit fuzzing
116+
)
117+
discovered_endpoints.append(new_endpoint)
118+
logger.info("Discovered undocumented endpoint", method=new_endpoint.method, path=new_endpoint.path, status=status)
119+
120+
return discovered_endpoints

0 commit comments

Comments
 (0)