Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ x-api-key: plx_xxxxx
| ------------ | ------ | -------- | ---------------------------------------------- |
| `metric` | string | Yes | Metric name (e.g., `temperature`, `motor.rpm`) |
| `value` | any | Yes | See supported value types below |
| `timestamp` | float | No | Unix timestamp (seconds). Defaults to now |
| `timestamp` | float | No | Unix timestamp in seconds (or ms if ≥ 1e12). Omit to use device time. Over WebSocket, the Python SDK applies a server-synced clock correction when omitted — see [Clock correction](#clock-correction). |
| `source_id` | string | Yes | Your source identifier |
| `tags` | object | No | Key-value labels |
| `session_id` | string | No | Group data into sessions |
Expand Down Expand Up @@ -168,18 +168,22 @@ Devices authenticate using an API key. The `source_id` in the request is the dev
// Server → Device
{
"type": "authenticated",
"source_id": "drone-01"
"source_id": "drone-01",
"server_time_ms": 1746100800000
}

// Server → Device (collision case)
{
"type": "authenticated",
"source_id": "drone-01_2"
"source_id": "drone-01_2",
"server_time_ms": 1746100800000
}
```

The SDK **adopts** whatever `source_id` the server returns and uses it for all subsequent frames, heartbeats, and reconnects. It also persists the assigned name locally so reconnects go straight to the claimed slot.

`server_time_ms` is the gateway's current Unix time in milliseconds. The Python SDK uses it to compute a clock offset (`server_time - device_time`) that is applied to every SDK-generated timestamp for the lifetime of the connection. This corrects for devices that boot without NTP or have an unreliable RTC — a common condition on embedded Linux. See [Clock correction](#clock-correction) for details and limitations.

`install_id` is a stable per-installation UUID, generated on the device's first run and saved to `~/.plexus/config.json`. It lets the server distinguish a rebooting device from a new device trying to claim an existing name. Legacy SDKs that omit `install_id` continue to work as before (the server passes the declared `source_id` through unchanged).

### Message Types (Dashboard → Device)
Expand Down Expand Up @@ -354,6 +358,9 @@ func main() {
#include <WiFi.h>
#include <HTTPClient.h>

// Call configTime(0, 0, "pool.ntp.org") in setup() before sending.
// time(nullptr) returns 0 until NTP sync completes — omit the timestamp
// field entirely if you cannot guarantee NTP sync at send time.
void sendToPlexus(const char* metric, float value) {
HTTPClient http;
http.begin("https://plexus-gateway.fly.dev/ingest");
Expand All @@ -363,7 +370,7 @@ void sendToPlexus(const char* metric, float value) {
String payload = "{\"points\":[{";
payload += "\"metric\":\"" + String(metric) + "\",";
payload += "\"value\":" + String(value) + ",";
payload += "\"timestamp\":" + String(millis() / 1000.0) + ",";
payload += "\"timestamp\":" + String(time(nullptr)) + ",";
payload += "\"source_id\":\"esp32-001\"";
payload += "}]}";

Expand Down Expand Up @@ -469,10 +476,39 @@ class MySensor(BaseSensor):
| 404 | Resource not found |
| 410 | Resource expired |

## Clock correction

Embedded devices commonly boot with a wrong system clock — no hardware RTC, NTP unreachable on first boot, or a fresh OS image whose filesystem timestamp is months in the past. Without correction, all telemetry lands at the wrong place on the timeline.

The Python SDK corrects for this automatically over WebSocket. On every connection the gateway returns `server_time_ms` in the `authenticated` frame. The SDK computes `offset = server_time - device_time` and adds it to every timestamp it generates. Data lands at the right time on the dashboard regardless of what the device clock says.

**When the correction applies:**

The offset is applied when `timestamp` is omitted (the SDK generates the time). If you pass an explicit `timestamp`, it is used as-is — the SDK cannot tell whether your value is a wall-clock time or a hardware-relative counter, so it leaves it alone.

```python
px.send("temperature", 72.5) # SDK picks time → correction applied
px.send("temperature", 72.5, timestamp=t) # your timestamp → used as-is, no correction
```

**When to pass an explicit timestamp:**
- You have a reliable wall-clock source (GPS, trusted hardware RTC, host NTP)
- You are replaying or backfilling historical data
- Your sensor provides its own wall-clock timestamp

**When to omit timestamp:**
- The device may have booted without NTP (Raspberry Pi, Jetson, field robots without network on first boot)
- You have no reliable external time source

**Known limitations:**
- The clock offset refreshes only on WebSocket reconnect. A device with a drifting RTC that stays connected for many days will accumulate uncorrected drift between reconnects proportional to the drift rate.
- HTTP transport (`transport="http"`) does not receive clock sync — timestamps default to the device clock uncorrected.
- `send_batch()` takes one shared `timestamp` for the whole batch, not per-point. For per-point timestamps, call `send()` in a loop.

## Best Practices

- **Batch points** - Send up to 100 points per request for HTTP
- **Use timestamps** - Always include accurate timestamps
- **Omit timestamp when unsure** - The Python SDK applies server-synced clock correction when `timestamp` is omitted over WebSocket; only pass an explicit timestamp when you have a reliable wall-clock source
- **Consistent source_id** - Use the same ID for each physical device/source
- **Use tags** - Label data for filtering (e.g., `{"location": "lab"}`)
- **Use sessions** - Group related data for easier analysis
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ px.buffer_size()
px.flush_buffer()
```

## Timestamps and clock correction

By default — `px.send("temp", 72.5)` with no `timestamp` argument — the SDK picks the time itself. Over WebSocket, it synchronizes with the gateway clock on every connection, so data lands at the right place on the timeline even if the device's system clock is wrong (no NTP on first boot, stale RTC, fresh OS image).

```python
px.send("temperature", 72.5) # SDK picks time; gateway-synced over WS
px.send("temperature", 72.5, timestamp=t) # your timestamp, used as-is, no correction
```

**Pass an explicit timestamp when** you have a reliable external time source (GPS, trusted RTC, host NTP) or are replaying historical data with known timestamps.

**Omit timestamp when** the device may have booted without NTP — which is the default on Raspberry Pi, Jetson, and most embedded Linux boards without a network connection at first boot.

**Known limits:**
- Clock sync refreshes on WebSocket (re)connect. A device with a drifting RTC that stays connected for many days accumulates uncorrected drift between reconnects.
- HTTP-only transport (`transport="http"`) does not receive clock sync — timestamps default to the uncorrected device clock.
- `send_batch()` shares one timestamp across the whole batch. For per-point timestamps, call `send()` in a loop.

## Transport

By default the SDK connects over a **WebSocket** to `/ws/device` on the gateway — same wire protocol as the C SDK. This gives you:
Expand Down
48 changes: 38 additions & 10 deletions plexus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def __init__(
self.transport = transport
self._ws_url = (ws_url or get_gateway_ws_url())
self._ws = None # lazily constructed in _ensure_ws()
self._clock_offset_ms: int = 0

# Pluggable buffer backend for failed sends
if persistent_buffer:
Expand Down Expand Up @@ -175,17 +176,16 @@ def _get_session(self) -> requests.Session:
self._session.headers["User-Agent"] = f"plexus-python/{__version__}"
return self._session

@staticmethod
def _normalize_ts_ms(timestamp: Optional[float] = None) -> int:
def _normalize_ts_ms(self, timestamp: Optional[float] = None) -> int:
"""Normalize a timestamp to milliseconds.

Accepts:
- None: returns current time in ms
- float seconds (e.g. time.time()): converts to ms
- int/float ms: returned as-is
- None: returns current time in ms, corrected by server clock offset
- float seconds (e.g. time.time()): converts to ms (no offset applied)
- int/float ms: returned as-is (no offset applied)
"""
if timestamp is None:
return int(time.time() * 1000)
return int(time.time() * 1000) + self._clock_offset_ms
# Heuristic: values < 1e12 are seconds
if timestamp > 0 and timestamp < 1e12:
return int(timestamp * 1000)
Expand Down Expand Up @@ -258,6 +258,30 @@ def send(
point = self._make_point(metric, value, timestamp, tags, data_class)
return self._send_points([point])

def event(
self,
name: str,
data: FlexValue,
timestamp: Optional[float] = None,
tags: Optional[Dict[str, str]] = None,
) -> bool:
"""
Send a named event with text or structured data.

Args:
name: Event type (e.g., "fault", "state_change", "log")
data: Text or JSON-serializable value (string, dict, list, bool, number)
timestamp: Unix timestamp. If not provided, uses current time.
tags: Optional key-value tags

Example:
px.event("fault", "E-stop triggered")
px.event("state_change", {"from": "IDLE", "to": "RUNNING"})
px.event("sensor_error", {"sensor": "imu", "code": 42}, tags={"motor": "A"})
"""
point = self._make_point(name, data, timestamp, tags, data_class="event")
return self._send_points([point])

def send_batch(
self,
points: List[Tuple[str, FlexValue]],
Expand All @@ -283,8 +307,8 @@ def send_batch(
("position", {"x": 1.0, "y": 2.0}),
])
"""
ts = timestamp if timestamp is not None else time.time()
data_points = [self._make_point(m, v, ts, tags) for m, v in points]
ts_ms = self._normalize_ts_ms(timestamp)
data_points = [self._make_point(m, v, ts_ms, tags) for m, v in points]
return self._send_points(data_points)

def _ensure_ws(self):
Expand All @@ -300,10 +324,14 @@ def _ensure_ws(self):
install_id=get_install_id(),
agent_version=__version__,
on_source_id_assigned=self._on_source_id_assigned,
on_clock_synced=self._on_clock_synced,
)
self._ws.start()
return self._ws

def _on_clock_synced(self, offset_ms: int) -> None:
self._clock_offset_ms = offset_ms

def _on_source_id_assigned(self, assigned: str) -> None:
"""Callback from WebSocketTransport when the gateway returns an
auto-suffixed source_id. Persists it so subsequent runs (and the HTTP
Expand Down Expand Up @@ -555,7 +583,7 @@ def run(self, run_id: str, tags: Optional[Dict[str, str]] = None, store_frames:
"source_id": self.source_id,
"status": "started",
"tags": tags,
"timestamp": time.time(),
"timestamp": (int(time.time() * 1000) + self._clock_offset_ms) / 1000,
},
timeout=self.timeout,
)
Expand All @@ -573,7 +601,7 @@ def run(self, run_id: str, tags: Optional[Dict[str, str]] = None, store_frames:
"run_id": run_id,
"source_id": self.source_id,
"status": "ended",
"timestamp": time.time(),
"timestamp": (int(time.time() * 1000) + self._clock_offset_ms) / 1000,
},
timeout=self.timeout,
)
Expand Down
18 changes: 18 additions & 0 deletions plexus/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from __future__ import annotations

import atexit
import json
import logging
import os
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(
platform: str = "python-sdk",
auto_reconnect: bool = True,
on_source_id_assigned: Optional[Callable[[str], None]] = None,
on_clock_synced: Optional[Callable[[int], None]] = None,
):
if not api_key:
raise ValueError("api_key required")
Expand All @@ -122,6 +124,7 @@ def __init__(
self.platform = platform
self.auto_reconnect = auto_reconnect
self._on_source_id_assigned = on_source_id_assigned
self._on_clock_synced = on_clock_synced

self._commands: Dict[str, _RegisteredCommand] = {}
self._ws: Optional[websocket.WebSocket] = None
Expand All @@ -130,6 +133,7 @@ def __init__(
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
self._backoff_attempt = 0
self._clock_offset_ms: int = 0

# ------------------------------------------------------------------ public

Expand All @@ -155,6 +159,7 @@ def start(self) -> None:
target=self._run, name="plexus-ws", daemon=True
)
self._thread.start()
atexit.register(self.stop)

def stop(self, timeout: float = 2.0) -> None:
self._stop.set()
Expand All @@ -175,6 +180,10 @@ def wait_authenticated(self, timeout: float = AUTH_TIMEOUT_S) -> bool:
def is_authenticated(self) -> bool:
return self._authenticated.is_set()

@property
def clock_offset_ms(self) -> int:
return self._clock_offset_ms

def send_points(self, points: List[Dict[str, Any]]) -> bool:
"""Send a telemetry frame. Returns False if the socket is not
authenticated — caller is expected to fall back to HTTP."""
Expand Down Expand Up @@ -250,6 +259,15 @@ def _connect_and_serve(self) -> None:
if msg.get("type") != "authenticated":
raise RuntimeError(f"auth failed: {msg}")

server_ts = msg.get("server_time_ms")
if isinstance(server_ts, (int, float)) and server_ts > 0:
self._clock_offset_ms = int(server_ts) - int(time.time() * 1000)
if self._on_clock_synced is not None:
try:
self._on_clock_synced(self._clock_offset_ms)
except Exception as e:
logger.debug("on_clock_synced callback raised: %s", e)

# The gateway may return a different source_id if the desired name
# was already claimed by another install — adopt the assigned value
# so all subsequent frames (heartbeats, future reconnects) use it.
Expand Down
18 changes: 18 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Basic tests for plexus-python."""

import time

from plexus import __version__
from plexus.client import Plexus
from plexus.config import DEFAULT_CONFIG
Expand Down Expand Up @@ -41,3 +43,19 @@ def test_make_point_with_tags():
point = px._make_point("temperature", 72.5, tags={"sensor": "A1"})

assert point["tags"] == {"sensor": "A1"}


def test_normalize_ts_ms_applies_clock_offset():
px = Plexus(api_key="test", endpoint="http://localhost")
px._clock_offset_ms = 5000
before = int(time.time() * 1000)
ts = px._normalize_ts_ms(None)
after = int(time.time() * 1000)
assert before + 5000 <= ts <= after + 5000


def test_normalize_ts_ms_ignores_offset_for_supplied_timestamp():
px = Plexus(api_key="test", endpoint="http://localhost")
px._clock_offset_ms = 5000
ts = px._normalize_ts_ms(1_700_000_000.0)
assert ts == 1_700_000_000_000
46 changes: 46 additions & 0 deletions tests/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def _handler(self, ws, path="/ws/device"):
await ws.send(json.dumps({
"type": "authenticated",
"source_id": returned_source_id,
"server_time_ms": int(time.time() * 1000),
}))
try:
async for raw in ws:
Expand Down Expand Up @@ -332,3 +333,48 @@ def test_ensure_device_path():
assert _ensure_device_path("wss://foo") == "wss://foo/ws/device"
assert _ensure_device_path("wss://foo/") == "wss://foo/ws/device"
assert _ensure_device_path("wss://foo/ws/device") == "wss://foo/ws/device"


def test_clock_offset_computed_from_authenticated_frame():
# Stub sends a server_time_ms that is 30 seconds ahead of real time.
# The transport's clock_offset_ms should be close to +30_000.
fake_offset_ms = 30_000

class _OffsetStubGateway(_StubGateway):
async def _handler(self, ws, path="/ws/device"):
self._ws = ws
raw = await ws.recv()
msg = json.loads(raw)
self.auth_frame = msg
returned_source_id = self.assigned_source_id or msg.get("source_id")
await ws.send(json.dumps({
"type": "authenticated",
"source_id": returned_source_id,
"server_time_ms": int(time.time() * 1000) + fake_offset_ms,
}))
try:
async for raw in ws:
self.received.append(json.loads(raw))
except websockets.ConnectionClosed:
return

g = _OffsetStubGateway()
g.start()
try:
seen: List[int] = []
t = WebSocketTransport(
api_key="plx_test_abc",
source_id="drone-001",
ws_url=_url(g.port),
on_clock_synced=lambda offset: seen.append(offset),
)
t.start()
try:
assert t.wait_authenticated(timeout=3)
assert abs(t.clock_offset_ms - fake_offset_ms) < 500
assert len(seen) == 1
assert abs(seen[0] - fake_offset_ms) < 500
finally:
t.stop()
finally:
g.stop()
Loading
Loading