Skip to content

Commit b2c927f

Browse files
authored
fix: Remove tiled writer if submitted task doesn't exist (#1484)
Whenever a task is submitted, a TiledWriter is added to the run engine and is only removed when the task is complete. If the task never starts due to not being found, the writer would never be removed and subsequent tasks would have duplicate writers causing write errors to tiled. Catching the KeyError and removing the subscribers immediately is a workaround that works but should be removed when the internal pending tasks queue is removed. Fixes #1480
1 parent c8dce0b commit b2c927f

3 files changed

Lines changed: 59 additions & 3 deletions

File tree

src/blueapi/service/interface.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ def begin_task(
187187
if nt := active_context.numtracker:
188188
nt.set_headers(pass_through_headers or {})
189189

190+
subscribers = []
190191
if tiled_config := active_context.tiled_conf:
191192
# Tiled queries the root node, so must create an authorized client
192193
if isinstance(tiled_config.authentication, ServiceAccount):
@@ -204,6 +205,7 @@ def begin_task(
204205
tiled_writer_token = active_context.run_engine.subscribe(
205206
TiledWriter(tiled_client, batch_size=1)
206207
)
208+
subscribers.append((active_context.run_engine, tiled_writer_token))
207209

208210
def remove_callback_when_task_finished(
209211
event: WorkerEvent, correlation_id: str | None
@@ -213,15 +215,21 @@ def remove_callback_when_task_finished(
213215
and event.task_status.task_id == task.task_id
214216
and event.task_status.task_complete
215217
):
216-
active_context.run_engine.unsubscribe(tiled_writer_token)
217-
active_worker.worker_events.unsubscribe(remove_callback)
218+
for channel, token in subscribers:
219+
channel.unsubscribe(token)
218220

219221
remove_callback = active_worker.worker_events.subscribe(
220222
remove_callback_when_task_finished
221223
)
224+
subscribers.append((active_worker.worker_events, remove_callback))
222225

223226
if task.task_id is not None:
224-
active_worker.begin_task(task.task_id)
227+
try:
228+
active_worker.begin_task(task.task_id)
229+
except KeyError:
230+
for channel, token in subscribers:
231+
channel.unsubscribe(token)
232+
raise
225233
return task
226234

227235

tests/system_tests/test_blueapi_system.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,3 +617,24 @@ def on_event(event: AnyEvent) -> None:
617617
assert outcome.result.message.startswith(
618618
"403: Access policy rejects the provided access blob"
619619
)
620+
621+
622+
# Regression test for #1480
623+
def test_task_submission_after_invalid_task(client_with_stomp: BlueapiClient):
624+
with pytest.raises(KeyError):
625+
# This task hasn't been submitted so should return an error...
626+
client_with_stomp._rest.update_worker_task(WorkerTask(task_id="missing"))
627+
628+
# ...but should leave the serve in a state where it can still run tasks
629+
res = client_with_stomp.run_task(
630+
TaskRequest(
631+
name="count",
632+
params={
633+
"detectors": [
634+
"det",
635+
],
636+
},
637+
instrument_session=AUTHORIZED_INSTRUMENT_SESSION,
638+
)
639+
)
640+
assert isinstance(res.result, TaskResult)

tests/unit_tests/service/test_interface.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,33 @@ def test_begin_task_no_task_id(worker_mock: MagicMock):
242242
worker_mock.assert_not_called()
243243

244244

245+
@patch("blueapi.service.interface.from_uri")
246+
@patch("blueapi.service.interface.config")
247+
@patch("blueapi.service.interface.context")
248+
@patch("blueapi.service.interface.worker")
249+
def test_subscribers_removed_when_task_not_found(
250+
worker_mock: MagicMock,
251+
context_mock: MagicMock,
252+
config_mock: MagicMock,
253+
from_uri_mock: MagicMock,
254+
):
255+
# regression test for #1480
256+
worker = worker_mock()
257+
ctx = context_mock()
258+
worker.begin_task.side_effect = KeyError()
259+
260+
with pytest.raises(KeyError):
261+
interface.begin_task(WorkerTask(task_id="missing"))
262+
263+
ctx.run_engine.subscribe.assert_called_once()
264+
tiled_token = ctx.run_engine.subscribe()
265+
ctx.run_engine.unsubscribe.assert_called_once_with(tiled_token)
266+
267+
worker.worker_events.subscribe.assert_called_once()
268+
remove_token = worker.worker_events.subscribe()
269+
worker.worker_events.unsubscribe.assert_called_once_with(remove_token)
270+
271+
245272
@patch("blueapi.service.interface.TaskWorker.get_tasks_by_status")
246273
def test_get_tasks_by_status(get_tasks_by_status_mock: MagicMock):
247274
pending_task1 = TrackableTask(task_id="0", task=Task(name="pending_task1"))

0 commit comments

Comments
 (0)