Skip to content

Commit 3281572

Browse files
committed
tpool: fix tpool_wait returning before queued work has started
The non-stopping branch of tpool_wait's predicate checked only working_cnt, which is the number of jobs currently being executed by a worker. working_cnt is incremented when a worker dequeues a job, not when a producer enqueues one, so: tp = tpool_create(N); for (i = 0; i < M; i++) tpool_add_work(tp, job, ctx); tpool_wait(tp); /* could return immediately */ /* read ctx state here, expecting all M jobs to have run */ was free to return before any worker had picked up the first job. Producers that outpaced the workers' first dequeue would observe zero or partial completion. The worker's completion-side signal already covers the correct condition (tpool.c:139 signals working_cond only when both working_cnt == 0 and work_first == NULL), so the fix is just to widen the wait predicate to match: also wait while work_first is non-NULL. The only in-tree caller is cores/libretro-ffmpeg/ffmpeg_core.c, which uses tpool_wait to drain the pool before clearing the video buffer. All three call sites benefit from the corrected semantics; none rely on the previous early-return behaviour. Adds a regression test under libretro-common/samples/rthreads/tpool_wait_test/ covering: - tpool_add_work then tpool_wait drains all jobs - tpool_create(0) defaults to a working pool - tpool_create / tpool_destroy roundtrip is heap-clean - tpool_destroy with queued-but-unrun work is heap-clean (tpool_destroy is documented to discard such work) Hooks the new sample into the Linux libretro-common samples workflow. Built and run under SANITIZER=address,undefined like the other samples; completes in under one second on a github- hosted runner. Verified the test catches the bug: against unpatched tpool.c, test_work_executes_once and test_zero_threads_default fail with counter=0 under ASan + UBSan.
1 parent b356250 commit 3281572

4 files changed

Lines changed: 360 additions & 2 deletions

File tree

.github/workflows/Linux-libretro-common-samples.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ jobs:
7878
rpng_roundtrip_test
7979
word_wrap_overflow_test
8080
task_queue_title_error_test
81+
tpool_wait_test
8182
)
8283
8384
# Per-binary run command (overrides ./<binary> if present).

libretro-common/rthreads/tpool.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,15 @@ void tpool_wait(tpool_t *tp)
281281
{
282282
/* working_cond is dual use. It signals when we're not stopping but the
283283
* working_cnt is 0 indicating there isn't any work processing. If we
284-
* are stopping it will trigger when there aren't any threads running. */
285-
if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0))
284+
* are stopping it will trigger when there aren't any threads running.
285+
*
286+
* The non-stopping branch must also wait while work_first is non-NULL:
287+
* a tpool_wait racing tpool_add_work would otherwise return before any
288+
* worker has dequeued the just-added work (working_cnt still 0). The
289+
* worker's completion path signals working_cond only when both
290+
* working_cnt == 0 and work_first == NULL, so this predicate matches
291+
* the signal exactly. */
292+
if ((!tp->stop && (tp->work_first || tp->working_cnt != 0)) || (tp->stop && tp->thread_cnt != 0))
286293
scond_wait(tp->working_cond, tp->work_mutex);
287294
else
288295
break;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
TARGET := tpool_wait_test
2+
3+
LIBRETRO_COMM_DIR := ../../..
4+
5+
# tpool.c depends on rthreads.c. Both are self-contained at the
6+
# libretro-common layer (rthreads.c is the platform abstraction over
7+
# pthreads / Win32 / etc.) so we just compile them in directly --
8+
# no other libretro-common sources are needed.
9+
SOURCES := \
10+
tpool_wait_test.c \
11+
$(LIBRETRO_COMM_DIR)/rthreads/tpool.c \
12+
$(LIBRETRO_COMM_DIR)/rthreads/rthreads.c
13+
14+
OBJS := $(SOURCES:.c=.o)
15+
16+
CFLAGS += -Wall -pedantic -std=gnu99 -g -O0 -I$(LIBRETRO_COMM_DIR)/include
17+
LDFLAGS += -lpthread
18+
19+
# rthreads.c uses clock_gettime + CLOCK_REALTIME on Linux glibc; on
20+
# older glibc those live in -lrt. Harmless on newer glibc.
21+
LDFLAGS += -lrt
22+
23+
ifneq ($(SANITIZER),)
24+
CFLAGS := -fsanitize=$(SANITIZER) -fno-omit-frame-pointer $(CFLAGS)
25+
LDFLAGS := -fsanitize=$(SANITIZER) $(LDFLAGS)
26+
endif
27+
28+
all: $(TARGET)
29+
30+
%.o: %.c
31+
$(CC) -c -o $@ $< $(CFLAGS)
32+
33+
$(TARGET): $(OBJS)
34+
$(CC) -o $@ $^ $(LDFLAGS)
35+
36+
clean:
37+
rm -f $(TARGET) $(OBJS)
38+
39+
.PHONY: clean
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/* Regression test for the tpool_wait predicate in
2+
* libretro-common/rthreads/tpool.c.
3+
*
4+
* Background
5+
* ----------
6+
* tpool_wait is the public "wait until all queued work has been
7+
* processed" entry point. Pre-fix, its predicate was:
8+
*
9+
* (!tp->stop && tp->working_cnt != 0) ||
10+
* (tp->stop && tp->thread_cnt != 0)
11+
*
12+
* That is, in the non-stopping case it waited only on working_cnt.
13+
* working_cnt is the number of jobs currently being executed by a
14+
* worker -- it is incremented when a worker pops a job off the
15+
* queue, not when a producer pushes one on.
16+
*
17+
* So the following sequence:
18+
*
19+
* tp = tpool_create(N);
20+
* for (i = 0; i < M; i++) tpool_add_work(tp, job, ctx);
21+
* tpool_wait(tp); // <-- can return immediately
22+
* // ctx state read here, expecting all M jobs to have run
23+
*
24+
* could return from tpool_wait before any worker had picked up the
25+
* first job: working_cnt was 0 because no worker had yet dequeued.
26+
* Callers that read shared state after tpool_wait would observe
27+
* zero or partial completion.
28+
*
29+
* Worker code already signals working_cond only when both
30+
* working_cnt == 0 AND work_first == NULL (tpool.c:139), so the
31+
* fix is to widen the wait predicate to match: also wait while
32+
* work_first is non-NULL.
33+
*
34+
* What this test asserts
35+
* ----------------------
36+
* 1. After a sequence of tpool_add_work followed by tpool_wait,
37+
* every queued job has run. Verified by an under-lock counter.
38+
* 2. tpool_create(0) defaults to a working pool (documented in
39+
* tpool.h) that completes a posted job before tpool_wait
40+
* returns.
41+
* 3. tpool_create / tpool_destroy round-trips cleanly across many
42+
* iterations (heap consistency checked by ASan/UBSan/LSan in
43+
* the workflow).
44+
* 4. tpool_destroy on a pool with queued-but-not-yet-run work does
45+
* not crash and does not corrupt the heap. tpool_destroy is
46+
* documented to discard outstanding queued work, so we do NOT
47+
* assert on the counter -- we only verify clean teardown.
48+
*
49+
* What this test does NOT assert
50+
* ------------------------------
51+
* It does not exercise tpool_wait followed by further
52+
* tpool_add_work, since that is not a documented use pattern. It
53+
* does not exercise the single-threaded fallback (HAVE_THREADS
54+
* off); tpool.c requires threads.
55+
*
56+
* How the regression is caught
57+
* ----------------------------
58+
* Without the fix, test_work_executes_once and
59+
* test_zero_threads_default both fail observably: the counter is
60+
* less than the expected job count (typically zero, since the
61+
* producer outpaces the workers' first dequeue). The test prints
62+
* a clear FAIL line and exits non-zero so the CI workflow flags
63+
* it. Built under ASan + UBSan (the workflow default), any
64+
* collateral heap or UB issue is also caught.
65+
*
66+
* The test is bounded: tight loops are sized to run inside the
67+
* workflow's per-binary 60-second timeout on a github-hosted
68+
* runner. Wall-clock under ASan + UBSan is under one second.
69+
*/
70+
71+
#include <stdio.h>
72+
#include <stdlib.h>
73+
#include <string.h>
74+
75+
#include <rthreads/rthreads.h>
76+
#include <rthreads/tpool.h>
77+
78+
#define POOL_THREADS 4
79+
#define WORK_JOBS 1000
80+
#define ROUNDTRIP_ITERS 2000
81+
#define STRESS_CYCLES 200
82+
#define STRESS_JOBS 32
83+
84+
struct work_ctx
85+
{
86+
slock_t *lock;
87+
int counter;
88+
};
89+
90+
static void inc_job(void *arg)
91+
{
92+
struct work_ctx *ctx = (struct work_ctx *)arg;
93+
slock_lock(ctx->lock);
94+
ctx->counter++;
95+
slock_unlock(ctx->lock);
96+
}
97+
98+
/* -----------------------------------------------------------------
99+
* Test 1: tpool_wait correctly drains the queue.
100+
*
101+
* This is the regression case. Before the predicate fix, the
102+
* counter would commonly read 0 here: working_cnt was still 0 at
103+
* the moment tpool_wait was entered (no worker had yet dequeued
104+
* any of the just-pushed work) and the old predicate returned
105+
* immediately.
106+
* ----------------------------------------------------------------- */
107+
static int test_work_executes_once(void)
108+
{
109+
tpool_t *tp;
110+
int i;
111+
struct work_ctx ctx;
112+
int rc = 0;
113+
114+
ctx.counter = 0;
115+
ctx.lock = slock_new();
116+
if (!ctx.lock)
117+
{
118+
printf("[FAIL] test_work_executes_once: slock_new failed\n");
119+
return 1;
120+
}
121+
122+
tp = tpool_create(POOL_THREADS);
123+
if (!tp)
124+
{
125+
printf("[FAIL] test_work_executes_once: tpool_create returned NULL\n");
126+
slock_free(ctx.lock);
127+
return 1;
128+
}
129+
130+
for (i = 0; i < WORK_JOBS; i++)
131+
{
132+
if (!tpool_add_work(tp, inc_job, &ctx))
133+
{
134+
printf("[FAIL] test_work_executes_once: tpool_add_work failed at i=%d\n", i);
135+
rc = 1;
136+
break;
137+
}
138+
}
139+
140+
tpool_wait(tp);
141+
tpool_destroy(tp);
142+
143+
if (!rc)
144+
{
145+
slock_lock(ctx.lock);
146+
if (ctx.counter != WORK_JOBS)
147+
{
148+
printf("[FAIL] test_work_executes_once: counter=%d expected=%d\n",
149+
ctx.counter, WORK_JOBS);
150+
rc = 1;
151+
}
152+
else
153+
printf("[PASS] test_work_executes_once (%d jobs across %d threads)\n",
154+
WORK_JOBS, POOL_THREADS);
155+
slock_unlock(ctx.lock);
156+
}
157+
158+
slock_free(ctx.lock);
159+
return rc;
160+
}
161+
162+
/* -----------------------------------------------------------------
163+
* Test 2: tpool_create(0) gives a working default pool.
164+
*
165+
* tpool.h documents num=0 as defaulting to 2 threads. Smoke-test
166+
* that this path produces a usable pool that runs a single job to
167+
* completion before tpool_wait returns.
168+
* ----------------------------------------------------------------- */
169+
static int test_zero_threads_default(void)
170+
{
171+
tpool_t *tp;
172+
struct work_ctx ctx;
173+
int rc = 0;
174+
175+
ctx.counter = 0;
176+
ctx.lock = slock_new();
177+
if (!ctx.lock)
178+
{
179+
printf("[FAIL] test_zero_threads_default: slock_new failed\n");
180+
return 1;
181+
}
182+
183+
tp = tpool_create(0);
184+
if (!tp)
185+
{
186+
printf("[FAIL] test_zero_threads_default: tpool_create(0) returned NULL\n");
187+
slock_free(ctx.lock);
188+
return 1;
189+
}
190+
191+
if (!tpool_add_work(tp, inc_job, &ctx))
192+
{
193+
printf("[FAIL] test_zero_threads_default: tpool_add_work failed\n");
194+
rc = 1;
195+
}
196+
197+
tpool_wait(tp);
198+
tpool_destroy(tp);
199+
200+
if (!rc)
201+
{
202+
slock_lock(ctx.lock);
203+
if (ctx.counter != 1)
204+
{
205+
printf("[FAIL] test_zero_threads_default: counter=%d expected=1\n",
206+
ctx.counter);
207+
rc = 1;
208+
}
209+
else
210+
printf("[PASS] test_zero_threads_default\n");
211+
slock_unlock(ctx.lock);
212+
}
213+
214+
slock_free(ctx.lock);
215+
return rc;
216+
}
217+
218+
/* -----------------------------------------------------------------
219+
* Test 3: create/destroy round-trip with no work.
220+
*
221+
* Heap consistency check. Workers transition straight from their
222+
* initial scond_wait to the stop branch; on the workflow runner
223+
* with ASan+UBSan, any heap-buffer-overflow / use-after-free /
224+
* undefined behaviour during teardown surfaces here.
225+
* ----------------------------------------------------------------- */
226+
static int test_create_destroy_no_work(void)
227+
{
228+
int i;
229+
for (i = 0; i < ROUNDTRIP_ITERS; i++)
230+
{
231+
tpool_t *tp = tpool_create(POOL_THREADS);
232+
if (!tp)
233+
{
234+
printf("[FAIL] test_create_destroy_no_work: tpool_create returned NULL at i=%d\n", i);
235+
return 1;
236+
}
237+
tpool_destroy(tp);
238+
}
239+
printf("[PASS] test_create_destroy_no_work (%d iterations x %d threads)\n",
240+
ROUNDTRIP_ITERS, POOL_THREADS);
241+
return 0;
242+
}
243+
244+
/* -----------------------------------------------------------------
245+
* Test 4: stress -- create / push some work / destroy without
246+
* waiting.
247+
*
248+
* tpool_destroy is documented to discard outstanding queued work,
249+
* so the counter is non-deterministic and we don't check it.
250+
* What we do check is that the destroyer terminates and the heap
251+
* stays consistent across many fast cycles -- ASan/UBSan/LSan
252+
* carry the verification. This case was the one I originally
253+
* (incorrectly) flagged as a UAF in the audit; the real situation
254+
* is that scond_wait re-acquires the mutex before the destroyer
255+
* can free it, so the original code is heap-safe here. Keeping
256+
* the test in place as a guard against any future regression that
257+
* would actually break that invariant.
258+
* ----------------------------------------------------------------- */
259+
static int test_stress_destroy_with_pending(void)
260+
{
261+
int i;
262+
int j;
263+
struct work_ctx ctx;
264+
265+
ctx.counter = 0;
266+
ctx.lock = slock_new();
267+
if (!ctx.lock)
268+
{
269+
printf("[FAIL] test_stress_destroy_with_pending: slock_new failed\n");
270+
return 1;
271+
}
272+
273+
for (i = 0; i < STRESS_CYCLES; i++)
274+
{
275+
tpool_t *tp = tpool_create(POOL_THREADS);
276+
if (!tp)
277+
{
278+
printf("[FAIL] test_stress_destroy_with_pending: tpool_create returned NULL at i=%d\n", i);
279+
slock_free(ctx.lock);
280+
return 1;
281+
}
282+
for (j = 0; j < STRESS_JOBS; j++)
283+
tpool_add_work(tp, inc_job, &ctx);
284+
/* Deliberately no tpool_wait here. */
285+
tpool_destroy(tp);
286+
}
287+
288+
printf("[PASS] test_stress_destroy_with_pending (%d cycles x %d jobs, ran=%d)\n",
289+
STRESS_CYCLES, STRESS_JOBS, ctx.counter);
290+
291+
slock_free(ctx.lock);
292+
return 0;
293+
}
294+
295+
int main(void)
296+
{
297+
int failures = 0;
298+
299+
failures += test_work_executes_once();
300+
failures += test_zero_threads_default();
301+
failures += test_create_destroy_no_work();
302+
failures += test_stress_destroy_with_pending();
303+
304+
if (failures)
305+
{
306+
printf("\n%d tpool_wait regression test(s) failed\n", failures);
307+
return 1;
308+
}
309+
printf("\nAll tpool_wait regression tests passed.\n");
310+
return 0;
311+
}

0 commit comments

Comments
 (0)