diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 94714ed..04cd0d8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,6 +38,6 @@ jobs: - name: Install dependencies run: | pip install -r requirements.txt - pip install pytest + pip install pytest httpx - name: Run smoke tests run: pytest test_smoke.py -v diff --git a/python/secid_server.py b/python/secid_server.py index e9c47ac..24cb10b 100644 --- a/python/secid_server.py +++ b/python/secid_server.py @@ -1,26 +1,35 @@ #!/usr/bin/env python3 """SecID Server — self-hosted resolver with pluggable storage. -Usage: - python secid_server.py --registry /path/to/SecID/registry - python secid_server.py --registry /data/public/registry --registry /data/private/registry - python secid_server.py --storage redis --redis-url redis://localhost:6379 - python secid_server.py --load bulk # pre-load all entries at startup - python secid_server.py --load lazy # load on first request (default) +Two ways to use this module: + + 1. CLI: python secid_server.py --registry /path/to/SecID/registry [...] + 2. Library: from secid_server import create_app, ServerConfig + config = ServerConfig(registry_dirs=["./registry"]) + app = create_app(config) + +The factory function lets tests build a fully-configured app without +running the CLI bootstrap, and lets ASGI deployments (uvicorn, gunicorn) +construct the app from environment variables instead of argparse. Serves: GET /api/v1/resolve?secid=... — REST API (same as secid.cloudsecurityalliance.org) - /mcp — MCP endpoint (same three tools) + GET /health — health check + POST /admin/reload — reload registry data after git pull + /mcp — MCP endpoint (when `mcp` package is installed) """ +from __future__ import annotations + import argparse import json import logging import os import sys +from dataclasses import dataclass, field from typing import Optional -from fastapi import FastAPI, Query, Request +from fastapi import FastAPI, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse @@ -28,111 +37,119 @@ from registry_loader import bulk_load, SECID_TYPES from resolver import resolve -# --- CLI arguments --- - -parser = argparse.ArgumentParser(description="SecID Self-Hosted Server") -parser.add_argument( - "--registry", action="append", default=[], - help="Path to registry directory (can specify multiple for overlay). Default: ./registry", -) -parser.add_argument("--storage", default="memory", choices=["memory", "redis", "memcached", "sqlite"]) -parser.add_argument("--redis-url", default="redis://localhost:6379") -parser.add_argument("--memcached-url", default="localhost:11211") -parser.add_argument("--sqlite-path", default=":memory:") -parser.add_argument("--load", default="lazy", choices=["lazy", "bulk"]) -parser.add_argument("--host", default="0.0.0.0") -parser.add_argument("--port", type=int, default=8000) -parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) - -args, _ = parser.parse_known_args() - -# Default registry path -if not args.registry: - # Try common locations - for candidate in ["./registry", "../SecID/registry", os.path.expanduser("~/GitHub/CloudSecurityAlliance/SecID/registry")]: - if os.path.isdir(candidate): - args.registry = [candidate] - break - if not args.registry: - print("Error: No registry directory found. Use --registry /path/to/SecID/registry", file=sys.stderr) - sys.exit(1) - -logging.basicConfig(level=getattr(logging, args.log_level), format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) -# --- Storage + loading --- -storage_kwargs = {} -if args.storage == "redis": - storage_kwargs["url"] = args.redis_url -elif args.storage == "memcached": - storage_kwargs["url"] = args.memcached_url -elif args.storage == "sqlite": - storage_kwargs["path"] = args.sqlite_path +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- -store = create_store(args.storage, **storage_kwargs) -if args.load == "bulk": - count = bulk_load(store, args.registry) - logger.info(f"Bulk loaded {count} namespaces into {args.storage} store") -else: - logger.info(f"Lazy loading from {args.registry} with {args.storage} store") +@dataclass +class ServerConfig: + """Configuration for the SecID server. Pass to create_app(). -# --- FastAPI app --- + Attributes: + registry_dirs: List of paths to registry directories. Later + directories override earlier ones for the same namespace+type + (overlay support). + storage_type: One of "memory", "redis", "memcached", "sqlite". + storage_kwargs: Backend-specific kwargs forwarded to create_store(). + For redis/memcached: {"url": "..."}. For sqlite: {"path": "..."}. + load_mode: "lazy" (load on first request, default) or "bulk" + (load everything at startup). + """ -app = FastAPI( - title="SecID Server", - description="Self-hosted SecID resolver", - version="0.1.0", -) + registry_dirs: list[str] + storage_type: str = "memory" + storage_kwargs: dict = field(default_factory=dict) + load_mode: str = "lazy" -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_methods=["GET", "POST"], - allow_headers=["*"], -) +# --------------------------------------------------------------------------- +# App factory +# --------------------------------------------------------------------------- -@app.get("/api/v1/resolve") -async def api_resolve( - secid: str = Query(..., description="SecID string to resolve"), - parsability: Optional[str] = Query(None, description="Filter results by parsability: 'structured' or 'scraped'"), -): - """Resolve a SecID string to URLs and registry data.""" - result = resolve(store, secid, registry_dirs=args.registry) - # Filter by parsability if requested - if parsability and "results" in result: - result["results"] = [ - r for r in result["results"] - if "url" not in r or r.get("parsability") == parsability - ] - return JSONResponse(content=result) +def create_app(config: ServerConfig) -> FastAPI: + """Create a configured SecID server FastAPI app. -@app.post("/admin/reload") -async def admin_reload(): - """Reload registry data (after git pull).""" - from registry_loader import update_load - count = update_load(store, args.registry) - return {"reloaded": count} + Used by the CLI (see main()) and by tests (via fastapi.testclient.TestClient). + Has no module-level side effects, so importing this file is safe. + """ + store = create_store(config.storage_type, **config.storage_kwargs) + if config.load_mode == "bulk": + count = bulk_load(store, config.registry_dirs) + logger.info(f"Bulk loaded {count} namespaces into {config.storage_type} store") + else: + logger.info(f"Lazy loading from {config.registry_dirs} with {config.storage_type} store") -@app.get("/health") -async def health(): - """Health check.""" - key_count = len(store.keys()) - return {"status": "ok", "store": args.storage, "keys": key_count} - + app = FastAPI( + title="SecID Server", + description="Self-hosted SecID resolver", + version="0.1.0", + ) -# --- MCP Server (same three tools as SecID-Service) --- + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["GET", "POST"], + allow_headers=["*"], + ) -try: - from mcp.server.fastmcp import FastMCP + @app.get("/api/v1/resolve") + async def api_resolve( + secid: str = Query(..., description="SecID string to resolve"), + parsability: Optional[str] = Query( + None, + description="Filter results by parsability: 'structured' or 'scraped'", + ), + ): + """Resolve a SecID string to URLs and registry data.""" + result = resolve(store, secid, registry_dirs=config.registry_dirs) + if parsability and "results" in result: + result["results"] = [ + r for r in result["results"] + if "url" not in r or r.get("parsability") == parsability + ] + return JSONResponse(content=result) + + @app.post("/admin/reload") + async def admin_reload(): + """Reload registry data (after git pull).""" + from registry_loader import update_load + count = update_load(store, config.registry_dirs) + return {"reloaded": count} + + @app.get("/health") + async def health(): + """Health check — returns store type and current key count.""" + key_count = len(store.keys()) + return {"status": "ok", "store": config.storage_type, "keys": key_count} + + _try_mount_mcp(app, store, config) + return app + + +def _try_mount_mcp(app: FastAPI, store, config: ServerConfig) -> None: + """Mount /mcp endpoint if the `mcp` package is available. + + Same three tools as SecID-Service (resolve, lookup, describe). Optional + dependency so users who only need the REST API don't have to install MCP. + """ + try: + from mcp.server.fastmcp import FastMCP + except ImportError: + logger.info("MCP SDK not installed — /mcp endpoint disabled. Install with: pip install mcp") + return mcp = FastMCP( "SecID", - instructions="Self-hosted SecID resolver. Resolve, look up, and describe security knowledge identifiers.", + instructions=( + "Self-hosted SecID resolver. Resolve, look up, and describe " + "security knowledge identifiers." + ), ) @mcp.tool() @@ -145,7 +162,7 @@ def mcp_resolve(secid: str) -> str: secid:ttp/mitre.org/attack#T1059.003 → ATT&CK technique URL secid:methodology/first.org/cvss@4.0 → CVSS v4.0 specification """ - return json.dumps(resolve(store, secid, registry_dirs=args.registry), indent=2) + return json.dumps(resolve(store, secid, registry_dirs=config.registry_dirs), indent=2) @mcp.tool() def mcp_lookup(type: str, identifier: str) -> str: @@ -157,7 +174,7 @@ def mcp_lookup(type: str, identifier: str) -> str: identifier: The identifier to search for (e.g., CVE-2021-44228, CWE-79) """ secid = f"secid:{type}/{identifier}" - return json.dumps(resolve(store, secid, registry_dirs=args.registry), indent=2) + return json.dumps(resolve(store, secid, registry_dirs=config.registry_dirs), indent=2) @mcp.tool() def mcp_describe(secid: str) -> str: @@ -171,19 +188,95 @@ def mcp_describe(secid: str) -> str: hash_idx = secid.find("#") if hash_idx != -1: secid = secid[:hash_idx] - return json.dumps(resolve(store, secid, registry_dirs=args.registry), indent=2) + return json.dumps(resolve(store, secid, registry_dirs=config.registry_dirs), indent=2) - # Mount MCP at /mcp app.mount("/mcp", mcp.streamable_http_app()) logger.info("MCP endpoint available at /mcp") -except ImportError: - logger.info("MCP SDK not installed — /mcp endpoint disabled. Install with: pip install mcp") +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def _parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="SecID Self-Hosted Server") + parser.add_argument( + "--registry", action="append", default=[], + help="Path to registry directory (can specify multiple for overlay). Default: ./registry", + ) + parser.add_argument("--storage", default="memory", choices=["memory", "redis", "memcached", "sqlite"]) + parser.add_argument("--redis-url", default="redis://localhost:6379") + parser.add_argument("--memcached-url", default="localhost:11211") + parser.add_argument("--sqlite-path", default=":memory:") + parser.add_argument("--load", default="lazy", choices=["lazy", "bulk"]) + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", type=int, default=8000) + parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"]) + return parser.parse_args(argv) + + +def _resolve_registry_dirs(provided: list[str]) -> list[str]: + """If no --registry was passed, search common host-local locations. + + Returns the list to use (provided as-is if non-empty, or a single + auto-discovered path, or empty list if nothing found). + """ + if provided: + return provided + for candidate in [ + "./registry", + "../SecID/registry", + os.path.expanduser("~/GitHub/CloudSecurityAlliance/SecID/registry"), + ]: + if os.path.isdir(candidate): + return [candidate] + return [] + + +def _build_storage_kwargs(args: argparse.Namespace) -> dict: + if args.storage == "redis": + return {"url": args.redis_url} + if args.storage == "memcached": + return {"url": args.memcached_url} + if args.storage == "sqlite": + return {"path": args.sqlite_path} + return {} + + +def main(argv: Optional[list[str]] = None) -> int: + """CLI entry point. Returns exit code.""" + args = _parse_args(argv) + + registry_dirs = _resolve_registry_dirs(args.registry) + if not registry_dirs: + print( + "Error: No registry directory found. Use --registry /path/to/SecID/registry", + file=sys.stderr, + ) + return 1 + + logging.basicConfig( + level=getattr(logging, args.log_level), + format="%(asctime)s %(levelname)s %(message)s", + ) + + config = ServerConfig( + registry_dirs=registry_dirs, + storage_type=args.storage, + storage_kwargs=_build_storage_kwargs(args), + load_mode=args.load, + ) + + app = create_app(config) -if __name__ == "__main__": import uvicorn logger.info(f"Starting SecID server on {args.host}:{args.port}") - logger.info(f"Registry: {args.registry}") + logger.info(f"Registry: {registry_dirs}") logger.info(f"Storage: {args.storage}, Loading: {args.load}") uvicorn.run(app, host=args.host, port=args.port) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/test_smoke.py b/python/test_smoke.py index d5c830d..b4e856b 100644 --- a/python/test_smoke.py +++ b/python/test_smoke.py @@ -1,17 +1,19 @@ -"""Smoke tests — verify the core resolver modules import cleanly and basic -constants/behavior are sane. +"""Smoke tests — verify the core resolver modules import cleanly, basic +constants/behavior are sane, and the secid_server factory produces a +working FastAPI app. -This is intentionally minimal: import-level guarantees plus a couple of -sanity assertions. Full test coverage will come with the conformance suite -work tracked in SecID-Client-SDK. - -Note: deliberately does NOT import secid_server.py — that module runs -argparse + storage initialization at import time, which makes it untestable -without restructuring. A future PR will move its CLI bootstrap into a -main() function gated by `if __name__ == "__main__":`. +After the secid_server.py refactor (factory pattern), this file can now +test the HTTP layer too via fastapi.testclient.TestClient. """ -import pytest +from fastapi.testclient import TestClient + +from secid_server import ServerConfig, create_app + + +# --------------------------------------------------------------------------- +# Module imports +# --------------------------------------------------------------------------- def test_resolver_module_imports(): @@ -34,6 +36,22 @@ def test_storage_module_imports(): assert callable(create_store) +def test_secid_server_module_imports_without_side_effects(): + """secid_server.py must be importable without running argparse or + starting a server. This is the property the pre-Phase-1 refactor + delivered — broken before, working after. + """ + import secid_server + assert hasattr(secid_server, "create_app") + assert hasattr(secid_server, "ServerConfig") + assert hasattr(secid_server, "main") + + +# --------------------------------------------------------------------------- +# Type-list invariants +# --------------------------------------------------------------------------- + + def test_secid_types_canonical(): """The 10 official SecID types — frozen at v1.0, must not drift silently.""" from registry_loader import SECID_TYPES @@ -50,8 +68,7 @@ def test_secid_types_canonical(): def test_secid_types_single_source(): - """Confirm resolver.py imports SECID_TYPES from registry_loader rather - than redefining it (PR #4 dedup).""" + """resolver.py should import SECID_TYPES from registry_loader, not redefine it.""" import resolver import registry_loader assert resolver.SECID_TYPES is registry_loader.SECID_TYPES, ( @@ -61,6 +78,11 @@ def test_secid_types_single_source(): ) +# --------------------------------------------------------------------------- +# resolve() basic invariants +# --------------------------------------------------------------------------- + + def test_resolve_handles_empty_input(): """resolve() must not crash on edge inputs — minimum contract.""" from resolver import resolve @@ -69,16 +91,68 @@ def test_resolve_handles_empty_input(): store = create_store("memory") result = resolve(store, "") assert isinstance(result, dict) - # Empty input should produce an error envelope, not raise. - assert "secid_query" in result or "error" in result or "results" in result def test_resolve_handles_missing_prefix(): - """A SecID without the 'secid:' prefix is malformed; must return an error envelope.""" + """A SecID without the 'secid:' prefix is malformed; must return an envelope, not raise.""" from resolver import resolve from storage import create_store store = create_store("memory") result = resolve(store, "advisory/mitre.org/cve#CVE-2021-44228") assert isinstance(result, dict) - # Should produce SOME response, not raise. + + +# --------------------------------------------------------------------------- +# create_app() factory + HTTP endpoints +# --------------------------------------------------------------------------- + + +def _empty_app(): + """Build a minimal app with no registry data. Sufficient for HTTP-layer smoke tests.""" + config = ServerConfig(registry_dirs=[], storage_type="memory") + return create_app(config) + + +def test_create_app_factory_returns_fastapi_app(): + """The factory should produce a FastAPI app with the expected title.""" + app = _empty_app() + assert app.title == "SecID Server" + + +def test_health_endpoint_returns_ok(): + """GET /health returns 200 with status=ok and the storage type.""" + client = TestClient(_empty_app()) + response = client.get("/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert data["store"] == "memory" + assert "keys" in data + + +def test_resolve_endpoint_returns_envelope_for_garbage_input(): + """GET /api/v1/resolve always returns a 200 with an envelope, even for malformed input. + This is the 'helpful over correct' contract from PRINCIPLES.md. + """ + client = TestClient(_empty_app()) + response = client.get("/api/v1/resolve?secid=this-is-not-a-valid-secid") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, dict) + + +def test_resolve_endpoint_requires_secid_param(): + """GET /api/v1/resolve without ?secid= should return 422 (FastAPI validation error).""" + client = TestClient(_empty_app()) + response = client.get("/api/v1/resolve") + assert response.status_code == 422 + + +def test_create_app_no_side_effects_on_import(): + """Multiple calls to create_app must produce independent apps without leaking + state between them (no module-level state from the old non-factory shape). + """ + app_a = _empty_app() + app_b = _empty_app() + assert app_a is not app_b