Data acquisition system for laboratory instrumentation with high-speed buffered acquisition, real-time signal processing, and live visualization. Designed for the requirements of CERN's ATLAS Magnet R&D group.
Author: Matej Konopik ([email protected], [email protected]), CERN, 2025-2026
The system is organized as a pub/sub pipeline where signals are the "wires" connecting devices, plugins, and services. Devices publish data as signals, plugins subscribe/publish derived signals, and isolated services consume streams for recording, visualization, and later control workloads. Each block is extensible and can be instantiated multiple times.
flowchart LR
%% Devices publish signals
subgraph Devices[Device Drivers (N)]
D1[Driver A]
D2[Driver B]
Dn[Driver ...]
end
%% Signals as topics on the bus
subgraph Bus[Message Bus / Pub-Sub]
S1((signal: A.ch1))
S2((signal: A.ch2))
S3((signal: B.ch1))
Sn((signal: ...))
end
%% Plugins subscribe/emit derived signals
subgraph Plugins[Plugins (M)]
P1[Plugin X]
P2[Plugin Y]
Pm[Plugin ...]
end
%% Sinks consume signals
subgraph Sinks[Sinks (K)]
C1[CSV Sink]
U1[UI Sink]
L1[Log Sink]
end
D1 --> S1
D1 --> S2
D2 --> S3
Dn --> Sn
S1 --> P1
S2 --> P1
P1 --> S3
S3 --> P2
P2 --> Sn
S1 --> C1
S2 --> C1
S3 --> U1
Sn --> L1
- Python 3.12+ (compatible with 3.10+)
- NI-VISA or compatible VISA runtime
- Windows 10+
python -m venv .venv
.venv\Scripts\activate
pip install --upgrade pip
pip install -e .[ui]scripts/build_exe.batThe executable will be in dist/labdaq/. Copy the entire folder to run on machines without Python.
- Launch the GUI
From the project root directory, run:
python -m src.ui.main_window -
Scan for Devices
- Go to the Configuration tab
- Click Scan for Devices
- Connected VISA instruments will be detected
-
Configure Signals
- Add signals to map device channels
- Add plugins to process data (Shunt Current, PT-100 Temperature, etc.)
- Add plots for real-time visualization
-
Run Acquisition
- Go to the Home tab
- Click Start Acquisition
- Click Show Plot Window to view live data
LabDAQ uses YAML configuration files to define devices, signals, plugins, and data outputs.
devices:
- id: dmm1
resource: TCPIP0::192.168.1.10::inst0::INSTR
driver_hint: dmm6500
signals:
- id: voltage_rail_1
device: dmm1
channel: ch1
unit: V
read:
mode: buffered
rate_hz: 100.0
options:
range: "10.0"
nplc: 0.1
csv_sink:
enabled: true
directory: data
filename: output.csv
flush_every: 100
queue_size: 8192 # Optional durable intake queue override
flush_interval_s: 1.0 # Flush tail rows at least this often
fsync_interval_s: 1.0 # Force OS-level durability at this cadence
ui:
enabled: true
refresh_hz: 30
window_seconds: 60
max_points: 2000
decimation: minmax
plots:
- title: "Voltage Rails"
curves:
- signal_id: voltage_rail_1
label: "Rail 1"
color: "#00d1b2"Device-level heartbeat/restart behavior is configured under reliability.device:
reliability:
device:
heartbeat_interval_s: 1.0
progress_timeout_s: 30.0
max_consecutive_errors_before_restart: 5
restart_max_attempts: 5
restart_window_s: 600.0
restart_delay_schedule_s: [10, 20, 30, 45, 60]Notes:
- Supervisor watchdog (acquisition-process restart) uses staged cooldowns:
20s, 30s, 40s, 50s, 60s. - Device workers expose health state (
healthy|degraded|restarting|unavailable) to supervisor status and Debug Monitor. - Supervisor writes both:
- run metrics CSV (
data/metrics/*_supervisor_metrics.csv) - run event JSONL (
data/metrics/*_supervisor_events.jsonl)
- run metrics CSV (
| Option | Default | Description |
|---|---|---|
refresh_hz |
30 | UI redraw rate (higher = more CPU/GPU) |
window_seconds |
30 | Rolling history displayed (max 259200s = 72h) |
max_points |
10000 | Max points stored in memory (max 500000) |
display_points |
2000 | Points actually rendered (decimated from max_points) |
ingest_nth |
1 | Subsample factor on ingest (1=all, 10=every 10th) |
queue_size |
2 | IPC queue depth (1-10, small for stability) |
decimation |
minmax | Decimation algorithm (see below) |
| Mode | Best For | Description |
|---|---|---|
minmax |
Noisy/fast signals | Preserves envelope (min/max per bucket) |
lttb |
Visual fidelity | Largest Triangle Three Buckets algorithm |
average |
Smooth trends | Mean per bucket |
nth |
Simple downsampling | Every Nth point |
raw |
Slow signals | No decimation (< 100 points total) |
For experiments running 24-72 hours at 1Hz:
ui:
enabled: true
window_seconds: 86400 # Rolling last 24 hours; run can continue indefinitely
max_points: 100000 # Store 100k points
display_points: 2000 # Render only 2000 points
ingest_nth: 2 # Trim UI load for long multi-device runs
decimation: lttb # Best visual quality
refresh_hz: 5 # Lower refresh saves CPU and IPC churn
queue_size: 8 # IPC mailbox depth for long runsBuilt-in plugin function:
- Shunt Current (
shunt_current): Calculate current from voltage drop (I = V/R) - PT-100 Temperature (
pt100_temp): Convert resistance to temperature - Scatter (
voltage_current_scatter): Combine two signals for X-Y plots, originally intended for IV curves during IC measurements. - Multiply (
multiply): Multiply two signals (Power = V * I...) - Scale (
scale): Apply linear scaling (y = mx + b)
Example:
plugins:
global_plugins:
- id: temp_sensor_1
type: pt100_temp
inputs: [voltage_ch101]
outputs:
- id: temperature_1
params:
fixed_current: 0.001
use_fixed_current: trueUnit tests can be run with:
python -m pytest tests/(This still needs more work)
python -m src.cli bench configs\performance-test.yaml --duration 120 --interval 1 --out data\bench.csvMetrics recorded:
- Message bus statistics (published/enqueued/dropped)
- Throughput (batches per second)
- Round-trip time statistics
- CPU usage and thread count
Add --tracemalloc for memory tracking.
Install py-spy:
pip install py-spyRecord flame graph:
py-spy record -o data\profile.svg --subprocesses -- python -m src.cli run configs\performance-test.yamlAttach to running process:
py-spy top --pid <PID>- Dropped messages should be near zero at steady state
- CPU usage should remain below system limits
- RTT should be stable (spikes indicate VISA/TCP issues)
- Run long tests (30-60 minutes) to check for memory leaks
LabDAQ uses a supervisor-owned acquisition lifecycle. The Main UI is a control client and does not own device workers directly.
Main UI / CLI -> Supervisor (REQ/RESP control) -> Services
Acquisition Service -> Local Bus -> Plugins -> ZMQ data/status topics
Recorder / Live Plot / future PSU -> ZMQ subscribers
- Supervisor Service: Single authority for start/stop/state of acquisition runtime.
- Acquisition Service: Owns
LocalBus,AcquisitionOrchestrator, workers, and process-boundary publishers. - Main UI: Edits config, starts/stops runs via supervisor commands, sends UI lease heartbeats.
- Recorder Service: Persists subscribed data streams to CSV and remains the durable data path.
- Live Plot Services: One process per plot group, each subscribing only to its configured signals.
Acquisition service runs in a dedicated process for isolation. Unexpected Main UI crash does not stop acquisition; intentional Main UI close prompts for confirmation and then stops the run.
flowchart LR
subgraph Ctrl[Control Plane]
UI[Main UI]
CLI[CLI ctl]
SUP[Supervisor]
end
subgraph AcqProc[Acquisition Service Process]
ORCH[Acquisition Orchestrator]
subgraph DevThreads[Device Workers]
DW1[DeviceWorker 1]
DW2[DeviceWorker 2]
end
subgraph PlugThreads[Plugin Workers]
PW1[PluginWorker 1]
PW2[PluginWorker 2]
end
Bus[Local Bus<br/>Topic: data.*]
CSV[CSV Sink]
end
subgraph UIP[UI Process]
PLOT[Live Plot UI]
end
UI --> SUP
CLI --> SUP
SUP --> ORCH
ORCH --> DW1 & DW2
ORCH --> PW1 & PW2
DW1 --> Bus
DW2 --> Bus
PW1 --> Bus
PW2 --> Bus
Bus --> CSV
Bus -.->|stream| PLOT
- Operator configures system in Main UI.
- Main UI writes immutable run snapshot and sends
start_runto supervisor. - Supervisor starts acquisition service process.
- Acquisition service runs workers/plugins and writes CSV continuously.
- Main UI polls supervisor status and sends lease heartbeat while active.
All data is published to topics under data.<signal_id>.
graph TB
subgraph Config["Configuration Layer"]
YAML[YAML Config]
end
subgraph Orch["Orchestrator"]
Orchestrator[Acquisition Orchestrator]
end
subgraph Workers["Device Workers (Threads)"]
DW1[Device Worker 1]
DW2[Device Worker 2]
end
subgraph Plugins["Plugin Workers (Threads)"]
PW1[Plugin Worker 1]
end
subgraph Bus["Message Bus"]
LocalBus[Local Bus<br/>Topic: data.*]
end
subgraph Sinks["Data Sinks"]
CSV[CSV Writer]
UI[Live Plot UI<br/>Separate Process]
end
YAML --> Orchestrator
Orchestrator --> DW1 & DW2
Orchestrator --> PW1
DW1 & DW2 --> LocalBus
PW1 --> LocalBus
LocalBus --> CSV & UI
graph TD
MainProcess[Main Process]
subgraph Acquisition
DW1[DeviceWorker 1<br>Polls @ 100Hz]
DW2[DeviceWorker 2<br>Polls @ 10Hz]
end
subgraph Processing
PW1[PluginWorker<br>Process All]
PW2[PluginWorker<br>Latest Only]
end
subgraph Output
CSV[CSV Sink]
end
subgraph Visualization
UI[PyQtGraph Sink<br>Separate Process]
end
MainProcess --> DW1 & DW2
MainProcess --> PW1 & PW2
MainProcess --> CSV
DW1 & DW2 -.->|Queue| UI
PW1 & PW2 -.->|Queue| UI
sequenceDiagram
participant Device
participant DeviceWorker
participant Bus
participant PluginWorker
participant Sinks
Note over DeviceWorker: Event-based timing loop
DeviceWorker->>Device: read()
Device-->>DeviceWorker: SampleChunk
DeviceWorker->>Bus: publish(raw data)
Bus->>PluginWorker: subscribe callback
Note over PluginWorker: Buffer & process batch
PluginWorker->>PluginWorker: process(inputs)
PluginWorker->>Bus: publish(processed data)
Bus->>Sinks: subscribe callback
Sinks->>Sinks: write CSV / update UI
Main UI / CLI -> Supervisor -> Acquisition Service -> Local Bus -> Plugins/Sinks (CSV/UI)
stateDiagram-v2
[*] --> RUNNING
state RUNNING {
[*] --> READING
READING --> SUCCESS : Data OK
READING --> ERROR : Timeout/Exception
}
state RECOVERY {
[*] --> CHECK_COUNT
CHECK_COUNT --> RETRY : Errors < 5
CHECK_COUNT --> FAULT : Errors >= 5
RETRY --> BACKOFF
BACKOFF --> RUNNING
}
SUCCESS --> RUNNING : Reset Counter
ERROR --> RECOVERY : Increment Counter
FAULT --> [*] : Stop Worker
Main UI / CLI -> Supervisor -> Acquisition Service -> Local Bus -> Plugins/Sinks (CSV/UI)
Create a new file in src/drivers/:
from src.drivers.base import BaseReadDevice
from src.drivers.registry import register_driver
from src.core.models import SampleChunk, ReadMode
import time
import numpy as np
@register_driver("my_instrument")
class MyInstrumentDriver(BaseReadDevice):
def configure_channels(self, settings):
self._write(":CONF:VOLT:DC")
def read(self):
val = float(self._query(":MEAS?"))
return {
"ch1": SampleChunk(
signal_id="",
t0=time.time(),
dt=0.0,
values=np.array([val]),
mode=ReadMode.SINGLE,
meta={},
rtt_ms=None
)
}Create a new file in src/plugins/:
from typing import Dict, List, Any
from src.core.models import SampleChunk
from src.plugins.base import BaseSignalPlugin
from src.plugins.registry import register_plugin
@register_plugin("my_plugin")
class MyPlugin(BaseSignalPlugin):
@classmethod
def describe(cls) -> Dict[str, Any]:
return {
"description": "My custom processing logic",
"inputs": [{"label": "Input Signal", "type": "signal"}],
"outputs": [{"label": "Output Signal", "type": "signal"}],
"params": [
{"name": "factor", "type": "float", "default": 1.0}
]
}
def process(self, inputs: Dict[str, SampleChunk]) -> List[SampleChunk]:
chunk = inputs.get(self.config["inputs"][0])
if chunk is None:
return []
factor = float(self.params.get("factor", 1.0))
result = chunk.values * factor
return [self.create_output_chunk(
values=result,
source_chunk=chunk
)]Import your plugin in src/plugins/builtin.py to register it.
Distribution under MIT License