diff --git a/CLAUDE.md b/CLAUDE.md index 34c2b7f..fa2f707 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,7 +63,7 @@ The SDK uses a src layout with the main package at `src/hubblenetwork/`. Public - **`ready.py`** - Hubble Ready device provisioning (UUID 0xFCA7). Handles GATT connections, characteristic reads/writes, and the full provisioning flow (register with backend, write key/config/time). -- **`crypto.py`** - Local packet decryption. Implements AES-CTR decryption with CMAC-based key derivation (SP800_108_Counter KDF). Supports both AES-256-CTR and AES-128-CTR. `decrypt()` accepts `counter_mode` as `"UNIX_TIME"` (default, UTC day-based) or `"DEVICE_UPTIME"` (counter-based, fixed pool size 128). Exports `UNIX_TIME` and `DEVICE_UPTIME` constants. `decrypt_eax()` decrypts AES-EAX packets by iterating counters 0-127, generating candidate EIDs via AES-ECB, and using `AES.MODE_EAX` for authenticated decryption. Uses key directly (no KDF). `decrypt_satellite()` decrypts a satellite packet's payload using the same AES-CTR/CMAC scheme as `decrypt()`; satellite packets deliver `seq_num`, `auth_tag`, and encrypted payload as separate fields (not packed into one advertisement) and always use the UNIX_TIME day counter. +- **`crypto.py`** - Local packet decryption. Implements AES-CTR decryption with CMAC-based key derivation (SP800_108_Counter KDF). Supports both AES-256-CTR and AES-128-CTR. `decrypt()` accepts `counter_mode` as `"UNIX_TIME"` (default, UTC day-based) or `"DEVICE_UPTIME"` (counter-based, fixed pool size 128). Exports `UNIX_TIME` and `DEVICE_UPTIME` constants. `decrypt_eax()` decrypts AES-EAX packets by iterating counters 0-127, generating candidate EIDs via AES-ECB, and using `AES.MODE_EAX` for authenticated decryption. Uses key directly (no KDF). `decrypt_satellite()` decrypts a satellite packet's payload using the same AES-CTR/CMAC scheme as `decrypt()`; satellite packets deliver `seq_num`, `auth_tag`, and encrypted payload as separate fields (not packed into one advertisement). It accepts `counter_mode` (`"UNIX_TIME"` default, day-based; or `"DEVICE_UPTIME"`, sweeping the fixed 0-127 counter pool) just like `decrypt()`. - **`packets.py`** - Data classes: `Location`, `EncryptedPacket`, `DecryptedPacket`, `AesEaxPacket`, `UnknownPacket`. @@ -71,7 +71,7 @@ The SDK uses a src layout with the main package at `src/hubblenetwork/`. Public - **`errors.py`** - Exception hierarchy. Base `HubbleError` with specialized errors for backend, network, validation, BLE scanning, and decryption failures. -- **`cli.py`** - Click-based CLI. Command groups: `ble` (scan, detect, check-time, validate), `ready` (scan, info, read-status, read-key-info, read-config, read-time, write-key, write-config, write-time, provision), `org` (info, list-devices, get-packets, register-device, delete-device, set-device-name), `sat` (scan, mock-scan; `scan` accepts `--key`/`--days`/`--show-failed-decryption` to decrypt payloads locally). Top-level: `validate-credentials`. +- **`cli.py`** - Click-based CLI. Command groups: `ble` (scan, detect, check-time, validate), `ready` (scan, info, read-status, read-key-info, read-config, read-time, write-key, write-config, write-time, provision), `org` (info, list-devices, get-packets, register-device, delete-device, set-device-name), `sat` (scan, mock-scan; `scan` accepts `--key`/`--days`/`--counter-mode`/`--show-failed-decryption` to decrypt payloads locally — when `--key` is given without `--counter-mode`, the counter source is auto-detected (UNIX_TIME vs DEVICE_UPTIME) and announced, mirroring `ble scan`). Top-level: `validate-credentials`. - **`sat.py`** - Satellite packet scanning via PlutoSDR. Manages Docker container lifecycle (pull, start, stop) and polls the container's HTTP API for decoded packets. Requires Docker daemon running. Image: `ghcr.io/hubblenetwork/sdr-docker:latest`. diff --git a/src/hubblenetwork/cli.py b/src/hubblenetwork/cli.py index 44cc2c3..556d8a2 100644 --- a/src/hubblenetwork/cli.py +++ b/src/hubblenetwork/cli.py @@ -14,7 +14,7 @@ from dataclasses import replace from datetime import datetime from functools import partial -from typing import Optional, List +from typing import Callable, Optional, List, TypeVar from tabulate import tabulate from hubblenetwork import Organization from hubblenetwork import Device, DecryptedPacket, EncryptedPacket, decrypt_eax @@ -165,42 +165,63 @@ def _decrypt_eax_with_detect( return None -def _decrypt_ctr_with_detect( - key: bytes, - pkt: EncryptedPacket, +_T = TypeVar("_T") + +# Satellite streams have no per-packet EID to key the counter-mode cache on, so +# the detected mode lives in a single shared slot for the whole scan. +_SAT_CTR_CACHE_KEY = "mode" + + +def _detect_ctr_counter_mode( *, + decrypt_fn: Callable[..., Optional[_T]], + days: int, auto_detect: bool, fixed_counter_mode: str, - days: int, + key_len: int, cache: dict, + cache_key: object, announced: list[str], suppress_info: bool, -) -> Optional[DecryptedPacket]: - if not auto_detect: - return decrypt(key, pkt, days=days, counter_mode=fixed_counter_mode) +) -> Optional[_T]: + """Decrypt an AES-CTR packet, auto-detecting the counter source if asked. + + Shared by the BLE and satellite scan paths. ``decrypt_fn`` is a packet-bound + adapter accepting ``counter_mode`` (and, for UNIX_TIME, ``days``) and + returning the decrypted result or None. + + When ``auto_detect`` is False the ``fixed_counter_mode`` is used directly. + Otherwise the mode cached under ``cache_key`` (a BLE EID, or a per-stream + sentinel for satellite) is tried first, then UNIX_TIME and DEVICE_UPTIME are + swept; the first that succeeds is cached and announced once via ``announced``. + A ``cache_key`` of None disables caching (BLE packets without an EID). + """ - def _try(mode: str) -> Optional[DecryptedPacket]: + def _try(mode: str) -> Optional[_T]: kwargs = {"counter_mode": mode} if mode == UNIX_TIME: kwargs["days"] = days - return decrypt(key, pkt, **kwargs) + return decrypt_fn(**kwargs) + + if not auto_detect: + return _try(fixed_counter_mode) - if pkt.eid is not None: - cached = cache.get(pkt.eid) + if cache_key is not None: + cached = cache.get(cache_key) if cached is not None: result = _try(cached) - if result: + if result is not None: return result for mode in (UNIX_TIME, DEVICE_UPTIME): result = _try(mode) if result is None: continue - if pkt.eid is not None: - cache[pkt.eid] = mode + if cache_key is not None: + cache[cache_key] = mode if not announced and not suppress_info: announced.append("ctr") - variant = "AES-128-CTR" if len(key) == 16 else "AES-256-CTR" + variant = "AES-128-CTR" if key_len == 16 else "AES-256-CTR" click.secho( f"[INFO] Detected: {variant}, counter_source={mode}", fg="green", @@ -210,6 +231,61 @@ def _try(mode: str) -> Optional[DecryptedPacket]: return None +def _decrypt_ctr_with_detect( + key: bytes, + pkt: EncryptedPacket, + *, + auto_detect: bool, + fixed_counter_mode: str, + days: int, + cache: dict, + announced: list[str], + suppress_info: bool, +) -> Optional[DecryptedPacket]: + return _detect_ctr_counter_mode( + decrypt_fn=lambda **kw: decrypt(key, pkt, **kw), + days=days, + auto_detect=auto_detect, + fixed_counter_mode=fixed_counter_mode, + key_len=len(key), + cache=cache, + cache_key=pkt.eid, + announced=announced, + suppress_info=suppress_info, + ) + + +def _decrypt_satellite_with_detect( + key: bytes, + pkt, + *, + auto_detect: bool, + fixed_counter_mode: str, + days: int, + state: dict, + announced: list[str], + suppress_info: bool, +) -> Optional[bytes]: + return _detect_ctr_counter_mode( + decrypt_fn=lambda **kw: decrypt_satellite( + key, + seq_no=pkt.seq_num, + auth_tag=pkt.auth_tag, + encrypted_payload=pkt.payload, + timestamp=pkt.timestamp, + **kw, + ), + days=days, + auto_detect=auto_detect, + fixed_counter_mode=fixed_counter_mode, + key_len=len(key), + cache=state, + cache_key=_SAT_CTR_CACHE_KEY, + announced=announced, + suppress_info=suppress_info, + ) + + def _format_payload(payload, fmt: str) -> str: """Format packet payload bytes for display.""" if not isinstance(payload, bytes): @@ -3154,6 +3230,8 @@ def _run_sat_scan( debug: bool = False, key: Optional[str] = None, days: int = 2, + counter_mode: str = UNIX_TIME, + auto_detect_ctr: bool = False, show_failed_decryption: bool = False, ) -> None: """Shared implementation for ``sat scan`` and ``sat mock-scan``.""" @@ -3167,8 +3245,7 @@ def _run_sat_scan( show_decrypt_status=show_failed_decryption, ) - # Pre-decode the key if provided. Satellite packets always use the - # UNIX_TIME (day-based) counter, so only --days is relevant. + # Pre-decode the key if provided. decoded_key: Optional[bytes] = None if key: try: @@ -3179,6 +3256,14 @@ def _run_sat_scan( return raise click.ClickException(f"Invalid key: {e}") + if decoded_key is not None and auto_detect_ctr: + _announce_auto_detect( + auto_ctr=True, auto_eax=False, suppress=printer.suppress_info_messages + ) + + detected_ctr_state: dict = {} + announced: list[str] = [] + if debug: sat_logger = logging.getLogger("hubblenetwork.sat") sat_logger.setLevel(logging.DEBUG) @@ -3225,13 +3310,15 @@ def _on_interrupt(sig, frame): # With a key, the user wants only packets the key can decrypt. decrypted = None if pkt.auth_tag is not None: - decrypted = decrypt_satellite( + decrypted = _decrypt_satellite_with_detect( decoded_key, - seq_no=pkt.seq_num, - auth_tag=pkt.auth_tag, - encrypted_payload=pkt.payload, - timestamp=pkt.timestamp, + pkt, + auto_detect=auto_detect_ctr, + fixed_counter_mode=counter_mode, days=days, + state=detected_ctr_state, + announced=announced, + suppress_info=printer.suppress_info_messages, ) if decrypted is not None: printer.print_row( @@ -3306,7 +3393,8 @@ def _sat_scan_options(fn): default=None, show_default=False, help="Key to decrypt packet payloads (hex or base64, 16 or 32 bytes). " - "Satellite packets always use the UNIX_TIME counter.", + "The counter source (UNIX_TIME / DEVICE_UPTIME) is auto-detected " + "unless --counter-mode is given.", ) @click.option( "--days", @@ -3314,7 +3402,15 @@ def _sat_scan_options(fn): type=int, default=2, show_default=True, - help="Days to search around each packet's timestamp when decrypting", + help="Days to search around each packet's timestamp when decrypting " + "with the UNIX_TIME counter", +) +@click.option( + "--counter-mode", + type=click.Choice([UNIX_TIME, DEVICE_UPTIME], case_sensitive=False), + default=UNIX_TIME, + show_default=False, + help="EID counter source for decryption. Omit to auto-detect from packets.", ) @click.option( "--show-failed-decryption", @@ -3323,14 +3419,17 @@ def _sat_scan_options(fn): help="Show packets that fail decryption/authentication with the provided " "key. Adds a DECRYPT column indicating OK/FAIL.", ) -def sat_scan(**kwargs) -> None: +@click.pass_context +def sat_scan(ctx, **kwargs) -> None: """ Start the satellite receiver and stream decoded packets. Requires Docker and a PlutoSDR device connected via USB. Pass --key to decrypt packet payloads locally; packets the key cannot - decrypt are hidden unless --show-failed-decryption is given. + decrypt are hidden unless --show-failed-decryption is given. The counter + source is auto-detected (UNIX_TIME or DEVICE_UPTIME) unless --counter-mode + is given explicitly. Example: hubblenetwork sat scan --timeout 30 @@ -3338,6 +3437,23 @@ def sat_scan(**kwargs) -> None: hubblenetwork sat scan -n 5 hubblenetwork sat scan --key "a562a2f7e4c62bed52ab09633878f62b" --timeout 60 """ + key = kwargs.get("key") + counter_mode = kwargs["counter_mode"] + + def _explicit(name: str) -> bool: + return ctx.get_parameter_source(name) == click.core.ParameterSource.COMMANDLINE + + # counter_mode defaults to UNIX_TIME, so a DEVICE_UPTIME value here always + # means the user passed it explicitly (matches the ble scan/detect guards). + if counter_mode == DEVICE_UPTIME: + if not key: + raise click.UsageError("--counter-mode DEVICE_UPTIME requires --key") + if _explicit("days"): + raise click.UsageError( + "--counter-mode DEVICE_UPTIME and --days are mutually exclusive" + ) + + kwargs["auto_detect_ctr"] = key is not None and not _explicit("counter_mode") _run_sat_scan(mock=False, **kwargs) diff --git a/src/hubblenetwork/crypto.py b/src/hubblenetwork/crypto.py index db5a6f7..b1ab494 100644 --- a/src/hubblenetwork/crypto.py +++ b/src/hubblenetwork/crypto.py @@ -157,12 +157,13 @@ def _check_tag_matches( return tag == parsed.auth_tag -def decrypt( - key: bytes, - encrypted_pkt: EncryptedPacket, - days: int = 2, - counter_mode: str = UNIX_TIME, -) -> Optional[DecryptedPacket]: +def _normalize_counter_mode(counter_mode: str, days: int) -> str: + """Validate and upper-case ``counter_mode``, enforcing the DEVICE_UPTIME rule. + + Shared by :func:`decrypt` and :func:`decrypt_satellite`. Returns the + normalized mode; raises ``ValueError`` for an unknown mode or for + ``DEVICE_UPTIME`` combined with a non-default ``days``. + """ counter_mode = counter_mode.upper() if counter_mode not in _VALID_COUNTER_MODES: raise ValueError( @@ -170,6 +171,16 @@ def decrypt( ) if counter_mode == DEVICE_UPTIME and days != 2: raise ValueError("Cannot specify both counter_mode=DEVICE_UPTIME and days") + return counter_mode + + +def decrypt( + key: bytes, + encrypted_pkt: EncryptedPacket, + days: int = 2, + counter_mode: str = UNIX_TIME, +) -> Optional[DecryptedPacket]: + counter_mode = _normalize_counter_mode(counter_mode, days) parsed = ParsedPacket(encrypted_pkt) keylen = len(key) @@ -211,29 +222,38 @@ def decrypt_satellite( encrypted_payload: bytes, timestamp: Optional[float] = None, days: int = 2, + counter_mode: str = UNIX_TIME, ) -> Optional[bytes]: """Decrypt a satellite packet's encrypted customer payload. Satellite packets deliver the sequence number, 4-byte auth tag, and encrypted customer payload as separate fields, unlike BLE where they are packed into one advertisement. The underlying AES-256/128-CTR + CMAC - scheme is identical to BLE (see :func:`decrypt`), and satellite always - uses the UNIX_TIME (day-based) counter. + scheme is identical to BLE (see :func:`decrypt`). - The day counter is swept +/-``days`` around the packet's ``timestamp`` - (or the current UTC day when ``timestamp`` is None), returning the - decrypted payload for the first day whose derived auth tag matches. + The counter swept to find a matching auth tag depends on ``counter_mode``: - Returns the decrypted payload bytes, or None if no day matches (wrong - key or outside the search window). + - ``UNIX_TIME`` (default): the day counter is swept +/-``days`` around the + packet's ``timestamp`` (or the current UTC day when ``timestamp`` is None). + - ``DEVICE_UPTIME``: the device-uptime counter is swept over the fixed pool + 0-127. ``timestamp`` and ``days`` are unused in this mode. + + Returns the decrypted payload bytes for the first counter whose derived + auth tag matches, or None if none match (wrong key, wrong mode, or outside + the search window). """ + counter_mode = _normalize_counter_mode(counter_mode, days) + keylen = len(key) - if timestamp is None: - timestamp = datetime.now(timezone.utc).timestamp() - base = int(timestamp) // 86400 + if counter_mode == DEVICE_UPTIME: + candidates = range(128) + else: + if timestamp is None: + timestamp = datetime.now(timezone.utc).timestamp() + base = int(timestamp) // 86400 + candidates = (base + delta for delta in range(-days, days + 1)) - for delta in range(-days, days + 1): - time_counter = base + delta + for time_counter in candidates: daily_key = _get_encryption_key(key, time_counter, seq_no, keylen=keylen) if _get_auth_tag(daily_key, encrypted_payload) == auth_tag: nonce = _get_nonce(key, time_counter, seq_no, keylen=keylen) diff --git a/tests/test_sat_decrypt.py b/tests/test_sat_decrypt.py index a5861ed..8f229b2 100644 --- a/tests/test_sat_decrypt.py +++ b/tests/test_sat_decrypt.py @@ -11,7 +11,7 @@ from Crypto.Hash import CMAC from Crypto.Protocol.KDF import SP800_108_Counter -from hubblenetwork import decrypt_satellite +from hubblenetwork import DEVICE_UPTIME, UNIX_TIME, decrypt_satellite from hubblenetwork.cli import cli from hubblenetwork.errors import DockerError, SatelliteError from hubblenetwork.packets import SatellitePacket @@ -127,6 +127,59 @@ def test_timestamp_none_uses_today(self): ) assert out == plaintext + def test_device_uptime_roundtrip(self): + # Device-uptime packets encode the counter (0-127) as time_counter. + plaintext = b"uptime!" + ct, tag = make_encrypted_sat_payload(_MASTER_256, 11, plaintext, 5) + out = decrypt_satellite( + _MASTER_256, seq_no=11, auth_tag=tag, encrypted_payload=ct, + counter_mode=DEVICE_UPTIME, + ) + assert out == plaintext + + def test_device_uptime_aes128(self): + plaintext = b"u128" + ct, tag = make_encrypted_sat_payload(_MASTER_128, 3, plaintext, 120) + out = decrypt_satellite( + _MASTER_128, seq_no=3, auth_tag=tag, encrypted_payload=ct, + counter_mode=DEVICE_UPTIME, + ) + assert out == plaintext + + def test_device_uptime_out_of_pool_returns_none(self): + # Counter 200 is outside the fixed 0-127 pool. + ct, tag = make_encrypted_sat_payload(_MASTER_256, 1, b"far", 200) + out = decrypt_satellite( + _MASTER_256, seq_no=1, auth_tag=tag, encrypted_payload=ct, + counter_mode=DEVICE_UPTIME, + ) + assert out is None + + def test_unix_time_does_not_match_uptime_packet(self): + # A device-uptime packet (small counter) must not resolve under UNIX_TIME. + ct, tag = make_encrypted_sat_payload(_MASTER_256, 4, b"nope", 5) + out = decrypt_satellite( + _MASTER_256, seq_no=4, auth_tag=tag, encrypted_payload=ct, + timestamp=_TODAY_TC * 86400, counter_mode=UNIX_TIME, + ) + assert out is None + + def test_invalid_counter_mode_raises(self): + ct, tag = make_encrypted_sat_payload(_MASTER_256, 1, b"x", _TODAY_TC) + with pytest.raises(ValueError): + decrypt_satellite( + _MASTER_256, seq_no=1, auth_tag=tag, encrypted_payload=ct, + counter_mode="BOGUS", + ) + + def test_device_uptime_with_days_raises(self): + ct, tag = make_encrypted_sat_payload(_MASTER_256, 1, b"x", 5) + with pytest.raises(ValueError): + decrypt_satellite( + _MASTER_256, seq_no=1, auth_tag=tag, encrypted_payload=ct, + counter_mode=DEVICE_UPTIME, days=5, + ) + # --------------------------------------------------------------------------- # JSONL parsing of auth_tag @@ -162,8 +215,10 @@ def test_missing_auth_tag_is_none(self): # --------------------------------------------------------------------------- -def _make_encrypted_sat_pkt(seq_no=42, plaintext=b"data_42") -> SatellitePacket: - ct, tag = make_encrypted_sat_payload(_MASTER_256, seq_no, plaintext, _TODAY_TC) +def _make_encrypted_sat_pkt( + seq_no=42, plaintext=b"data_42", time_counter=_TODAY_TC +) -> SatellitePacket: + ct, tag = make_encrypted_sat_payload(_MASTER_256, seq_no, plaintext, time_counter) return SatellitePacket( device_id="0xBB2973BD", seq_num=seq_no, @@ -279,3 +334,75 @@ def test_decrypts_payload_json_no_status_without_flag(self, runner, monkeypatch) assert result.exit_code == 0 assert "hello" in result.output assert "decrypt_status" not in result.output + + def test_autodetect_device_uptime(self, runner, monkeypatch): + # No --counter-mode: a device-uptime packet should auto-detect and decrypt. + pkt = _make_encrypted_sat_pkt(seq_no=42, plaintext=b"up_data", time_counter=5) + import hubblenetwork.cli as cli_mod + + monkeypatch.setattr(cli_mod.sat_mod, "scan", lambda **kw: iter([pkt])) + monkeypatch.setattr(cli_mod.sat_mod, "ensure_docker_available", lambda: None) + + result = runner.invoke( + cli, + ["sat", "scan", "--key", _MASTER_256.hex(), "--timeout", "1", + "--poll-interval", "0.1", "--payload-format", "string"], + ) + assert result.exit_code == 0 + assert "up_data" in result.output + assert "Detected: AES-256-CTR, counter_source=DEVICE_UPTIME" in result.output + + def test_autodetect_unix_time_announced(self, runner, monkeypatch): + pkt = _make_encrypted_sat_pkt(seq_no=42, plaintext=b"day_data") + import hubblenetwork.cli as cli_mod + + monkeypatch.setattr(cli_mod.sat_mod, "scan", lambda **kw: iter([pkt])) + monkeypatch.setattr(cli_mod.sat_mod, "ensure_docker_available", lambda: None) + + result = runner.invoke( + cli, + ["sat", "scan", "--key", _MASTER_256.hex(), "--timeout", "1", + "--poll-interval", "0.1", "--payload-format", "string"], + ) + assert result.exit_code == 0 + assert "day_data" in result.output + assert "Detected: AES-256-CTR, counter_source=UNIX_TIME" in result.output + + def test_explicit_counter_mode_device_uptime(self, runner, monkeypatch): + pkt = _make_encrypted_sat_pkt(seq_no=42, plaintext=b"up_data", time_counter=5) + import hubblenetwork.cli as cli_mod + + monkeypatch.setattr(cli_mod.sat_mod, "scan", lambda **kw: iter([pkt])) + monkeypatch.setattr(cli_mod.sat_mod, "ensure_docker_available", lambda: None) + + result = runner.invoke( + cli, + ["sat", "scan", "--key", _MASTER_256.hex(), "--counter-mode", + "DEVICE_UPTIME", "--timeout", "1", "--poll-interval", "0.1", + "--payload-format", "string"], + ) + assert result.exit_code == 0 + assert "up_data" in result.output + # Explicit mode skips auto-detect, so no "Detected:" line. + assert "Detected:" not in result.output + + def test_device_uptime_requires_key(self, runner): + result = runner.invoke( + cli, ["sat", "scan", "--counter-mode", "DEVICE_UPTIME", "--timeout", "1"] + ) + assert result.exit_code != 0 + assert "DEVICE_UPTIME requires --key" in result.output + + def test_device_uptime_days_mutually_exclusive(self, runner): + result = runner.invoke( + cli, + ["sat", "scan", "--key", _MASTER_256.hex(), "--counter-mode", + "DEVICE_UPTIME", "--days", "3", "--timeout", "1"], + ) + assert result.exit_code != 0 + assert "mutually exclusive" in result.output + + def test_counter_mode_in_help(self, runner): + result = runner.invoke(cli, ["sat", "scan", "--help"]) + assert result.exit_code == 0 + assert "--counter-mode" in result.output