Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 27 additions & 47 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,33 @@ and reliable way to wait for all tasks in the group to finish.

Close the given coroutine if the task group is not active.

.. method:: cancel()

Cancel the task group.

:meth:`~asyncio.Task.cancel` will be called on any tasks in the group that
aren't yet done, as well as the parent (body) of the group. This will
cause the task group context manager to exit *without*
:exc:`asyncio.CancelledError` being raised.
Comment thread
gvanrossum marked this conversation as resolved.

If :meth:`cancel` is called before entering the task group, the group will be
cancelled upon entry. This is useful for patterns where one piece of
code passes an unused :class:`asyncio.TaskGroup` instance to another in order to have
the ability to cancel anything run within the group.

:meth:`cancel` is idempotent and may be called after the task group has
already exited.

Ways to use :meth:`cancel`:

* call it from the task group body based on some condition or event
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you want code examples for all of these?

* pass the task group instance to child tasks via :meth:`create_task`, allowing a child
task to conditionally cancel the entire entire group
* pass the task group instance or bound :meth:`cancel` method to some other task *before*
opening the task group, allowing remote cancellation
Comment thread
gvanrossum marked this conversation as resolved.

.. versionadded:: next

Example::

async def main():
Expand Down Expand Up @@ -414,53 +441,6 @@ reported by :meth:`asyncio.Task.cancelling`.
Improved handling of simultaneous internal and external cancellations
and correct preservation of cancellation counts.

Terminating a Task Group
Comment thread
gvanrossum marked this conversation as resolved.
Outdated
------------------------

While terminating a task group is not natively supported by the standard
library, termination can be achieved by adding an exception-raising task
to the task group and ignoring the raised exception:

.. code-block:: python

import asyncio
from asyncio import TaskGroup

class TerminateTaskGroup(Exception):
"""Exception raised to terminate a task group."""

async def force_terminate_task_group():
"""Used to force termination of a task group."""
raise TerminateTaskGroup()

async def job(task_id, sleep_time):
print(f'Task {task_id}: start')
await asyncio.sleep(sleep_time)
print(f'Task {task_id}: done')

async def main():
try:
async with TaskGroup() as group:
# spawn some tasks
group.create_task(job(1, 0.5))
group.create_task(job(2, 1.5))
# sleep for 1 second
await asyncio.sleep(1)
# add an exception-raising task to force the group to terminate
group.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
pass

asyncio.run(main())

Expected output:

.. code-block:: text

Task 1: start
Task 2: start
Task 1: done

Sleeping
========

Expand Down
34 changes: 34 additions & 0 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self):
self._errors = []
self._base_error = None
self._on_completed_fut = None
self._cancel_on_enter = False

def __repr__(self):
info = ['']
Expand All @@ -62,6 +63,8 @@ async def __aenter__(self):
raise RuntimeError(
f'TaskGroup {self!r} cannot determine the parent task')
self._entered = True
if self._cancel_on_enter:
self.cancel()

return self

Expand Down Expand Up @@ -177,6 +180,10 @@ async def _aexit(self, et, exc):
finally:
exc = None

# If we wanted to raise an error, it would have been done explicitly
# above. Otherwise, either there is no error or we want to suppress
# the original error.
Comment thread
belm0 marked this conversation as resolved.
Outdated
return True
Comment thread
gvanrossum marked this conversation as resolved.

def create_task(self, coro, *, name=None, context=None):
"""Create a new task in this group and return it.
Expand Down Expand Up @@ -273,3 +280,30 @@ def _on_task_done(self, task):
self._abort()
self._parent_cancel_requested = True
self._parent_task.cancel()

def cancel(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about supporting cancel messages here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked on Gitter, but I'm still unclear about how such a message would be accessed and surfaced.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be logged by the task that gets cancelled, or useful in debugging

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it as-is and maybe add a message in the follow-up PR; this PR is big enough for the review.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked on Gitter, but I'm still unclear about how such a message would be accessed and surfaced.

My $0.02:

  1. Assuming that message gets passed into each task, indeed, those tasks can do something with it (like identifying who cancelled it -- this is a private protocol within an app or library).
  2. If we end up raising CancelledError out of the async with block, the same is true for whoever catches that CancelledError.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOW, in case I wasn't clear, yes, we should add cancel message support. (Even if in the end we renamed the method to stop() -- it'll still call .cancel() on many tasks that might want to participate in such a private protocol.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a cancel message is going to encourage some anti-patterns such as parsing the string to determine code flow (this is why we stopped raising bare strings). If I were reviewing code using a message this way, I'd ask to refactor the code to signal "I cancelled you" in some more direct way.

My sense is that this is much lower priority, and can always be reconsidered in the future.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevertheless it's a feature that most cancellable objects follow. And I don't think that anti-pattern has surfaced in practice. Why would TaskGroup be different in this respect? It is a feature third-party code uses, as a private protocol. Note that we don't even enforce that it is a string -- it can be anything, since it's just getting passed around.

For an implementation example, see Future.cancel.

"""Cancel the task group

`cancel()` will be called on any tasks in the group that aren't yet
done, as well as the parent (body) of the group. This will cause the
task group context manager to exit *without* `asyncio.CancelledError`
being raised.

If `cancel()` is called before entering the task group, the group will be
cancelled upon entry. This is useful for patterns where one piece of
code passes an unused TaskGroup instance to another in order to have
the ability to cancel anything run within the group.

`cancel()` is idempotent and may be called after the task group has
already exited.
"""
if not self._entered:
self._cancel_on_enter = True
return
if self._exiting and not self._tasks:
return
if not self._aborting:
self._abort()
if self._parent_task and not self._parent_cancel_requested:
self._parent_cancel_requested = True
self._parent_task.cancel()
88 changes: 88 additions & 0 deletions Lib/test/test_asyncio/test_taskgroups.py
Comment thread
belm0 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,94 @@ class MyKeyboardInterrupt(KeyboardInterrupt):
self.assertIsNotNone(exc)
self.assertListEqual(gc.get_referrers(exc), no_other_refs())

async def test_taskgroup_cancel_children(self):
Comment thread
belm0 marked this conversation as resolved.
# (asserting that TimeoutError is not raised)
async with asyncio.timeout(1):
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(10))
tg.create_task(asyncio.sleep(10))
await asyncio.sleep(0)
tg.cancel()

async def test_taskgroup_cancel_body(self):
count = 0
async with asyncio.TaskGroup() as tg:
tg.cancel()
count += 1
await asyncio.sleep(0)
count += 1
self.assertEqual(count, 1)

async def test_taskgroup_cancel_idempotent(self):
count = 0
async with asyncio.TaskGroup() as tg:
tg.cancel()
tg.cancel()
count += 1
await asyncio.sleep(0)
count += 1
self.assertEqual(count, 1)

async def test_taskgroup_cancel_after_exit(self):
async with asyncio.TaskGroup() as tg:
await asyncio.sleep(0)
# (asserting that exception is not raised)
tg.cancel()

async def test_taskgroup_cancel_before_enter(self):
tg = asyncio.TaskGroup()
tg.cancel()
count = 0
async with tg:
count += 1
await asyncio.sleep(0)
count += 1
self.assertEqual(count, 1)

async def test_taskgroup_cancel_before_create_task(self):
async with asyncio.TaskGroup() as tg:
tg.cancel()
# TODO: This behavior is not ideal. We'd rather have no exception
# raised, and the child task run until the first await.
with self.assertRaises(RuntimeError):
tg.create_task(asyncio.sleep(1))

async def test_taskgroup_cancel_before_exception(self):
async def raise_exc(parent_tg: asyncio.TaskGroup):
parent_tg.cancel()
raise RuntimeError

with self.assertRaises(ExceptionGroup):
async with asyncio.TaskGroup() as tg:
tg.create_task(raise_exc(tg))
await asyncio.sleep(1)

async def test_taskgroup_cancel_after_exception(self):
async def raise_exc(parent_tg: asyncio.TaskGroup):
try:
raise RuntimeError
finally:
parent_tg.cancel()

with self.assertRaises(ExceptionGroup):
async with asyncio.TaskGroup() as tg:
tg.create_task(raise_exc(tg))
Comment thread
gvanrossum marked this conversation as resolved.
await asyncio.sleep(1)

async def test_taskgroup_body_cancel_before_exception(self):
with self.assertRaises(ExceptionGroup):
async with asyncio.TaskGroup() as tg:
tg.cancel()
raise RuntimeError

async def test_taskgroup_body_cancel_after_exception(self):
with self.assertRaises(ExceptionGroup):
async with asyncio.TaskGroup() as tg:
try:
raise RuntimeError
finally:
tg.cancel()


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add :meth:`~asyncio.TaskGroup.cancel`.
Comment thread
belm0 marked this conversation as resolved.
Outdated