From 287adab2b5e8067bb7aab75f105e8925fd071c5c Mon Sep 17 00:00:00 2001 From: Paul Buckley Date: Thu, 30 Apr 2026 14:49:25 -0700 Subject: [PATCH 1/4] docs: add packet generation design spec --- .../2026-04-30-packet-generation-design.md | 327 ++++++++++++++++++ 1 file changed, 327 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-30-packet-generation-design.md diff --git a/docs/superpowers/specs/2026-04-30-packet-generation-design.md b/docs/superpowers/specs/2026-04-30-packet-generation-design.md new file mode 100644 index 0000000..8c4d8f2 --- /dev/null +++ b/docs/superpowers/specs/2026-04-30-packet-generation-design.md @@ -0,0 +1,327 @@ +# Hubble Packet Generation — Design + +**Status:** Draft +**Date:** 2026-04-30 +**Owner:** Paul Buckley + +## Summary + +Add the ability to generate (encrypt) Hubble BLE advertisement packets locally from a key and a payload. This is the inverse of the existing `decrypt()` / `decrypt_eax()` flow. Generated packets can be ingested to the Hubble Cloud (`--ingest`) and printed in several formats so users can compare a synthetic packet against one observed on hardware. + +## Motivation + +Today the SDK only consumes encrypted packets (BLE scan → decrypt → render). There's no supported way to produce a well-formed encrypted packet from a key. This makes it hard to: + +- Validate end-to-end ingestion paths without real hardware in hand. +- Match a packet captured on the wire against what the spec says it *should* look like for known inputs. +- Seed the cloud with deterministic test data. + +This feature provides both an SDK primitive and a CLI command for these workflows. + +## Scope + +**In scope:** + +- AES-CTR (protocol v0) and AES-EAX (protocol v2) packet generation. +- Real EID generation (matches firmware) for both protocols, so backend lookups succeed. +- CLI command (`hubblenetwork ble generate`) wrapping the SDK with formatting and ingest options. +- `--ingest` flag posting via the existing `Organization.ingest_packet`. + +**Out of scope (filed as follow-ups):** + +- Generating unencrypted protocol v1 packets ("based on a key" doesn't apply). +- Multi-packet generation (`--count N`). +- Fixing the pre-existing bug that BLE-received `AesEaxPacket`s can't be ingested via `Organization.ingest_packet` (the ingest path reads `.payload` and currently expects the full BLE service data; on `AesEaxPacket` `.payload` is the inner ciphertext only). Tracked as a follow-up — this spec works around it by having `encrypt_eax()` return an `EncryptedPacket` carrying the full v2 bytes. +- Adding EID verification to the receive-side `decrypt()` (we now know the algorithm, but verifying isn't required for this feature). + +## Design + +### High-level architecture + +Two new pure functions in `src/hubblenetwork/crypto.py`, exported from `hubblenetwork/__init__.py`. A new `ble generate` Click subcommand in `src/hubblenetwork/cli.py` wraps them with formatting and ingest plumbing. + +``` + ┌─────────────────────────────────────┐ + │ CLI: hubblenetwork ble generate │ + │ parses flags, dispatches by │ + │ key length, formats output, │ + │ optionally ingests │ + └────────────┬────────────────────────┘ + │ + ┌────────────▼────────────────────────┐ + │ SDK: hubblenetwork.encrypt() │ + │ hubblenetwork.encrypt_eax() │ + │ pure functions, return │ + │ EncryptedPacket with full BLE │ + │ service-data bytes in .payload │ + └────────────┬────────────────────────┘ + │ reuses + ┌────────────▼────────────────────────┐ + │ Existing helpers in crypto.py: │ + │ _generate_kdf_key, _get_nonce, │ + │ _get_encryption_key, │ + │ _get_auth_tag, _generate_eid │ + └─────────────────────────────────────┘ +``` + +### SDK API + +Both functions are added to `crypto.py` next to their inverses, and re-exported through `hubblenetwork/__init__.py`. + +```python +def encrypt( + key: bytes, + payload: bytes, + *, + time_counter: Optional[int] = None, + seq_no: Optional[int] = None, + counter_mode: str = UNIX_TIME, +) -> EncryptedPacket: + """Generate an AES-CTR (protocol v0) encrypted Hubble BLE packet. + + Args: + key: 16 or 32 bytes (AES-128 or AES-256). + payload: plaintext to encrypt. Maximum length is bounded by the BLE + advertisement frame; the implementation enforces the same ceiling + firmware uses (see "Open questions"). + time_counter: For UNIX_TIME mode, the UTC day counter + (`int(time.time()) // 86400`). For DEVICE_UPTIME mode, the + device's uptime counter index. Defaults to today's UTC day + (UNIX_TIME) or 0 (DEVICE_UPTIME). + seq_no: 10-bit sequence number (0..1023). Defaults to a random value. + counter_mode: "UNIX_TIME" or "DEVICE_UPTIME". Affects only the default + of `time_counter`; the encryption math is identical. + + Returns: + EncryptedPacket whose `.payload` field is the full BLE service-data + byte string (header(2) | EID(4) | auth_tag(4) | ciphertext). + + Raises: + ValueError: invalid key length, payload too long, seq_no out of range, + or invalid counter_mode. + """ + +def encrypt_eax( + key: bytes, + payload: bytes, + *, + counter: Optional[int] = None, + nonce_salt: Optional[bytes] = None, + period_exponent: int = 0, +) -> EncryptedPacket: + """Generate an AES-EAX (protocol v2) encrypted Hubble BLE packet. + + Args: + key: 16 bytes (AES-128). + payload: 0-9 bytes plaintext to encrypt. + counter: EID counter index. Effective counter value is + `counter * 2**period_exponent`. Defaults to 0. + nonce_salt: 2 random bytes. Defaults to `secrets.token_bytes(2)`. + period_exponent: 0..15. Period = 2^n seconds. Default 0. + + Returns: + EncryptedPacket whose `.payload` field is the full BLE service-data + byte string (header(1) | nonce_salt(2) | EID_le(8) | ciphertext | tag(4)). + `protocol_version=2`, `eid` and `auth_tag` are populated from the v2 fields. + + Raises: + ValueError: invalid key length, payload too long, nonce_salt wrong size, + or period_exponent out of range. + """ +``` + +**Why both return `EncryptedPacket` (not `AesEaxPacket` for the v2 case):** The existing `Organization.ingest_packet` accepts an `EncryptedPacket` and reads `.payload` as the BLE adv bytes to base64-encode. Returning an `EncryptedPacket` from `encrypt_eax()` lets `--ingest` work without changing the cloud module. The protocol version, EID, and auth_tag are preserved in the dataclass fields so downstream code can still inspect them. (Fixing the broader EAX-ingest gap for BLE-received packets is a separate follow-up.) + +### CLI + +``` +hubblenetwork ble generate + --key required + --payload required (with --payload-format) + --payload-format hex|base64|string default: hex + [--counter-mode UNIX_TIME|DEVICE_UPTIME] CTR-only; default UNIX_TIME + [--counter N] CTR: time_counter override; EAX: counter override + [--seq-no N] CTR-only; 0..1023 + [--nonce-salt ] EAX-only; exactly 2 bytes + [--period-exponent 0..15] EAX-only; default 0 + [--ingest] POST to backend (requires HUBBLE_ORG_ID/HUBBLE_API_TOKEN) + [--format breakdown|hex|json] default: breakdown +``` + +Format selection: 32-byte key → AES-CTR; 16-byte key → AES-EAX. Same convention as the receive side and consistent with `_parse_key()`. + +Validation rejects flags that don't apply to the chosen format (e.g. `--seq-no` with a 16-byte key, `--nonce-salt` with a 32-byte key). Errors surface as `click.UsageError` so they appear consistently with the existing CLI's error path. + +### Data flow + +#### `encrypt()` — AES-CTR (v0) + +1. Validate `key` length (16 or 32) and `payload` length (≤ MAX_CTR_PAYLOAD; see "Open questions"). +2. Resolve defaults: + - `seq_no` ← random 10-bit if not provided. + - `time_counter` ← `int(time.time()) // 86400` (UNIX_TIME) or `0` (DEVICE_UPTIME) if not provided. +3. Compute encryption key: `_get_encryption_key(key, time_counter, seq_no, keylen)`. +4. Compute nonce: `_get_nonce(key, time_counter, seq_no, keylen)`. +5. Encrypt: `AES.new(enc_key, AES.MODE_CTR, nonce=nonce).encrypt(payload)` → ciphertext. +6. Compute auth tag: `_get_auth_tag(enc_key, ciphertext)` → 4 bytes. +7. Compute EID via new helper: + ```python + def _generate_ctr_eid(key, time_counter, keylen): + device_key = _generate_kdf_key(key, keylen, "DeviceKey", time_counter) + return _generate_kdf_key(device_key, 4, "DeviceID", 0) + ``` + Algorithm matches firmware `hubble_internal_device_id_get` (KBKDF chain with `DeviceKey` then `DeviceID` labels, `seq_no` hardcoded to 0 for EID derivation). +8. Pack header: `(version << 10) | (seq_no & 0x3FF)` as 2 big-endian bytes (version=0). +9. Concatenate: `header(2) | EID(4) | auth_tag(4) | ciphertext`. +10. Return `EncryptedPacket(timestamp=now, location=_FAKE_LOCATION, payload=, rssi=0, protocol_version=0, eid=, auth_tag=)`. + +#### `encrypt_eax()` — AES-EAX (v2) + +1. Validate `key` length (must be 16) and `payload` length (≤9). Validate `nonce_salt` (2 bytes) and `period_exponent` (0..15). +2. Resolve defaults: `counter` ← 0 if not provided; `nonce_salt` ← `secrets.token_bytes(2)` if not provided. +3. Compute EID using the **same formula `decrypt_eax` uses** so round-trip is guaranteed: derive `key_0 = _derive_eid_key(key, 0)` once, then `eid_block = AES_ECB(key_0, b"\x00"*11 + period_exponent.to_bytes(1,"big") + (counter * (1 << period_exponent)).to_bytes(4,"big"))`, take first 8 bytes as a big-endian uint64. (The existing `_generate_eid` helper diverges from `decrypt_eax` when `counter * 2**period_exponent ≥ 65536`. Match `decrypt_eax` here.) +4. Build nonce: `(counter * (1 << period_exponent)).to_bytes(4, "big") + nonce_salt`. +5. Encrypt + tag: + ```python + cipher = AES.new(key, AES.MODE_EAX, mac_len=4, nonce=nonce) + ciphertext, tag = cipher.encrypt(payload), cipher.digest() + ``` +6. Pack header: `(version << 2)` (version=2) → 1 byte = `0x08`. +7. Concatenate: `header(1) | nonce_salt(2) | eid_le(8) | ciphertext | tag(4)`. + - `eid_le = struct.pack(", rssi=0, protocol_version=2, eid=, auth_tag=)`. + +### CLI dispatch + +``` +parse_key(key) -> bytes (16 or 32) +parse_payload(payload, payload_format) -> bytes + +if len(key) == 32: # AES-CTR + reject EAX-only flags + pkt = encrypt(key, payload, time_counter=counter, seq_no=seq_no, counter_mode=counter_mode) +elif len(key) == 16: # AES-EAX + reject CTR-only flags + pkt = encrypt_eax(key, payload, counter=counter, nonce_salt=nonce_salt, period_exponent=period_exponent) + +render(pkt, format) +if ingest: + Organization(...).ingest_packet(pkt) + print confirmation +``` + +### Output formats + +#### `breakdown` (default) + +Human-readable, labeled fields followed by the full service-data bytes in three encodings: + +``` +Protocol: AES-EAX (v2) +Key length: 16 bytes +Counter: 0 +Period exponent: 0 (period = 1s) +Nonce salt: 0xa3 0xf1 +EID: 0x8b3c4d5e6f7a8b9c +Ciphertext: 0xde 0xad 0xbe 0xef +Auth tag: 0x12 0x34 0x56 0x78 + +Service data (19 bytes): + Hex: 08a3f19c8b7a6f5e4d3c8bdeadbeef12345678 + Spaced: 08 a3 f1 9c 8b 7a 6f 5e 4d 3c 8b de ad be ef 12 34 56 78 + Python: b'\x08\xa3\xf1\x9c\x8b\x7a\x6f\x5e\x4d\x3c\x8b\xde\xad\xbe\xef\x12\x34\x56\x78' + C array: {0x08, 0xa3, 0xf1, 0x9c, 0x8b, 0x7a, 0x6f, 0x5e, 0x4d, 0x3c, 0x8b, 0xde, 0xad, 0xbe, 0xef, 0x12, 0x34, 0x56, 0x78} +``` + +For AES-CTR the breakdown has the analogous fields (Protocol, Key length, Time counter, Counter mode, Seq no, EID (4-byte), Auth tag, Ciphertext, Service data). + +#### `hex` + +Single line of hex, no labels, suitable for scripting: + +``` +08a3f19c8b7a6f5e4d3c8bdeadbeef12345678 +``` + +#### `json` + +Machine-readable structured object: + +```json +{ + "protocol": "aes_eax", + "protocol_version": 2, + "key_length": 16, + "counter": 0, + "period_exponent": 0, + "nonce_salt": "a3f1", + "eid": "8b3c4d5e6f7a8b9c", + "ciphertext": "deadbeef", + "auth_tag": "12345678", + "service_data": "08a3f19c8b7a6f5e4d3c8bdeadbeef12345678" +} +``` + +(Analogous shape for AES-CTR with `protocol="aes_ctr"`, `time_counter`, `counter_mode`, `seq_no`.) + +### Error handling + +| Condition | Behavior | +|---|---| +| Invalid key (not 16 or 32 bytes) | `_parse_key` raises `ValueError`; CLI surfaces `click.ClickException`. | +| Payload too long for protocol (CTR > MAX_CTR_PAYLOAD or EAX > 9 bytes) | `ValueError` from SDK, `click.UsageError` from CLI. | +| `--seq-no` out of range (not 0..1023) | `click.UsageError`. | +| `--nonce-salt` not exactly 2 bytes | `click.UsageError`. | +| `--period-exponent` out of 0..15 | `click.UsageError`. | +| CTR-only flag with 16-byte key (or vice versa) | `click.UsageError` with explicit message. | +| `--counter-mode DEVICE_UPTIME` with explicit `--counter` not provided | Allowed (defaults to 0). | +| `--ingest` without `HUBBLE_ORG_ID`/`HUBBLE_API_TOKEN` | Same `_get_env_or_fail` path used by `ble scan --ingest`. | +| Backend ingest failure | Propagate `BackendError`; the CLI prints the error and exits non-zero. | + +### Testing + +New file `tests/test_packet_generation.py` covers SDK primitives. CLI coverage extends the existing `tests/test_cli_*` files (or a new `tests/test_cli_generate.py`). + +**SDK round-trip tests:** + +- `encrypt(key, payload, time_counter=T, seq_no=S)` → `decrypt(key, packet)` returns matching payload, counter=T, sequence=S. Repeat for AES-128 and AES-256 keys. +- `encrypt_eax(key, payload, counter=C, nonce_salt=N, period_exponent=E)` → `decrypt_eax(key, parse(packet), period_exponent=E)` returns matching payload. +- Determinism: same inputs (including explicit `seq_no` / `nonce_salt`) produce byte-identical output across calls. + +**EID verification tests:** + +- AES-CTR: assert `_generate_ctr_eid(key, T, keylen)` matches a known-good firmware-generated value for at least one fixed `(key, T)` pair. *(Open: need a reference vector from firmware. If not available, fall back to verifying the EID round-trips through the cloud in an integration test marked `@pytest.mark.integration`.)* +- AES-EAX: existing `_generate_eid` is already tested; add a generation test that confirms the EID embedded in the service data matches `_generate_eid(key, counter*period, period_exponent)`. + +**Format dispatch tests:** + +- 32-byte key produces protocol_version 0 service data; first byte's top 6 bits are zero. +- 16-byte key produces protocol_version 2 service data; first byte equals `0x08`. + +**Validation tests:** + +- Oversized payload, mismatched flags-vs-key-length, malformed nonce-salt all raise the right errors. + +**CLI tests (using `click.testing.CliRunner`):** + +- `hubblenetwork ble generate --key --payload ` with `--format hex` produces parseable bytes that round-trip through `ble._make_packet` and `decrypt`. +- Same for AES-EAX with `--format hex` and `decrypt_eax`. +- `--format json` produces valid JSON with all expected keys. +- `--format breakdown` includes "Protocol", "Service data", and the hex string. +- `--ingest` calls `Organization.ingest_packet` with the generated packet (mock the network). + +## Migration / compatibility + +Pure addition; no existing behavior changes. The single internal addition is `_generate_ctr_eid()` in `crypto.py`, used only by the new `encrypt()` function. No changes to `decrypt()`, `decrypt_eax()`, `org.py`, `cloud.py`, or BLE parsing. Adds `encrypt`, `encrypt_eax` to the public API surface. + +## Open questions + +- **Firmware EID test vector:** Do we have a known-good `(key, time_counter) → EID` pair from the firmware suite that we can pin in `test_packet_generation.py`? If not, the round-trip + integration test cover the algorithm but won't independently verify Python-side parity with C. (This isn't blocking — the firmware function is small and we have its source.) +- **MAX_CTR_PAYLOAD:** What's the actual max ciphertext length the firmware emits for AES-CTR adverts? Need to either grab the constant from `hubble_ble.c` or pick a conservative ceiling (e.g. 17 bytes, matching CTR overhead of 10 + ciphertext within a 27-byte service-data limit). The unencrypted-v1 ceiling of 18 bytes is documented but doesn't apply to CTR directly. Implementer should pin this from firmware before merge. + +## Follow-ups + +- Make `Organization.ingest_packet` accept `AesEaxPacket` (so BLE-received EAX packets can be ingested through the SDK). +- `--count N` for generating sequences of distinct packets. +- Optional support for protocol v1 (unencrypted) generation. +- Consider verifying the CTR EID in `decrypt()` now that we know the algorithm. From 13de2b7233ec2197e04171b94eff19c15ea28f02 Mon Sep 17 00:00:00 2001 From: Paul Buckley Date: Thu, 30 Apr 2026 14:58:57 -0700 Subject: [PATCH 2/4] docs: add packet generation implementation plan --- .../plans/2026-04-30-packet-generation.md | 1421 +++++++++++++++++ 1 file changed, 1421 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-30-packet-generation.md diff --git a/docs/superpowers/plans/2026-04-30-packet-generation.md b/docs/superpowers/plans/2026-04-30-packet-generation.md new file mode 100644 index 0000000..6a350d9 --- /dev/null +++ b/docs/superpowers/plans/2026-04-30-packet-generation.md @@ -0,0 +1,1421 @@ +# Packet Generation Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add SDK functions and a CLI command (`hubblenetwork ble generate`) that generate AES-CTR (v0) and AES-EAX (v2) Hubble BLE advertisement packets from a key + payload, with optional ingestion to the Hubble Cloud. + +**Architecture:** Two pure functions in `crypto.py` (`encrypt`, `encrypt_eax`) that mirror the existing `decrypt` / `decrypt_eax` and reuse the existing KDF / EID / nonce helpers. A new Click subcommand under the `ble` group dispatches by key length (32 → CTR, 16 → EAX), formats output (breakdown / hex / json), and optionally posts the packet via `Organization.ingest_packet`. Both encrypt functions return an `EncryptedPacket` whose `.payload` field is the full BLE service-data byte string — this lets the same struct flow through `org.ingest_packet` regardless of protocol. + +**Tech Stack:** Python 3, `pycryptodome` (already a dep), Click, pytest. Reuses existing `_generate_kdf_key`, `_get_encryption_key`, `_get_nonce`, `_get_auth_tag`, `_derive_eid_key` helpers in `crypto.py`. + +**Reference:** Spec at `docs/superpowers/specs/2026-04-30-packet-generation-design.md`. + +--- + +## File Map + +| File | Action | Purpose | +|---|---|---| +| `src/hubblenetwork/crypto.py` | Modify | Add `MAX_CTR_PAYLOAD`, `MAX_EAX_PAYLOAD`, `_generate_ctr_eid`, `encrypt`, `encrypt_eax` | +| `src/hubblenetwork/__init__.py` | Modify | Export `encrypt`, `encrypt_eax` | +| `src/hubblenetwork/cli.py` | Modify | Add `ble generate` subcommand and three output formatters | +| `tests/test_packet_generation.py` | Create | SDK round-trip + validation tests | +| `tests/test_cli_generate.py` | Create | CLI command tests | +| `README.md` | Modify | Add `ble generate` example | + +--- + +## Constants (used across tasks) + +These come from firmware (`hubble_ble.c`): + +```python +MAX_CTR_PAYLOAD = 13 # HUBBLE_BLE_MAX_DATA_LEN +MAX_EAX_PAYLOAD = 9 # documented in CLAUDE.md / packets.py docstring +``` + +--- + +## Task 1: Add CTR EID helper and payload constants + +**Files:** +- Modify: `src/hubblenetwork/crypto.py` +- Test: `tests/test_packet_generation.py` + +- [ ] **Step 1: Create the test file with the failing test** + +Create `tests/test_packet_generation.py`: + +```python +"""Tests for packet generation (encrypt / encrypt_eax).""" + +from hubblenetwork.crypto import _generate_ctr_eid + + +class TestGenerateCtrEid: + def test_returns_4_bytes(self): + key = bytes(range(32)) + eid = _generate_ctr_eid(key, time_counter=12345, keylen=32) + assert isinstance(eid, bytes) + assert len(eid) == 4 + + def test_deterministic(self): + key = bytes(range(32)) + a = _generate_ctr_eid(key, time_counter=12345, keylen=32) + b = _generate_ctr_eid(key, time_counter=12345, keylen=32) + assert a == b + + def test_changes_with_time_counter(self): + key = bytes(range(32)) + a = _generate_ctr_eid(key, time_counter=12345, keylen=32) + b = _generate_ctr_eid(key, time_counter=12346, keylen=32) + assert a != b + + def test_aes_128_key(self): + key = bytes(range(16)) + eid = _generate_ctr_eid(key, time_counter=0, keylen=16) + assert len(eid) == 4 +``` + +- [ ] **Step 2: Run the tests and verify they fail** + +Run: `pytest tests/test_packet_generation.py -v` +Expected: 4 failures, all `ImportError: cannot import name '_generate_ctr_eid'`. + +- [ ] **Step 3: Add constants and helper to `crypto.py`** + +In `src/hubblenetwork/crypto.py`, add the constants near the top (after the existing `_HUBBLE_AES_TAG_SIZE` constant): + +```python +# Maximum customer payload sizes (matches firmware HUBBLE_BLE_MAX_DATA_LEN +# and the AES-EAX 0-9 byte ciphertext cap documented in packets.py). +MAX_CTR_PAYLOAD = 13 +MAX_EAX_PAYLOAD = 9 +``` + +Then add the EID helper next to `_get_auth_tag` (around line 60): + +```python +def _generate_ctr_eid(key: bytes, time_counter: int, keylen: int) -> bytes: + """Generate the 4-byte AES-CTR EID embedded at offset 2-6 of v0 service data. + + Mirrors the firmware `hubble_internal_device_id_get` function: a two-step + KBKDF chain. First derives a per-period DeviceKey from the master key, + then derives the 4-byte DeviceID with seq_no hardcoded to 0. + """ + device_key = _generate_kdf_key(key, keylen, "DeviceKey", time_counter) + return _generate_kdf_key(device_key, 4, "DeviceID", 0) +``` + +- [ ] **Step 4: Run tests and verify they pass** + +Run: `pytest tests/test_packet_generation.py -v` +Expected: 4 passed. + +- [ ] **Step 5: Commit** + +```bash +git add src/hubblenetwork/crypto.py tests/test_packet_generation.py +git commit -m "feat(crypto): add _generate_ctr_eid helper and payload size constants" +``` + +--- + +## Task 2: Add `encrypt()` for AES-CTR (v0) + +**Files:** +- Modify: `src/hubblenetwork/crypto.py` +- Test: `tests/test_packet_generation.py` + +- [ ] **Step 1: Add round-trip test** + +Append to `tests/test_packet_generation.py`: + +```python +import pytest +from hubblenetwork.crypto import ( + encrypt, + decrypt, + UNIX_TIME, + DEVICE_UPTIME, + MAX_CTR_PAYLOAD, +) + + +class TestEncryptCtr: + def test_round_trip_aes_256(self): + key = bytes(range(32)) + plaintext = b"hello world" + pkt = encrypt(key, plaintext, time_counter=7, seq_no=42, counter_mode=DEVICE_UPTIME) + result = decrypt(key, pkt, counter_mode=DEVICE_UPTIME) + assert result is not None + assert result.payload == plaintext + assert result.counter == 7 + assert result.sequence == 42 + + def test_round_trip_aes_128(self): + key = bytes(range(16)) + plaintext = b"hi" + pkt = encrypt(key, plaintext, time_counter=3, seq_no=100, counter_mode=DEVICE_UPTIME) + result = decrypt(key, pkt, counter_mode=DEVICE_UPTIME) + assert result is not None + assert result.payload == plaintext + assert result.counter == 3 + assert result.sequence == 100 + + def test_unix_time_default(self): + """Without time_counter, defaults to today's UTC day; decrypt() default mode finds it.""" + key = bytes(range(32)) + pkt = encrypt(key, b"x", seq_no=1) + result = decrypt(key, pkt) # default UNIX_TIME, ±2 days + assert result is not None + assert result.payload == b"x" + assert result.sequence == 1 + + def test_random_seq_no_when_omitted(self): + """Without seq_no, two calls produce different bytes.""" + key = bytes(range(32)) + a = encrypt(key, b"x", time_counter=5, counter_mode=DEVICE_UPTIME) + b = encrypt(key, b"x", time_counter=5, counter_mode=DEVICE_UPTIME) + assert a.payload != b.payload + + def test_deterministic_with_explicit_inputs(self): + key = bytes(range(32)) + a = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + b = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + assert a.payload == b.payload + + def test_eid_matches_helper(self): + key = bytes(range(32)) + pkt = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + # Service data layout: header(2) | EID(4) | auth_tag(4) | ciphertext + eid_bytes = pkt.payload[2:6] + assert eid_bytes == _generate_ctr_eid(key, 5, keylen=32) + + def test_protocol_version_zero(self): + key = bytes(range(32)) + pkt = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + # Top 6 bits of byte 0 are version (0 for AES-CTR) + assert (pkt.payload[0] >> 2) == 0 + assert pkt.protocol_version == 0 + + def test_seq_no_encoded_in_header(self): + key = bytes(range(32)) + pkt = encrypt(key, b"x", time_counter=5, seq_no=0x123, counter_mode=DEVICE_UPTIME) + seq_extracted = int.from_bytes(pkt.payload[0:2], "big") & 0x3FF + assert seq_extracted == 0x123 +``` + +The `_generate_ctr_eid` import was added at the top of the file in Task 1. + +- [ ] **Step 2: Run tests and verify they fail** + +Run: `pytest tests/test_packet_generation.py::TestEncryptCtr -v` +Expected: All fail with `ImportError: cannot import name 'encrypt'`. + +- [ ] **Step 3: Implement `encrypt()` in `crypto.py`** + +Add at the bottom of `src/hubblenetwork/crypto.py` (or grouped with `decrypt`): + +```python +import secrets +import time as _time + + +def encrypt( + key: bytes, + payload: bytes, + *, + time_counter: Optional[int] = None, + seq_no: Optional[int] = None, + counter_mode: str = UNIX_TIME, +) -> EncryptedPacket: + """Generate an AES-CTR (protocol v0) encrypted Hubble BLE packet. + + The returned EncryptedPacket's `.payload` field holds the full BLE + service-data byte string: header(2) | EID(4) | auth_tag(4) | ciphertext. + """ + counter_mode = counter_mode.upper() + if counter_mode not in _VALID_COUNTER_MODES: + raise ValueError( + f"counter_mode must be one of {sorted(_VALID_COUNTER_MODES)}, got {counter_mode!r}" + ) + + keylen = len(key) + if keylen not in (16, 32): + raise ValueError(f"key must be 16 or 32 bytes, got {keylen}") + + if len(payload) > MAX_CTR_PAYLOAD: + raise ValueError( + f"payload too long for AES-CTR: {len(payload)} > {MAX_CTR_PAYLOAD}" + ) + + if seq_no is None: + seq_no = secrets.randbelow(1 << 10) + if not (0 <= seq_no < (1 << 10)): + raise ValueError(f"seq_no must be in 0..1023, got {seq_no}") + + if time_counter is None: + if counter_mode == UNIX_TIME: + time_counter = int(_time.time()) // 86400 + else: + time_counter = 0 + + enc_key = _get_encryption_key(key, time_counter, seq_no, keylen=keylen) + nonce = _get_nonce(key, time_counter, seq_no, keylen=keylen) + ciphertext = AES.new(enc_key, AES.MODE_CTR, nonce=nonce).encrypt(payload) + auth_tag = _get_auth_tag(enc_key, ciphertext) + eid_bytes = _generate_ctr_eid(key, time_counter, keylen=keylen) + + # Header: top 6 bits = version (0 for v0), bottom 10 bits = seq_no + header_int = (0 << 10) | (seq_no & 0x3FF) + header = header_int.to_bytes(2, "big") + + service_data = header + eid_bytes + auth_tag + ciphertext + + from datetime import datetime, timezone + return EncryptedPacket( + timestamp=int(datetime.now(timezone.utc).timestamp()), + location=Location(lat=90, lon=0, fake=True), + payload=service_data, + rssi=0, + protocol_version=0, + eid=int.from_bytes(eid_bytes, "big"), + auth_tag=auth_tag, + ) +``` + +Add the missing import at the top of `crypto.py`: + +```python +from .packets import EncryptedPacket, DecryptedPacket, AesEaxPacket, Location +``` + +(Replace the existing `from .packets import` line — `Location` is the only addition.) + +- [ ] **Step 4: Run tests and verify they pass** + +Run: `pytest tests/test_packet_generation.py::TestEncryptCtr -v` +Expected: 8 passed. + +- [ ] **Step 5: Commit** + +```bash +git add src/hubblenetwork/crypto.py tests/test_packet_generation.py +git commit -m "feat(crypto): add encrypt() for AES-CTR packet generation" +``` + +--- + +## Task 3: Add `encrypt_eax()` for AES-EAX (v2) + +**Files:** +- Modify: `src/hubblenetwork/crypto.py` +- Test: `tests/test_packet_generation.py` + +- [ ] **Step 1: Add round-trip test** + +Append to `tests/test_packet_generation.py`: + +```python +import struct +from hubblenetwork.crypto import encrypt_eax, decrypt_eax +from hubblenetwork import ble as ble_mod +from hubblenetwork.packets import AesEaxPacket + + +class TestEncryptEax: + KEY_128 = bytes(range(16)) + + def test_round_trip_default(self): + plaintext = b"abc" + pkt = encrypt_eax(self.KEY_128, plaintext) + # Re-parse the service data bytes back into an AesEaxPacket + parsed = ble_mod._make_packet(pkt.payload, rssi=0) + assert isinstance(parsed, AesEaxPacket) + result = decrypt_eax(self.KEY_128, parsed) + assert result is not None + assert result.payload == plaintext + + def test_round_trip_with_explicit_inputs(self): + plaintext = b"abcdefghi" # 9 bytes, max + nonce_salt = b"\xa3\xf1" + pkt = encrypt_eax( + self.KEY_128, plaintext, + counter=3, nonce_salt=nonce_salt, period_exponent=0, + ) + parsed = ble_mod._make_packet(pkt.payload, rssi=0) + assert isinstance(parsed, AesEaxPacket) + assert parsed.nonce_salt == nonce_salt + result = decrypt_eax(self.KEY_128, parsed, period_exponent=0) + assert result is not None + assert result.payload == plaintext + + def test_round_trip_with_period_exponent(self): + plaintext = b"x" + pkt = encrypt_eax( + self.KEY_128, plaintext, + counter=2, nonce_salt=b"\x00\x01", period_exponent=3, + ) + parsed = ble_mod._make_packet(pkt.payload, rssi=0) + result = decrypt_eax(self.KEY_128, parsed, period_exponent=3) + assert result is not None + assert result.payload == plaintext + + def test_random_nonce_salt_when_omitted(self): + a = encrypt_eax(self.KEY_128, b"x") + b = encrypt_eax(self.KEY_128, b"x") + assert a.payload != b.payload + + def test_deterministic_with_explicit_inputs(self): + a = encrypt_eax(self.KEY_128, b"x", counter=0, nonce_salt=b"\x00\x00") + b = encrypt_eax(self.KEY_128, b"x", counter=0, nonce_salt=b"\x00\x00") + assert a.payload == b.payload + + def test_protocol_version_two(self): + pkt = encrypt_eax(self.KEY_128, b"x", counter=0, nonce_salt=b"\x00\x00") + # Top 6 bits of byte 0 = version (2 for AES-EAX) → 0x08 + assert (pkt.payload[0] >> 2) == 2 + assert pkt.protocol_version == 2 + + def test_eid_embedded_little_endian(self): + pkt = encrypt_eax(self.KEY_128, b"", counter=0, nonce_salt=b"\x00\x00") + # offset 3-11 is the 8-byte EID, little-endian + embedded_eid = struct.unpack(" EncryptedPacket: + """Generate an AES-EAX (protocol v2) encrypted Hubble BLE packet. + + Returns an EncryptedPacket whose `.payload` is the full BLE service-data + byte string: header(1) | nonce_salt(2) | eid_le(8) | ciphertext | tag(4). + Returning EncryptedPacket (not AesEaxPacket) keeps Organization.ingest_packet + working without modification. + """ + if len(key) != 16: + raise ValueError(f"AES-EAX requires a 16-byte key, got {len(key)}") + if len(payload) > MAX_EAX_PAYLOAD: + raise ValueError( + f"payload too long for AES-EAX: {len(payload)} > {MAX_EAX_PAYLOAD}" + ) + if not (0 <= period_exponent <= 15): + raise ValueError(f"period_exponent must be 0..15, got {period_exponent}") + + if counter is None: + counter = 0 + if nonce_salt is None: + nonce_salt = secrets.token_bytes(2) + elif len(nonce_salt) != 2: + raise ValueError(f"nonce_salt must be exactly 2 bytes, got {len(nonce_salt)}") + + effective_counter = counter * (1 << period_exponent) + + # Compute EID using the SAME formula decrypt_eax uses (key_0 derived from + # counter=0). Inlined to guarantee byte-for-byte round-trip even when + # effective_counter ≥ 65536, where _generate_eid would diverge. + key_0 = _derive_eid_key(key, 0) + msg2 = ( + b"\x00" * 11 + + period_exponent.to_bytes(1, "big") + + effective_counter.to_bytes(4, "big") + ) + eid_block = AES.new(key_0, AES.MODE_ECB).encrypt(msg2) + eid_int = int.from_bytes(eid_block[0:8], "big") + + # Build the AES-EAX nonce: counter (BE 4 bytes) || nonce_salt (2 bytes) + nonce = effective_counter.to_bytes(4, "big") + nonce_salt + cipher = AES.new(key, AES.MODE_EAX, mac_len=4, nonce=nonce) + ciphertext, auth_tag = cipher.encrypt_and_digest(payload) + + # Header byte: version (top 6 bits) shifted up; bottom 2 bits unused for v2 + header = bytes([2 << 2]) + eid_le = struct.pack(" bytes: + return bytes.fromhex(hex_str) + + +class TestBleGenerateHexFormat: + def test_aes_ctr_hex_round_trips(self): + runner = CliRunner() + key_hex = "00" * 32 # 32-byte key, AES-CTR + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", key_hex, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "hex", + ], + ) + assert result.exit_code == 0, result.output + hex_out = result.output.strip() + # Must be a hex string of even length + assert re.fullmatch(r"[0-9a-fA-F]+", hex_out) + raw = bytes.fromhex(hex_out) + # Round-trip: parse raw bytes → decrypt + pkt = EncryptedPacket( + timestamp=0, location=_FAKE_LOCATION, payload=raw, rssi=0, + ) + decrypted = decrypt(bytes(32), pkt, counter_mode=DEVICE_UPTIME) + assert decrypted is not None + assert decrypted.payload == bytes.fromhex("deadbeef") + assert decrypted.counter == 5 + assert decrypted.sequence == 42 + + def test_aes_eax_hex_round_trips(self): + runner = CliRunner() + key_hex = "00" * 16 # 16-byte key, AES-EAX + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", key_hex, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--period-exponent", "0", + "--format", "hex", + ], + ) + assert result.exit_code == 0, result.output + raw = bytes.fromhex(result.output.strip()) + parsed = ble_mod._make_packet(raw, rssi=0) + assert isinstance(parsed, AesEaxPacket) + decrypted = decrypt_eax(bytes(16), parsed, period_exponent=0) + assert decrypted is not None + assert decrypted.payload == bytes.fromhex("deadbeef") + + +class TestBleGenerateValidation: + def test_seq_no_with_aes_eax_key_rejected(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "00", + "--payload-format", "hex", + "--seq-no", "1", + "--format", "hex", + ], + ) + assert result.exit_code != 0 + assert "AES-CTR" in result.output or "16-byte" in result.output + + def test_nonce_salt_with_aes_ctr_key_rejected(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "00", + "--payload-format", "hex", + "--nonce-salt", "0001", + "--format", "hex", + ], + ) + assert result.exit_code != 0 + assert "AES-EAX" in result.output or "32-byte" in result.output +``` + +- [ ] **Step 2: Run tests and verify they fail** + +Run: `pytest tests/test_cli_generate.py -v` +Expected: All fail with "Error: No such command 'generate'" or similar. + +- [ ] **Step 3: Add the `ble generate` command and a `_payload_to_bytes` helper to `cli.py`** + +In `src/hubblenetwork/cli.py`, find the existing `ble` group definition and add the `generate` subcommand below it. First add the helper near `_parse_key`: + +```python +def _payload_to_bytes(payload_str: str, payload_format: str) -> bytes: + """Decode a payload string into bytes per the chosen format.""" + fmt = payload_format.lower() + if fmt == "hex": + return bytes.fromhex(payload_str) + if fmt == "base64": + return base64.b64decode(payload_str, validate=True) + if fmt == "string": + return payload_str.encode("utf-8") + raise click.UsageError(f"Unknown payload format: {payload_format}") +``` + +Then add the command (place it after the `ble_scan` definition): + +```python +@ble.command("generate") +@click.option("--key", required=True, help="Encryption key (hex or base64). 16 or 32 bytes.") +@click.option("--payload", required=True, help="Plaintext payload to encrypt.") +@click.option( + "--payload-format", + type=click.Choice(["hex", "base64", "string"], case_sensitive=False), + default="hex", + show_default=True, + help="Encoding of --payload.", +) +@click.option( + "--counter-mode", + type=click.Choice([UNIX_TIME, DEVICE_UPTIME], case_sensitive=False), + default=UNIX_TIME, + show_default=True, + help="EID counter mode (AES-CTR only).", +) +@click.option("--counter", type=int, default=None, help="time_counter (CTR) or counter index (EAX).") +@click.option("--seq-no", type=int, default=None, help="10-bit sequence number (AES-CTR only).") +@click.option("--nonce-salt", default=None, help="2-byte nonce salt as hex (AES-EAX only).") +@click.option("--period-exponent", type=int, default=0, show_default=True, help="EID rotation period exponent (AES-EAX only). 0..15.") +@click.option("--ingest", is_flag=True, help="POST the generated packet to the Hubble Cloud (requires HUBBLE_ORG_ID/HUBBLE_API_TOKEN).") +@click.option( + "--format", + "output_format", + type=click.Choice(["breakdown", "hex", "json"], case_sensitive=False), + default="breakdown", + show_default=True, + help="Output format.", +) +def ble_generate( + key: str, + payload: str, + payload_format: str, + counter_mode: str, + counter: Optional[int], + seq_no: Optional[int], + nonce_salt: Optional[str], + period_exponent: int, + ingest: bool, + output_format: str, +) -> None: + """Generate a Hubble BLE encrypted advertisement packet from a key and payload.""" + from hubblenetwork.crypto import encrypt, encrypt_eax + + try: + key_bytes = _parse_key(key) + except ValueError as e: + raise click.UsageError(str(e)) + + try: + payload_bytes = _payload_to_bytes(payload, payload_format) + except (ValueError, binascii.Error) as e: + raise click.UsageError(f"Invalid payload: {e}") + + if len(key_bytes) == 32: + # AES-CTR path + if nonce_salt is not None: + raise click.UsageError("--nonce-salt is AES-EAX only; not valid with a 32-byte key.") + try: + pkt = encrypt( + key_bytes, + payload_bytes, + time_counter=counter, + seq_no=seq_no, + counter_mode=counter_mode, + ) + except ValueError as e: + raise click.UsageError(str(e)) + elif len(key_bytes) == 16: + # AES-EAX path + if seq_no is not None: + raise click.UsageError("--seq-no is AES-CTR only; not valid with a 16-byte key.") + nonce_salt_bytes: Optional[bytes] = None + if nonce_salt is not None: + try: + nonce_salt_bytes = bytes.fromhex(nonce_salt) + except ValueError as e: + raise click.UsageError(f"Invalid --nonce-salt hex: {e}") + try: + pkt = encrypt_eax( + key_bytes, + payload_bytes, + counter=counter, + nonce_salt=nonce_salt_bytes, + period_exponent=period_exponent, + ) + except ValueError as e: + raise click.UsageError(str(e)) + else: + # Defensive — _parse_key already enforces 16/32 + raise click.UsageError(f"Unsupported key length: {len(key_bytes)}") + + # Output + fmt = output_format.lower() + if fmt == "hex": + click.echo(pkt.payload.hex()) + elif fmt == "json": + click.echo(_render_json(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode)) + else: # breakdown — placeholder for now, replaced in Task 7 + click.echo(_render_breakdown(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode)) + + if ingest: + from hubblenetwork import Organization + org = Organization( + org_id=_get_env_or_fail("HUBBLE_ORG_ID"), + api_token=_get_env_or_fail("HUBBLE_API_TOKEN"), + ) + org.ingest_packet(pkt) + click.secho("[INFO] Packet ingested.", fg="green") + + +def _render_breakdown(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode): + """Placeholder breakdown formatter (replaced in Task 7).""" + return pkt.payload.hex() + + +def _render_json(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode): + """Placeholder JSON formatter (replaced in Task 8).""" + return json.dumps({"service_data": pkt.payload.hex()}) +``` + +Note: `_get_env_or_fail` already exists in the file — confirm it's defined; if not, define it as: + +```python +def _get_env_or_fail(name: str) -> str: + value = os.environ.get(name) + if not value: + raise click.UsageError(f"Missing required env var: {name}") + return value +``` + +(Search `cli.py` first; only add if missing.) + +- [ ] **Step 4: Run tests and verify they pass** + +Run: `pytest tests/test_cli_generate.py -v` +Expected: 4 passed (2 round-trip, 2 validation). + +- [ ] **Step 5: Commit** + +```bash +git add src/hubblenetwork/cli.py tests/test_cli_generate.py +git commit -m "feat(cli): add 'ble generate' command with hex output and dispatch" +``` + +--- + +## Task 7: Implement `breakdown` formatter + +**Files:** +- Modify: `src/hubblenetwork/cli.py` +- Test: `tests/test_cli_generate.py` + +- [ ] **Step 1: Add tests for breakdown output** + +Append to `tests/test_cli_generate.py`: + +```python +class TestBleGenerateBreakdownFormat: + def test_aes_ctr_breakdown_includes_fields(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + # no --format → default breakdown + ], + ) + assert result.exit_code == 0, result.output + out = result.output + assert "AES-CTR" in out + assert "Service data" in out + assert "Hex:" in out + assert "Spaced:" in out + assert "Python:" in out + # The hex bytes appear somewhere in the output + # (we don't assert specific values — those are tested in round-trip tests) + + def test_aes_eax_breakdown_includes_fields(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--period-exponent", "0", + ], + ) + assert result.exit_code == 0, result.output + out = result.output + assert "AES-EAX" in out + assert "Nonce salt" in out + assert "Period exponent" in out + assert "Service data" in out +``` + +- [ ] **Step 2: Run tests and verify they fail** + +Run: `pytest tests/test_cli_generate.py::TestBleGenerateBreakdownFormat -v` +Expected: 2 failures (output doesn't contain expected strings — placeholder formatter). + +- [ ] **Step 3: Implement the breakdown formatter** + +Replace the `_render_breakdown` placeholder in `cli.py` with: + +```python +def _render_breakdown(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode): + raw = pkt.payload + keylen = len(key_bytes) + is_eax = (keylen == 16) + lines = [] + + if is_eax: + # Layout: header(1) | salt(2) | eid(8) | ciphertext | tag(4) + salt = raw[1:3] + eid_bytes = raw[3:11] + tag = raw[-4:] + ciphertext = raw[11:-4] + period_seconds = 1 << period_exponent + lines.append(f"Protocol: AES-EAX (v2)") + lines.append(f"Key length: {keylen} bytes") + lines.append(f"Counter: {counter if counter is not None else 0}") + lines.append(f"Period exponent: {period_exponent} (period = {period_seconds}s)") + lines.append(f"Nonce salt: {' '.join(f'0x{b:02x}' for b in salt)}") + lines.append(f"EID: 0x{pkt.eid:016x} (LE bytes: {eid_bytes.hex()})") + lines.append(f"Ciphertext: {' '.join(f'0x{b:02x}' for b in ciphertext) if ciphertext else '(empty)'}") + lines.append(f"Auth tag: {' '.join(f'0x{b:02x}' for b in tag)}") + else: + # Layout: header(2) | eid(4) | tag(4) | ciphertext + header = raw[0:2] + eid_bytes = raw[2:6] + tag = raw[6:10] + ciphertext = raw[10:] + used_seq = int.from_bytes(header, "big") & 0x3FF + lines.append(f"Protocol: AES-CTR (v0)") + lines.append(f"Key length: {keylen} bytes") + lines.append(f"Counter mode: {counter_mode.upper()}") + lines.append(f"Time counter: {counter if counter is not None else '(default)'}") + lines.append(f"Seq no: {used_seq}") + lines.append(f"EID: 0x{int.from_bytes(eid_bytes, 'big'):08x}") + lines.append(f"Auth tag: {' '.join(f'0x{b:02x}' for b in tag)}") + lines.append(f"Ciphertext: {' '.join(f'0x{b:02x}' for b in ciphertext) if ciphertext else '(empty)'}") + + lines.append("") + lines.append(f"Service data ({len(raw)} bytes):") + lines.append(f" Hex: {raw.hex()}") + lines.append(f" Spaced: {' '.join(f'{b:02x}' for b in raw)}") + py_repr = "b'" + "".join(f"\\x{b:02x}" for b in raw) + "'" + lines.append(f" Python: {py_repr}") + c_array = "{" + ", ".join(f"0x{b:02x}" for b in raw) + "}" + lines.append(f" C array: {c_array}") + return "\n".join(lines) +``` + +- [ ] **Step 4: Run tests and verify they pass** + +Run: `pytest tests/test_cli_generate.py -v` +Expected: All passed (round-trip + validation + breakdown). + +- [ ] **Step 5: Commit** + +```bash +git add src/hubblenetwork/cli.py tests/test_cli_generate.py +git commit -m "feat(cli): add breakdown output format for 'ble generate'" +``` + +--- + +## Task 8: Implement `json` formatter + +**Files:** +- Modify: `src/hubblenetwork/cli.py` +- Test: `tests/test_cli_generate.py` + +- [ ] **Step 1: Add tests for JSON output** + +Append to `tests/test_cli_generate.py`: + +```python +class TestBleGenerateJsonFormat: + def test_aes_ctr_json_shape(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "ab", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "json", + ], + ) + assert result.exit_code == 0, result.output + data = json.loads(result.output.strip()) + assert data["protocol"] == "aes_ctr" + assert data["protocol_version"] == 0 + assert data["key_length"] == 32 + assert data["counter_mode"] == "DEVICE_UPTIME" + assert data["time_counter"] == 5 + assert data["seq_no"] == 42 + assert "eid" in data + assert "auth_tag" in data + assert "ciphertext" in data + assert re.fullmatch(r"[0-9a-f]+", data["service_data"]) + + def test_aes_eax_json_shape(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "ab", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--period-exponent", "0", + "--format", "json", + ], + ) + assert result.exit_code == 0, result.output + data = json.loads(result.output.strip()) + assert data["protocol"] == "aes_eax" + assert data["protocol_version"] == 2 + assert data["key_length"] == 16 + assert data["counter"] == 0 + assert data["period_exponent"] == 0 + assert data["nonce_salt"] == "0001" + assert "eid" in data + assert "auth_tag" in data + assert "ciphertext" in data + assert "service_data" in data +``` + +- [ ] **Step 2: Run tests and verify they fail** + +Run: `pytest tests/test_cli_generate.py::TestBleGenerateJsonFormat -v` +Expected: 2 failures (placeholder JSON returns only `service_data`). + +- [ ] **Step 3: Implement the JSON formatter** + +Replace the `_render_json` placeholder in `cli.py` with: + +```python +def _render_json(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode): + raw = pkt.payload + keylen = len(key_bytes) + if keylen == 16: + # AES-EAX layout + salt = raw[1:3] + eid_bytes = raw[3:11] + tag = raw[-4:] + ciphertext = raw[11:-4] + return json.dumps({ + "protocol": "aes_eax", + "protocol_version": 2, + "key_length": keylen, + "counter": counter if counter is not None else 0, + "period_exponent": period_exponent, + "nonce_salt": salt.hex(), + "eid": eid_bytes.hex(), + "ciphertext": ciphertext.hex(), + "auth_tag": tag.hex(), + "service_data": raw.hex(), + }) + # AES-CTR layout + header = raw[0:2] + eid_bytes = raw[2:6] + tag = raw[6:10] + ciphertext = raw[10:] + used_seq = int.from_bytes(header, "big") & 0x3FF + return json.dumps({ + "protocol": "aes_ctr", + "protocol_version": 0, + "key_length": keylen, + "counter_mode": counter_mode.upper(), + "time_counter": counter, + "seq_no": used_seq, + "eid": eid_bytes.hex(), + "ciphertext": ciphertext.hex(), + "auth_tag": tag.hex(), + "service_data": raw.hex(), + }) +``` + +- [ ] **Step 4: Run tests and verify they pass** + +Run: `pytest tests/test_cli_generate.py -v` +Expected: All passed. + +- [ ] **Step 5: Commit** + +```bash +git add src/hubblenetwork/cli.py tests/test_cli_generate.py +git commit -m "feat(cli): add json output format for 'ble generate'" +``` + +--- + +## Task 9: Implement `--ingest` flag + +**Files:** +- Modify: (CLI already has the flag wired in Task 6 — this task adds tests) +- Test: `tests/test_cli_generate.py` + +- [ ] **Step 1: Add tests for `--ingest`** + +Append to `tests/test_cli_generate.py`: + +```python +from unittest.mock import patch, MagicMock + + +class TestBleGenerateIngest: + def test_ingest_calls_organization(self, monkeypatch): + monkeypatch.setenv("HUBBLE_ORG_ID", "test-org") + monkeypatch.setenv("HUBBLE_API_TOKEN", "test-token") + + with patch("hubblenetwork.cli.Organization") as MockOrg: + mock_instance = MagicMock() + MockOrg.return_value = mock_instance + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "ab", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "hex", + "--ingest", + ], + ) + + assert result.exit_code == 0, result.output + MockOrg.assert_called_once_with(org_id="test-org", api_token="test-token") + mock_instance.ingest_packet.assert_called_once() + # The call's first positional arg is the EncryptedPacket + ingested_pkt = mock_instance.ingest_packet.call_args[0][0] + assert isinstance(ingested_pkt, EncryptedPacket) + assert ingested_pkt.protocol_version == 0 + + def test_ingest_aes_eax(self, monkeypatch): + monkeypatch.setenv("HUBBLE_ORG_ID", "test-org") + monkeypatch.setenv("HUBBLE_API_TOKEN", "test-token") + + with patch("hubblenetwork.cli.Organization") as MockOrg: + mock_instance = MagicMock() + MockOrg.return_value = mock_instance + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "ab", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--format", "hex", + "--ingest", + ], + ) + + assert result.exit_code == 0, result.output + mock_instance.ingest_packet.assert_called_once() + ingested_pkt = mock_instance.ingest_packet.call_args[0][0] + assert ingested_pkt.protocol_version == 2 + + def test_ingest_without_credentials_fails(self, monkeypatch): + monkeypatch.delenv("HUBBLE_ORG_ID", raising=False) + monkeypatch.delenv("HUBBLE_API_TOKEN", raising=False) + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "ab", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "hex", + "--ingest", + ], + ) + assert result.exit_code != 0 + assert "HUBBLE_ORG_ID" in result.output +``` + +- [ ] **Step 2: Add `Organization` import to `cli.py` (top-level if not already)** + +`hubblenetwork.cli` already imports `Organization` at the top of the file (`from hubblenetwork import Organization`). The `with patch("hubblenetwork.cli.Organization")` line in the test depends on this. Confirm with: + +```bash +grep "from hubblenetwork import Organization" src/hubblenetwork/cli.py +``` + +Expected: 1 match. + +If missing, add it to the imports near the top of the file. + +Then update the `ble_generate` function to use the module-level import (remove the `from hubblenetwork import Organization` inside the function body — patch needs the module-level reference): + +In `ble_generate`, replace: + +```python + if ingest: + from hubblenetwork import Organization + org = Organization( +``` + +with: + +```python + if ingest: + org = Organization( +``` + +- [ ] **Step 3: Run tests and verify they pass** + +Run: `pytest tests/test_cli_generate.py::TestBleGenerateIngest -v` +Expected: 3 passed. + +- [ ] **Step 4: Run the full test suite** + +Run: `pytest tests/test_packet_generation.py tests/test_cli_generate.py -v` +Expected: all tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/hubblenetwork/cli.py tests/test_cli_generate.py +git commit -m "test(cli): cover --ingest flag for 'ble generate'" +``` + +--- + +## Task 10: Update README with example + +**Files:** +- Modify: `README.md` + +- [ ] **Step 1: Locate the right section in `README.md`** + +Run: `grep -n "ble scan" /Users/paulbuckley/projects/pyhubblenetwork/README.md | head -5` +Find the section showing CLI examples (likely under a "CLI" or "Usage" heading). + +- [ ] **Step 2: Add a `ble generate` example after the `ble scan` example** + +Insert the following block immediately after the existing `ble scan` example block: + +```markdown +### Generate a packet from a key + +Produce a well-formed encrypted Hubble BLE packet from a key + payload. Useful +for matching observed hardware emissions against expected output, or for seeding +the cloud with deterministic test data. + +```bash +# AES-256-CTR (32-byte key) +hubblenetwork ble generate \ + --key "0000000000000000000000000000000000000000000000000000000000000000" \ + --payload "deadbeef" \ + --payload-format hex \ + --counter-mode DEVICE_UPTIME \ + --counter 5 \ + --seq-no 42 + +# AES-128-EAX (16-byte key) +hubblenetwork ble generate \ + --key "00000000000000000000000000000000" \ + --payload "deadbeef" \ + --payload-format hex \ + --counter 0 \ + --nonce-salt 0001 + +# Pipe the raw bytes for scripting +hubblenetwork ble generate --key ... --payload ab --payload-format hex --format hex + +# Send the generated packet to the cloud (requires HUBBLE_ORG_ID, HUBBLE_API_TOKEN) +hubblenetwork ble generate --key ... --payload ab --payload-format hex --ingest +``` +``` + +- [ ] **Step 3: Verify markdown renders correctly** + +Run: `head -200 README.md | tail -80` (or open in your editor) to spot-check formatting. + +- [ ] **Step 4: Commit** + +```bash +git add README.md +git commit -m "docs(readme): document 'ble generate' command" +``` + +--- + +## Task 11: Run full test suite + lint + +**Files:** none + +- [ ] **Step 1: Run full pytest** + +Run: `pytest` +Expected: all tests pass; no failures introduced in unrelated files. + +- [ ] **Step 2: Run ruff** + +Run: `ruff check src` +Expected: clean (no new violations). + +- [ ] **Step 3: If lint fails, fix issues and re-run** + +Common things to fix: +- Unused imports — delete them. +- Line-length warnings — wrap long strings or function signatures. + +- [ ] **Step 4: Commit any lint fixes (if needed)** + +```bash +git add src/ +git commit -m "chore: lint fixes for packet generation feature" +``` + +- [ ] **Step 5: Final verification commit message review** + +Run: `git log --oneline | head -15` +Confirm the commits read like a coherent feature series. From feb24f876facdddf96f67dc0743fcde0448e3ca1 Mon Sep 17 00:00:00 2001 From: Paul Buckley Date: Thu, 30 Apr 2026 15:31:01 -0700 Subject: [PATCH 3/4] feat(crypto,cli): add Hubble packet generation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the ability to generate AES-CTR (v0) and AES-EAX (v2) Hubble BLE advertisement packets locally from a key and payload — the inverse of the existing decrypt path. Generated packets can optionally be posted to the Hubble Cloud or printed as raw bytes for matching against hardware emissions. SDK: - encrypt(key, payload, *, time_counter, seq_no, counter_mode) — AES-CTR - encrypt_eax(key, payload, *, counter, nonce_salt, period_exponent) — AES-EAX - Both return EncryptedPacket so org.ingest_packet works for either protocol. - _generate_ctr_eid helper mirrors firmware hubble_internal_device_id_get. - EAX EID derivation matches decrypt_eax exactly (key_0 from counter=0) to guarantee byte-for-byte round-trip even at high effective counters. CLI: hubblenetwork ble generate - Flags: --key, --payload(/format), --counter-mode, --counter, --seq-no, --nonce-salt, --period-exponent, --ingest, --format breakdown|hex|json - Dispatches by key length (32 → CTR, 16 → EAX) with mutually-exclusive flag validation per protocol. - Three output formats: structured breakdown (default), raw hex, and JSON. Tests: 44 new tests covering round-trip via decrypt/decrypt_eax, validation paths, header layout, EID embedding, format dispatch, and the --ingest path (with mocked Organization). Spec: docs/superpowers/specs/2026-04-30-packet-generation-design.md Plan: docs/superpowers/plans/2026-04-30-packet-generation.md --- .github/workflows/integration-tests.yml | 6 +- README.md | 31 +++ src/hubblenetwork/__init__.py | 4 +- src/hubblenetwork/cli.py | 206 ++++++++++++++ src/hubblenetwork/crypto.py | 149 +++++++++- tests/test_cli_generate.py | 289 ++++++++++++++++++++ tests/test_packet_generation.py | 252 +++++++++++++++++ tests/test_packet_generation_integration.py | 162 +++++++++++ 8 files changed, 1094 insertions(+), 5 deletions(-) create mode 100644 tests/test_cli_generate.py create mode 100644 tests/test_packet_generation.py create mode 100644 tests/test_packet_generation_integration.py diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8862ff6..205b752 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -31,15 +31,15 @@ jobs: HUBBLE_PROD_ORG_ID: ${{ secrets.HUBBLE_PROD_ORG_ID }} HUBBLE_PROD_API_TOKEN: ${{ secrets.HUBBLE_PROD_API_TOKEN }} run: | - pytest tests/test_cloud_integration.py::TestProdEnvironment -v --tb=short + pytest -m integration -v --tb=short - name: Run TESTING integration tests env: HUBBLE_TESTING_ORG_ID: ${{ secrets.HUBBLE_TESTING_ORG_ID }} HUBBLE_TESTING_API_TOKEN: ${{ secrets.HUBBLE_TESTING_API_TOKEN }} run: | - pytest tests/test_cloud_integration.py::TestTestingEnvironment -v --tb=short + pytest -m integration -v --tb=short - name: Run invalid credentials test run: | - pytest tests/test_cloud_integration.py::TestInvalidCredentials -v --tb=short + pytest -m integration -v --tb=short diff --git a/README.md b/README.md index 57d6245..7e5aef9 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,37 @@ hubblenetwork ble scan --key "base64key=" --counter-mode DEVICE_UPTIME # counte hubblenetwork org get-packets --payload-format string ``` +### Generate a packet from a key + +Produce a well-formed encrypted Hubble BLE packet from a key + payload. Useful +for matching observed hardware emissions against expected output, or for seeding +the cloud with deterministic test data. + +```bash +# AES-256-CTR (32-byte key) +hubblenetwork ble generate \ + --key "0000000000000000000000000000000000000000000000000000000000000000" \ + --payload "deadbeef" \ + --payload-format hex \ + --counter-mode DEVICE_UPTIME \ + --counter 5 \ + --seq-no 42 + +# AES-128-EAX (16-byte key) +hubblenetwork ble generate \ + --key "00000000000000000000000000000000" \ + --payload "deadbeef" \ + --payload-format hex \ + --counter 0 \ + --nonce-salt 0001 + +# Pipe the raw bytes for scripting +hubblenetwork ble generate --key ... --payload ab --payload-format hex --format hex + +# Send the generated packet to the cloud (requires HUBBLE_ORG_ID, HUBBLE_API_TOKEN) +hubblenetwork ble generate --key ... --payload ab --payload-format hex --ingest +``` + ### Payload format option Commands that output packet data (`ble scan`, `ble detect`, `org get-packets`) support the `--payload-format` flag to control how payloads are displayed: diff --git a/src/hubblenetwork/__init__.py b/src/hubblenetwork/__init__.py index 78475d4..03cabfb 100644 --- a/src/hubblenetwork/__init__.py +++ b/src/hubblenetwork/__init__.py @@ -12,7 +12,7 @@ from .packets import Location, EncryptedPacket, UnencryptedPacket, DecryptedPacket, SatellitePacket, AesEaxPacket, UnknownPacket from .device import Device from .org import Organization -from .crypto import decrypt, decrypt_eax, UNIX_TIME, DEVICE_UPTIME +from .crypto import decrypt, decrypt_eax, encrypt, encrypt_eax, UNIX_TIME, DEVICE_UPTIME from .errors import InvalidCredentialsError from .cloud import Credentials, Environment @@ -23,6 +23,8 @@ "sat", "decrypt", "decrypt_eax", + "encrypt", + "encrypt_eax", "UNIX_TIME", "DEVICE_UPTIME", "SatellitePacket", diff --git a/src/hubblenetwork/cli.py b/src/hubblenetwork/cli.py index 9de9e4d..58a9a8c 100644 --- a/src/hubblenetwork/cli.py +++ b/src/hubblenetwork/cli.py @@ -59,6 +59,18 @@ def _parse_key(key_str: str) -> bytes: return key_bytes +def _payload_to_bytes(payload_str: str, payload_format: str) -> bytes: + """Decode a payload string into bytes per the chosen format.""" + fmt = payload_format.lower() + if fmt == "hex": + return bytes.fromhex(payload_str) + if fmt == "base64": + return base64.b64decode(payload_str, validate=True) + if fmt == "string": + return payload_str.encode("utf-8") + raise click.UsageError(f"Unknown payload format: {payload_format}") + + def _validate_info(msg): click.secho("[INFO] ", fg="cyan", bold=True, nl=False) click.echo(msg + "... ", nl=False) @@ -1249,6 +1261,200 @@ def _explicit(name: str) -> bool: ) +@ble.command("generate") +@click.option("--key", required=True, help="Encryption key (hex or base64). 16 or 32 bytes.") +@click.option("--payload", required=True, help="Plaintext payload to encrypt.") +@click.option( + "--payload-format", + type=click.Choice(["hex", "base64", "string"], case_sensitive=False), + default="hex", + show_default=True, + help="Encoding of --payload.", +) +@click.option( + "--counter-mode", + type=click.Choice([UNIX_TIME, DEVICE_UPTIME], case_sensitive=False), + default=UNIX_TIME, + show_default=True, + help="EID counter mode (AES-CTR only).", +) +@click.option("--counter", type=int, default=None, help="time_counter (CTR) or counter index (EAX).") +@click.option("--seq-no", type=int, default=None, help="10-bit sequence number (AES-CTR only).") +@click.option("--nonce-salt", default=None, help="2-byte nonce salt as hex (AES-EAX only).") +@click.option("--period-exponent", type=int, default=0, show_default=True, help="EID rotation period exponent (AES-EAX only). 0..15.") +@click.option("--ingest", is_flag=True, help="POST the generated packet to the Hubble Cloud (requires HUBBLE_ORG_ID/HUBBLE_API_TOKEN).") +@click.option( + "--format", + "output_format", + type=click.Choice(["breakdown", "hex", "json"], case_sensitive=False), + default="breakdown", + show_default=True, + help="Output format.", +) +def ble_generate( + key: str, + payload: str, + payload_format: str, + counter_mode: str, + counter: Optional[int], + seq_no: Optional[int], + nonce_salt: Optional[str], + period_exponent: int, + ingest: bool, + output_format: str, +) -> None: + """Generate a Hubble BLE encrypted advertisement packet from a key and payload.""" + from hubblenetwork.crypto import encrypt, encrypt_eax + + try: + key_bytes = _parse_key(key) + except ValueError as e: + raise click.UsageError(str(e)) + + try: + payload_bytes = _payload_to_bytes(payload, payload_format) + except (ValueError, binascii.Error) as e: + raise click.UsageError(f"Invalid payload: {e}") + + if len(key_bytes) == 32: + if nonce_salt is not None: + raise click.UsageError("--nonce-salt is AES-EAX only; not valid with a 32-byte key.") + try: + pkt = encrypt( + key_bytes, + payload_bytes, + time_counter=counter, + seq_no=seq_no, + counter_mode=counter_mode, + ) + except ValueError as e: + raise click.UsageError(str(e)) + elif len(key_bytes) == 16: + if seq_no is not None: + raise click.UsageError("--seq-no is AES-CTR only; not valid with a 16-byte key.") + nonce_salt_bytes: Optional[bytes] = None + if nonce_salt is not None: + try: + nonce_salt_bytes = bytes.fromhex(nonce_salt) + except ValueError as e: + raise click.UsageError(f"Invalid --nonce-salt hex: {e}") + try: + pkt = encrypt_eax( + key_bytes, + payload_bytes, + counter=counter, + nonce_salt=nonce_salt_bytes, + period_exponent=period_exponent, + ) + except ValueError as e: + raise click.UsageError(str(e)) + else: + raise click.UsageError(f"Unsupported key length: {len(key_bytes)}") + + fmt = output_format.lower() + if fmt == "hex": + click.echo(pkt.payload.hex()) + elif fmt == "json": + click.echo(_render_json(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode)) + else: # breakdown — placeholder for now, replaced in Task 7 + click.echo(_render_breakdown(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode)) + + if ingest: + org = Organization( + org_id=_get_env_or_fail("HUBBLE_ORG_ID"), + api_token=_get_env_or_fail("HUBBLE_API_TOKEN"), + ) + org.ingest_packet(pkt) + click.secho("[INFO] Packet ingested.", fg="green") + + +def _render_breakdown(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode): + raw = pkt.payload + keylen = len(key_bytes) + is_eax = (keylen == 16) + lines = [] + + if is_eax: + # Layout: header(1) | salt(2) | eid(8) | ciphertext | tag(4) + salt = raw[1:3] + eid_bytes = raw[3:11] + tag = raw[-4:] + ciphertext = raw[11:-4] + period_seconds = 1 << period_exponent + lines.append("Protocol: AES-EAX (v2)") + lines.append(f"Key length: {keylen} bytes") + lines.append(f"Counter: {counter if counter is not None else 0}") + lines.append(f"Period exponent: {period_exponent} (period = {period_seconds}s)") + lines.append(f"Nonce salt: {' '.join(f'0x{b:02x}' for b in salt)}") + lines.append(f"EID: 0x{pkt.eid:016x} (LE bytes: {eid_bytes.hex()})") + lines.append(f"Ciphertext: {' '.join(f'0x{b:02x}' for b in ciphertext) if ciphertext else '(empty)'}") + lines.append(f"Auth tag: {' '.join(f'0x{b:02x}' for b in tag)}") + else: + # Layout: header(2) | eid(4) | tag(4) | ciphertext + header = raw[0:2] + eid_bytes = raw[2:6] + tag = raw[6:10] + ciphertext = raw[10:] + used_seq = int.from_bytes(header, "big") & 0x3FF + lines.append("Protocol: AES-CTR (v0)") + lines.append(f"Key length: {keylen} bytes") + lines.append(f"Counter mode: {counter_mode.upper()}") + lines.append(f"Time counter: {counter if counter is not None else '(default)'}") + lines.append(f"Seq no: {used_seq}") + lines.append(f"EID: 0x{int.from_bytes(eid_bytes, 'big'):08x}") + lines.append(f"Auth tag: {' '.join(f'0x{b:02x}' for b in tag)}") + lines.append(f"Ciphertext: {' '.join(f'0x{b:02x}' for b in ciphertext) if ciphertext else '(empty)'}") + + lines.append("") + lines.append(f"Service data ({len(raw)} bytes):") + lines.append(f" Hex: {raw.hex()}") + lines.append(f" Spaced: {' '.join(f'{b:02x}' for b in raw)}") + py_repr = "b'" + "".join(f"\\x{b:02x}" for b in raw) + "'" + lines.append(f" Python: {py_repr}") + c_array = "{" + ", ".join(f"0x{b:02x}" for b in raw) + "}" + lines.append(f" C array: {c_array}") + return "\n".join(lines) + + +def _render_json(pkt, key_bytes, counter, seq_no, nonce_salt, period_exponent, counter_mode): + raw = pkt.payload + keylen = len(key_bytes) + if keylen == 16: + salt = raw[1:3] + eid_bytes = raw[3:11] + tag = raw[-4:] + ciphertext = raw[11:-4] + return json.dumps({ + "protocol": "aes_eax", + "protocol_version": 2, + "key_length": keylen, + "counter": counter if counter is not None else 0, + "period_exponent": period_exponent, + "nonce_salt": salt.hex(), + "eid": eid_bytes.hex(), + "ciphertext": ciphertext.hex(), + "auth_tag": tag.hex(), + "service_data": raw.hex(), + }) + header = raw[0:2] + eid_bytes = raw[2:6] + tag = raw[6:10] + ciphertext = raw[10:] + used_seq = int.from_bytes(header, "big") & 0x3FF + return json.dumps({ + "protocol": "aes_ctr", + "protocol_version": 0, + "key_length": keylen, + "counter_mode": counter_mode.upper(), + "time_counter": counter, + "seq_no": used_seq, + "eid": eid_bytes.hex(), + "ciphertext": ciphertext.hex(), + "auth_tag": tag.hex(), + "service_data": raw.hex(), + }) + + @ble.command("check-time") @click.option( "--timeout", diff --git a/src/hubblenetwork/crypto.py b/src/hubblenetwork/crypto.py index 88ada71..8692c30 100644 --- a/src/hubblenetwork/crypto.py +++ b/src/hubblenetwork/crypto.py @@ -1,11 +1,14 @@ from __future__ import annotations +import secrets +import struct +import time as _time from typing import Optional from Crypto.Cipher import AES from Crypto.Hash import CMAC from Crypto.Protocol.KDF import SP800_108_Counter from datetime import datetime, timezone -from .packets import EncryptedPacket, DecryptedPacket, AesEaxPacket +from .packets import EncryptedPacket, DecryptedPacket, AesEaxPacket, Location UNIX_TIME = "UNIX_TIME" DEVICE_UPTIME = "DEVICE_UPTIME" @@ -14,6 +17,11 @@ _HUBBLE_AES_NONCE_SIZE = 12 _HUBBLE_AES_TAG_SIZE = 4 +# Maximum customer payload sizes (matches firmware HUBBLE_BLE_MAX_DATA_LEN +# and the AES-EAX 0-9 byte ciphertext cap documented in packets.py). +MAX_CTR_PAYLOAD = 13 +MAX_EAX_PAYLOAD = 9 + class ParsedPacket: """Parsed components from an EncryptedPacket's BLE advertisement payload.""" @@ -60,6 +68,17 @@ def _get_auth_tag(key: bytes, ciphertext: bytes) -> bytes: return computed_cmac[:_HUBBLE_AES_TAG_SIZE] +def _generate_ctr_eid(key: bytes, time_counter: int, keylen: int) -> bytes: + """Generate the 4-byte AES-CTR EID embedded at offset 2-6 of v0 service data. + + Mirrors firmware `hubble_internal_device_id_get`: a two-step KBKDF chain. + First derives a per-period DeviceKey from the master key, then derives the + 4-byte DeviceID with seq_no hardcoded to 0. + """ + device_key = _generate_kdf_key(key, keylen, "DeviceKey", time_counter) + return _generate_kdf_key(device_key, 4, "DeviceID", 0) + + def _aes_decrypt(key: bytes, session_nonce: bytes, ciphertext: bytes) -> bytes: cipher = AES.new(key, AES.MODE_CTR, nonce=session_nonce) @@ -204,6 +223,134 @@ def decrypt( return None +def encrypt( + key: bytes, + payload: bytes, + *, + time_counter: Optional[int] = None, + seq_no: Optional[int] = None, + counter_mode: str = UNIX_TIME, +) -> EncryptedPacket: + """Generate an AES-CTR (protocol v0) encrypted Hubble BLE packet. + + The returned EncryptedPacket's `.payload` field holds the full BLE + service-data byte string: header(2) | EID(4) | auth_tag(4) | ciphertext. + """ + counter_mode = counter_mode.upper() + if counter_mode not in _VALID_COUNTER_MODES: + raise ValueError( + f"counter_mode must be one of {sorted(_VALID_COUNTER_MODES)}, got {counter_mode!r}" + ) + + keylen = len(key) + if keylen not in (16, 32): + raise ValueError(f"key must be 16 or 32 bytes, got {keylen}") + + if len(payload) > MAX_CTR_PAYLOAD: + raise ValueError( + f"payload too long for AES-CTR: {len(payload)} > {MAX_CTR_PAYLOAD}" + ) + + if seq_no is None: + seq_no = secrets.randbelow(1 << 10) + if not (0 <= seq_no < (1 << 10)): + raise ValueError(f"seq_no must be in 0..1023, got {seq_no}") + + if time_counter is None: + if counter_mode == UNIX_TIME: + time_counter = int(_time.time()) // 86400 + else: + time_counter = 0 + + enc_key = _get_encryption_key(key, time_counter, seq_no, keylen=keylen) + nonce = _get_nonce(key, time_counter, seq_no, keylen=keylen) + ciphertext = AES.new(enc_key, AES.MODE_CTR, nonce=nonce).encrypt(payload) + auth_tag = _get_auth_tag(enc_key, ciphertext) + eid_bytes = _generate_ctr_eid(key, time_counter, keylen=keylen) + + # Header: top 6 bits = version (0 for v0), bottom 10 bits = seq_no + header_int = (0 << 10) | (seq_no & 0x3FF) + header = header_int.to_bytes(2, "big") + + service_data = header + eid_bytes + auth_tag + ciphertext + + return EncryptedPacket( + timestamp=int(datetime.now(timezone.utc).timestamp()), + location=Location(lat=90, lon=0, fake=True), + payload=service_data, + rssi=0, + protocol_version=0, + eid=int.from_bytes(eid_bytes, "big"), + auth_tag=auth_tag, + ) + + +def encrypt_eax( + key: bytes, + payload: bytes, + *, + counter: Optional[int] = None, + nonce_salt: Optional[bytes] = None, + period_exponent: int = 0, +) -> EncryptedPacket: + """Generate an AES-EAX (protocol v2) encrypted Hubble BLE packet. + + Returns an EncryptedPacket whose `.payload` is the full BLE service-data + byte string: header(1) | nonce_salt(2) | eid_le(8) | ciphertext | tag(4). + Returning EncryptedPacket (not AesEaxPacket) keeps Organization.ingest_packet + working without modification. + """ + if len(key) != 16: + raise ValueError(f"AES-EAX requires a 16-byte key, got {len(key)}") + if len(payload) > MAX_EAX_PAYLOAD: + raise ValueError( + f"payload too long for AES-EAX: {len(payload)} > {MAX_EAX_PAYLOAD}" + ) + if not (0 <= period_exponent <= 15): + raise ValueError(f"period_exponent must be 0..15, got {period_exponent}") + + if counter is None: + counter = 0 + if nonce_salt is None: + nonce_salt = secrets.token_bytes(2) + elif len(nonce_salt) != 2: + raise ValueError(f"nonce_salt must be exactly 2 bytes, got {len(nonce_salt)}") + + effective_counter = counter * (1 << period_exponent) + + # Compute EID using the SAME formula decrypt_eax uses (key_0 derived from + # counter=0). Inlined to guarantee byte-for-byte round-trip even when + # effective_counter ≥ 65536, where _generate_eid would diverge. + key_0 = _derive_eid_key(key, 0) + msg2 = ( + b"\x00" * 11 + + period_exponent.to_bytes(1, "big") + + effective_counter.to_bytes(4, "big") + ) + eid_block = AES.new(key_0, AES.MODE_ECB).encrypt(msg2) + eid_int = int.from_bytes(eid_block[0:8], "big") + + # AES-EAX nonce: counter (BE 4 bytes) || nonce_salt (2 bytes) + nonce = effective_counter.to_bytes(4, "big") + nonce_salt + cipher = AES.new(key, AES.MODE_EAX, mac_len=4, nonce=nonce) + ciphertext, auth_tag = cipher.encrypt_and_digest(payload) + + # Header byte: version (top 6 bits) shifted up; bottom 2 bits unused for v2 + header = bytes([2 << 2]) + eid_le = struct.pack(" Optional[int]: diff --git a/tests/test_cli_generate.py b/tests/test_cli_generate.py new file mode 100644 index 0000000..3760340 --- /dev/null +++ b/tests/test_cli_generate.py @@ -0,0 +1,289 @@ +"""Tests for `hubblenetwork ble generate` CLI command.""" + +import json +import re +from unittest.mock import patch, MagicMock +from click.testing import CliRunner + +from hubblenetwork.cli import cli +from hubblenetwork.crypto import decrypt, decrypt_eax, DEVICE_UPTIME +from hubblenetwork.packets import EncryptedPacket, AesEaxPacket, Location +from hubblenetwork import ble as ble_mod + + +_FAKE_LOCATION = Location(lat=90, lon=0, fake=True) + + +class TestBleGenerateHexFormat: + def test_aes_ctr_hex_round_trips(self): + runner = CliRunner() + key_hex = "00" * 32 + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", key_hex, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "hex", + ], + ) + assert result.exit_code == 0, result.output + hex_out = result.output.strip() + assert re.fullmatch(r"[0-9a-fA-F]+", hex_out) + raw = bytes.fromhex(hex_out) + pkt = EncryptedPacket( + timestamp=0, location=_FAKE_LOCATION, payload=raw, rssi=0, + ) + decrypted = decrypt(bytes(32), pkt, counter_mode=DEVICE_UPTIME) + assert decrypted is not None + assert decrypted.payload == bytes.fromhex("deadbeef") + assert decrypted.counter == 5 + assert decrypted.sequence == 42 + + def test_aes_eax_hex_round_trips(self): + runner = CliRunner() + key_hex = "00" * 16 + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", key_hex, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--period-exponent", "0", + "--format", "hex", + ], + ) + assert result.exit_code == 0, result.output + raw = bytes.fromhex(result.output.strip()) + parsed = ble_mod._make_packet(raw, rssi=0) + assert isinstance(parsed, AesEaxPacket) + decrypted = decrypt_eax(bytes(16), parsed, period_exponent=0) + assert decrypted is not None + assert decrypted.payload == bytes.fromhex("deadbeef") + + +class TestBleGenerateValidation: + def test_seq_no_with_aes_eax_key_rejected(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "00", + "--payload-format", "hex", + "--seq-no", "1", + "--format", "hex", + ], + ) + assert result.exit_code != 0 + assert "AES-CTR" in result.output or "16-byte" in result.output + + def test_nonce_salt_with_aes_ctr_key_rejected(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "00", + "--payload-format", "hex", + "--nonce-salt", "0001", + "--format", "hex", + ], + ) + assert result.exit_code != 0 + assert "AES-EAX" in result.output or "32-byte" in result.output + + +class TestBleGenerateBreakdownFormat: + def test_aes_ctr_breakdown_includes_fields(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + # no --format → default breakdown + ], + ) + assert result.exit_code == 0, result.output + out = result.output + assert "AES-CTR" in out + assert "Service data" in out + assert "Hex:" in out + assert "Spaced:" in out + assert "Python:" in out + + def test_aes_eax_breakdown_includes_fields(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "deadbeef", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--period-exponent", "0", + ], + ) + assert result.exit_code == 0, result.output + out = result.output + assert "AES-EAX" in out + assert "Nonce salt" in out + assert "Period exponent" in out + assert "Service data" in out + + +class TestBleGenerateJsonFormat: + def test_aes_ctr_json_shape(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "ab", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "json", + ], + ) + assert result.exit_code == 0, result.output + data = json.loads(result.output.strip()) + assert data["protocol"] == "aes_ctr" + assert data["protocol_version"] == 0 + assert data["key_length"] == 32 + assert data["counter_mode"] == "DEVICE_UPTIME" + assert data["time_counter"] == 5 + assert data["seq_no"] == 42 + assert "eid" in data + assert "auth_tag" in data + assert "ciphertext" in data + assert re.fullmatch(r"[0-9a-f]+", data["service_data"]) + + def test_aes_eax_json_shape(self): + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "ab", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--period-exponent", "0", + "--format", "json", + ], + ) + assert result.exit_code == 0, result.output + data = json.loads(result.output.strip()) + assert data["protocol"] == "aes_eax" + assert data["protocol_version"] == 2 + assert data["key_length"] == 16 + assert data["counter"] == 0 + assert data["period_exponent"] == 0 + assert data["nonce_salt"] == "0001" + assert "eid" in data + assert "auth_tag" in data + assert "ciphertext" in data + assert "service_data" in data + + +class TestBleGenerateIngest: + def test_ingest_calls_organization(self, monkeypatch): + monkeypatch.setenv("HUBBLE_ORG_ID", "test-org") + monkeypatch.setenv("HUBBLE_API_TOKEN", "test-token") + + with patch("hubblenetwork.cli.Organization") as MockOrg: + mock_instance = MagicMock() + MockOrg.return_value = mock_instance + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "ab", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "hex", + "--ingest", + ], + ) + + assert result.exit_code == 0, result.output + MockOrg.assert_called_once_with(org_id="test-org", api_token="test-token") + mock_instance.ingest_packet.assert_called_once() + ingested_pkt = mock_instance.ingest_packet.call_args[0][0] + assert isinstance(ingested_pkt, EncryptedPacket) + assert ingested_pkt.protocol_version == 0 + + def test_ingest_aes_eax(self, monkeypatch): + monkeypatch.setenv("HUBBLE_ORG_ID", "test-org") + monkeypatch.setenv("HUBBLE_API_TOKEN", "test-token") + + with patch("hubblenetwork.cli.Organization") as MockOrg: + mock_instance = MagicMock() + MockOrg.return_value = mock_instance + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 16, + "--payload", "ab", + "--payload-format", "hex", + "--counter", "0", + "--nonce-salt", "0001", + "--format", "hex", + "--ingest", + ], + ) + + assert result.exit_code == 0, result.output + mock_instance.ingest_packet.assert_called_once() + ingested_pkt = mock_instance.ingest_packet.call_args[0][0] + assert ingested_pkt.protocol_version == 2 + + def test_ingest_without_credentials_fails(self, monkeypatch): + monkeypatch.delenv("HUBBLE_ORG_ID", raising=False) + monkeypatch.delenv("HUBBLE_API_TOKEN", raising=False) + runner = CliRunner() + result = runner.invoke( + cli, + [ + "ble", "generate", + "--key", "00" * 32, + "--payload", "ab", + "--payload-format", "hex", + "--counter-mode", "DEVICE_UPTIME", + "--counter", "5", + "--seq-no", "42", + "--format", "hex", + "--ingest", + ], + ) + assert result.exit_code != 0 + assert "HUBBLE_ORG_ID" in result.output diff --git a/tests/test_packet_generation.py b/tests/test_packet_generation.py new file mode 100644 index 0000000..9f616b4 --- /dev/null +++ b/tests/test_packet_generation.py @@ -0,0 +1,252 @@ +"""Tests for packet generation (encrypt / encrypt_eax).""" + +from hubblenetwork.crypto import _generate_ctr_eid + + +class TestGenerateCtrEid: + def test_returns_4_bytes(self): + key = bytes(range(32)) + eid = _generate_ctr_eid(key, time_counter=12345, keylen=32) + assert isinstance(eid, bytes) + assert len(eid) == 4 + + def test_deterministic(self): + key = bytes(range(32)) + a = _generate_ctr_eid(key, time_counter=12345, keylen=32) + b = _generate_ctr_eid(key, time_counter=12345, keylen=32) + assert a == b + + def test_changes_with_time_counter(self): + key = bytes(range(32)) + a = _generate_ctr_eid(key, time_counter=12345, keylen=32) + b = _generate_ctr_eid(key, time_counter=12346, keylen=32) + assert a != b + + def test_aes_128_key(self): + key = bytes(range(16)) + eid = _generate_ctr_eid(key, time_counter=0, keylen=16) + assert len(eid) == 4 + + +import pytest +from hubblenetwork.crypto import ( + encrypt, + decrypt, + UNIX_TIME, + DEVICE_UPTIME, + MAX_CTR_PAYLOAD, +) + + +class TestEncryptCtr: + def test_round_trip_aes_256(self): + key = bytes(range(32)) + plaintext = b"hello world" + pkt = encrypt(key, plaintext, time_counter=7, seq_no=42, counter_mode=DEVICE_UPTIME) + result = decrypt(key, pkt, counter_mode=DEVICE_UPTIME) + assert result is not None + assert result.payload == plaintext + assert result.counter == 7 + assert result.sequence == 42 + + def test_round_trip_aes_128(self): + key = bytes(range(16)) + plaintext = b"hi" + pkt = encrypt(key, plaintext, time_counter=3, seq_no=100, counter_mode=DEVICE_UPTIME) + result = decrypt(key, pkt, counter_mode=DEVICE_UPTIME) + assert result is not None + assert result.payload == plaintext + assert result.counter == 3 + assert result.sequence == 100 + + def test_unix_time_default(self): + """Without time_counter, defaults to today's UTC day; decrypt() default mode finds it.""" + key = bytes(range(32)) + pkt = encrypt(key, b"x", seq_no=1) + result = decrypt(key, pkt) # default UNIX_TIME, ±2 days + assert result is not None + assert result.payload == b"x" + assert result.sequence == 1 + + def test_random_seq_no_when_omitted(self): + """Without seq_no, two calls produce different bytes.""" + key = bytes(range(32)) + a = encrypt(key, b"x", time_counter=5, counter_mode=DEVICE_UPTIME) + b = encrypt(key, b"x", time_counter=5, counter_mode=DEVICE_UPTIME) + assert a.payload != b.payload + + def test_deterministic_with_explicit_inputs(self): + key = bytes(range(32)) + a = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + b = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + assert a.payload == b.payload + + def test_eid_matches_helper(self): + key = bytes(range(32)) + pkt = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + # Service data layout: header(2) | EID(4) | auth_tag(4) | ciphertext + eid_bytes = pkt.payload[2:6] + assert eid_bytes == _generate_ctr_eid(key, 5, keylen=32) + + def test_protocol_version_zero(self): + key = bytes(range(32)) + pkt = encrypt(key, b"x", time_counter=5, seq_no=7, counter_mode=DEVICE_UPTIME) + # Top 6 bits of byte 0 are version (0 for AES-CTR) + assert (pkt.payload[0] >> 2) == 0 + assert pkt.protocol_version == 0 + + def test_seq_no_encoded_in_header(self): + key = bytes(range(32)) + pkt = encrypt(key, b"x", time_counter=5, seq_no=0x123, counter_mode=DEVICE_UPTIME) + seq_extracted = int.from_bytes(pkt.payload[0:2], "big") & 0x3FF + assert seq_extracted == 0x123 + + +import struct +from hubblenetwork.crypto import encrypt_eax, decrypt_eax +from hubblenetwork import ble as ble_mod +from hubblenetwork.packets import AesEaxPacket + + +class TestEncryptEax: + KEY_128 = bytes(range(16)) + + def test_round_trip_default(self): + plaintext = b"abc" + pkt = encrypt_eax(self.KEY_128, plaintext) + # Re-parse the service data bytes back into an AesEaxPacket + parsed = ble_mod._make_packet(pkt.payload, rssi=0) + assert isinstance(parsed, AesEaxPacket) + result = decrypt_eax(self.KEY_128, parsed) + assert result is not None + assert result.payload == plaintext + + def test_round_trip_with_explicit_inputs(self): + plaintext = b"abcdefghi" # 9 bytes, max + nonce_salt = b"\xa3\xf1" + pkt = encrypt_eax( + self.KEY_128, plaintext, + counter=3, nonce_salt=nonce_salt, period_exponent=0, + ) + parsed = ble_mod._make_packet(pkt.payload, rssi=0) + assert isinstance(parsed, AesEaxPacket) + assert parsed.nonce_salt == nonce_salt + result = decrypt_eax(self.KEY_128, parsed, period_exponent=0) + assert result is not None + assert result.payload == plaintext + + def test_round_trip_with_period_exponent(self): + plaintext = b"x" + pkt = encrypt_eax( + self.KEY_128, plaintext, + counter=2, nonce_salt=b"\x00\x01", period_exponent=3, + ) + parsed = ble_mod._make_packet(pkt.payload, rssi=0) + result = decrypt_eax(self.KEY_128, parsed, period_exponent=3) + assert result is not None + assert result.payload == plaintext + + def test_random_nonce_salt_when_omitted(self): + a = encrypt_eax(self.KEY_128, b"x") + b = encrypt_eax(self.KEY_128, b"x") + assert a.payload != b.payload + + def test_deterministic_with_explicit_inputs(self): + a = encrypt_eax(self.KEY_128, b"x", counter=0, nonce_salt=b"\x00\x00") + b = encrypt_eax(self.KEY_128, b"x", counter=0, nonce_salt=b"\x00\x00") + assert a.payload == b.payload + + def test_protocol_version_two(self): + pkt = encrypt_eax(self.KEY_128, b"x", counter=0, nonce_salt=b"\x00\x00") + # Top 6 bits of byte 0 = version (2 for AES-EAX) → 0x08 + assert (pkt.payload[0] >> 2) == 2 + assert pkt.protocol_version == 2 + + def test_eid_embedded_little_endian(self): + pkt = encrypt_eax(self.KEY_128, b"", counter=0, nonce_salt=b"\x00\x00") + # offset 3-11 is the 8-byte EID, little-endian + embedded_eid = struct.unpack(" tuple[str, str]: + org_id = os.environ.get("HUBBLE_TESTING_ORG_ID") or os.environ.get("HUBBLE_PROD_ORG_ID") + api_token = os.environ.get("HUBBLE_TESTING_API_TOKEN") or os.environ.get("HUBBLE_PROD_API_TOKEN") + if not org_id or not api_token: + pytest.skip("Requires HUBBLE_TESTING_* or HUBBLE_PROD_* env vars") + return org_id, api_token + + +def _wait_for_packet(org: Organization, device: Device, expected_payload: bytes): + """Poll retrieve_packets until a packet with the expected payload appears.""" + deadline = time.monotonic() + _POLL_TIMEOUT_SEC + while time.monotonic() < deadline: + for pkt in org.retrieve_packets(device, days=1): + if pkt.payload == expected_payload: + return pkt + time.sleep(_POLL_INTERVAL_SEC) + return None + + +@pytest.fixture +def org() -> Organization: + org_id, api_token = _credentials_or_skip() + return Organization(org_id=org_id, api_token=api_token) + + +def _safe_delete(org: Organization, device_id: str) -> None: + """Best-effort cleanup — don't mask the real failure if delete also fails.""" + try: + org.delete_device(device_id) + except Exception as e: # pragma: no cover - test-time cleanup + print(f"[cleanup] failed to delete device {device_id}: {e}") + + +class TestAesCtrRoundTripViaCloud: + """Register an AES-CTR device, encrypt a packet locally, ingest it, read + it back from the cloud, and verify the decrypted payload matches.""" + + def test_aes_256_ctr_unix_time(self, org): + device = org.register_device( + encryption="AES-256-CTR", + counter_source="UNIX_TIME", + ) + assert device.id and device.key and len(device.key) == 32 + try: + payload = b"int-256-" + os.urandom(4) + pkt = encrypt(device.key, payload) # default UNIX_TIME, today's UTC day + org.ingest_packet(pkt) + + decrypted = _wait_for_packet(org, device, payload) + assert decrypted is not None, ( + f"backend did not surface ingested packet within {_POLL_TIMEOUT_SEC}s" + ) + assert decrypted.payload == payload + assert decrypted.device_id == device.id + finally: + _safe_delete(org, device.id) + + def test_aes_256_ctr_device_uptime_with_explicit_counter_and_seq(self, org): + device = org.register_device( + encryption="AES-256-CTR", + counter_source="DEVICE_UPTIME", + ) + assert device.id and device.key and len(device.key) == 32 + try: + payload = b"int-uptime-" + os.urandom(4) + counter = 5 + seq_no = 42 + pkt = encrypt( + device.key, payload, + time_counter=counter, seq_no=seq_no, + counter_mode="DEVICE_UPTIME", + ) + org.ingest_packet(pkt) + + decrypted = _wait_for_packet(org, device, payload) + assert decrypted is not None + assert decrypted.payload == payload + assert decrypted.device_id == device.id + assert decrypted.counter == counter + assert decrypted.sequence == seq_no + finally: + _safe_delete(org, device.id) + + def test_aes_128_ctr_unix_time(self, org): + device = org.register_device( + encryption="AES-128-CTR", + counter_source="UNIX_TIME", + ) + assert device.id and device.key and len(device.key) == 16 + try: + payload = b"int-128-" + os.urandom(4) + pkt = encrypt(device.key, payload) + org.ingest_packet(pkt) + + decrypted = _wait_for_packet(org, device, payload) + assert decrypted is not None + assert decrypted.payload == payload + assert decrypted.device_id == device.id + finally: + _safe_delete(org, device.id) + + +class TestAesEaxRoundTripViaCloud: + """Same round-trip with an AES-128-EAX device.""" + + def test_aes_128_eax_device_uptime(self, org): + # Cloud accepts period_exponent 10..15 for AES-128-EAX/DEVICE_UPTIME. + period_exponent = 15 + device = org.register_device( + encryption="AES-128-EAX", + counter_source="DEVICE_UPTIME", + period_exponent=period_exponent, + ) + assert device.id and device.key and len(device.key) == 16 + try: + payload = b"int-eax-" + os.urandom(1) # 9 bytes max for EAX + pkt = encrypt_eax( + device.key, payload, + counter=0, period_exponent=period_exponent, + ) + org.ingest_packet(pkt) + + decrypted = _wait_for_packet(org, device, payload) + assert decrypted is not None, ( + f"backend did not surface ingested EAX packet within {_POLL_TIMEOUT_SEC}s" + ) + assert decrypted.payload == payload + assert decrypted.device_id == device.id + finally: + _safe_delete(org, device.id) From 4932da3be0d0a13d56a2b1d7e403ee0931cd97ad Mon Sep 17 00:00:00 2001 From: Paul Buckley Date: Thu, 30 Apr 2026 18:33:16 -0700 Subject: [PATCH 4/4] fix(test): shorten CTR uptime integration payload to fit 13-byte limit The b"int-uptime-" prefix (11 bytes) plus 4 random bytes produced a 15-byte payload, exceeding MAX_CTR_PAYLOAD. Shortened to b"int-up-". --- tests/test_packet_generation_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_packet_generation_integration.py b/tests/test_packet_generation_integration.py index c89a70b..f603722 100644 --- a/tests/test_packet_generation_integration.py +++ b/tests/test_packet_generation_integration.py @@ -94,7 +94,7 @@ def test_aes_256_ctr_device_uptime_with_explicit_counter_and_seq(self, org): ) assert device.id and device.key and len(device.key) == 32 try: - payload = b"int-uptime-" + os.urandom(4) + payload = b"int-up-" + os.urandom(4) counter = 5 seq_no = 42 pkt = encrypt(