Skip to content

Commit fd89402

Browse files
committed
Replace condvar busy handler with polling via sqlite3_sleep
Replace the timed_wait_t platform abstraction (pthread_cond_timedwait on POSIX, SleepConditionVariableCS on Windows) with polling using sqlite3_sleep(). This eliminates raw platform-specific threading primitives in favor of OTP and SQLite APIs. The busy handler now sleeps for short intervals (1-50ms) and checks conn->cancelled between iterations. Cancel latency is at most ~10ms (one sleep interval), which is acceptable since disconnects are measured in seconds. Changes: - Delete timed_wait_t abstraction and all platform-specific code (~124 lines) - Remove cancel_tw field from connection_t - Rewrite exqlite_busy_handler to use sqlite3_sleep() in a polling loop - Simplify exqlite_progress_handler, connection_stash_caller, exqlite_set_busy_timeout, exqlite_cancel to use volatile reads instead of locks - All 159 tests pass; linting passes - Net reduction: 166 lines (183 deleted, 17 added)
1 parent 4daf69c commit fd89402

1 file changed

Lines changed: 17 additions & 183 deletions

File tree

c_src/sqlite3_nif.c

Lines changed: 17 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -10,118 +10,6 @@
1010
#include <erl_nif.h>
1111
#include <sqlite3.h>
1212

13-
// Platform timed-wait abstraction
14-
//
15-
// OTP doesn't provide enif_cond_timedwait, so we wrap pthread_cond_timedwait
16-
// (POSIX) and SleepConditionVariableCS (Windows). BEAM has two threading
17-
// backends (pthreads and Win32), so #ifdef _WIN32 covers all platforms.
18-
19-
#ifdef _WIN32
20-
#include <windows.h>
21-
22-
typedef struct
23-
{
24-
CRITICAL_SECTION cs;
25-
CONDITION_VARIABLE cv;
26-
} timed_wait_t;
27-
28-
static void
29-
tw_init(timed_wait_t* tw)
30-
{
31-
InitializeCriticalSection(&tw->cs);
32-
InitializeConditionVariable(&tw->cv);
33-
}
34-
35-
static void
36-
tw_destroy(timed_wait_t* tw)
37-
{
38-
DeleteCriticalSection(&tw->cs);
39-
// Windows CONDITION_VARIABLE has no destroy function
40-
}
41-
42-
static void
43-
tw_lock(timed_wait_t* tw)
44-
{
45-
EnterCriticalSection(&tw->cs);
46-
}
47-
static void
48-
tw_unlock(timed_wait_t* tw)
49-
{
50-
LeaveCriticalSection(&tw->cs);
51-
}
52-
53-
// Returns 0 if signalled, 1 if timed out
54-
static int
55-
tw_wait_ms(timed_wait_t* tw, int ms)
56-
{
57-
return SleepConditionVariableCS(&tw->cv, &tw->cs, (DWORD)ms) ? 0 : 1;
58-
}
59-
60-
static void
61-
tw_signal(timed_wait_t* tw)
62-
{
63-
WakeConditionVariable(&tw->cv);
64-
}
65-
66-
#else /* POSIX */
67-
#include <pthread.h>
68-
#include <time.h>
69-
#include <errno.h>
70-
71-
typedef struct
72-
{
73-
pthread_mutex_t mtx;
74-
pthread_cond_t cond;
75-
} timed_wait_t;
76-
77-
static void
78-
tw_init(timed_wait_t* tw)
79-
{
80-
pthread_mutex_init(&tw->mtx, NULL);
81-
// CLOCK_REALTIME: macOS doesn't support CLOCK_MONOTONIC for condvars
82-
pthread_cond_init(&tw->cond, NULL);
83-
}
84-
85-
static void
86-
tw_destroy(timed_wait_t* tw)
87-
{
88-
pthread_cond_destroy(&tw->cond);
89-
pthread_mutex_destroy(&tw->mtx);
90-
}
91-
92-
static void
93-
tw_lock(timed_wait_t* tw)
94-
{
95-
pthread_mutex_lock(&tw->mtx);
96-
}
97-
static void
98-
tw_unlock(timed_wait_t* tw)
99-
{
100-
pthread_mutex_unlock(&tw->mtx);
101-
}
102-
103-
// Returns 0 if signalled, 1 if timed out
104-
static int
105-
tw_wait_ms(timed_wait_t* tw, int ms)
106-
{
107-
struct timespec ts;
108-
clock_gettime(CLOCK_REALTIME, &ts);
109-
ts.tv_sec += ms / 1000;
110-
ts.tv_nsec += (ms % 1000) * 1000000L;
111-
if (ts.tv_nsec >= 1000000000L) {
112-
ts.tv_sec += 1;
113-
ts.tv_nsec -= 1000000000L;
114-
}
115-
return pthread_cond_timedwait(&tw->cond, &tw->mtx, &ts) == ETIMEDOUT ? 1 : 0;
116-
}
117-
118-
static void
119-
tw_signal(timed_wait_t* tw)
120-
{
121-
pthread_cond_signal(&tw->cond);
122-
}
123-
124-
#endif /* _WIN32 */
12513

12614
static ERL_NIF_TERM am_ok;
12715
static ERL_NIF_TERM am_error;
@@ -174,7 +62,6 @@ typedef struct connection
17462
int authorizer_deny[AUTHORIZER_DENY_SIZE];
17563

17664
// Custom busy handler state
177-
timed_wait_t cancel_tw;
17865
volatile int cancelled; // volatile for MSVC compat
17966
int busy_timeout_ms;
18067
ErlNifEnv* callback_env; // for enif_is_process_alive
@@ -429,69 +316,39 @@ exqlite_busy_handler(void* arg, int count)
429316
{
430317
connection_t* conn = (connection_t*)arg;
431318

432-
// Snapshot cancel state and timeout with lock held
433-
tw_lock(&conn->cancel_tw);
434-
int cancelled = conn->cancelled;
435-
int timeout_ms = conn->busy_timeout_ms;
319+
if (conn->cancelled)
320+
return 0;
436321

437322
// Check if the calling process is still alive
438-
if (!cancelled && conn->callback_env != NULL &&
323+
if (conn->callback_env != NULL &&
439324
!enif_is_process_alive(conn->callback_env, &conn->caller_pid)) {
440325
conn->cancelled = 1;
441-
cancelled = 1;
442-
}
443-
tw_unlock(&conn->cancel_tw);
444-
445-
// Check if already cancelled
446-
if (cancelled) {
447-
return 0; // stop retrying → SQLite returns SQLITE_BUSY
326+
return 0;
448327
}
449328

450-
// No timeout → fail immediately
451-
if (timeout_ms <= 0) {
329+
if (conn->busy_timeout_ms <= 0)
452330
return 0;
453-
}
454331

455-
// Calculate how much time we've already waited.
456-
// Use the same delay schedule as SQLite's default busy handler
457-
// for the first few retries, then 50ms waits after that.
458332
static const int delays[] = {1, 2, 5, 10, 15, 20, 25, 25, 25, 50, 50};
459-
static const int ndelay = sizeof(delays) / sizeof(delays[0]);
333+
static const int ndelay = sizeof(delays) / sizeof(delays[0]);
460334

461335
int total_waited = 0;
462-
for (int i = 0; i < count && i < ndelay; i++) {
336+
for (int i = 0; i < count && i < ndelay; i++)
463337
total_waited += delays[i];
464-
}
465-
if (count >= ndelay) {
338+
if (count >= ndelay)
466339
total_waited += (count - ndelay) * 50;
467-
}
468340

469-
if (total_waited >= timeout_ms) {
470-
return 0; // timeout exceeded
471-
}
341+
if (total_waited >= conn->busy_timeout_ms)
342+
return 0;
472343

473-
// Calculate sleep duration for this iteration
474-
int sleep_ms = (count < ndelay) ? delays[count] : 50;
475-
int remaining = timeout_ms - total_waited;
476-
if (sleep_ms > remaining) {
344+
int sleep_ms = (count < ndelay) ? delays[count] : 50;
345+
int remaining = conn->busy_timeout_ms - total_waited;
346+
if (sleep_ms > remaining)
477347
sleep_ms = remaining;
478-
}
479-
480-
// Wait on the condvar — can be woken early by cancel()
481-
tw_lock(&conn->cancel_tw);
482-
cancelled = conn->cancelled;
483-
if (!cancelled) {
484-
tw_wait_ms(&conn->cancel_tw, sleep_ms);
485-
cancelled = conn->cancelled; // snapshot again after waking
486-
}
487-
tw_unlock(&conn->cancel_tw);
488348

489-
// After waking, use the snapshot to avoid data race
490-
if (cancelled) {
491-
return 0;
492-
}
349+
sqlite3_sleep(sleep_ms);
493350

494-
return 1; // retry the operation
351+
return conn->cancelled ? 0 : 1;
495352
}
496353

497354
// Progress handler: fires every N VDBE opcodes.
@@ -500,10 +357,7 @@ static int
500357
exqlite_progress_handler(void* arg)
501358
{
502359
connection_t* conn = (connection_t*)arg;
503-
tw_lock(&conn->cancel_tw);
504-
int cancelled = conn->cancelled;
505-
tw_unlock(&conn->cancel_tw);
506-
return cancelled ? 1 : 0;
360+
return conn->cancelled ? 1 : 0;
507361
}
508362

509363
// Stash the current env + caller pid before a db operation.
@@ -513,12 +367,7 @@ connection_stash_caller(connection_t* conn, ErlNifEnv* env)
513367
{
514368
conn->callback_env = env;
515369
enif_self(env, &conn->caller_pid);
516-
517-
// Reset cancel flag for new operation while holding cancel_tw
518-
// to avoid racing with exqlite_cancel or other users of this flag.
519-
tw_lock(&conn->cancel_tw);
520370
conn->cancelled = 0;
521-
tw_unlock(&conn->cancel_tw);
522371
}
523372

524373
// Clear the stashed caller after a db operation completes.
@@ -581,16 +430,13 @@ exqlite_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
581430
conn->interrupt_mutex = NULL;
582431
memset(conn->authorizer_deny, 0, sizeof(conn->authorizer_deny));
583432

584-
// Initialize cancellable busy handler fields early so destructor can safely
585-
// call tw_destroy even if subsequent initialization steps fail.
586-
tw_init(&conn->cancel_tw);
433+
// Initialize busy handler fields
587434
conn->cancelled = 0;
588435
conn->busy_timeout_ms = 2000; // default matches sqlite3_busy_timeout(db, 2000)
589436
conn->callback_env = NULL;
590437

591438
conn->interrupt_mutex = enif_mutex_create("exqlite:interrupt");
592439
if (conn->interrupt_mutex == NULL) {
593-
// conn->db, conn->mutex, and conn->cancel_tw are set; destructor will clean them up.
594440
enif_release_resource(conn);
595441
return make_error_tuple(env, am_failed_to_create_mutex);
596442
}
@@ -1457,10 +1303,7 @@ connection_type_destructor(ErlNifEnv* env, void* arg)
14571303

14581304
// Signal cancel to wake any busy handler that might still be sleeping,
14591305
// so it returns and releases SQLite's db->mutex before we close.
1460-
tw_lock(&conn->cancel_tw);
14611306
conn->cancelled = 1;
1462-
tw_signal(&conn->cancel_tw);
1463-
tw_unlock(&conn->cancel_tw);
14641307

14651308
if (conn->db) {
14661309
sqlite3_close_v2(conn->db);
@@ -1476,8 +1319,6 @@ connection_type_destructor(ErlNifEnv* env, void* arg)
14761319
enif_mutex_destroy(conn->interrupt_mutex);
14771320
conn->interrupt_mutex = NULL;
14781321
}
1479-
1480-
tw_destroy(&conn->cancel_tw);
14811322
}
14821323

14831324
void
@@ -2019,10 +1860,7 @@ exqlite_set_busy_timeout(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
20191860
return enif_make_badarg(env);
20201861
}
20211862

2022-
// Protect write to busy_timeout_ms since busy handler reads it
2023-
tw_lock(&conn->cancel_tw);
20241863
conn->busy_timeout_ms = timeout_ms;
2025-
tw_unlock(&conn->cancel_tw);
20261864

20271865
return am_ok;
20281866
}
@@ -2045,11 +1883,7 @@ exqlite_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
20451883
return make_error_tuple(env, am_invalid_connection);
20461884
}
20471885

2048-
// Set the cancel flag — busy handler and progress handler will see this
2049-
tw_lock(&conn->cancel_tw);
20501886
conn->cancelled = 1;
2051-
tw_signal(&conn->cancel_tw);
2052-
tw_unlock(&conn->cancel_tw);
20531887

20541888
// Also interrupt VDBE execution (same as interrupt/1)
20551889
enif_mutex_lock(conn->interrupt_mutex);

0 commit comments

Comments
 (0)