diff --git a/src/node-control/README.md b/src/node-control/README.md index 3983af96..99784152 100644 --- a/src/node-control/README.md +++ b/src/node-control/README.md @@ -23,6 +23,7 @@ - [Voting Commands](#voting-commands) - [REST API Endpoints](#rest-api-endpoints) - [Configuration](#configuration) +- [Audit log](#audit-log) - [Config Structure](#config-structure) - [Section Descriptions](#section-descriptions) - [Default Config Example](#default-config-example) @@ -2521,9 +2522,18 @@ curl -X POST http://127.0.0.1:8080/v1/task/elections \ --- +## Audit log + +nodectl writes a structured audit log of domain events (elections, config +mutations, auth) to `./logs/audit.jsonl`. See [docs/audit-log.md](docs/audit-log.md) +for configuration, retention, PII handling, and log analysis. + +--- + ## Related Setup Guides - [Hashicorp Vault Dedicated Setup](./docs/hcp-vault-setup.md) - [Node Control Service Setup](./docs/nodectl-setup.md) - [Contracts automation (auto-deploy / auto-topup)](./docs/contracts-automation.md) — `automation` config, REST and CLI - [Security Guide](./docs/nodectl-security.md) — roles, token lifecycle, rate limiting, monitoring +- [Audit Log](./docs/audit-log.md) — configuration, durability, PII, log analysis diff --git a/src/node-control/commands/src/commands/nodectl/service_api_cmd.rs b/src/node-control/commands/src/commands/nodectl/service_api_cmd.rs index 7047fb92..1a8c020b 100644 --- a/src/node-control/commands/src/commands/nodectl/service_api_cmd.rs +++ b/src/node-control/commands/src/commands/nodectl/service_api_cmd.rs @@ -673,7 +673,7 @@ fn print_elections_table(body: &str) -> anyhow::Result<()> { println!("\n {} ({})\n", "Our Participants".cyan().bold(), participants.len()); println!( - " {} {} {} {} {} {} {} {} {}", + " {} {} {} {} {} {} {} {} {} {}", format!("{:<14}", "Node").cyan().bold(), format!("{:<13}", "Status").cyan().bold(), format!("{:<5}", "Pos").cyan().bold(), @@ -681,10 +681,11 @@ fn print_elections_table(body: &str) -> anyhow::Result<()> { format!("{:<15}", "Accepted TON").cyan().bold(), format!("{:<24}", "Submitted At").cyan().bold(), format!("{:<6}", "MaxF").cyan().bold(), + format!("{:<30}", "Last error").cyan().bold(), format!("{:<44}", "Pubkey").cyan().bold(), "ADNL".cyan().bold(), ); - println!(" {}", "-".repeat(148).dimmed()); + println!(" {}", "-".repeat(180).dimmed()); for p in participants { let node = binding_str(p, "node_id"); @@ -736,11 +737,14 @@ fn print_elections_table(body: &str) -> anyhow::Result<()> { .unwrap_or_else(|| "-".to_string()); let accepted_stake = binding_str(p, "accepted_stake"); + let last_error = binding_str(p, "last_error"); + let last_error_display = + if last_error == "-" { "-".to_string() } else { last_error.yellow().to_string() }; let pubkey = binding_str(p, "pubkey"); let adnl = binding_str(p, "adnl"); println!( - " {:<14} {} {:<5} {:<15} {:<15} {:<24} {:<6} {:<44} {}", + " {:<14} {} {:<5} {:<15} {:<15} {:<24} {:<6} {:<30} {:<44} {}", node, status, position, @@ -748,14 +752,13 @@ fn print_elections_table(body: &str) -> anyhow::Result<()> { display_tons_from_str(&accepted_stake), submitted_at, max_factor, + last_error_display, pubkey, adnl, ); } println!(); - print_recent_events_table(&value); - Ok(()) } diff --git a/src/node-control/docs/audit-log.md b/src/node-control/docs/audit-log.md new file mode 100644 index 00000000..3cff4a43 --- /dev/null +++ b/src/node-control/docs/audit-log.md @@ -0,0 +1,177 @@ +# Audit Log + +## What it is + +nodectl writes a structured, append-only log of domain events — elections, config +mutations, authentication, vault operations — to a newline-delimited JSON file +(`audit.jsonl`). + +The audit log is **separate from the `tracing` service log** (stderr / journald). +Use the table below to decide where to look. + +| Use case | Where | +|---|---| +| Debugging service internals, stack traces | tracing logs (`RUST_LOG`) | +| HTTP access / request logs | *(not implemented; would be tracing spans)* | +| Metrics / counters | Prometheus *(future)* | +| Domain events: who did what and when | **audit log** | + +## Out of scope + +- Per-RPC / per-request logging +- Metrics / dashboards +- Debug noise (heartbeats, cache refreshes, routine polls) +- High-frequency sources (> ~10 events/sec) +- Tamper-evidence (hash chain, signed events) — see RFC 9162 for future work + +## Event types + +Events are grouped by source: + +| Prefix | Events | +|---|---| +| `elections.*` | Key generated, stake submitted/accepted/skipped/failed/recovered, withdraw processed/failed | +| `rest_api.*` | Config updated, auth login succeeded/rejected, token rejected | +| `vault.*` | Key created / removed *(producers not wired yet)* | +| `rewards.*` | Distribution started/completed/failed, recipient skipped *(producers not wired yet)* | +| `system.*` | Service started/stopped, audit events dropped | + +Each event contains: + +- `id` — UUID v7 (sortable by creation time) +- `ts` — RFC3339 timestamp with millisecond precision (`2026-05-22T12:10:30.123Z`) +- `outcome` — `success`, `failure`, or `skipped` +- `event_type` — dotted string (e.g. `elections.stake_submitted`) +- `data` — event-specific payload (omitted when `include_payload = false`) +- `actor` — who triggered the action (`service` task or `user` identity) +- `target` — what the action was applied to (node, config, vault key, …) + +## File layout + +``` +./logs/audit.jsonl ← current file (line 0 is a system.service_started event) +./logs/audit.jsonl.1 ← most-recent rotation +./logs/audit.jsonl.2 +… +./logs/audit.jsonl.9 +``` + +The first line of every (rotated) file is a regular `system.service_started` +event whose `data` carries the service `version` and `host`. There is no +special header format — every line is a uniform JSONL event: + +```json +{"id":"019ecb64-...","ts":"2026-05-22T12:00:00.000Z","outcome":"success","event_type":"system.service_started","data":{"version":"0.7.0","host":"validator-1"},"actor":{"kind":"system"},"target":{"kind":"system"}} +``` + +Defaults: 100 MiB per file, 10 files → ~1 GiB total history. + +## Configuration + +All fields live under the `audit_log` key in the nodectl config file. +None of them require a service restart — the values are read at startup. + +| Field | Default | Description | +|---|---|---| +| `enabled` | `true` | Set to `false` to disable the audit log entirely | +| `path` | `./logs/audit.jsonl` | Path to the active log file; rotated files get `.1`…`.N` suffixes | +| `max_size_bytes` | `104857600` (100 MiB) | Rotate when the live file exceeds this size | +| `max_files` | `10` | Number of rotated files to keep (oldest is deleted on overflow) | +| `batch_interval_ms` | `1000` | How often (ms) the writer flushes a batch to disk | +| `batch_max_events` | `100` | Flush early when a batch reaches this many events | +| `queue_capacity` | `10000` | In-memory channel capacity between `record()` callers and the writer task | +| `queue_full_timeout_ms` | `250` | How long (ms) `record()` waits before dropping an event when the queue is full | +| `fsync_on_batch` | `false` | Call `fsync` after every batch — see [Durability](#durability) | +| `include_payload` | `true` | Write `data` fields; set to `false` to log event metadata only | +| `record_client_ip` | `false` | Include client IP in `rest_api.*` events — see [PII](#pii-and-retention) | +| `ip_anonymize` | `false` | Mask last IPv4 octet / last two IPv6 groups when recording IP | +| `ring_buffer_capacity` | `100` | In-memory ring for the REST read-path (see [Where it's consumed](#where-its-consumed)) | + +Example minimal override (all other fields keep their defaults): + +```json +{ + "audit_log": { + "path": "/var/log/nodectl/audit.jsonl", + "max_files": 30, + "fsync_on_batch": true + } +} +``` + +## Durability + +With `fsync_on_batch = false` (default), the kernel page cache is flushed on +the OS's own schedule. On a hard kill (`SIGKILL`) or power loss, up to +`batch_interval_ms` (~1 s) of events may be lost. + +Set `fsync_on_batch = true` for strict durability at higher disk cost (one +`fsync` per second by default; one per `batch_max_events` events at high +throughput). + +Events dropped because the writer queue is full are counted in the +`system.audit_events_dropped` event emitted on the next flush. + +## PII and retention + +Audit events may contain operator usernames, optionally client IP addresses, +and config change details. In GDPR-style regimes, IP addresses and usernames +are personal data. + +- `record_client_ip = false` (default): no IP is ever written. +- `record_client_ip = true`, `ip_anonymize = false`: full IP written. +- `record_client_ip = true`, `ip_anonymize = true`: last IPv4 octet zeroed, + last two IPv6 groups masked (`::0:0`). + +Retention is bounded by `max_size_bytes × max_files`. Tune for your policy. +Log files are **not** automatically deleted after a time-based retention period — +external tooling (logrotate, cron) is needed if you require time-based purges. + +## File permissions + +On Unix, the live file and all rotated files are created with mode `0600` +(owner read/write only). The directory is not created with any special mode — +ensure the directory itself has appropriate permissions. + +Tamper-evidence (hash chains, signed events) is **out of scope** for the +current release. Treat the audit log as protected by host trust and filesystem +ACLs, not by cryptography. + +## Where it's consumed + +`GET /v1/elections` reads from the **in-memory ring buffer** (last +`ring_buffer_capacity` events, default 100) and enriches `our_participants` +with: + +- `stake_submissions` — stake submission history from audit +- `last_error` — latest error-class event (stake skipped, stake failed, withdraw failed) + +The JSONL file on disk is **not** parsed on the hot path. + +## Analyzing the log + +```sh +# Count events by type +jq -r .event_type logs/audit.jsonl | sort | uniq -c | sort -rn + +# All events for one election round +jq 'select(.target.election_id == 1779265552)' logs/audit.jsonl + +# Failed or skipped stakes in the last file +jq 'select(.outcome == "failure" or .outcome == "skipped") + | select(.event_type | startswith("elections.stake"))' logs/audit.jsonl + +# Config mutations by a specific user +jq 'select(.event_type == "rest_api.config_updated" and .actor.id == "alice")' \ + logs/audit.jsonl + +# Tail-follow live events +tail -f logs/audit.jsonl | jq . + +# All events in a time range +jq 'select(.ts >= "2026-05-22T10:00:00Z" and .ts < "2026-05-22T11:00:00Z")' \ + logs/audit.jsonl + +# Events across rotated files (newest first) +cat logs/audit.jsonl.1 logs/audit.jsonl | jq . +``` diff --git a/src/node-control/service/src/audit/enums.rs b/src/node-control/service/src/audit/enums.rs index 727b95ea..9dec6734 100644 --- a/src/node-control/service/src/audit/enums.rs +++ b/src/node-control/service/src/audit/enums.rs @@ -87,7 +87,7 @@ pub enum AuditEventPayload { // ── system ─────────────────────────────────────────────────────────────── #[serde(rename = "system.service_started")] - SystemServiceStarted { version: String }, + SystemServiceStarted { version: String, host: String }, #[serde(rename = "system.service_stopped")] SystemServiceStopped {}, diff --git a/src/node-control/service/src/audit/event.rs b/src/node-control/service/src/audit/event.rs index d43d4299..379f578e 100644 --- a/src/node-control/service/src/audit/event.rs +++ b/src/node-control/service/src/audit/event.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; /// Renders timestamps as RFC3339 with millisecond precision and a trailing `Z` -/// (e.g. `2026-05-22T12:10:30.123Z`), used for `ts` and `started_at`. +/// (e.g. `2026-05-22T12:10:30.123Z`), used for the `ts` field. mod ts_millis_rfc3339 { use super::*; @@ -31,26 +31,11 @@ mod ts_millis_rfc3339 { } } -/// First JSONL line of every (rotated) audit file. Readers distinguish it from -/// events by the absence of an `event_type` field. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct AuditFileHeader { - pub schema_version: u16, - /// Logical service name, e.g. `"nodectl"`. - pub service: String, - /// Service semver. - pub service_version: String, - pub host: String, - #[serde(with = "ts_millis_rfc3339")] - pub started_at: DateTime, -} - /// A single audit record. /// /// Wire shape: `id`, `ts`, `outcome`, the flattened payload /// (`event_type` + `data`), `actor`, `target`. `severity`/`source` are derived -/// from the payload at the display layer and `schema_version` lives in -/// [`AuditFileHeader`], so none of them are stored per event. +/// from the payload at the display layer, so they are not stored per event. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct AuditEvent { /// UUID v7 — sortable by creation time. @@ -287,12 +272,12 @@ impl AuditEvent { ) } - pub fn system_service_started(version: impl Into) -> Self { + pub fn system_service_started(version: impl Into, host: impl Into) -> Self { Self::new( AuditActor::System, AuditTarget::System, AuditOutcome::Success, - AuditEventPayload::SystemServiceStarted { version: version.into() }, + AuditEventPayload::SystemServiceStarted { version: version.into(), host: host.into() }, ) } @@ -341,6 +326,31 @@ mod tests { AuditEvent { id: fixture_id(), ts: fixture_ts(), outcome, payload, actor, target } } + #[test] + fn serializes_service_started_to_expected_json() { + let event = fixed( + AuditOutcome::Success, + AuditActor::System, + AuditTarget::System, + AuditEventPayload::SystemServiceStarted { + version: "0.5.1".into(), + host: "node-host".into(), + }, + ); + assert_json_eq( + &event, + json!({ + "id": FIXTURE_ID, + "ts": FIXTURE_TS, + "outcome": "success", + "event_type": "system.service_started", + "data": { "version": "0.5.1", "host": "node-host" }, + "actor": { "kind": "system" }, + "target": { "kind": "system" } + }), + ); + } + #[test] fn serializes_stake_submitted_to_expected_json() { let event = fixed( @@ -405,30 +415,6 @@ mod tests { ); } - #[test] - fn file_header_serializes_with_millis_ts() { - let header = AuditFileHeader { - schema_version: 1, - service: "nodectl".into(), - service_version: "0.5.1".into(), - host: "node-host".into(), - started_at: fixture_ts(), - }; - let value = serde_json::to_value(&header).expect("serialize header"); - assert_eq!( - value, - json!({ - "schema_version": 1, - "service": "nodectl", - "service_version": "0.5.1", - "host": "node-host", - "started_at": FIXTURE_TS - }) - ); - // Header has no event_type — that is how readers tell it apart from events. - assert!(value.get("event_type").is_none()); - } - fn sample_event(payload: AuditEventPayload) -> AuditEvent { fixed( AuditOutcome::Success, @@ -481,7 +467,10 @@ mod tests { AuditEventPayload::RestApiTokenRejected { reason: "expired".into() }, AuditEventPayload::VaultKeyCreated {}, AuditEventPayload::VaultKeyRemoved {}, - AuditEventPayload::SystemServiceStarted { version: "0.5.0".into() }, + AuditEventPayload::SystemServiceStarted { + version: "0.5.0".into(), + host: "test-host".into(), + }, AuditEventPayload::SystemServiceStopped {}, AuditEventPayload::SystemAuditEventsDropped { dropped_events: 3, diff --git a/src/node-control/service/src/audit/factory.rs b/src/node-control/service/src/audit/factory.rs index 135c6edd..ef37d3be 100644 --- a/src/node-control/service/src/audit/factory.rs +++ b/src/node-control/service/src/audit/factory.rs @@ -45,7 +45,7 @@ mod tests { use tempfile::tempdir; fn sample_event() -> AuditEvent { - AuditEvent::system_service_started("test") + AuditEvent::system_service_started("test", "") } #[tokio::test] diff --git a/src/node-control/service/src/audit/in_memory.rs b/src/node-control/service/src/audit/in_memory.rs index fc999ffb..f2665bc1 100644 --- a/src/node-control/service/src/audit/in_memory.rs +++ b/src/node-control/service/src/audit/in_memory.rs @@ -46,7 +46,7 @@ mod tests { #[tokio::test] async fn records_and_drains_events() { let log = InMemoryAuditLog::new(); - let event = AuditEvent::system_service_started("test"); + let event = AuditEvent::system_service_started("test", ""); log.record(event.clone()).await; let drained = log.drain(); assert_eq!(drained.len(), 1); diff --git a/src/node-control/service/src/audit/jsonl_log.rs b/src/node-control/service/src/audit/jsonl_log.rs index 3d012134..d6a6a447 100644 --- a/src/node-control/service/src/audit/jsonl_log.rs +++ b/src/node-control/service/src/audit/jsonl_log.rs @@ -166,7 +166,6 @@ impl AuditLog for JsonlAuditLog { } // Ring already updated; proceed to the JSONL writer queue. - let event_id = event.id; let source = event.payload.source(); let cmd = AuditCommand::Event(Box::new(event)); @@ -206,7 +205,7 @@ mod tests { use tempfile::tempdir; fn sample_event(tag: &str) -> AuditEvent { - AuditEvent::system_service_started(tag) + AuditEvent::system_service_started(tag, "") } fn test_config(path: PathBuf) -> AuditLogConfig { diff --git a/src/node-control/service/src/audit/jsonl_writer.rs b/src/node-control/service/src/audit/jsonl_writer.rs index d3cd5b20..5859ddbd 100644 --- a/src/node-control/service/src/audit/jsonl_writer.rs +++ b/src/node-control/service/src/audit/jsonl_writer.rs @@ -6,8 +6,7 @@ * * This software is provided "AS IS", WITHOUT WARRANTY OF ANY KIND. */ -use crate::audit::{AuditEvent, AuditFileHeader, AuditLogConfig, jsonl_log::AuditInitError}; -use chrono::Utc; +use crate::audit::{AuditEvent, AuditLogConfig, jsonl_log::AuditInitError}; use std::{ sync::{ Arc, Once, @@ -16,9 +15,6 @@ use std::{ time::Duration, }; -/// Schema version stamped into the per-file [`AuditFileHeader`]. -const AUDIT_SCHEMA_VERSION: u16 = 1; - static HOSTNAME_FALLBACK_WARNED: Once = Once::new(); #[derive(Debug)] @@ -39,7 +35,7 @@ pub(crate) enum AuditCommand { } pub(crate) struct AuditWriter { - /// Host identity stamped into each new file's [`AuditFileHeader`]. + /// Host identity written as `data.host` into the first [`AuditEvent`] of each file segment. host: String, config: Arc, /// Live append handle. `None` only transiently during rotation (the old @@ -128,26 +124,22 @@ impl AuditWriter { last_dropped_seen: 0, write_delay, }; - writer.write_header_if_empty().await.map_err(AuditInitError::FileOpen)?; + writer.write_startup_event_if_new().await.map_err(AuditInitError::FileOpen)?; Ok(writer) } - fn file_header(&self) -> AuditFileHeader { - AuditFileHeader { - schema_version: AUDIT_SCHEMA_VERSION, - service: "nodectl".into(), - service_version: env!("CARGO_PKG_VERSION").into(), - host: self.host.clone(), - started_at: Utc::now(), - } - } - - async fn write_header_if_empty(&mut self) -> std::io::Result<()> { + /// Writes a `system.service_started` event as the very first line of a new file segment. + /// + /// Called on fresh open (empty file) and after rotation. Every segment starts with this + /// event so readers always know which service version and host produced the following lines, + /// with no special-case header format to distinguish. + async fn write_startup_event_if_new(&mut self) -> std::io::Result<()> { if self.current_size != 0 { return Ok(()); } use tokio::io::AsyncWriteExt; - let mut line = serde_json::to_vec(&self.file_header()) + let ev = AuditEvent::system_service_started(env!("CARGO_PKG_VERSION"), self.host.as_str()); + let mut line = serde_json::to_vec(&ev) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; line.push(b'\n'); let file = self @@ -422,7 +414,7 @@ impl AuditWriter { } self.file = Some(file); self.current_size = 0; - self.write_header_if_empty().await?; + self.write_startup_event_if_new().await?; Ok(()) } @@ -474,11 +466,11 @@ mod tests { } fn sample_event(tag: &str) -> AuditEvent { - AuditEvent::system_service_started(tag) + AuditEvent::system_service_started(tag, "") } fn large_event(payload_kb: usize) -> AuditEvent { - AuditEvent::system_service_started("x".repeat(payload_kb * 1024)) + AuditEvent::system_service_started("x".repeat(payload_kb * 1024), "") } fn test_config(dir: &Path, mut cfg: AuditLogConfig) -> AuditLogConfig { @@ -531,16 +523,24 @@ mod tests { tx.send(AuditCommand::Shutdown).await.unwrap(); } - /// Reads event lines, skipping the per-file [`AuditFileHeader`] (no `event_type`). + /// Reads all event lines, skipping only the very first `system.service_started` line + /// that the writer automatically prepends to every new file segment. Test events of + /// the same type (if any) are preserved so counting assertions stay correct. fn read_json_lines(path: &Path) -> Vec { assert!(path.exists(), "audit file missing at {}", path.display()); let content = std::fs::read_to_string(path).unwrap(); - content + let mut lines: Vec = content .lines() .filter(|line| !line.is_empty()) .map(|line| serde_json::from_str::(line).expect("valid json line")) - .filter(|value| value.get("event_type").is_some()) - .collect() + .collect(); + // Drop only the first line when it is the writer-injected startup event. + if lines.first().and_then(|v| v.get("event_type")).and_then(|t| t.as_str()) + == Some("system.service_started") + { + lines.remove(0); + } + lines } fn count_rotated_files(dir: &Path) -> usize { diff --git a/src/node-control/service/src/audit/log.rs b/src/node-control/service/src/audit/log.rs index 3ef705c0..8ba5d66d 100644 --- a/src/node-control/service/src/audit/log.rs +++ b/src/node-control/service/src/audit/log.rs @@ -28,6 +28,6 @@ mod tests { #[tokio::test] async fn noop_audit_log_record_completes() { let log = NoopAuditLog; - log.record(AuditEvent::system_service_started("test")).await; + log.record(AuditEvent::system_service_started("test", "")).await; } } diff --git a/src/node-control/service/src/audit/mod.rs b/src/node-control/service/src/audit/mod.rs index f038d9b6..7c0f71be 100644 --- a/src/node-control/service/src/audit/mod.rs +++ b/src/node-control/service/src/audit/mod.rs @@ -16,6 +16,7 @@ pub mod jsonl_log; pub mod jsonl_writer; pub mod log; pub mod participant; +pub mod projection; pub mod ring_buffer; pub use actor_builder::{AuditActorBuilder, client_ip_from_headers}; @@ -23,11 +24,15 @@ pub use common::app_config::AuditLogConfig; pub use enums::{ AuditEventPayload, AuditOutcome, AuditSeverity, AuditSource, ConfigFieldChange, StakeSkipReason, }; -pub use event::{AuditEvent, AuditFileHeader, ElectionsStakeSubmittedParams}; +pub use event::{AuditEvent, ElectionsStakeSubmittedParams}; pub use factory::AuditLogFactory; #[cfg(test)] pub use in_memory::InMemoryAuditLog; pub use jsonl_log::AuditInitError; pub use log::{AuditLog, NoopAuditLog}; pub use participant::{AuditActor, AuditTarget}; +pub use projection::{ + ElectionsProjection, collect_recent_election_ids, merge_projection_into_participants, + project_elections, +}; pub use ring_buffer::AuditEventBuffer; diff --git a/src/node-control/service/src/audit/projection.rs b/src/node-control/service/src/audit/projection.rs new file mode 100644 index 00000000..7d7d48f2 --- /dev/null +++ b/src/node-control/service/src/audit/projection.rs @@ -0,0 +1,443 @@ +/* + * Copyright (C) 2025-2026 RSquad Blockchain Lab. + * + * Licensed under the GNU General Public License v3.0. + * See the LICENSE file in the root of this repository. + * + * This software is provided "AS IS", WITHOUT WARRANTY OF ANY KIND. + */ +use crate::audit::{ + AuditEvent, AuditEventPayload, AuditOutcome, AuditSource, StakeSkipReason, + participant::AuditTarget, +}; +use chrono::{DateTime, Utc}; +use common::{ + snapshot::{OurElectionParticipant, StakeSubmission}, + time_format, + ton_utils::max_stake_factor_raw_to_multiplier, +}; +use std::collections::BTreeMap; + +/// Projected stake submission from audit events. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProjectedStakeSubmission { + pub ts: DateTime, + pub node_id: String, + pub stake: String, + pub max_factor: u32, + pub policy: String, + pub submission_time: u64, +} + +/// Projected stake skip from audit events. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProjectedStakeSkip { + pub ts: DateTime, + pub node_id: String, + pub reason: StakeSkipReason, +} + +/// Projected withdraw outcome from audit events. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProjectedWithdraw { + pub ts: DateTime, + pub node_id: String, + pub outcome: AuditOutcome, + pub msg_hash: Option, + pub error: Option, +} + +/// Projected stake failure from audit events. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProjectedStakeFailure { + pub ts: DateTime, + pub node_id: String, + pub reason: String, +} + +/// Per-node elections audit data keyed by election id. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct NodeElectionProjection { + pub stake_submissions: Vec, + pub stake_skips: Vec, + pub withdraws: Vec, + pub stake_failures: Vec, +} + +/// Aggregated elections projection from the in-memory audit ring buffer. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct ElectionsProjection { + /// `election_id` → `node_id` → per-node projection. + pub nodes: BTreeMap>, +} + +/// Collects up to `max` recent election ids: the live cycle first, then others seen in `events`. +pub fn collect_recent_election_ids( + current_election_id: Option, + events: &[AuditEvent], + max: usize, +) -> Vec { + if max == 0 { + return Vec::new(); + } + + let mut ids = Vec::new(); + let mut seen = std::collections::HashSet::new(); + + if let Some(id) = current_election_id { + seen.insert(id); + ids.push(id); + if ids.len() >= max { + return ids; + } + } + + for ev in events.iter().rev() { + let Some(election_id) = election_id_from_event(ev) else { continue }; + if seen.insert(election_id) { + ids.push(election_id); + if ids.len() >= max { + break; + } + } + } + + ids +} + +/// Builds an [`ElectionsProjection`] from audit events, keeping only `recent_election_ids`. +pub fn project_elections( + events: &[AuditEvent], + recent_election_ids: &[u64], +) -> ElectionsProjection { + let mut projection = ElectionsProjection::default(); + + for ev in events { + if ev.payload.source() != AuditSource::Elections { + continue; + } + let Some(election_id) = election_id_from_event(ev) else { continue }; + if !recent_election_ids.contains(&election_id) { + continue; + } + let node_id = node_id_from_event(ev).unwrap_or_default(); + let node = + projection.nodes.entry(election_id).or_default().entry(node_id.clone()).or_default(); + + match &ev.payload { + AuditEventPayload::ElectionsStakeSubmitted { + stake, + max_factor, + policy, + submission_time, + } => { + node.stake_submissions.push(ProjectedStakeSubmission { + ts: ev.ts, + node_id, + stake: stake.clone(), + max_factor: *max_factor, + policy: policy.clone(), + submission_time: *submission_time, + }); + } + AuditEventPayload::ElectionsStakeSkipped { reason, .. } => { + node.stake_skips.push(ProjectedStakeSkip { ts: ev.ts, node_id, reason: *reason }); + } + AuditEventPayload::ElectionsStakeFailed { reason } => { + node.stake_failures.push(ProjectedStakeFailure { + ts: ev.ts, + node_id, + reason: reason.clone(), + }); + } + AuditEventPayload::ElectionsWithdrawProcessed { msg_hash } => { + node.withdraws.push(ProjectedWithdraw { + ts: ev.ts, + node_id, + outcome: AuditOutcome::Success, + msg_hash: Some(msg_hash.clone()), + error: None, + }); + } + AuditEventPayload::ElectionsWithdrawFailed { reason } => { + node.withdraws.push(ProjectedWithdraw { + ts: ev.ts, + node_id, + outcome: AuditOutcome::Failure, + msg_hash: None, + error: Some(reason.clone()), + }); + } + _ => {} + } + } + + projection +} + +/// Merges projected audit data into live snapshot participants for `election_id`. +pub fn merge_projection_into_participants( + participants: &mut [OurElectionParticipant], + projection: &ElectionsProjection, + election_id: u64, +) { + let Some(by_node) = projection.nodes.get(&election_id) else { return }; + + for participant in participants.iter_mut() { + let Some(node_proj) = by_node.get(&participant.node_id) else { continue }; + + merge_stake_submissions(&mut participant.stake_submissions, &node_proj.stake_submissions); + + if let Some(err) = latest_error_message(node_proj) { + participant.last_error = Some(err); + } + } +} + +fn merge_stake_submissions( + existing: &mut Vec, + projected: &[ProjectedStakeSubmission], +) { + for sub in projected { + let converted = projected_to_stake_submission(sub); + let duplicate = existing + .iter() + .any(|s| s.submission_time == converted.submission_time && s.stake == converted.stake); + if !duplicate { + existing.push(converted); + } + } + existing.sort_by_key(|s| s.submission_time); +} + +fn projected_to_stake_submission(sub: &ProjectedStakeSubmission) -> StakeSubmission { + StakeSubmission { + stake: sub.stake.clone(), + max_factor: max_stake_factor_raw_to_multiplier(sub.max_factor), + submission_time: sub.submission_time, + submission_time_utc: time_format::format_ts(sub.submission_time), + } +} + +fn latest_error_message(node_proj: &NodeElectionProjection) -> Option { + let mut candidates: Vec<(DateTime, String)> = Vec::new(); + + for skip in &node_proj.stake_skips { + candidates.push((skip.ts, format!("stake skipped: {}", format_skip_reason(skip.reason)))); + } + for failure in &node_proj.stake_failures { + candidates.push((failure.ts, format!("stake failed: {}", failure.reason))); + } + for withdraw in &node_proj.withdraws { + if withdraw.outcome == AuditOutcome::Failure + && let Some(error) = &withdraw.error + { + candidates.push((withdraw.ts, format!("withdraw failed: {error}"))); + } + } + + candidates.sort_by_key(|(ts, _)| *ts); + candidates.last().map(|(_, msg)| msg.clone()) +} + +fn format_skip_reason(reason: StakeSkipReason) -> &'static str { + match reason { + StakeSkipReason::LowWalletBalance => "low_wallet_balance", + StakeSkipReason::WithdrawRequestsPending => "withdraw_requests_pending", + StakeSkipReason::PoolNotReady => "pool_not_ready", + StakeSkipReason::AdaptiveSleepingPeriod => "adaptive_sleeping_period", + StakeSkipReason::AdaptiveWaitingPeriod => "adaptive_waiting_period", + StakeSkipReason::ElectionsDisabled => "elections_disabled", + StakeSkipReason::RecoverPending => "recover_pending", + StakeSkipReason::InsufficientStakeFunds => "insufficient_stake_funds", + } +} + +pub(crate) fn election_id_from_event(ev: &AuditEvent) -> Option { + match &ev.target { + AuditTarget::Node { election_id: Some(id), .. } => Some(*id), + AuditTarget::Elections { election_id } => Some(*election_id), + _ => None, + } +} + +pub(crate) fn node_id_from_event(ev: &AuditEvent) -> Option { + match &ev.target { + AuditTarget::Node { id, .. } => Some(id.clone()), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::audit::{AuditActor, AuditEvent}; + + const ELECTION_ID: u64 = 1_779_265_552; + const NODE_ID: &str = "node-1"; + + fn elections_actor() -> AuditActor { + AuditActor::service("elections-task") + } + + #[test] + fn collect_recent_election_ids_returns_empty_when_max_is_zero() { + let events = vec![AuditEvent::elections_stake_failed( + elections_actor(), + NODE_ID, + ELECTION_ID, + "err", + )]; + assert!(collect_recent_election_ids(Some(ELECTION_ID), &events, 0).is_empty()); + assert!(collect_recent_election_ids(None, &events, 0).is_empty()); + } + + #[test] + fn collect_recent_election_ids_respects_max_after_current_id() { + let events = vec![ + AuditEvent::elections_stake_failed(elections_actor(), NODE_ID, ELECTION_ID + 1, "a"), + AuditEvent::elections_stake_failed(elections_actor(), NODE_ID, ELECTION_ID + 2, "b"), + ]; + let ids = collect_recent_election_ids(Some(ELECTION_ID), &events, 1); + assert_eq!(ids, vec![ELECTION_ID]); + } + + #[test] + fn projection_groups_events_by_election_id() { + const OTHER_ELECTION: u64 = ELECTION_ID + 100; + const NODE_B: &str = "node-2"; + + let events = vec![ + AuditEvent::elections_stake_submitted( + elections_actor(), + NODE_ID, + ELECTION_ID, + crate::audit::ElectionsStakeSubmittedParams { + stake: "100000000000".into(), + max_factor: 196_608, + policy: "split50".into(), + submission_time: 1_700_000_000, + }, + ), + AuditEvent::elections_stake_skipped( + elections_actor(), + NODE_B, + OTHER_ELECTION, + StakeSkipReason::ElectionsDisabled, + None, + None, + ), + ]; + + let projection = project_elections(&events, &[ELECTION_ID, OTHER_ELECTION]); + + let current = projection.nodes.get(&ELECTION_ID).unwrap().get(NODE_ID).unwrap(); + assert_eq!(current.stake_submissions.len(), 1); + assert!(current.stake_skips.is_empty()); + + let other = projection.nodes.get(&OTHER_ELECTION).unwrap().get(NODE_B).unwrap(); + assert!(other.stake_submissions.is_empty()); + assert_eq!(other.stake_skips.len(), 1); + assert_eq!(other.stake_skips[0].reason, StakeSkipReason::ElectionsDisabled); + } + + #[test] + fn projection_ignores_non_elections_source() { + let events = vec![ + AuditEvent::rest_api_auth_login_success( + AuditActor::user("alice", Some("admin".into()), None), + "alice", + ), + AuditEvent::elections_stake_failed(elections_actor(), NODE_ID, ELECTION_ID, "boom"), + ]; + + let projection = project_elections(&events, &[ELECTION_ID]); + + assert_eq!(projection.nodes.len(), 1); + let node = projection.nodes.get(&ELECTION_ID).unwrap().get(NODE_ID).unwrap(); + assert_eq!(node.stake_failures.len(), 1); + assert_eq!(node.stake_failures[0].reason, "boom"); + } + + #[test] + fn projection_only_includes_recent_election_ids() { + let events = vec![ + AuditEvent::elections_stake_failed(elections_actor(), NODE_ID, ELECTION_ID, "current"), + AuditEvent::elections_stake_failed( + elections_actor(), + NODE_ID, + ELECTION_ID + 1, + "excluded", + ), + ]; + + let projection = project_elections(&events, &[ELECTION_ID]); + + assert_eq!(projection.nodes.len(), 1); + let node = projection.nodes.get(&ELECTION_ID).unwrap().get(NODE_ID).unwrap(); + assert_eq!(node.stake_failures[0].reason, "current"); + } + + #[test] + fn merge_projection_enriches_participants_without_duplicates() { + let events = vec![ + AuditEvent::elections_stake_submitted( + elections_actor(), + NODE_ID, + ELECTION_ID, + crate::audit::ElectionsStakeSubmittedParams { + stake: "200000000000".into(), + max_factor: 196_608, + policy: "all".into(), + submission_time: 1_700_000_100, + }, + ), + AuditEvent::elections_stake_skipped( + elections_actor(), + NODE_ID, + ELECTION_ID, + StakeSkipReason::InsufficientStakeFunds, + Some("100".into()), + Some("50".into()), + ), + ]; + let projection = project_elections(&events, &[ELECTION_ID]); + + let mut participants = vec![OurElectionParticipant { + node_id: NODE_ID.to_string(), + stake_submissions: vec![StakeSubmission { + stake: "200000000000".into(), + max_factor: 3.0, + submission_time: 1_700_000_100, + submission_time_utc: time_format::format_ts(1_700_000_100), + }], + ..Default::default() + }]; + + merge_projection_into_participants(&mut participants, &projection, ELECTION_ID); + + assert_eq!(participants[0].stake_submissions.len(), 1); + assert_eq!( + participants[0].last_error.as_deref(), + Some("stake skipped: insufficient_stake_funds") + ); + } + + #[test] + fn merge_projection_is_noop_when_ring_projection_empty() { + let mut participants = vec![OurElectionParticipant { + node_id: NODE_ID.to_string(), + last_error: Some("existing".into()), + ..Default::default() + }]; + + merge_projection_into_participants( + &mut participants, + &ElectionsProjection::default(), + ELECTION_ID, + ); + + assert_eq!(participants[0].last_error.as_deref(), Some("existing")); + assert!(participants[0].stake_submissions.is_empty()); + } +} diff --git a/src/node-control/service/src/audit/ring_buffer.rs b/src/node-control/service/src/audit/ring_buffer.rs index 279ff97f..4352f0fe 100644 --- a/src/node-control/service/src/audit/ring_buffer.rs +++ b/src/node-control/service/src/audit/ring_buffer.rs @@ -93,10 +93,10 @@ impl AuditEventBuffer { mod tests { use super::*; use crate::audit::{AuditActor, AuditEvent, AuditSource, StakeSkipReason}; - use std::sync::Arc; + use std::sync::{Arc, Barrier}; fn ev(tag: &str) -> AuditEvent { - AuditEvent::system_service_started(tag) + AuditEvent::system_service_started(tag, "") } // ── original tests ──────────────────────────────────────────────────────── @@ -185,7 +185,7 @@ mod tests { let b = buf.clone(); handles.push(std::thread::spawn(move || { for i in 0..100u32 { - b.push(AuditEvent::system_service_started(&format!("t{t}-{i}"))); + b.push(AuditEvent::system_service_started(&format!("t{t}-{i}"), "")); } })); } @@ -207,15 +207,55 @@ mod tests { assert_eq!(buf.len(), 50); } + fn stake_skipped(node_id: &str) -> AuditEvent { + AuditEvent::elections_stake_skipped( + AuditActor::service("elections-task"), + node_id, + 1_779_265_552, + StakeSkipReason::ElectionsDisabled, + None, + None, + ) + } + #[test] - fn zero_capacity_buffer_silently_drops() { - // capacity=0 is normalised to 1 internally; no crash, snapshot is non-empty after push - let buf = AuditEventBuffer::new(0); - assert_eq!(buf.capacity(), 1, "capacity normalised to 1"); - buf.push(ev("a")); - buf.push(ev("b")); // evicts first, keeps second - let snap = buf.snapshot(); - assert_eq!(snap.len(), 1, "only the latest event is retained"); + fn push_unless_dedup_duplicate_allows_first_then_suppresses() { + let buf = AuditEventBuffer::new(10); + let first = stake_skipped("node-1"); + let second = stake_skipped("node-1"); + + assert!(buf.push_unless_dedup_duplicate(first)); + assert!(!buf.push_unless_dedup_duplicate(second)); + assert_eq!(buf.len(), 1); + } + + #[test] + fn push_unless_dedup_duplicate_always_appends_without_dedup_key() { + let buf = AuditEventBuffer::new(10); + assert!(buf.push_unless_dedup_duplicate(ev("a"))); + assert!(buf.push_unless_dedup_duplicate(ev("b"))); + assert_eq!(buf.len(), 2); + } + + #[test] + fn push_unless_dedup_duplicate_is_atomic_under_concurrency() { + let buf = AuditEventBuffer::new(10); + let barrier = Arc::new(Barrier::new(8)); + let mut handles = vec![]; + + for _ in 0..8 { + let b = buf.clone(); + let gate = barrier.clone(); + handles.push(std::thread::spawn(move || { + gate.wait(); + b.push_unless_dedup_duplicate(stake_skipped("node-1")); + })); + } + + for h in handles { + h.join().expect("thread panicked"); + } + assert_eq!(buf.len(), 1, "only one concurrent stake_skipped must be retained"); } #[test] @@ -254,4 +294,15 @@ mod tests { assert!(buf.push_unless_dedup_duplicate(different_reason)); assert_eq!(buf.len(), 2); } + + #[test] + fn zero_capacity_buffer_silently_drops() { + // capacity=0 is normalised to 1 internally; no crash, snapshot is non-empty after push + let buf = AuditEventBuffer::new(0); + assert_eq!(buf.capacity(), 1, "capacity normalised to 1"); + buf.push(ev("a")); + buf.push(ev("b")); // evicts first, keeps second + let snap = buf.snapshot(); + assert_eq!(snap.len(), 1, "only the latest event is retained"); + } } diff --git a/src/node-control/service/src/http/http_server_task.rs b/src/node-control/service/src/http/http_server_task.rs index 635d5067..3681d5a4 100644 --- a/src/node-control/service/src/http/http_server_task.rs +++ b/src/node-control/service/src/http/http_server_task.rs @@ -496,26 +496,33 @@ pub async fn v1_elections_handler( axum::extract::State(state): axum::extract::State, axum::extract::Query(query): axum::extract::Query, ) -> axum::Json { - use crate::audit::AuditSource; + use crate::audit::{ + AuditSource, collect_recent_election_ids, merge_projection_into_participants, + project_elections, + }; let include_participants = query.include_participants.unwrap_or(false); let view = state.store.get_elections_view(include_participants); + let current_election_id = view.elections.as_ref().map(|e| e.election_id); - let recent_events: Vec = state - .audit_ring - .filter_collect(|e| e.payload.source() == AuditSource::Elections) - .into_iter() - .rev() - .filter_map(|e| serde_json::to_value(e).ok()) - .collect(); + // Single snapshot of the ring; `project_elections` filters by `recent_ids` internally. + let elections_events = + state.audit_ring.filter_collect(|e| e.payload.source() == AuditSource::Elections); + let recent_ids = collect_recent_election_ids(current_election_id, &elections_events, 3); + let projection = project_elections(&elections_events, &recent_ids); + + let mut our_participants = view.our_participants; + if let Some(election_id) = current_election_id { + merge_projection_into_participants(&mut our_participants, &projection, election_id); + } axum::Json(ElectionsResponse { ok: true, result: view.elections, status: view.status, next_elections: view.next_elections, - our_participants: view.our_participants, - recent_events, + our_participants, + recent_events: Vec::new(), }) } @@ -1148,6 +1155,18 @@ mod tests { test_state_with_audit(store, runtime_cfg, elections_task, Arc::new(NoopAuditLog)).await } + async fn test_state_with_ring( + store: Arc, + runtime_cfg: Arc, + elections_task: Arc, + audit_ring: Arc, + ) -> AppState { + let mut state = + test_state_with_audit(store, runtime_cfg, elections_task, Arc::new(NoopAuditLog)).await; + state.audit_ring = audit_ring; + state + } + async fn test_state_with_audit( store: Arc, runtime_cfg: Arc, @@ -1157,6 +1176,40 @@ mod tests { test_support::build_app_state_with(runtime_cfg, audit, store, Some(elections_task)).await } + async fn test_state_with_audit_and_ring( + store: Arc, + runtime_cfg: Arc, + elections_task: Arc, + audit: Arc, + audit_ring: Arc, + ) -> AppState { + let mut state = test_state_with_audit(store, runtime_cfg, elections_task, audit).await; + state.audit_ring = audit_ring; + state + } + + const PROJECTION_ELECTION_ID: u64 = 1_779_265_552; + + fn elections_store_with_participant(participant: OurElectionParticipant) -> Arc { + let store = Arc::new(SnapshotStore::new()); + store.update_with(|s| { + s.elections_status = ElectionsStatus::Active; + s.elections = Some(ElectionsSnapshot { + election_id: PROJECTION_ELECTION_ID, + ..Default::default() + }); + s.our_participants.push(participant); + }); + store + } + + async fn call_elections_handler(state: AppState) -> serde_json::Value { + let app = routes(false, state); + let resp = app.oneshot(get_request("/v1/elections")).await.unwrap(); + assert_eq!(resp.status(), 200); + body_json(resp).await + } + fn test_app_config(policy: StakePolicy) -> Arc { test_app_config_with_bindings(policy, HashMap::new()) } @@ -1754,6 +1807,204 @@ mod tests { assert_eq!(participants[0]["position"], 5); } + #[tokio::test] + async fn handler_returns_unchanged_response_when_ring_empty() { + let store = elections_store_with_participant(OurElectionParticipant { + node_id: "node-1".to_string(), + stake_accepted: true, + stake_submissions: vec![StakeSubmission { + stake: "100".to_string(), + max_factor: 3.0, + submission_time: 12345, + submission_time_utc: "2024-01-01T00:00:00Z".to_string(), + }], + accepted_stake: Some("100".to_string()), + last_error: Some("snapshot error".to_string()), + ..Default::default() + }); + let runtime_cfg = + Arc::new(RuntimeConfigStore::from_app_config(test_app_config(StakePolicy::Minimum))); + let state = test_state_with_ring( + store, + runtime_cfg, + test_elections_task(), + crate::audit::AuditEventBuffer::new(10), + ) + .await; + + let v = call_elections_handler(state).await; + let participant = &v["our_participants"][0]; + assert_eq!(participant["node_id"], "node-1"); + assert_eq!(participant["stake_accepted"], true); + assert_eq!(participant["accepted_stake"], "100"); + assert_eq!(participant["last_error"], "snapshot error"); + assert_eq!(participant["stake_submissions"].as_array().unwrap().len(), 1); + assert!(v.get("recent_events").is_none()); + } + + #[tokio::test] + async fn handler_includes_stake_submissions_from_audit() { + use crate::audit::{AuditEvent, AuditEventBuffer}; + + let store = elections_store_with_participant(OurElectionParticipant { + node_id: "node-1".to_string(), + ..Default::default() + }); + let audit_ring = AuditEventBuffer::new(50); + audit_ring.push(AuditEvent::elections_stake_submitted( + crate::audit::AuditActor::service("elections-task"), + "node-1", + PROJECTION_ELECTION_ID, + crate::audit::ElectionsStakeSubmittedParams { + stake: "300000000000".into(), + max_factor: 196_608, + policy: "split50".into(), + submission_time: 1_700_000_000, + }, + )); + + let runtime_cfg = + Arc::new(RuntimeConfigStore::from_app_config(test_app_config(StakePolicy::Minimum))); + let state = + test_state_with_ring(store, runtime_cfg, test_elections_task(), audit_ring).await; + + let v = call_elections_handler(state).await; + let submissions = v["our_participants"][0]["stake_submissions"].as_array().unwrap(); + assert_eq!(submissions.len(), 1); + assert_eq!(submissions[0]["stake"], "300000000000"); + assert_eq!(submissions[0]["max_factor"], 3.0); + assert_eq!(submissions[0]["submission_time"], 1_700_000_000); + assert!(v["our_participants"][0].get("last_error").is_none()); + } + + #[tokio::test] + async fn handler_marks_last_error_from_stake_skipped() { + use crate::audit::{AuditEvent, AuditEventBuffer, StakeSkipReason}; + + let store = elections_store_with_participant(OurElectionParticipant { + node_id: "node-1".to_string(), + ..Default::default() + }); + let audit_ring = AuditEventBuffer::new(50); + audit_ring.push(AuditEvent::elections_stake_skipped( + crate::audit::AuditActor::service("elections-task"), + "node-1", + PROJECTION_ELECTION_ID, + StakeSkipReason::AdaptiveSleepingPeriod, + None, + None, + )); + + let runtime_cfg = + Arc::new(RuntimeConfigStore::from_app_config(test_app_config(StakePolicy::Minimum))); + let state = + test_state_with_ring(store, runtime_cfg, test_elections_task(), audit_ring).await; + + let v = call_elections_handler(state).await; + assert_eq!( + v["our_participants"][0]["last_error"].as_str(), + Some("stake skipped: adaptive_sleeping_period") + ); + } + + #[tokio::test] + async fn handler_merges_withdraw_outcomes() { + use crate::audit::{AuditEvent, AuditEventBuffer}; + + let store = elections_store_with_participant(OurElectionParticipant { + node_id: "node-1".to_string(), + ..Default::default() + }); + let audit_ring = AuditEventBuffer::new(50); + audit_ring.push(AuditEvent::elections_withdraw_processed( + crate::audit::AuditActor::service("elections-task"), + "node-1", + PROJECTION_ELECTION_ID, + "abc123", + )); + audit_ring.push(AuditEvent::elections_withdraw_failed( + crate::audit::AuditActor::service("elections-task"), + "node-1", + PROJECTION_ELECTION_ID, + "send failed", + )); + + let runtime_cfg = + Arc::new(RuntimeConfigStore::from_app_config(test_app_config(StakePolicy::Minimum))); + let state = + test_state_with_ring(store, runtime_cfg, test_elections_task(), audit_ring).await; + + let v = call_elections_handler(state).await; + assert_eq!( + v["our_participants"][0]["last_error"].as_str(), + Some("withdraw failed: send failed") + ); + } + + #[tokio::test] + async fn handler_projects_events_recorded_through_jsonl_audit_log() { + use crate::audit::{ + AuditEvent, AuditLogConfig, StakeSkipReason, jsonl_log::JsonlAuditLog, log::AuditLog, + }; + use tempfile::tempdir; + + let store = elections_store_with_participant(OurElectionParticipant { + node_id: "node-1".to_string(), + ..Default::default() + }); + + let dir = tempdir().unwrap(); + let cfg = AuditLogConfig { + path: dir.path().join("audit.jsonl"), + ring_buffer_capacity: 50, + batch_interval_ms: 60_000, + ..AuditLogConfig::default() + }; + let log = JsonlAuditLog::start(cfg).await.unwrap(); + let audit_ring = log.ring(); + + log.record(AuditEvent::elections_stake_submitted( + crate::audit::AuditActor::service("elections-task"), + "node-1", + PROJECTION_ELECTION_ID, + crate::audit::ElectionsStakeSubmittedParams { + stake: "500000000000".into(), + max_factor: 196_608, + policy: "all".into(), + submission_time: 1_700_000_500, + }, + )) + .await; + log.record(AuditEvent::elections_stake_skipped( + crate::audit::AuditActor::service("elections-task"), + "node-1", + PROJECTION_ELECTION_ID, + StakeSkipReason::PoolNotReady, + None, + None, + )) + .await; + + let runtime_cfg = + Arc::new(RuntimeConfigStore::from_app_config(test_app_config(StakePolicy::Minimum))); + let state = test_state_with_audit_and_ring( + store, + runtime_cfg, + test_elections_task(), + log, + audit_ring, + ) + .await; + + let v = call_elections_handler(state).await; + let participant = &v["our_participants"][0]; + let submissions = participant["stake_submissions"].as_array().unwrap(); + assert_eq!(submissions.len(), 1); + assert_eq!(submissions[0]["stake"], "500000000000"); + assert_eq!(participant["last_error"].as_str(), Some("stake skipped: pool_not_ready")); + assert!(v.get("recent_events").is_none()); + } + #[tokio::test] async fn validators_returns_empty_snapshot() { let store = Arc::new(SnapshotStore::new()); diff --git a/src/node/tests/test_run_net_py/test_audit_integration.py b/src/node/tests/test_run_net_py/test_audit_integration.py new file mode 100644 index 00000000..13a85049 --- /dev/null +++ b/src/node/tests/test_run_net_py/test_audit_integration.py @@ -0,0 +1,705 @@ +#!/usr/bin/env python3 +""" +Audit-log integration test suite for nodectl. + +Can be run stand-alone after run_singlehost_nodectl.py has brought up the +service, or invoked as a phase from inside that bootstrap script. + +Exit code: 0 — all required checks pass; 1 — one or more failures. + +Required env vars (or CLI flags): + CONFIG_PATH — path to the running nodectl-config.json + NODECTL_API_TOKEN — JWT token for REST calls (produced by phase 8 auth setup) + +Optional env vars: + AUDIT_LOG_PATH — override audit file path (default: read from config, + fall back to /logs/audit.jsonl) + NODECTL_REST_URL — override REST base URL (default: read from config http.bind) + +Usage: + python3 test_audit_integration.py [--config /path/to/nodectl-config.json] + [--rest-url http://127.0.0.1:8080] + [--audit-log /path/to/audit.jsonl] + [--verbose] +""" + +from __future__ import annotations + +import argparse +import datetime +import json +import os +import re +import sys +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any, Optional + +# ── RFC 3339 millis with Z: 2026-05-22T12:10:30.123Z ───────────────────────── +_TS_PATTERN = re.compile( + r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$" +) + +REQUIRED_EVENT_FIELDS = ("id", "ts", "outcome", "event_type", "actor", "target") +VALID_OUTCOMES = {"success", "failure", "skipped"} + +# Canonical set of event_type values produced by the current service version. +# Events outside this set indicate schema drift (removed/renamed variant). +KNOWN_EVENT_TYPES = { + "elections.key_generated", + "elections.stake_submitted", + "elections.stake_accepted", + "elections.stake_skipped", + "elections.stake_failed", + "elections.stake_recovered", + "elections.stake_recover_failed", + "elections.withdraw_processed", + "elections.withdraw_failed", + "rewards.distribution_started", + "rewards.distribution_completed", + "rewards.distribution_failed", + "rewards.recipient_skipped", + "rest_api.config_updated", + "rest_api.auth_login_succeeded", + "rest_api.auth_login_rejected", + "rest_api.token_rejected", + "vault.key_created", + "vault.key_removed", + "system.service_started", + "system.service_stopped", + "system.audit_events_dropped", +} + + +# ══════════════════════════════════════════════════════════════════════════════ +# Tiny colour logger +# ══════════════════════════════════════════════════════════════════════════════ + +class Logger: + def __init__(self, verbose: bool = False) -> None: + self.verbose = verbose + + def _emit(self, colour: str, label: str, msg: str) -> None: + print(f"\033[{colour}m[{label}]\033[0m {msg}", flush=True) + + def pass_(self, msg: str) -> None: self._emit("32", "PASS", msg) + def fail(self, msg: str) -> None: self._emit("31", "FAIL", msg) + def skip(self, msg: str) -> None: self._emit("33", "SKIP", msg) + def info(self, msg: str) -> None: self._emit("36", "INFO", msg) + def debug(self, msg: str) -> None: + if self.verbose: + self._emit("37", "DBG ", msg) + + +# ══════════════════════════════════════════════════════════════════════════════ +# Result accumulator +# ══════════════════════════════════════════════════════════════════════════════ + +class Results: + def __init__(self) -> None: + self.passed = 0 + self.failed = 0 + self.skipped = 0 + + def record_pass(self, log: Logger, msg: str) -> None: + self.passed += 1 + log.pass_(msg) + + def record_fail(self, log: Logger, msg: str) -> None: + self.failed += 1 + log.fail(msg) + + def record_skip(self, log: Logger, msg: str) -> None: + self.skipped += 1 + log.skip(msg) + + @property + def ok(self) -> bool: + return self.failed == 0 + + def summary(self, log: Logger) -> None: + colour = "32" if self.ok else "31" + log._emit(colour, "SUM", + f"passed={self.passed} failed={self.failed} skipped={self.skipped}") + + +# ══════════════════════════════════════════════════════════════════════════════ +# Config / path resolution +# ══════════════════════════════════════════════════════════════════════════════ + +def resolve_audit_log_path(config_path: Path) -> Path: + """ + Find the audit.jsonl path using the following priority: + + 1. $AUDIT_LOG_PATH env var + 2. `audit.path` field in the config file + - absolute path used as-is + - relative path resolved against: config-dir, then CWD, first that exists + 3. Default candidates (first that exists wins): + - ./logs/audit.jsonl (service CWD — matches the nodectl default) + - /logs/audit.jsonl + """ + env_override = os.environ.get("AUDIT_LOG_PATH", "").strip() + if env_override: + return Path(env_override) + + try: + cfg = json.loads(config_path.read_text()) + audit_path_str = cfg.get("audit", {}).get("path", "") + if audit_path_str: + p = Path(audit_path_str) + if p.is_absolute(): + return p + # Relative: try config-dir first, then CWD + candidate_cfg = config_path.parent / p + if candidate_cfg.exists(): + return candidate_cfg + candidate_cwd = Path.cwd() / p + if candidate_cwd.exists(): + return candidate_cwd + return candidate_cfg # return best-guess even if missing + except Exception: + pass + + # Default: service writes to ./logs/audit.jsonl relative to its CWD. + # The bootstrap script starts nodectl from test_run_net_py/, so that + # matches /logs/audit.jsonl. Fall back to config-dir if + # the CWD candidate does not exist. + cwd_candidate = Path.cwd() / "logs" / "audit.jsonl" + if cwd_candidate.exists(): + return cwd_candidate + + cfg_candidate = config_path.parent / "logs" / "audit.jsonl" + if cfg_candidate.exists(): + return cfg_candidate + + # Neither exists — return CWD candidate so the error message is meaningful + return cwd_candidate + + +def resolve_rest_base_url(config_path: Path) -> str: + """Derive REST base URL from config http.bind or env.""" + env_override = os.environ.get("NODECTL_REST_URL", "").strip() + if env_override: + return env_override.rstrip("/") + + try: + cfg = json.loads(config_path.read_text()) + bind = str(cfg.get("http", {}).get("bind", "127.0.0.1:8080")) + except Exception: + bind = "127.0.0.1:8080" + + if bind.startswith("["): + bracket_end = bind.find("]") + host = bind[1:bracket_end] if bracket_end > 0 else "127.0.0.1" + rest = bind[bracket_end + 1:].lstrip() if bracket_end > 0 else "" + port = rest[1:] if rest.startswith(":") else "8080" + elif bind.count(":") == 1: + host, port = bind.split(":", 1) + else: + host, port = "127.0.0.1", "8080" + + if host in ("0.0.0.0", "::"): + host = "127.0.0.1" + + return (f"http://[{host}]:{port}" if ":" in host else f"http://{host}:{port}").rstrip("/") + + +def rest_get_json(base_url: str, path: str, token: str, timeout: int = 15) -> dict: + url = base_url + path + req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"}) + body = "" + with urllib.request.urlopen(req, timeout=timeout) as resp: + body = resp.read().decode(errors="replace") + return json.loads(body) + + +# ══════════════════════════════════════════════════════════════════════════════ +# JSONL parsing helpers +# ══════════════════════════════════════════════════════════════════════════════ + +def read_jsonl(path: Path) -> tuple[list[dict], list[str]]: + """ + Returns (records, errors). Each record is a parsed JSON object. + errors contains descriptions of lines that failed to parse. + """ + records: list[dict] = [] + errors: list[str] = [] + for i, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): + raw = raw.strip() + if not raw: + continue + try: + obj = json.loads(raw) + if not isinstance(obj, dict): + errors.append(f"line {i}: not a JSON object: {raw[:120]}") + else: + records.append(obj) + except json.JSONDecodeError as e: + errors.append(f"line {i}: {e.msg}: {raw[:120]}") + return records, errors + + + + +# ══════════════════════════════════════════════════════════════════════════════ +# Test suites +# ══════════════════════════════════════════════════════════════════════════════ + +class AuditFileTests: + """Tests that inspect the audit.jsonl file on disk.""" + + def __init__(self, audit_path: Path, log: Logger, results: Results) -> None: + self.path = audit_path + self.log = log + self.r = results + self._records: list[dict] = [] + self._events: list[dict] = [] + + def run_all(self) -> None: + self.log.info(f"=== Audit file checks: {self.path} ===") + + if not self._check_file_exists(): + return # nothing to test without the file + + records, errors = read_jsonl(self.path) + + if errors: + for e in errors: + self.r.record_fail(self.log, f"JSONL parse error — {e}") + else: + self.r.record_pass(self.log, "All lines parse as valid JSON objects") + + self._records = records + # Unified JSONL format: every line is an event carrying `event_type`. + # Guard against any stray line without it (e.g. a legacy header artifact). + self._events = [r for r in records if "event_type" in r] + + self._check_file_header(records) + self._check_event_fields() + self._check_ts_format() + self._check_outcome_values() + self._check_service_started_present() + self._check_no_dedup_duplicates() + self._check_actor_shape() + self._check_no_unknown_event_types() + self._check_target_shape() + + # ── individual checks ───────────────────────────────────────────────────── + + def _check_file_exists(self) -> bool: + if self.path.exists(): + size = self.path.stat().st_size + self.r.record_pass(self.log, f"audit.jsonl exists ({size} bytes)") + return True + self.r.record_fail(self.log, f"audit.jsonl not found: {self.path}") + return False + + def _check_file_header(self, records: list[dict]) -> None: + """First line must be a system.service_started event with version and host fields.""" + if not records: + self.r.record_fail(self.log, "audit.jsonl is empty — no events at all") + return + + first = records[0] + et = first.get("event_type") + if et != "system.service_started": + self.r.record_fail(self.log, + f"First line must be system.service_started, got event_type={et!r}") + return + + data = first.get("data", {}) + missing = [f for f in ("version", "host") if f not in data] + if missing: + self.r.record_fail(self.log, + f"system.service_started missing data fields: {missing}") + else: + self.r.record_pass(self.log, + f"First event is system.service_started " + f"(version={data.get('version')!r}, host={data.get('host')!r})") + + def _check_event_fields(self) -> None: + if not self._events: + self.r.record_skip(self.log, "No events in file — skipping field checks") + return + bad: list[str] = [] + for ev in self._events: + missing = [f for f in REQUIRED_EVENT_FIELDS if f not in ev] + if missing: + bad.append(f"event_type={ev.get('event_type')!r} id={ev.get('id')!r}: " + f"missing {missing}") + if bad: + for b in bad[:5]: + self.r.record_fail(self.log, f"Event missing required fields — {b}") + if len(bad) > 5: + self.r.record_fail(self.log, f" … and {len(bad) - 5} more") + else: + self.r.record_pass(self.log, + f"All {len(self._events)} events have required fields " + f"({', '.join(REQUIRED_EVENT_FIELDS)})") + + def _check_ts_format(self) -> None: + if not self._events: + return + bad: list[str] = [] + for ev in self._events: + ts = ev.get("ts", "") + if not _TS_PATTERN.match(ts): + bad.append(f"event_type={ev.get('event_type')!r}: ts={ts!r}") + if bad: + for b in bad[:5]: + self.r.record_fail(self.log, f"Bad ts format — {b}") + else: + self.r.record_pass(self.log, + f"All {len(self._events)} events have RFC3339-millis-Z timestamps") + + def _check_outcome_values(self) -> None: + if not self._events: + return + bad: list[str] = [] + for ev in self._events: + o = ev.get("outcome") + if o not in VALID_OUTCOMES: + bad.append(f"event_type={ev.get('event_type')!r}: outcome={o!r}") + if bad: + for b in bad[:5]: + self.r.record_fail(self.log, f"Invalid outcome — {b}") + else: + self.r.record_pass(self.log, + f"All {len(self._events)} events have valid outcome values") + + def _check_service_started_present(self) -> None: + found = any(ev.get("event_type") == "system.service_started" + for ev in self._events) + if found: + self.r.record_pass(self.log, "system.service_started event present") + else: + self.r.record_fail(self.log, + "system.service_started event not found in audit.jsonl " + "(expected on service startup)") + + def _check_no_unknown_event_types(self) -> None: + unknown = { + ev.get("event_type") for ev in self._events + if ev.get("event_type") not in KNOWN_EVENT_TYPES + } + if unknown: + for et in sorted(unknown): + self.r.record_fail(self.log, + f"Unknown event_type {et!r} — removed from schema or schema drift " + f"(check AuditEventPayload enum)") + else: + self.r.record_pass(self.log, + f"All {len(self._events)} events use known event_type values") + + def _check_no_dedup_duplicates(self) -> None: + """No two stake_skipped events with the same (node_id, election_id, reason).""" + skipped_events = [ + ev for ev in self._events + if ev.get("event_type") == "elections.stake_skipped" + ] + if not skipped_events: + self.r.record_skip(self.log, + "No elections.stake_skipped events — dedup check skipped") + return + + seen: set[tuple] = set() + dups: list[str] = [] + for ev in skipped_events: + target = ev.get("target", {}) + node_id = target.get("id", "") + election_id = target.get("election_id") + data = ev.get("data", {}) + reason = data.get("reason", "") + key = (node_id, election_id, reason) + if key in seen: + dups.append( + f"node_id={node_id!r} election_id={election_id} reason={reason!r}" + ) + else: + seen.add(key) + + if dups: + for d in dups[:5]: + self.r.record_fail(self.log, + f"Duplicate stake_skipped in file (dedup failed) — {d}") + else: + self.r.record_pass(self.log, + f"No duplicate stake_skipped events among {len(skipped_events)} " + f"(unique keys: {len(seen)})") + + def _check_actor_shape(self) -> None: + if not self._events: + return + bad: list[str] = [] + for ev in self._events: + actor = ev.get("actor") + if not isinstance(actor, dict): + bad.append(f"event_type={ev.get('event_type')!r}: actor is not an object") + continue + if "kind" not in actor: + bad.append(f"event_type={ev.get('event_type')!r}: actor missing 'kind'") + if bad: + for b in bad[:5]: + self.r.record_fail(self.log, f"Bad actor shape — {b}") + else: + self.r.record_pass(self.log, f"All {len(self._events)} actors have 'kind' field") + + def _check_target_shape(self) -> None: + if not self._events: + return + bad: list[str] = [] + for ev in self._events: + target = ev.get("target") + if not isinstance(target, dict): + bad.append(f"event_type={ev.get('event_type')!r}: target is not an object") + continue + if "kind" not in target: + bad.append(f"event_type={ev.get('event_type')!r}: target missing 'kind'") + if bad: + for b in bad[:5]: + self.r.record_fail(self.log, f"Bad target shape — {b}") + else: + self.r.record_pass(self.log, f"All {len(self._events)} targets have 'kind' field") + + # ── summary helpers ──────────────────────────────────────────────────────── + + def print_event_type_counts(self) -> None: + if not self._events: + return + counts: dict[str, int] = {} + for ev in self._events: + et = ev.get("event_type", "") + counts[et] = counts.get(et, 0) + 1 + self.log.info("Event type distribution:") + for et, n in sorted(counts.items()): + self.log.info(f" {n:4d} {et}") + + +class AuditRestTests: + """Tests that call GET /v1/elections and validate audit-enriched fields.""" + + def __init__( + self, + rest_url: str, + token: str, + log: Logger, + results: Results, + ) -> None: + self.rest_url = rest_url + self.token = token + self.log = log + self.r = results + + def run_all(self) -> None: + self.log.info(f"=== REST API checks: {self.rest_url}/v1/elections ===") + + data = self._fetch_elections() + if data is None: + return + + self._check_response_shape(data) + self._check_recent_events_empty(data) + self._check_participants_structure(data) + self._check_stake_submissions_no_duplicates(data) + + # ── individual checks ───────────────────────────────────────────────────── + + def _fetch_elections(self) -> Optional[dict]: + try: + data = rest_get_json(self.rest_url, "/v1/elections", self.token) + self.r.record_pass(self.log, "GET /v1/elections → HTTP 200") + return data + except urllib.error.HTTPError as e: + body = e.read().decode(errors="replace")[:400] + self.r.record_fail(self.log, + f"GET /v1/elections → HTTP {e.code}: {body}") + except urllib.error.URLError as e: + self.r.record_fail(self.log, + f"GET /v1/elections connection failed: {e.reason}") + except json.JSONDecodeError as e: + self.r.record_fail(self.log, + f"GET /v1/elections returned invalid JSON: {e.msg}") + return None + + def _check_response_shape(self, data: dict) -> None: + # Accept both {ok, result} and flat shapes (depending on version) + result = data.get("result", data) + if not isinstance(result, dict): + self.r.record_fail(self.log, + f"elections response result is not an object: {type(result)}") + return + + has_election_id = "election_id" in result or "election_id" in data + has_participants = ( + "our_participants" in result or "our_participants" in data + ) + if has_participants: + self.r.record_pass(self.log, "elections response contains 'our_participants'") + else: + self.r.record_fail(self.log, + "elections response has no 'our_participants' field") + + def _check_recent_events_empty(self, data: dict) -> None: + """recent_events must be absent or an empty list (sma-106 skips serialization).""" + result = data.get("result", data) + recent = result.get("recent_events") + if recent is None: + self.r.record_pass(self.log, + "recent_events absent from response (skip_serializing_if = empty)") + elif isinstance(recent, list) and len(recent) == 0: + self.r.record_pass(self.log, "recent_events is present but empty []") + else: + self.r.record_fail(self.log, + f"recent_events is unexpected: {str(recent)[:200]}") + + def _check_participants_structure(self, data: dict) -> None: + result = data.get("result", data) + participants = result.get("our_participants", []) + if not isinstance(participants, list): + self.r.record_fail(self.log, + f"our_participants is not a list: {type(participants)}") + return + if not participants: + self.r.record_skip(self.log, + "our_participants is empty — skipping per-participant checks") + return + + bad_last_error: list[str] = [] + for p in participants: + node_id = p.get("node_id", "?") + last_error = p.get("last_error") + if last_error is not None and not isinstance(last_error, str): + bad_last_error.append( + f"node_id={node_id!r}: last_error is not a string: {last_error!r}" + ) + + if bad_last_error: + for b in bad_last_error: + self.r.record_fail(self.log, f"Bad last_error type — {b}") + else: + with_errors = [p for p in participants if p.get("last_error")] + self.r.record_pass(self.log, + f"our_participants: {len(participants)} participant(s), " + f"{len(with_errors)} with last_error populated") + if with_errors: + for p in with_errors: + self.log.info( + f" node_id={p.get('node_id')!r} " + f"last_error={p.get('last_error')!r}" + ) + + def _check_stake_submissions_no_duplicates(self, data: dict) -> None: + result = data.get("result", data) + participants = result.get("our_participants", []) + if not isinstance(participants, list) or not participants: + return + + dups: list[str] = [] + for p in participants: + node_id = p.get("node_id", "?") + subs = p.get("stake_submissions") or [] + seen: set[tuple] = set() + for s in subs: + key = (s.get("stake"), s.get("submission_time")) + if key in seen: + dups.append( + f"node_id={node_id!r} stake={key[0]!r} time={key[1]!r}" + ) + else: + seen.add(key) + + if dups: + for d in dups[:5]: + self.r.record_fail(self.log, + f"Duplicate stake_submission in REST response — {d}") + else: + total_subs = sum(len(p.get("stake_submissions") or []) + for p in participants) + self.r.record_pass(self.log, + f"No duplicate stake_submissions across {len(participants)} " + f"participant(s) ({total_subs} total)") + + +# ══════════════════════════════════════════════════════════════════════════════ +# Entry point +# ══════════════════════════════════════════════════════════════════════════════ + +def parse_args() -> argparse.Namespace: + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--config", metavar="PATH", + default=os.environ.get("CONFIG_PATH", ""), + help="Path to nodectl-config.json " + "(default: $CONFIG_PATH)") + ap.add_argument("--rest-url", metavar="URL", + default="", + help="Base REST URL, e.g. http://127.0.0.1:8080 " + "(default: derive from config http.bind or $NODECTL_REST_URL)") + ap.add_argument("--audit-log", metavar="PATH", + default="", + help="Override audit.jsonl path " + "(default: derive from config or $AUDIT_LOG_PATH)") + ap.add_argument("--verbose", "-v", action="store_true", + help="Print debug lines") + return ap.parse_args() + + +def main() -> None: + args = parse_args() + log = Logger(verbose=args.verbose) + results = Results() + + # ── Resolve config path ──────────────────────────────────────────────────── + config_path_str = args.config or os.environ.get("CONFIG_PATH", "") + if not config_path_str: + log._emit("31", "FATAL", + "CONFIG_PATH is not set. " + "Pass --config or set the CONFIG_PATH environment variable.") + sys.exit(1) + + config_path = Path(config_path_str) + if not config_path.exists(): + log._emit("31", "FATAL", f"Config file not found: {config_path}") + sys.exit(1) + + # ── Resolve audit log path ───────────────────────────────────────────────── + if args.audit_log: + audit_path = Path(args.audit_log) + elif os.environ.get("AUDIT_LOG_PATH"): + audit_path = Path(os.environ["AUDIT_LOG_PATH"]) + else: + audit_path = resolve_audit_log_path(config_path) + + # ── Resolve REST URL ─────────────────────────────────────────────────────── + rest_url = (args.rest_url or "").rstrip("/") or resolve_rest_base_url(config_path) + + ts = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + log.info(f"=== {ts} test_audit_integration.py ===") + log.info(f"config: {config_path}") + log.info(f"audit log: {audit_path}") + log.info(f"rest url: {rest_url}") + + # ── Suite 1: audit.jsonl file ───────────────────────────────────────────── + file_tests = AuditFileTests(audit_path, log, results) + file_tests.run_all() + file_tests.print_event_type_counts() + + # ── Suite 2: REST API ───────────────────────────────────────────────────── + token = os.environ.get("NODECTL_API_TOKEN", "").strip() + if not token: + results.record_skip(log, + "NODECTL_API_TOKEN not set — skipping all REST API checks") + else: + rest_tests = AuditRestTests(rest_url, token, log, results) + rest_tests.run_all() + + # ── Final summary ───────────────────────────────────────────────────────── + print() + results.summary(log) + sys.exit(0 if results.ok else 1) + + +if __name__ == "__main__": + main()