Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 217 additions & 75 deletions Source/Task/TaskQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,10 @@ HRESULT __stdcall TaskQueuePortImpl::QueueItem(
}
else
{
entry.enqueueTime = m_timer.GetAbsoluteTime(waitMs);
// Delayed callbacks are ordered by a monotonic due time so stale timer
// callbacks and wall-clock adjustments cannot make one pending entry
// masquerade as another.
entry.enqueueTime = m_timer.GetDueTime(waitMs);
RETURN_HR_IF(E_OUTOFMEMORY, !m_pendingList->push_back(entry));

// If the entry's enqueue time is < our current time,
Expand Down Expand Up @@ -955,18 +958,21 @@ void TaskQueuePortImpl::CancelPendingEntries(
_In_ ITaskQueuePortContext* portContext,
_In_ bool appendToQueue)
{
// Stop wait timer and promote pending callbacks that are used
// by the queue that invoked this termination. Other callbacks
// are placed back on the pending list.

m_timer.Cancel();
m_timerDue = UINT64_MAX;
// Only move entries owned by the terminating queue. Sibling delegates
// share this port's delayed-callback timer state, so leave m_timer and
// m_timerDue alone; if we removed the armed earliest entry, the existing
// timer simply takes one blank fire and re-arms for the next real item.
LocklessQueue<QueueEntry> entriesToAppend(*m_queueList.get());

m_pendingList->remove_if([&](auto& entry, auto address)
{
if (entry.portContext == portContext)
{
if (!appendToQueue || !AppendEntry(entry, address))
if (appendToQueue)
{
entriesToAppend.push_back(std::move(entry), address);
}
else
{
entry.portContext->Release();
m_pendingList->free_node(address);
Expand All @@ -978,7 +984,31 @@ void TaskQueuePortImpl::CancelPendingEntries(
return false;
});

SubmitPendingCallback();
while (appendToQueue)
{
QueueEntry entry = {};
uint64_t address = 0;
if (!entriesToAppend.pop_front(entry, address))
{
break;
}

if (!AppendEntry(entry, address))
{
entry.portContext->Release();
m_queueList->free_node(address);
}
}

#ifdef HC_UNITTEST_API
// Test hook: let unit tests enqueue a sibling delayed callback while this
// termination path still owns the interleaving window that used to race
// with SubmitPendingCallback().
if (auto hooks = portContext->GetQueue()->GetTestHooks(); hooks != nullptr)
{
hooks->PendingEntriesRemovedDuringTermination(portContext->GetType());
}
#endif

#ifdef _WIN32

Expand Down Expand Up @@ -1028,83 +1058,130 @@ void TaskQueuePortImpl::EraseQueue(
}
}

// Examines the pending callback list, optionally popping the entry off the
// list that matches m_timerDue, and schedules the timer for the next entry.
bool TaskQueuePortImpl::ScheduleNextPendingCallback(
// Promotes every delayed entry whose deadline has already arrived and then
// arms the timer for the next future deadline, if one remains.
//
// This replaces the older "pop exactly one entry whose enqueueTime matches the
// currently armed due time" flow. That older model made correctness depend on
// timestamps behaving like unique identities. By sweeping everything with
// enqueueTime <= now, equal-deadline siblings and stale timer callbacks both
// collapse into the same simple rule: if a callback is due, move it now; if it
// is still in the future, leave it pending and re-arm for the earliest future
// item.
void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
_In_ uint64_t dueTime,
_Out_ QueueEntry& dueEntry,
_Out_ uint64_t& dueEntryNode)
_In_ uint64_t now)
{
QueueEntry nextItem = {};
bool hasDueEntry = false;
bool hasNextItem = false;
for (;;)
{
// Collect due entries locally first and only touch the active queue
// after remove_if completes. Keeping the sweep phase and the publish
// phase separate preserves the "promote all ready entries" behavior
// without asking remove_if to coexist with queue wakeups and
// cross-queue node reuse at the same time.
LocklessQueue<QueueEntry> readyEntries(*m_queueList.get());

dueEntryNode = 0;
QueueEntry nextItem = {};
bool hasNextItem = false;

m_pendingList->remove_if([&](auto& entry, auto address)
{
if (!hasDueEntry && entry.enqueueTime == dueTime)
{
dueEntry = entry;
dueEntryNode = address;
hasDueEntry = true;
return true;
}
else if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
m_pendingList->remove_if([&](auto& entry, auto address)
{
// remove_if works by removing items from the list and
// re-adding them if this callback returns false. If we
// are going to keep an item beyond this callback we need
// to make sure fields we're using stay valid. Only the
// port context is a risk.
// Any entry whose deadline has passed is ready right now,
// regardless of whether its timestamp aliases another entry or
// whether this timer fire is the original notification or a
// stale callback that arrived late.
if (entry.enqueueTime <= now)
{
readyEntries.push_back(std::move(entry), address);

return true;
}

if (hasNextItem)
if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
{
nextItem.portContext->Release();
// remove_if works by removing items from the list and
// re-adding them if this callback returns false. If we
// are going to keep an item beyond this callback we need
// to make sure fields we're using stay valid. Only the
// port context is a risk.

if (hasNextItem)
{
nextItem.portContext->Release();
}

nextItem = entry;
nextItem.portContext->AddRef();
hasNextItem = true;
}

nextItem = entry;
nextItem.portContext->AddRef();
hasNextItem = true;
}
return false;
});

return false;
});
// Publish the ready entries after the pending-list walk finishes.
QueueEntry readyEntry = {};
uint64_t readyEntryNode = 0;
while (readyEntries.pop_front(readyEntry, readyEntryNode))
{
if (!AppendEntry(readyEntry, readyEntryNode))
{
readyEntry.portContext->Release();
m_queueList->free_node(readyEntryNode);
}
}

if (hasNextItem)
{
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
if (hasNextItem)
{
while (true)
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
{
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
while (true)
{
m_timer.Start(nextItem.enqueueTime);
break;
}
// Publish the earliest future deadline that survived the
// ready sweep. If another thread already armed an even
// earlier timer, leave that earlier deadline in place.
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
{
m_timer.Start(nextItem.enqueueTime);
break;
}

dueTime = m_timerDue.load();
dueTime = m_timerDue.load();

if (dueTime <= nextItem.enqueueTime)
{
break;
if (dueTime <= nextItem.enqueueTime)
{
break;
}
}
}
}
else
{
// The port is no longer active. Pending entries are canceled
// when the port is terminated, but if we were iterating above
// it's possible that we removed an item while the termination was
// being processed and it got missed.
CancelPendingEntries(nextItem.portContext, true);
else
{
// The port is no longer active. Pending entries are canceled
// when the port is terminated, but if we were iterating above
// it's possible that we removed an item while the termination
// was being processed and it got missed.
CancelPendingEntries(nextItem.portContext, true);
}

nextItem.portContext->Release();
return;
}

nextItem.portContext->Release();
}
else
{
// No future entries remain in the pending list.
uint64_t noDueTime = UINT64_MAX;

#ifdef HC_UNITTEST_API
m_attachedContexts.Visit([&](ITaskQueuePortContext* portContext)
{
auto hooks = portContext->GetQueue()->GetTestHooks();
if (hooks != nullptr)
{
hooks->NoNextPendingCallbackFound(
portContext->GetType(),
dueTime);
}
});
#endif

if (m_timerDue.compare_exchange_strong(dueTime, noDueTime))
{
// Bug fix: ScheduleNextPendingCallback timer race results
Expand All @@ -1118,6 +1195,7 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
// See VerifyDelayedCallbackTimerRaceOnManualQueue for full
// analysis. The test hook here allows unit tests to verify
// there is no race.
#ifdef HC_UNITTEST_API
m_attachedContexts.Visit([&](ITaskQueuePortContext* portContext)
{
auto hooks = portContext->GetQueue()->GetTestHooks();
Expand All @@ -1128,24 +1206,69 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
noDueTime);
}
});
#endif

// A concurrent QueueItem can append a future entry after our
// sweep has already concluded there is no next item, but before
// we publish UINT64_MAX here. Instead of recursing (which has
// no tail-call guarantee and risks stack growth under sustained
// contention), loop back for a rescue sweep. If nothing landed,
// the second pass is a cheap no-op.
if (dueTime != noDueTime)
{
now = m_timer.GetCurrentTime();
dueTime = noDueTime;
continue;
}
}
}

return hasDueEntry;
return;
}
}

void TaskQueuePortImpl::SubmitPendingCallback()
{
QueueEntry dueEntry;
uint64_t dueEntryNode;

if (ScheduleNextPendingCallback(m_timerDue.load(), dueEntry, dueEntryNode))
while (true)
{
if (!AppendEntry(dueEntry, dueEntryNode))
uint64_t dueTime = m_timerDue.load();

if (dueTime == UINT64_MAX)
{
return;
}

// Threadpool timer callbacks that were already queued can still arrive
// after the timer has been retargeted. Treat the callback as advisory and
// only sweep ready entries once the currently armed monotonic deadline has
// actually arrived.
//
// Important: do not just return on an "early" callback. On Win32 the
// threadpool timer's relative wait source is not the same clock object as
// std::chrono::steady_clock, so a legitimate one-shot fire can arrive a
// little before the stored steady-clock deadline. If we drop that callback
// without re-arming the timer, the pending entry can remain stranded until
// some unrelated later timer fire or termination path happens to flush it.
//
// Also do not blindly re-arm the due time we just read. Another thread can
// publish an earlier pending entry between the load above and Start() below.
// If this stale callback then overwrites the timer with the older deadline,
// the newer earlier entry can stay stranded until the older deadline fires.
// Only re-arm when m_timerDue still matches the due time we observed.
const uint64_t now = m_timer.GetCurrentTime();
if (now < dueTime)
{
dueEntry.portContext->Release();
m_queueList->free_node(dueEntryNode);
uint64_t expectedDueTime = dueTime;
if (m_timerDue.compare_exchange_weak(expectedDueTime, dueTime))
{
m_timer.Start(dueTime);
return;
}

continue;
}

PromoteReadyPendingCallbacks(dueTime, now);
return;
}
}

Expand Down Expand Up @@ -2341,6 +2464,7 @@ STDAPI_(bool) XTaskQueueUninitialize(
return ApiRefs::WaitZeroRefs(timeoutMilliseconds);
}

#ifdef HC_UNITTEST_API
/// <summary>
/// Sets or clears test hooks on a task queue.
/// </summary>
Expand All @@ -2353,4 +2477,22 @@ STDAPI XTaskQueueSetTestHooks(
RETURN_HR_IF(E_GAMERUNTIME_INVALID_HANDLE, aq == nullptr);
aq->SetTestHooks(hooks);
return S_OK;
}
}

STDAPI XTaskQueueSubmitPendingCallbackForTests(
_In_ XTaskQueueHandle queue,
_In_ XTaskQueuePort port
) noexcept
{
referenced_ptr<ITaskQueue> aq(GetQueue(queue));
RETURN_HR_IF(E_GAMERUNTIME_INVALID_HANDLE, aq == nullptr);

referenced_ptr<ITaskQueuePortContext> portContext;
RETURN_IF_FAILED(aq->GetPortContext(port, portContext.address_of()));

auto* portImpl = static_cast<TaskQueuePortImpl*>(portContext->GetPort());
portImpl->SubmitPendingCallbackForTests();
return S_OK;
}
#endif

Loading