Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 100 additions & 7 deletions src/hubblenetwork/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from hubblenetwork import ready as ready_mod
from hubblenetwork import sat as sat_mod
from hubblenetwork import decrypt, UNIX_TIME, DEVICE_UPTIME
from hubblenetwork.crypto import find_time_counter_delta
from hubblenetwork.crypto import find_time_counter_delta, derive_device_id, derive_device_id_set
from hubblenetwork import cloud
from hubblenetwork import InvalidCredentialsError
from hubblenetwork.errors import BackendError
Expand Down Expand Up @@ -589,11 +589,12 @@ def finalize(self) -> None:
def _sat_packet_to_dict(
pkt: SatellitePacket,
payload_format: str = "base64",
td_stats: Optional[dict] = None,
**_: object,
) -> dict:
"""Convert a SatellitePacket to a dictionary for JSON serialization."""
ts = datetime.fromtimestamp(pkt.timestamp).strftime("%c")
return {
d = {
"device_id": pkt.device_id,
"seq_num": pkt.seq_num,
"device_type": pkt.device_type,
Expand All @@ -602,8 +603,14 @@ def _sat_packet_to_dict(
"rssi_dB": pkt.rssi_dB,
"channel_num": pkt.channel_num,
"freq_offset_hz": pkt.freq_offset_hz,
"pdu_n_corr": pkt.pdu_n_corr,
"header_n_corr": pkt.header_n_corr,
"payload": _format_payload(pkt.payload, payload_format),
}
if td_stats is not None:
d["sym_mean_ms"] = td_stats.get("sym_mean_ms")
d["gap_mean_ms"] = td_stats.get("gap_mean_ms")
return d


class _SatStreamingTablePrinter(_StreamingPrinterBase):
Expand All @@ -617,15 +624,28 @@ class _SatStreamingTablePrinter(_StreamingPrinterBase):
"RSSI_DB": 8,
"CHANNEL": 8,
"FREQ_OFFSET": 12,
"RS_CORR": 8,
"SYM_MS": 10,
"GAP_MS": 10,
"PAYLOAD": 20,
}

_HEADERS = ["DEVICE_ID", "SEQ", "TYPE", "TIME", "RSSI_DB", "CHANNEL", "FREQ_OFFSET", "PAYLOAD"]
_BASE_HEADERS = ["DEVICE_ID", "SEQ", "TYPE", "TIME", "RSSI_DB", "CHANNEL", "FREQ_OFFSET", "RS_CORR"]
_TD_HEADERS = ["SYM_MS", "GAP_MS"]

def __init__(self, payload_format: str = "base64"):
def __init__(self, payload_format: str = "base64", show_td_stats: bool = False):
super().__init__()
self._header_printed = False
self._payload_format = payload_format
self._show_td_stats = show_td_stats
self._HEADERS = self._BASE_HEADERS + (self._TD_HEADERS if show_td_stats else []) + ["PAYLOAD"]
self._sym_mean_ms: Optional[float] = None
self._gap_mean_ms: Optional[float] = None

def update_td_stats(self, td: dict) -> None:
"""Update the latest TD stats to be stamped on subsequent rows."""
self._sym_mean_ms = td.get("sym_mean_ms")
self._gap_mean_ms = td.get("gap_mean_ms")

def _format_row(self, values: List) -> str:
parts = []
Expand All @@ -650,6 +670,9 @@ def print_row(self, pkt: SatellitePacket) -> None:
self._header_printed = True

ts = datetime.fromtimestamp(pkt.timestamp).strftime("%c")
rs = "-"
if pkt.pdu_n_corr is not None and pkt.header_n_corr is not None:
rs = str(pkt.pdu_n_corr + pkt.header_n_corr)
row = [
pkt.device_id,
pkt.seq_num,
Expand All @@ -658,8 +681,12 @@ def print_row(self, pkt: SatellitePacket) -> None:
f"{pkt.rssi_dB:.1f}",
pkt.channel_num,
f"{pkt.freq_offset_hz:.1f}",
_format_payload(pkt.payload, self._payload_format),
rs,
]
if self._show_td_stats:
row.append(f"{self._sym_mean_ms:.2f}" if self._sym_mean_ms is not None else "-")
row.append(f"{self._gap_mean_ms:.2f}" if self._gap_mean_ms is not None else "-")
row.append(_format_payload(pkt.payload, self._payload_format))
click.echo(self._format_row(row))
click.echo(self._make_separator())
self._packet_count += 1
Expand Down Expand Up @@ -3130,21 +3157,43 @@ def _run_sat_scan(
output_format: str,
poll_interval: float,
payload_format: str,
key: Optional[str] = None,
days: int = 2,
pluto_uri: Optional[str] = None,
debug: bool = False,
) -> None:
"""Shared implementation for ``sat scan`` and ``sat mock-scan``."""
mode_label = "mock satellite receiver" if mock else "satellite receiver"

show_td = key is not None and output_format.lower() == "tabular"
printer_class = _SAT_STREAMING_PRINTERS.get(
output_format.lower(), _SatStreamingTablePrinter
)
printer = printer_class(payload_format=payload_format)
printer = printer_class(payload_format=payload_format, **( {"show_td_stats": True} if show_td else {}))

if debug:
sat_logger = logging.getLogger("hubblenetwork.sat")
sat_logger.setLevel(logging.DEBUG)
sat_logger.addHandler(_handler)

# Parse key and derive expected device IDs if provided.
device_id_filter: Optional[set] = None
if key:
try:
decoded_key = _parse_key(key)
except ValueError as e:
if printer.suppress_info_messages:
click.echo(json.dumps({"error": f"Invalid key: {e}"}))
else:
click.secho(f"[ERROR] Invalid key: {e}", fg="red", err=True)
sys.exit(1)
device_id_filter = derive_device_id_set(decoded_key, days=days)
if not printer.suppress_info_messages:
click.secho(
f"[INFO] Filtering to device IDs: {', '.join(sorted(device_id_filter))}",
fg="cyan", err=True,
)

# Fail fast: verify Docker is available before printing anything.
try:
sat_mod.ensure_docker_available()
Expand All @@ -3161,9 +3210,33 @@ def _run_sat_scan(
f"[INFO] Starting {mode_label}... (Press Ctrl+C to stop)"
)

# Today's integer device ID for TD mode (derived from key at today's counter).
_td_device_id: Optional[int] = None
if key and device_id_filter is not None:
from datetime import datetime, timezone as _tz
_tc = int(datetime.now(_tz.utc).timestamp()) // 86400
_td_device_id = derive_device_id(decoded_key, _tc)

_td_started = [False]
_last_td_sig: Optional[float] = [None]
_last_td_check = [0.0] # monotonic timestamp of last fetch_td_info call

def _on_status(msg: str) -> None:
if not printer.suppress_info_messages:
click.secho(f"[INFO] {msg}", fg="cyan", err=True)
# Once receiver is ready, kick off TD capture for our device.
if "ready" in msg.lower() and _td_device_id is not None and not _td_started[0]:
try:
sat_mod.start_timedomain(_td_device_id)
_td_started[0] = True
if not printer.suppress_info_messages:
click.secho(
f"[INFO] Time-domain capture started for device "
f"0x{_td_device_id:08X}",
fg="cyan", err=True,
)
except sat_mod.SatelliteError:
pass # non-fatal

_stop_msg_shown = [False]

Expand All @@ -3180,9 +3253,23 @@ def _on_interrupt(sig, frame):
try:
for pkt in sat_mod.scan(
timeout=timeout, poll_interval=poll_interval, mock=mock,
on_status=_on_status,
pluto_uri=pluto_uri, on_status=_on_status,
):
if device_id_filter is not None and pkt.device_id not in device_id_filter:
continue
printer.print_row(pkt)

# Check for updated TD stats at most once per poll cycle.
if _td_started[0] and (time.monotonic() - _last_td_check[0]) >= poll_interval:
_last_td_check[0] = time.monotonic()
td = sat_mod.fetch_td_info()
if td and td.get("sym_mean_ms") is not None:
sig = td["sym_mean_ms"]
if sig != _last_td_sig[0]:
_last_td_sig[0] = sig
if isinstance(printer, _SatStreamingTablePrinter):
printer.update_td_stats(td)

if count is not None and printer.packet_count >= count:
break
except sat_mod.DockerError as exc:
Expand Down Expand Up @@ -3232,6 +3319,12 @@ def _sat_scan_options(fn):
case_sensitive=False),
default="base64", show_default=True,
help="Encoding format for packet payload"),
click.option("--key", "-k", type=str, default=None,
help="Device key (hex or base64) — only show packets from the matching device"),
click.option("--days", "-d", type=int, default=2, show_default=True,
help="Day window around today to derive device IDs from key"),
click.option("--pluto-uri", "pluto_uri", type=str, default=None,
help="PlutoSDR URI passed to the container (e.g. usb:, ip:192.168.2.1)"),
click.option("--debug", is_flag=True, default=False,
help="Enable debug logging to stderr"),
]):
Expand Down
20 changes: 20 additions & 0 deletions src/hubblenetwork/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,26 @@ def decrypt(
return None


def derive_device_id(key: bytes, time_counter: int) -> int:
"""Derive the 32-bit device ID for key at time_counter (UTC day counter)."""
device_key = _generate_kdf_key(key, len(key), "DeviceKey", time_counter)
device_id_bytes = _generate_kdf_key(device_key, 4, "DeviceID", 0)
return int.from_bytes(device_id_bytes, "big")


def derive_device_id_set(key: bytes, days: int = 2) -> set:
"""Return the set of hex device ID strings expected for *key* over a day window.

Covers today ± *days* to account for clock skew and day boundaries.
IDs are formatted as the sdr-docker API returns them
"""
tc = int(datetime.now(timezone.utc).timestamp()) // 86400
return {
f"0x{derive_device_id(key, tc + delta):08X}"
for delta in range(-days, days + 1)
}


def find_time_counter_delta(
key: bytes, encrypted_pkt: EncryptedPacket, max_days_back: int = 365
) -> Optional[int]:
Expand Down
2 changes: 2 additions & 0 deletions src/hubblenetwork/packets.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,5 @@ class SatellitePacket:
channel_num: int
freq_offset_hz: float
payload: bytes # encrypted payload bytes (base64-decoded from API)
pdu_n_corr: Optional[int] = None # Reed-Solomon corrections on PDU (None for OOK/v-1)
header_n_corr: Optional[int] = None # Reed-Solomon corrections on header
40 changes: 39 additions & 1 deletion src/hubblenetwork/sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def _packets_url(port: int = API_PORT) -> str:
return f"http://localhost:{port}/api/packets"


def _timedomain_url(port: int = API_PORT) -> str:
return f"http://localhost:{port}/api/timedomain"


def _td_info_url(port: int = API_PORT) -> str:
return f"http://localhost:{port}/api/td_info"


# ---------------------------------------------------------------------------
# Docker helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -187,13 +195,38 @@ def _parse_jsonl(text: str) -> List[SatellitePacket]:
channel_num=obj["channel_num"],
freq_offset_hz=obj["freq_offset_hz"],
payload=payload,
pdu_n_corr=obj.get("pdu_n_corr"),
header_n_corr=obj.get("header_n_corr"),
)
)
except (KeyError, TypeError, json.JSONDecodeError) as exc:
logger.warning("Skipping malformed packet line: %s (%s)", line, exc)
return packets


def start_timedomain(device_id: int, port: int = API_PORT) -> None:
"""Enable time-domain capture for *device_id* on the running container."""
url = _timedomain_url(port)
try:
resp = httpx.post(url, json={"action": "start", "device_id": device_id}, timeout=5)
resp.raise_for_status()
except httpx.HTTPError as exc:
raise SatelliteError(f"Failed to start time-domain capture: {exc}")


def fetch_td_info(port: int = API_PORT) -> Optional[dict]:
"""Return the latest time-domain stats dict, or None if not yet available."""
url = _td_info_url(port)
try:
resp = httpx.get(url, timeout=5)
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
except httpx.HTTPError:
return None


def fetch_packets(port: int = API_PORT) -> List[SatellitePacket]:
"""Fetch the current packet buffer from the satellite receiver API."""
url = _packets_url(port)
Expand Down Expand Up @@ -222,6 +255,7 @@ def scan(
image: str = DOCKER_IMAGE,
*,
mock: bool = False,
pluto_uri: Optional[str] = None,
on_status: Optional[Callable[[str], None]] = None,
) -> Generator[SatellitePacket, None, None]:
"""Scan for satellite packets, managing the Docker container lifecycle.
Expand All @@ -246,7 +280,11 @@ def scan(

_emit("Starting container...")
container_name = MOCK_CONTAINER_NAME if mock else CONTAINER_NAME
environment: Optional[Dict[str, str]] = {"SDR_TYPE": "mock"} if mock else None
environment: Dict[str, str] = {}
if mock:
environment["SDR_TYPE"] = "mock"
if pluto_uri is not None:
environment["PLUTO_URI"] = pluto_uri

container_id = start_container(
image=image,
Expand Down
Loading