diff --git a/CMakeLists.txt b/CMakeLists.txt index c2e5ca7..42c1b7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,6 +8,12 @@ option(FAST_TASK_ENABLE_ABORT_IF_NEVER_STARTED "Abort if task's destructor calle option(FAST_TASK_ENABLE_PREEMPTIVE_SCHEDULER "Enables time sliced preemption for tasks, the tasks should use interrupt_unsafe_region on regions where preemption should be disabled." OFF) option(FAST_TASK_INCLUDE_THREAD_INTERRUPT_CODE "Allows the code to stop the thread and execute custom function on top of its stack, doesn't have effect if FAST_TASK_ENABLE_PREEMPTIVE_SCHEDULER enabled" ON) +set(FAST_TASK_GUARD_PAGE_COUNT 1 CACHE STRING + "Number of PROT_NONE (Linux) / PAGE_GUARD (Windows) pages placed at the bottom of each task stack. Set to 0 to disable guard pages entirely.") +if(NOT FAST_TASK_GUARD_PAGE_COUNT MATCHES "^[0-9]+$") + message(FATAL_ERROR "FAST_TASK_GUARD_PAGE_COUNT must be a non-negative integer.") +endif() + set(FAST_TASK_EXCEPTION_POLICY "NONE" CACHE STRING @@ -99,6 +105,8 @@ elseif(FAST_TASK_EXCEPTION_POLICY STREQUAL "PRESERVE") target_compile_definitions(fast_task PRIVATE FT_EXCEPTION_POLICY_PRESERVE) endif() +target_compile_definitions(fast_task PRIVATE FT_GUARD_PAGE_COUNT=${FAST_TASK_GUARD_PAGE_COUNT}) + find_package(Boost COMPONENTS context lockfree) if(NOT Boost_FOUND) FetchContent_Declare( @@ -175,6 +183,15 @@ else() target_compile_options(fast_task PRIVATE -Wall) target_compile_options(fast_task PRIVATE -Wextra) target_compile_options(fast_task PRIVATE --pedantic) + + # Use real Valgrind headers when available; fall back to lightweight stubs otherwise. + find_path(VALGRIND_INCLUDE_DIR valgrind/valgrind.h) + if(VALGRIND_INCLUDE_DIR) + target_include_directories(fast_task PRIVATE ${VALGRIND_INCLUDE_DIR}) + else() + target_include_directories(fast_task PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/third_party/valgrind-stubs) + endif() endif() if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") diff --git a/include/task/scheduler.hpp b/include/task/scheduler.hpp index d9945e2..88e83eb 100644 --- a/include/task/scheduler.hpp +++ b/include/task/scheduler.hpp @@ -57,7 +57,7 @@ namespace fast_task { uint16_t FT_API create_bind_only_executor(uint16_t fixed_count, bool allow_implicit_start, executor_policy policy = executor_policy::default_policy); void FT_API assign_bind_only_executor(uint16_t id, uint16_t fixed_count, bool allow_implicit_start, executor_policy policy = executor_policy::default_policy); - void FT_API close_bind_only_executor(uint16_t id); + void FT_API close_bind_only_executor(uint16_t id, bool abort_tasks = false); void FT_API create_executor(size_t count = 1); size_t FT_API total_executors(); diff --git a/src/tasks/_internal.hpp b/src/tasks/_internal.hpp index cfb9ad7..60ab0a3 100644 --- a/src/tasks/_internal.hpp +++ b/src/tasks/_internal.hpp @@ -221,6 +221,7 @@ namespace fast_task { bool in_close : 1 = false; bool allow_implicit_start : 1 = false; bool fixed_size : 1 = false; + bool abort_tasks_on_close : 1 = false; scheduler::executor_policy policy = scheduler::executor_policy::default_policy; }; diff --git a/src/tasks/classes/task/scheduler.cpp b/src/tasks/classes/task/scheduler.cpp index 06e3cb8..2e19979 100644 --- a/src/tasks/classes/task/scheduler.cpp +++ b/src/tasks/classes/task/scheduler.cpp @@ -125,7 +125,7 @@ namespace fast_task::scheduler { } } - void close_bind_only_executor(uint16_t id) { + void close_bind_only_executor(uint16_t id, bool abort_tasks) { mutex_unify unify(glob.binded_workers_safety); fast_task::unique_lock guard(unify); decltype(glob.binded_workers[id].tasks) transfer_tasks; @@ -149,6 +149,7 @@ namespace fast_task::scheduler { if (context.in_close) return; context.in_close = true; + context.abort_tasks_on_close = abort_tasks; std::swap(transfer_tasks, context.tasks); for (uint16_t i = 0; i < context.executors; i++) { @@ -171,8 +172,32 @@ namespace fast_task::scheduler { glob.binded_workers.erase(id); } std::shared_ptr task; - while (transfer_tasks.try_dequeue(task)) - transfer_task(std::move(task)); + while (transfer_tasks.try_dequeue(task)) { + if (!abort_tasks) { + transfer_task(std::move(task)); + continue; + } + if (!task) + continue; + + bool should_decrement = false; + { + fast_task::lock_guard task_guard(get_data(task).no_race); + if (!get_data(task).completed) { + get_data(task).completed = true; + get_data(task).end_of_life = true; + get_data(task).started = true; + should_decrement = true; + } + get_data(task).result_notify.notify_all(); + } + + if (should_decrement) { + --glob.executing_tasks; + fast_task::shared_lock notify_guard(glob.task_thread_safety); + glob.no_tasks_execute_notifier.notify_all_guarded(); + } + } } void create_executor(size_t count) { diff --git a/src/tasks/scheduler.cpp b/src/tasks/scheduler.cpp index ee8dcf6..1f2f2ad 100644 --- a/src/tasks/scheduler.cpp +++ b/src/tasks/scheduler.cpp @@ -281,7 +281,20 @@ namespace fast_task { data.started = true; data.result_notify.notify_all(); } catch (...) { - loc.ex_ptr = std::current_exception(); //TODO pass this to the callback + loc.ex_ptr = std::current_exception(); + } + if (loc.ex_ptr) { + if (data.callbacks.on_exception) { + try { + data.callbacks.on_exception(data.callbacks.get_data(), loc.ex_ptr); + loc.ex_ptr = nullptr; + } catch (const task_cancellation& cancel) { + forceCancelCancellation(cancel); + loc.ex_ptr = nullptr; + } catch (...) { + loc.ex_ptr = std::current_exception(); + } + } fast_task::lock_guard guard(data.no_race); data.end_of_life = true; data.started = true; @@ -738,9 +751,28 @@ namespace fast_task { if (context.executors == 0) { if (context.in_close) { while (context.tasks.size_approx()) - while (context.tasks.try_dequeue(loc.curr_task)) { //TODO add option to abort if there still tasks in queue - get_data(loc.curr_task).bind_to_worker_id = (uint16_t)-1; - glob.tasks.enqueue(std::move(loc.curr_task)); + while (context.tasks.try_dequeue(loc.curr_task)) { + if (context.abort_tasks_on_close) { + bool should_decrement = false; + { + fast_task::lock_guard task_guard(get_data(loc.curr_task).no_race); + if (!get_data(loc.curr_task).completed) { + get_data(loc.curr_task).completed = true; + get_data(loc.curr_task).end_of_life = true; + get_data(loc.curr_task).started = true; + should_decrement = true; + } + get_data(loc.curr_task).result_notify.notify_all(); + } + if (should_decrement) { + --glob.executing_tasks; + fast_task::shared_lock notify_guard(glob.task_thread_safety); + glob.no_tasks_execute_notifier.notify_all_guarded(); + } + } else { + get_data(loc.curr_task).bind_to_worker_id = (uint16_t)-1; + glob.tasks.enqueue(std::move(loc.curr_task)); + } } glob.tasks_notifier.unsafe_notify_all(); context.on_closed_notifier.notify_all(); diff --git a/src/tasks/util/light_stack.cpp b/src/tasks/util/light_stack.cpp index c71ea5a..d84ef25 100644 --- a/src/tasks/util/light_stack.cpp +++ b/src/tasks/util/light_stack.cpp @@ -24,6 +24,11 @@ namespace fast_task { } #if PLATFORM_WINDOWS #include + + #ifndef FT_GUARD_PAGE_COUNT + #define FT_GUARD_PAGE_COUNT 1 + #endif + size_t page_size = []() { SYSTEM_INFO si; GetSystemInfo(&si); @@ -32,7 +37,7 @@ size_t page_size = []() { namespace fast_task { stack_context create_stack(size_t size) { - const size_t guard_page_size = page_size; + const size_t guard_page_size = page_size * FT_GUARD_PAGE_COUNT; void* vp = ::VirtualAlloc(0, size, MEM_RESERVE, PAGE_READWRITE); if (!vp) @@ -47,12 +52,15 @@ namespace fast_task { throw std::bad_alloc(); } - // create guard page so the OS can catch page faults and grow our stack +#if FT_GUARD_PAGE_COUNT > 0 + // create guard page(s) so the OS can catch stack overflows (fast-fail) pPtr -= guard_page_size; if (!VirtualAlloc(pPtr, guard_page_size, MEM_COMMIT, PAGE_READWRITE | PAGE_GUARD)) { VirtualFree(vp, size, MEM_FREE); throw std::bad_alloc(); } +#endif + stack_context sctx; sctx.size = size; sctx.sp = static_cast(vp) + sctx.size; @@ -62,10 +70,11 @@ namespace fast_task { light_stack::light_stack(size_t size) BOOST_NOEXCEPT_OR_NOTHROW : size(size) {} stack_context light_stack::allocate() { - const size_t guard_page_size = page_size; - const size_t pages = (size + guard_page_size + page_size - 1) / page_size; - // add one page at bottom that will be used as guard-page - const size_t size__ = (pages + 1) * page_size; + const size_t guard_page_size = page_size * FT_GUARD_PAGE_COUNT; + // Allocate size + guard_page_size so the usable portion is exactly 'size', + // regardless of guard page configuration (a large FT_GUARD_PAGE_COUNT would + // otherwise consume the entire requested allocation). + const size_t size__ = ((size + guard_page_size + page_size - 1) / page_size) * page_size; stack_context result; if (stack_allocations.try_dequeue(result)) { @@ -73,7 +82,10 @@ namespace fast_task { if (!flush_used_stacks) return result; else { - memset(static_cast(result.sp) - result.size, 0xCC, result.size); + auto* stack_base = static_cast(result.sp) - result.size; + const size_t clear_offset = std::min(guard_page_size, result.size); + if (clear_offset < result.size) + memset(stack_base + clear_offset, 0xCC, result.size - clear_offset); return result; } } else @@ -110,121 +122,46 @@ namespace fast_task { } } #elif PLATFORM_LINUX - #include #include #include #include #include #include + #ifndef FT_GUARD_PAGE_COUNT + #define FT_GUARD_PAGE_COUNT 1 + #endif + namespace fast_task { static const size_t page_size = boost::context::stack_traits::page_size(); - static const size_t guard_page_size = boost::context::stack_traits::page_size(); - - //void stack_growth_handler(int sig, siginfo_t* si, void* ucontext); - // - //static thread_local struct old___ { - // struct sigaction handler; - // stack_t stack; - // bool is_init = false; - // - // void init() { - // if (is_init) - // return; - // is_init = true; - // stack_t ss; - // ss.ss_sp = mmap(nullptr, SIGSTKSZ, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - // ss.ss_size = SIGSTKSZ; - // ss.ss_flags = 0; - // if (sigaltstack(&ss, &stack) == -1) { - // perror("sigaltstack"); - // exit(EXIT_FAILURE); - // } - // struct sigaction sa; - // sigemptyset(&sa.sa_mask); - // sa.sa_sigaction = stack_growth_handler; - // sa.sa_flags = SA_SIGINFO | SA_ONSTACK; - // if (sigaction(SIGSEGV, &sa, &handler) == -1) { - // perror("sigaction"); - // exit(EXIT_FAILURE); - // } - // } - // - // ~old___() { - // if (sigaltstack(&stack, &stack) == -1) - // perror("sigaltstack"); - // if (munmap(static_cast(stack.ss_sp), SIGSTKSZ) == -1) - // perror("munmap"); - // if (sigaction(SIGSEGV, &handler, NULL) == -1) - // perror("sigaction failed in library destructor"); - // } - //} old_data; - // - //void pass_handler(int sig, siginfo_t* si, void* ucontext) { - // if (old_data.handler.sa_flags & SA_SIGINFO) - // old_data.handler.sa_sigaction(sig, si, ucontext); - // else if (old_data.handler.sa_handler == SIG_DFL) { - // signal(sig, SIG_DFL); - // raise(sig); - // } else if (old_data.handler.sa_handler != SIG_IGN) - // old_data.handler.sa_handler(sig); - //} - // - //void stack_growth_handler(int sig, siginfo_t* si, void* ucontext) { - // if (!loc.curr_task) { //definitely not ours stack - // pass_handler(sig, si, ucontext); - // return; - // } else if (!get_data(loc.curr_task).data) { //avoid alloc - // pass_handler(sig, si, ucontext); - // return; - // } - // - // void* fault_addr = si->si_addr; - // void* stack_start = get_execution_data(loc.curr_task).stack_ptr; - // void* stack_end = static_cast(stack_start) + get_execution_data(loc.curr_task).stack_size; - // - // if (fault_addr >= stack_start && fault_addr < stack_end) { - // void* page_start = (void*)((uintptr_t)fault_addr & ~(page_size - 1)); - // if (mprotect(page_start, page_size, PROT_READ | PROT_WRITE) == -1) { - // if (is_debugger_attached()) { - // pass_handler(sig, si, ucontext); - // return; - // } - // psignal(sig, "mprotect failed in signal handler"); - // _exit(EXIT_FAILURE); - // } - // if (RUNNING_ON_VALGRIND) - // VALGRIND_MAKE_MEM_DEFINED(page_start, page_size); - // return; - // } else - // pass_handler(sig, si, ucontext); - //} + static const size_t guard_page_size = page_size * FT_GUARD_PAGE_COUNT; void __install_signal_handler_mem() { - //old_data.init(); + // Guard pages serve as fast-fail sentinels only; no signal handler is installed. } - //TODO create proper guard page + //create proper guard page stack_context create_stack(size_t size) { size_t total_size = std::max(size, page_size * 3); - void* vp = mmap(nullptr, total_size, /*PROT_NONE*/ PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (!vp) + void* vp = mmap(nullptr, total_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (vp == MAP_FAILED) throw std::bad_alloc(); - // needs at least 3 pages to fully construct the coroutine and switch to it - //const auto init_commit_size = page_size * 3; - //auto commit_start = static_cast(vp) + total_size - init_commit_size; - //if (mprotect(commit_start, init_commit_size, PROT_READ | PROT_WRITE) == -1) { - // munmap(vp, total_size); - // throw std::bad_alloc(); - //} +#if FT_GUARD_PAGE_COUNT > 0 + // Create PROT_NONE guard page(s) at the bottom of the stack. + // A stack overflow will trigger SIGSEGV, terminating the process fast. + if (mprotect(vp, guard_page_size, PROT_NONE) == -1) { + munmap(vp, total_size); + throw std::bad_alloc(); + } +#endif + if (RUNNING_ON_VALGRIND) { - void* stack_bottom = vp; + void* stack_bottom = static_cast(vp) + guard_page_size; void* stack_top = static_cast(vp) + total_size; get_execution_data(loc.curr_task).valgrind_stack_id = VALGRIND_STACK_REGISTER(stack_bottom, stack_top); } - //PROT_NONE already used for guard page stack_context sctx; sctx.size = size; sctx.sp = static_cast(vp) + sctx.size; @@ -246,9 +183,10 @@ namespace fast_task { light_stack::light_stack(size_t size) BOOST_NOEXCEPT_OR_NOTHROW : size(size) {} stack_context light_stack::allocate() { - const size_t pages = (size + guard_page_size + page_size - 1) / page_size; - // add one page at bottom that will be used as guard-page - const size_t size__ = (pages + 1) * page_size; + // Allocate size + guard_page_size so the usable portion is exactly 'size', + // regardless of guard page configuration (a large FT_GUARD_PAGE_COUNT would + // otherwise consume the entire requested allocation). + const size_t size__ = ((size + guard_page_size + page_size - 1) / page_size) * page_size; stack_context result; if (stack_allocations.try_dequeue(result)) { @@ -256,7 +194,10 @@ namespace fast_task { if (!flush_used_stacks) return result; else { - memset(static_cast(result.sp) - result.size, 0xCC, result.size); + auto* stack_base = static_cast(result.sp) - result.size; + const size_t clear_offset = std::min(guard_page_size, result.size); + if (clear_offset < result.size) + memset(stack_base + clear_offset, 0xCC, result.size - clear_offset); return result; } } else @@ -295,4 +236,4 @@ namespace fast_task { #else #error Unsupported platform -#endif \ No newline at end of file +#endif diff --git a/tests/scheduler/test_bind_executor.cpp b/tests/scheduler/test_bind_executor.cpp index d60922b..87c854e 100644 --- a/tests/scheduler/test_bind_executor.cpp +++ b/tests/scheduler/test_bind_executor.cpp @@ -68,3 +68,20 @@ TEST(BindExecutor, SetWorkerIdOnTask) { fast_task::scheduler::close_bind_only_executor(id); fast_task::scheduler::shut_down(); } + +TEST(BindExecutor, CloseAbortTasksAbortsQueuedAndAllowsShutdown) { + fast_task::scheduler::create_executor(2); + uint16_t id = fast_task::scheduler::create_bind_only_executor(0, false); + std::atomic queued_ran{false}; + + auto queued = std::make_shared([&] { queued_ran.store(true, std::memory_order_release); }); + queued->set_worker_id(id); + fast_task::scheduler::start(queued); + + fast_task::scheduler::close_bind_only_executor(id, true); + queued->await_task(); + EXPECT_FALSE(queued_ran.load(std::memory_order_acquire)); + + fast_task::scheduler::await_no_tasks(); + fast_task::scheduler::shut_down(); +} diff --git a/tests/stackfull/CMakeLists.txt b/tests/stackfull/CMakeLists.txt index fe6e92b..7600de5 100644 --- a/tests/stackfull/CMakeLists.txt +++ b/tests/stackfull/CMakeLists.txt @@ -1,3 +1,4 @@ add_ft_test(test_stackfull_basic test_stackfull_basic.cpp) add_ft_test(test_stackfull_sleep test_stackfull_sleep.cpp) add_ft_test(test_stackfull_sync test_stackfull_sync.cpp) +add_ft_test(test_stackfull_guard test_stackfull_guard.cpp) diff --git a/tests/stackfull/test_stackfull_guard.cpp b/tests/stackfull/test_stackfull_guard.cpp new file mode 100644 index 0000000..470390f --- /dev/null +++ b/tests/stackfull/test_stackfull_guard.cpp @@ -0,0 +1,66 @@ +// Copyright Danyil Melnytskyi 2025-Present +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include + +class StackfullGuardTest : public SchedulerFixture {}; + +// --------------------------------------------------------------------------- +// Exception-callback tests (platform independent) +// --------------------------------------------------------------------------- + +// Verify that a normal C++ exception thrown inside a task reaches the +// on_exception callback with the correct type and message. +TEST_F(StackfullGuardTest, ExceptionCallback_ReceivesCorrectException) { + std::atomic called{false}; + std::string message; + + auto t = std::make_shared( + [] { throw std::runtime_error("guard_test"); }, + [&](const std::exception_ptr& ep) { + called.store(true); + try { + std::rethrow_exception(ep); + } catch (const std::runtime_error& e) { + message = e.what(); + } + } + ); + fast_task::scheduler::start(t); + t->await_task(); + + EXPECT_TRUE(called.load()); + EXPECT_EQ(message, "guard_test"); +} + +// Verify that RAII destructors run for local objects when a normal exception +// is thrown inside a task. +TEST_F(StackfullGuardTest, ExceptionCallback_RaiiDestructorCalledOnException) { + struct DtorGuard { + std::atomic& flag; + ~DtorGuard() { flag.store(true, std::memory_order_release); } + }; + + std::atomic dtor_called{false}; + std::atomic handler_called{false}; + + auto t = std::make_shared( + [&] { + DtorGuard guard{dtor_called}; + throw std::runtime_error("raii_test"); + }, + [&](const std::exception_ptr&) { + handler_called.store(true); + } + ); + fast_task::scheduler::start(t); + t->await_task(); + + EXPECT_TRUE(handler_called.load()) << "Exception handler must be called"; + EXPECT_TRUE(dtor_called.load()) << "RAII destructor must run before handler"; +} diff --git a/third_party/valgrind-stubs/valgrind/memcheck.h b/third_party/valgrind-stubs/valgrind/memcheck.h new file mode 100644 index 0000000..78f22d5 --- /dev/null +++ b/third_party/valgrind-stubs/valgrind/memcheck.h @@ -0,0 +1,12 @@ +// Minimal memcheck stub used when the real Valgrind headers are not installed. +#ifndef VALGRIND_MEMCHECK_STUB_H +#define VALGRIND_MEMCHECK_STUB_H + +#include "valgrind.h" + +#define VALGRIND_MAKE_MEM_DEFINED(addr, size) do {} while (0) +#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size) do {} while (0) +#define VALGRIND_MAKE_MEM_NOACCESS(addr, size) do {} while (0) +#define VALGRIND_CHECK_MEM_IS_DEFINED(addr, size) ((unsigned int)0) + +#endif /* VALGRIND_MEMCHECK_STUB_H */ diff --git a/third_party/valgrind-stubs/valgrind/valgrind.h b/third_party/valgrind-stubs/valgrind/valgrind.h new file mode 100644 index 0000000..72013ed --- /dev/null +++ b/third_party/valgrind-stubs/valgrind/valgrind.h @@ -0,0 +1,12 @@ +// Minimal valgrind stub used when the real Valgrind headers are not installed. +// All macros expand to no-ops; RUNNING_ON_VALGRIND is always 0 at runtime. +#ifndef VALGRIND_STUB_H +#define VALGRIND_STUB_H + +#define RUNNING_ON_VALGRIND 0 + +#define VALGRIND_STACK_REGISTER(start, end) ((unsigned int)0) +#define VALGRIND_STACK_DEREGISTER(id) do {} while (0) +#define VALGRIND_STACK_CHANGE(id, start, end) do {} while (0) + +#endif /* VALGRIND_STUB_H */