Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ The SDK uses a src layout with the main package at `src/hubblenetwork/`. Public

- **`ready.py`** - Hubble Ready device provisioning (UUID 0xFCA7). Handles GATT connections, characteristic reads/writes, and the full provisioning flow (register with backend, write key/config/time).

- **`crypto.py`** - Local packet decryption. Implements AES-CTR decryption with CMAC-based key derivation (SP800_108_Counter KDF). Supports both AES-256-CTR and AES-128-CTR. `decrypt()` accepts `counter_mode` as `"UNIX_TIME"` (default, UTC day-based) or `"DEVICE_UPTIME"` (counter-based, fixed pool size 128). Exports `UNIX_TIME` and `DEVICE_UPTIME` constants. `decrypt_eax()` decrypts AES-EAX packets by iterating counters 0-127, generating candidate EIDs via AES-ECB, and using `AES.MODE_EAX` for authenticated decryption. Uses key directly (no KDF). `decrypt_satellite()` decrypts a satellite packet's payload using the same AES-CTR/CMAC scheme as `decrypt()`; satellite packets deliver `seq_num`, `auth_tag`, and encrypted payload as separate fields (not packed into one advertisement) and always use the UNIX_TIME day counter.
- **`crypto.py`** - Local packet decryption. Implements AES-CTR decryption with CMAC-based key derivation (SP800_108_Counter KDF). Supports both AES-256-CTR and AES-128-CTR. `decrypt()` accepts `counter_mode` as `"UNIX_TIME"` (default, UTC day-based) or `"DEVICE_UPTIME"` (counter-based, fixed pool size 128). Exports `UNIX_TIME` and `DEVICE_UPTIME` constants. `decrypt_eax()` decrypts AES-EAX packets by iterating counters 0-127, generating candidate EIDs via AES-ECB, and using `AES.MODE_EAX` for authenticated decryption. Uses key directly (no KDF). `decrypt_satellite()` decrypts a satellite packet's payload using the same AES-CTR/CMAC scheme as `decrypt()`; satellite packets deliver `seq_num`, `auth_tag`, and encrypted payload as separate fields (not packed into one advertisement). It accepts `counter_mode` (`"UNIX_TIME"` default, day-based; or `"DEVICE_UPTIME"`, sweeping the fixed 0-127 counter pool) just like `decrypt()`.

- **`packets.py`** - Data classes: `Location`, `EncryptedPacket`, `DecryptedPacket`, `AesEaxPacket`, `UnknownPacket`.

- **`device.py`** - `Device` dataclass representing a registered device.

- **`errors.py`** - Exception hierarchy. Base `HubbleError` with specialized errors for backend, network, validation, BLE scanning, and decryption failures.

- **`cli.py`** - Click-based CLI. Command groups: `ble` (scan, detect, check-time, validate), `ready` (scan, info, read-status, read-key-info, read-config, read-time, write-key, write-config, write-time, provision), `org` (info, list-devices, get-packets, register-device, delete-device, set-device-name), `sat` (scan, mock-scan; `scan` accepts `--key`/`--days`/`--show-failed-decryption` to decrypt payloads locally). Top-level: `validate-credentials`.
- **`cli.py`** - Click-based CLI. Command groups: `ble` (scan, detect, check-time, validate), `ready` (scan, info, read-status, read-key-info, read-config, read-time, write-key, write-config, write-time, provision), `org` (info, list-devices, get-packets, register-device, delete-device, set-device-name), `sat` (scan, mock-scan; `scan` accepts `--key`/`--days`/`--counter-mode`/`--show-failed-decryption` to decrypt payloads locally — when `--key` is given without `--counter-mode`, the counter source is auto-detected (UNIX_TIME vs DEVICE_UPTIME) and announced, mirroring `ble scan`). Top-level: `validate-credentials`.

- **`sat.py`** - Satellite packet scanning via PlutoSDR. Manages Docker container lifecycle (pull, start, stop) and polls the container's HTTP API for decoded packets. Requires Docker daemon running. Image: `ghcr.io/hubblenetwork/sdr-docker:latest`.

Expand Down
170 changes: 143 additions & 27 deletions src/hubblenetwork/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dataclasses import replace
from datetime import datetime
from functools import partial
from typing import Optional, List
from typing import Callable, Optional, List, TypeVar
from tabulate import tabulate
from hubblenetwork import Organization
from hubblenetwork import Device, DecryptedPacket, EncryptedPacket, decrypt_eax
Expand Down Expand Up @@ -165,42 +165,63 @@ def _decrypt_eax_with_detect(
return None


def _decrypt_ctr_with_detect(
key: bytes,
pkt: EncryptedPacket,
_T = TypeVar("_T")

# Satellite streams have no per-packet EID to key the counter-mode cache on, so
# the detected mode lives in a single shared slot for the whole scan.
_SAT_CTR_CACHE_KEY = "mode"


def _detect_ctr_counter_mode(
*,
decrypt_fn: Callable[..., Optional[_T]],
days: int,
auto_detect: bool,
fixed_counter_mode: str,
days: int,
key_len: int,
cache: dict,
cache_key: object,
announced: list[str],
suppress_info: bool,
) -> Optional[DecryptedPacket]:
if not auto_detect:
return decrypt(key, pkt, days=days, counter_mode=fixed_counter_mode)
) -> Optional[_T]:
"""Decrypt an AES-CTR packet, auto-detecting the counter source if asked.

Shared by the BLE and satellite scan paths. ``decrypt_fn`` is a packet-bound
adapter accepting ``counter_mode`` (and, for UNIX_TIME, ``days``) and
returning the decrypted result or None.

When ``auto_detect`` is False the ``fixed_counter_mode`` is used directly.
Otherwise the mode cached under ``cache_key`` (a BLE EID, or a per-stream
sentinel for satellite) is tried first, then UNIX_TIME and DEVICE_UPTIME are
swept; the first that succeeds is cached and announced once via ``announced``.
A ``cache_key`` of None disables caching (BLE packets without an EID).
"""

def _try(mode: str) -> Optional[DecryptedPacket]:
def _try(mode: str) -> Optional[_T]:
kwargs = {"counter_mode": mode}
if mode == UNIX_TIME:
kwargs["days"] = days
return decrypt(key, pkt, **kwargs)
return decrypt_fn(**kwargs)

if not auto_detect:
return _try(fixed_counter_mode)

if pkt.eid is not None:
cached = cache.get(pkt.eid)
if cache_key is not None:
cached = cache.get(cache_key)
if cached is not None:
result = _try(cached)
if result:
if result is not None:
return result

for mode in (UNIX_TIME, DEVICE_UPTIME):
result = _try(mode)
if result is None:
continue
if pkt.eid is not None:
cache[pkt.eid] = mode
if cache_key is not None:
cache[cache_key] = mode
if not announced and not suppress_info:
announced.append("ctr")
variant = "AES-128-CTR" if len(key) == 16 else "AES-256-CTR"
variant = "AES-128-CTR" if key_len == 16 else "AES-256-CTR"
click.secho(
f"[INFO] Detected: {variant}, counter_source={mode}",
fg="green",
Expand All @@ -210,6 +231,61 @@ def _try(mode: str) -> Optional[DecryptedPacket]:
return None


def _decrypt_ctr_with_detect(
key: bytes,
pkt: EncryptedPacket,
*,
auto_detect: bool,
fixed_counter_mode: str,
days: int,
cache: dict,
announced: list[str],
suppress_info: bool,
) -> Optional[DecryptedPacket]:
return _detect_ctr_counter_mode(
decrypt_fn=lambda **kw: decrypt(key, pkt, **kw),
days=days,
auto_detect=auto_detect,
fixed_counter_mode=fixed_counter_mode,
key_len=len(key),
cache=cache,
cache_key=pkt.eid,
announced=announced,
suppress_info=suppress_info,
)


def _decrypt_satellite_with_detect(
key: bytes,
pkt,
*,
auto_detect: bool,
fixed_counter_mode: str,
days: int,
state: dict,
announced: list[str],
suppress_info: bool,
) -> Optional[bytes]:
return _detect_ctr_counter_mode(
decrypt_fn=lambda **kw: decrypt_satellite(
key,
seq_no=pkt.seq_num,
auth_tag=pkt.auth_tag,
encrypted_payload=pkt.payload,
timestamp=pkt.timestamp,
**kw,
),
days=days,
auto_detect=auto_detect,
fixed_counter_mode=fixed_counter_mode,
key_len=len(key),
cache=state,
cache_key=_SAT_CTR_CACHE_KEY,
announced=announced,
suppress_info=suppress_info,
)


def _format_payload(payload, fmt: str) -> str:
"""Format packet payload bytes for display."""
if not isinstance(payload, bytes):
Expand Down Expand Up @@ -3154,6 +3230,8 @@ def _run_sat_scan(
debug: bool = False,
key: Optional[str] = None,
days: int = 2,
counter_mode: str = UNIX_TIME,
auto_detect_ctr: bool = False,
show_failed_decryption: bool = False,
) -> None:
"""Shared implementation for ``sat scan`` and ``sat mock-scan``."""
Expand All @@ -3167,8 +3245,7 @@ def _run_sat_scan(
show_decrypt_status=show_failed_decryption,
)

# Pre-decode the key if provided. Satellite packets always use the
# UNIX_TIME (day-based) counter, so only --days is relevant.
# Pre-decode the key if provided.
decoded_key: Optional[bytes] = None
if key:
try:
Expand All @@ -3179,6 +3256,14 @@ def _run_sat_scan(
return
raise click.ClickException(f"Invalid key: {e}")

if decoded_key is not None and auto_detect_ctr:
_announce_auto_detect(
auto_ctr=True, auto_eax=False, suppress=printer.suppress_info_messages
)

detected_ctr_state: dict = {}
announced: list[str] = []

if debug:
sat_logger = logging.getLogger("hubblenetwork.sat")
sat_logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -3225,13 +3310,15 @@ def _on_interrupt(sig, frame):
# With a key, the user wants only packets the key can decrypt.
decrypted = None
if pkt.auth_tag is not None:
decrypted = decrypt_satellite(
decrypted = _decrypt_satellite_with_detect(
decoded_key,
seq_no=pkt.seq_num,
auth_tag=pkt.auth_tag,
encrypted_payload=pkt.payload,
timestamp=pkt.timestamp,
pkt,
auto_detect=auto_detect_ctr,
fixed_counter_mode=counter_mode,
days=days,
state=detected_ctr_state,
announced=announced,
suppress_info=printer.suppress_info_messages,
)
if decrypted is not None:
printer.print_row(
Expand Down Expand Up @@ -3306,15 +3393,24 @@ def _sat_scan_options(fn):
default=None,
show_default=False,
help="Key to decrypt packet payloads (hex or base64, 16 or 32 bytes). "
"Satellite packets always use the UNIX_TIME counter.",
"The counter source (UNIX_TIME / DEVICE_UPTIME) is auto-detected "
"unless --counter-mode is given.",
)
@click.option(
"--days",
"-d",
type=int,
default=2,
show_default=True,
help="Days to search around each packet's timestamp when decrypting",
help="Days to search around each packet's timestamp when decrypting "
"with the UNIX_TIME counter",
)
@click.option(
"--counter-mode",
type=click.Choice([UNIX_TIME, DEVICE_UPTIME], case_sensitive=False),
default=UNIX_TIME,
show_default=False,
help="EID counter source for decryption. Omit to auto-detect from packets.",
)
@click.option(
"--show-failed-decryption",
Expand All @@ -3323,21 +3419,41 @@ def _sat_scan_options(fn):
help="Show packets that fail decryption/authentication with the provided "
"key. Adds a DECRYPT column indicating OK/FAIL.",
)
def sat_scan(**kwargs) -> None:
@click.pass_context
def sat_scan(ctx, **kwargs) -> None:
"""
Start the satellite receiver and stream decoded packets.

Requires Docker and a PlutoSDR device connected via USB.

Pass --key to decrypt packet payloads locally; packets the key cannot
decrypt are hidden unless --show-failed-decryption is given.
decrypt are hidden unless --show-failed-decryption is given. The counter
source is auto-detected (UNIX_TIME or DEVICE_UPTIME) unless --counter-mode
is given explicitly.

Example:
hubblenetwork sat scan --timeout 30
hubblenetwork sat scan -o json --timeout 10
hubblenetwork sat scan -n 5
hubblenetwork sat scan --key "a562a2f7e4c62bed52ab09633878f62b" --timeout 60
"""
key = kwargs.get("key")
counter_mode = kwargs["counter_mode"]

def _explicit(name: str) -> bool:
return ctx.get_parameter_source(name) == click.core.ParameterSource.COMMANDLINE

# counter_mode defaults to UNIX_TIME, so a DEVICE_UPTIME value here always
# means the user passed it explicitly (matches the ble scan/detect guards).
if counter_mode == DEVICE_UPTIME:
if not key:
raise click.UsageError("--counter-mode DEVICE_UPTIME requires --key")
if _explicit("days"):
raise click.UsageError(
"--counter-mode DEVICE_UPTIME and --days are mutually exclusive"
)

kwargs["auto_detect_ctr"] = key is not None and not _explicit("counter_mode")
_run_sat_scan(mock=False, **kwargs)


Expand Down
56 changes: 38 additions & 18 deletions src/hubblenetwork/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,30 @@ def _check_tag_matches(
return tag == parsed.auth_tag


def decrypt(
key: bytes,
encrypted_pkt: EncryptedPacket,
days: int = 2,
counter_mode: str = UNIX_TIME,
) -> Optional[DecryptedPacket]:
def _normalize_counter_mode(counter_mode: str, days: int) -> str:
"""Validate and upper-case ``counter_mode``, enforcing the DEVICE_UPTIME rule.

Shared by :func:`decrypt` and :func:`decrypt_satellite`. Returns the
normalized mode; raises ``ValueError`` for an unknown mode or for
``DEVICE_UPTIME`` combined with a non-default ``days``.
"""
counter_mode = counter_mode.upper()
if counter_mode not in _VALID_COUNTER_MODES:
raise ValueError(
f"counter_mode must be one of {sorted(_VALID_COUNTER_MODES)}, got {counter_mode!r}"
)
if counter_mode == DEVICE_UPTIME and days != 2:
raise ValueError("Cannot specify both counter_mode=DEVICE_UPTIME and days")
return counter_mode


def decrypt(
key: bytes,
encrypted_pkt: EncryptedPacket,
days: int = 2,
counter_mode: str = UNIX_TIME,
) -> Optional[DecryptedPacket]:
counter_mode = _normalize_counter_mode(counter_mode, days)

parsed = ParsedPacket(encrypted_pkt)
keylen = len(key)
Expand Down Expand Up @@ -211,29 +222,38 @@ def decrypt_satellite(
encrypted_payload: bytes,
timestamp: Optional[float] = None,
days: int = 2,
counter_mode: str = UNIX_TIME,
) -> Optional[bytes]:
"""Decrypt a satellite packet's encrypted customer payload.

Satellite packets deliver the sequence number, 4-byte auth tag, and
encrypted customer payload as separate fields, unlike BLE where they are
packed into one advertisement. The underlying AES-256/128-CTR + CMAC
scheme is identical to BLE (see :func:`decrypt`), and satellite always
uses the UNIX_TIME (day-based) counter.
scheme is identical to BLE (see :func:`decrypt`).

The day counter is swept +/-``days`` around the packet's ``timestamp``
(or the current UTC day when ``timestamp`` is None), returning the
decrypted payload for the first day whose derived auth tag matches.
The counter swept to find a matching auth tag depends on ``counter_mode``:

Returns the decrypted payload bytes, or None if no day matches (wrong
key or outside the search window).
- ``UNIX_TIME`` (default): the day counter is swept +/-``days`` around the
packet's ``timestamp`` (or the current UTC day when ``timestamp`` is None).
- ``DEVICE_UPTIME``: the device-uptime counter is swept over the fixed pool
0-127. ``timestamp`` and ``days`` are unused in this mode.

Returns the decrypted payload bytes for the first counter whose derived
auth tag matches, or None if none match (wrong key, wrong mode, or outside
the search window).
"""
counter_mode = _normalize_counter_mode(counter_mode, days)

keylen = len(key)
if timestamp is None:
timestamp = datetime.now(timezone.utc).timestamp()
base = int(timestamp) // 86400
if counter_mode == DEVICE_UPTIME:
candidates = range(128)
else:
if timestamp is None:
timestamp = datetime.now(timezone.utc).timestamp()
base = int(timestamp) // 86400
candidates = (base + delta for delta in range(-days, days + 1))

for delta in range(-days, days + 1):
time_counter = base + delta
for time_counter in candidates:
daily_key = _get_encryption_key(key, time_counter, seq_no, keylen=keylen)
if _get_auth_tag(daily_key, encrypted_payload) == auth_tag:
nonce = _get_nonce(key, time_counter, seq_no, keylen=keylen)
Expand Down
Loading
Loading