Skip to content
17 changes: 17 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ option(FAST_TASK_ENABLE_ABORT_IF_NEVER_STARTED "Abort if task's destructor calle
option(FAST_TASK_ENABLE_PREEMPTIVE_SCHEDULER "Enables time sliced preemption for tasks, the tasks should use interrupt_unsafe_region on regions where preemption should be disabled." OFF)
option(FAST_TASK_INCLUDE_THREAD_INTERRUPT_CODE "Allows the code to stop the thread and execute custom function on top of its stack, doesn't have effect if FAST_TASK_ENABLE_PREEMPTIVE_SCHEDULER enabled" ON)

set(FAST_TASK_GUARD_PAGE_COUNT 1 CACHE STRING
"Number of PROT_NONE (Linux) / PAGE_GUARD (Windows) pages placed at the bottom of each task stack. Set to 0 to disable guard pages entirely.")
if(NOT FAST_TASK_GUARD_PAGE_COUNT MATCHES "^[0-9]+$")
message(FATAL_ERROR "FAST_TASK_GUARD_PAGE_COUNT must be a non-negative integer.")
Comment thread
Melnytskyi marked this conversation as resolved.
endif()


set(FAST_TASK_EXCEPTION_POLICY "NONE" CACHE
STRING
Expand Down Expand Up @@ -99,6 +105,8 @@ elseif(FAST_TASK_EXCEPTION_POLICY STREQUAL "PRESERVE")
target_compile_definitions(fast_task PRIVATE FT_EXCEPTION_POLICY_PRESERVE)
endif()

Comment thread
Melnytskyi marked this conversation as resolved.
target_compile_definitions(fast_task PRIVATE FT_GUARD_PAGE_COUNT=${FAST_TASK_GUARD_PAGE_COUNT})

find_package(Boost COMPONENTS context lockfree)
if(NOT Boost_FOUND)
FetchContent_Declare(
Expand Down Expand Up @@ -175,6 +183,15 @@ else()
target_compile_options(fast_task PRIVATE -Wall)
target_compile_options(fast_task PRIVATE -Wextra)
target_compile_options(fast_task PRIVATE --pedantic)

# Use real Valgrind headers when available; fall back to lightweight stubs otherwise.
find_path(VALGRIND_INCLUDE_DIR valgrind/valgrind.h)
if(VALGRIND_INCLUDE_DIR)
target_include_directories(fast_task PRIVATE ${VALGRIND_INCLUDE_DIR})
else()
target_include_directories(fast_task PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/third_party/valgrind-stubs)
endif()
endif()

if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
Expand Down
2 changes: 1 addition & 1 deletion include/task/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace fast_task {

uint16_t FT_API create_bind_only_executor(uint16_t fixed_count, bool allow_implicit_start, executor_policy policy = executor_policy::default_policy);
void FT_API assign_bind_only_executor(uint16_t id, uint16_t fixed_count, bool allow_implicit_start, executor_policy policy = executor_policy::default_policy);
Comment thread
Melnytskyi marked this conversation as resolved.
void FT_API close_bind_only_executor(uint16_t id);
void FT_API close_bind_only_executor(uint16_t id, bool abort_tasks = false);
Comment thread
Melnytskyi marked this conversation as resolved.

void FT_API create_executor(size_t count = 1);
size_t FT_API total_executors();
Expand Down
1 change: 1 addition & 0 deletions src/tasks/_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ namespace fast_task {
bool in_close : 1 = false;
bool allow_implicit_start : 1 = false;
bool fixed_size : 1 = false;
bool abort_tasks_on_close : 1 = false;
scheduler::executor_policy policy = scheduler::executor_policy::default_policy;
};

Expand Down
31 changes: 28 additions & 3 deletions src/tasks/classes/task/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ namespace fast_task::scheduler {
}
}

void close_bind_only_executor(uint16_t id) {
void close_bind_only_executor(uint16_t id, bool abort_tasks) {
mutex_unify unify(glob.binded_workers_safety);
fast_task::unique_lock guard(unify);
decltype(glob.binded_workers[id].tasks) transfer_tasks;
Expand All @@ -149,6 +149,7 @@ namespace fast_task::scheduler {
if (context.in_close)
return;
context.in_close = true;
context.abort_tasks_on_close = abort_tasks;

std::swap(transfer_tasks, context.tasks);
Comment thread
Melnytskyi marked this conversation as resolved.
for (uint16_t i = 0; i < context.executors; i++) {
Expand All @@ -171,8 +172,32 @@ namespace fast_task::scheduler {
glob.binded_workers.erase(id);
}
std::shared_ptr<task> task;
while (transfer_tasks.try_dequeue(task))
transfer_task(std::move(task));
while (transfer_tasks.try_dequeue(task)) {
if (!abort_tasks) {
Comment thread
Melnytskyi marked this conversation as resolved.
transfer_task(std::move(task));
continue;
}
if (!task)
continue;

bool should_decrement = false;
{
fast_task::lock_guard task_guard(get_data(task).no_race);
if (!get_data(task).completed) {
get_data(task).completed = true;
get_data(task).end_of_life = true;
get_data(task).started = true;
should_decrement = true;
}
get_data(task).result_notify.notify_all();
}

if (should_decrement) {
--glob.executing_tasks;
fast_task::shared_lock notify_guard(glob.task_thread_safety);
glob.no_tasks_execute_notifier.notify_all_guarded();
}
}
}

void create_executor(size_t count) {
Expand Down
40 changes: 36 additions & 4 deletions src/tasks/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,20 @@ namespace fast_task {
data.started = true;
data.result_notify.notify_all();
} catch (...) {
loc.ex_ptr = std::current_exception(); //TODO pass this to the callback
loc.ex_ptr = std::current_exception();
}
if (loc.ex_ptr) {
if (data.callbacks.on_exception) {
try {
data.callbacks.on_exception(data.callbacks.get_data(), loc.ex_ptr);
loc.ex_ptr = nullptr;
Comment thread
Melnytskyi marked this conversation as resolved.
} catch (const task_cancellation& cancel) {
forceCancelCancellation(cancel);
loc.ex_ptr = nullptr;
} catch (...) {
loc.ex_ptr = std::current_exception();
}
}
fast_task::lock_guard guard(data.no_race);
data.end_of_life = true;
data.started = true;
Expand Down Expand Up @@ -738,9 +751,28 @@ namespace fast_task {
if (context.executors == 0) {
if (context.in_close) {
while (context.tasks.size_approx())
while (context.tasks.try_dequeue(loc.curr_task)) { //TODO add option to abort if there still tasks in queue
get_data(loc.curr_task).bind_to_worker_id = (uint16_t)-1;
glob.tasks.enqueue(std::move(loc.curr_task));
while (context.tasks.try_dequeue(loc.curr_task)) {
if (context.abort_tasks_on_close) {
bool should_decrement = false;
{
fast_task::lock_guard task_guard(get_data(loc.curr_task).no_race);
if (!get_data(loc.curr_task).completed) {
get_data(loc.curr_task).completed = true;
get_data(loc.curr_task).end_of_life = true;
get_data(loc.curr_task).started = true;
should_decrement = true;
}
get_data(loc.curr_task).result_notify.notify_all();
}
if (should_decrement) {
--glob.executing_tasks;
fast_task::shared_lock notify_guard(glob.task_thread_safety);
glob.no_tasks_execute_notifier.notify_all_guarded();
}
} else {
get_data(loc.curr_task).bind_to_worker_id = (uint16_t)-1;
glob.tasks.enqueue(std::move(loc.curr_task));
}
}
glob.tasks_notifier.unsafe_notify_all();
context.on_closed_notifier.notify_all();
Expand Down
Loading