Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ endif

$(BUILD)/%.o: c_src/%.c
@echo " CC $(notdir $@)"
$(CC) -c $(ERL_CFLAGS) $(CFLAGS) -o $@ $<
$(CC) -c $(ERL_CFLAGS) $(CFLAGS) -MMD -MP -o $@ $<

# Include dependency files for automatic header tracking
-include $(OBJ:.o=.d)

$(LIB_NAME): $(OBJ)
@echo " LD $(notdir $@)"
Expand All @@ -152,7 +155,7 @@ $(PREFIX) $(BUILD):
mkdir -p $@

clean:
$(RM) $(LIB_NAME) $(ARCHIVE_NAME) $(OBJ)
$(RM) $(LIB_NAME) $(ARCHIVE_NAME) $(OBJ) $(OBJ:.o=.d)

.PHONY: all clean

Expand Down
186 changes: 182 additions & 4 deletions c_src/sqlite3_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ typedef struct connection
ErlNifMutex* interrupt_mutex;
ErlNifPid update_hook_pid;
int authorizer_deny[AUTHORIZER_DENY_SIZE];

// Custom busy handler state
volatile int cancelled; // volatile for MSVC compat
int busy_timeout_ms;
ErlNifEnv* callback_env; // for enif_is_process_alive
ErlNifPid caller_pid;
} connection_t;

typedef struct statement
Expand Down Expand Up @@ -292,6 +298,81 @@ statement_release_lock(statement_t* statement)
connection_release_lock(statement->conn);
}

// ---------------------------------------------------------------------------
// Custom busy handler
//
// Replaces SQLite's default busy handler (which sleeps via sqlite3OsSleep and
// cannot be interrupted) with one that polls conn->cancelled between each
// sqlite3_sleep() call. cancel() sets the flag and calls sqlite3_interrupt()
// so disconnect() wakes within at most one sleep interval (~10ms).
// ---------------------------------------------------------------------------

static int
exqlite_busy_handler(void* arg, int count)
{
connection_t* conn = (connection_t*)arg;

if (conn->cancelled)
return 0;
Comment on lines +315 to +316
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add missing curly braces {}


// Check if the calling process is still alive
if (conn->callback_env != NULL &&
!enif_is_process_alive(conn->callback_env, &conn->caller_pid)) {
conn->cancelled = 1;
return 0;
}

if (conn->busy_timeout_ms <= 0)
return 0;

static const int delays[] = {1, 2, 5, 10, 15, 20, 25, 25, 25, 50, 50};
static const int ndelay = sizeof(delays) / sizeof(delays[0]);

int total_waited = 0;
for (int i = 0; i < count && i < ndelay; i++)
total_waited += delays[i];
if (count >= ndelay)
total_waited += (count - ndelay) * 50;

if (total_waited >= conn->busy_timeout_ms)
return 0;

int sleep_ms = (count < ndelay) ? delays[count] : 50;
int remaining = conn->busy_timeout_ms - total_waited;
if (sleep_ms > remaining)
sleep_ms = remaining;

sqlite3_sleep(sleep_ms);

return conn->cancelled ? 0 : 1;
}

// Progress handler: fires every N VDBE opcodes.
// Returns non-zero to interrupt execution when cancelled.
static int
exqlite_progress_handler(void* arg)
{
connection_t* conn = (connection_t*)arg;
return conn->cancelled ? 1 : 0;
}

// Stash the current env + caller pid before a db operation.
// Must be called while holding conn->mutex.
static inline void
connection_stash_caller(connection_t* conn, ErlNifEnv* env)
{
conn->callback_env = env;
enif_self(env, &conn->caller_pid);
conn->cancelled = 0;
}

// Clear the stashed caller after a db operation completes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Clear the stashed caller after a db operation completes.
// Clear the stashed caller after a db operation completes.
// Assumes that the `conn` has been locked for clearing the
// caller.

static inline void
connection_clear_caller(connection_t* conn)
{
conn->callback_env = NULL;
}

///
/// Opens a new SQLite database
///
Expand Down Expand Up @@ -334,8 +415,6 @@ exqlite_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return make_error_tuple(env, am_failed_to_create_mutex);
}

sqlite3_busy_timeout(db, 2000);

conn = enif_alloc_resource(connection_type, sizeof(connection_t));
if (!conn) {
sqlite3_close_v2(db);
Expand All @@ -344,14 +423,24 @@ exqlite_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
conn->db = db;
conn->mutex = mutex;
conn->interrupt_mutex = enif_mutex_create("exqlite:interrupt");
conn->interrupt_mutex = NULL;
memset(conn->authorizer_deny, 0, sizeof(conn->authorizer_deny));

// Initialize busy handler fields
conn->cancelled = 0;
conn->busy_timeout_ms = 2000; // default matches sqlite3_busy_timeout(db, 2000)
conn->callback_env = NULL;

conn->interrupt_mutex = enif_mutex_create("exqlite:interrupt");
if (conn->interrupt_mutex == NULL) {
// conn->db and conn->mutex are set; the destructor will clean them up.
enif_release_resource(conn);
return make_error_tuple(env, am_failed_to_create_mutex);
}

// Install our custom busy handler + progress handler
sqlite3_busy_handler(db, exqlite_busy_handler, conn);
sqlite3_progress_handler(db, 1000, exqlite_progress_handler, conn);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are going to want 1000 to be customizable. If someone wishes to disable this for perf reasons, a -1 should be allowed per the documentation. Or if they want a higher number than 1000 depending on their application needs.


result = enif_make_resource(env, conn);
enif_release_resource(conn);

Expand Down Expand Up @@ -381,9 +470,11 @@ exqlite_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
// cases. Cases such as query timeout and connection pooling
// attempting to close the connection
connection_acquire_lock(conn);
connection_stash_caller(conn, env);

// DB is already closed, nothing to do here.
if (conn->db == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return am_ok;
}
Expand All @@ -392,6 +483,7 @@ exqlite_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
if (autocommit == 0) {
rc = sqlite3_exec(conn->db, "ROLLBACK;", NULL, NULL, NULL);
if (rc != SQLITE_OK) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_sqlite3_error_tuple(env, rc, conn->db);
}
Expand All @@ -417,6 +509,7 @@ exqlite_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
conn->db = NULL;
enif_mutex_unlock(conn->interrupt_mutex);

connection_clear_caller(conn);
connection_release_lock(conn);

return am_ok;
Expand Down Expand Up @@ -448,18 +541,22 @@ exqlite_execute(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

connection_acquire_lock(conn);
connection_stash_caller(conn, env);

if (conn->db == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_error_tuple(env, am_connection_closed);
}

rc = sqlite3_exec(conn->db, (char*)bin.data, NULL, NULL, NULL);
if (rc != SQLITE_OK) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_sqlite3_error_tuple(env, rc, conn->db);
}

connection_clear_caller(conn);
connection_release_lock(conn);

return am_ok;
Expand Down Expand Up @@ -531,7 +628,9 @@ exqlite_prepare(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

// ensure connection is not getting closed by parallel thread
connection_acquire_lock(conn);
connection_stash_caller(conn, env);
if (conn->db == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
enif_release_resource(statement);
return make_error_tuple(env, am_connection_closed);
Expand All @@ -541,11 +640,13 @@ exqlite_prepare(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

if (rc != SQLITE_OK) {
result = make_sqlite3_error_tuple(env, rc, conn->db);
connection_clear_caller(conn);
connection_release_lock(conn);
enif_release_resource(statement);
return result;
}

connection_clear_caller(conn);
connection_release_lock(conn);

result = enif_make_resource(env, statement);
Expand Down Expand Up @@ -814,8 +915,10 @@ exqlite_multi_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

connection_acquire_lock(conn);
connection_stash_caller(conn, env);

if (statement->statement == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_error_tuple(env, am_invalid_statement);
}
Expand All @@ -828,11 +931,13 @@ exqlite_multi_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
switch (rc) {
case SQLITE_BUSY:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return am_busy;

case SQLITE_DONE:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return enif_make_tuple2(env, am_done, rows);

Expand All @@ -843,11 +948,13 @@ exqlite_multi_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])

default:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return make_sqlite3_error_tuple(env, rc, conn->db);
}
}

connection_clear_caller(conn);
connection_release_lock(conn);

return enif_make_tuple2(env, am_rows, rows);
Expand Down Expand Up @@ -880,8 +987,10 @@ exqlite_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

connection_acquire_lock(conn);
connection_stash_caller(conn, env);

if (statement->statement == NULL) {
connection_clear_caller(conn);
connection_release_lock(conn);
return make_error_tuple(env, am_invalid_statement);
}
Expand All @@ -890,19 +999,23 @@ exqlite_step(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
switch (rc) {
case SQLITE_ROW:
result = enif_make_tuple2(env, am_row, make_row(env, statement->statement));
connection_clear_caller(conn);
connection_release_lock(conn);
return result;
case SQLITE_BUSY:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return am_busy;
case SQLITE_DONE:
sqlite3_reset(statement->statement);
connection_clear_caller(conn);
connection_release_lock(conn);
return am_done;
default:
sqlite3_reset(statement->statement);
result = make_sqlite3_error_tuple(env, rc, conn->db);
connection_clear_caller(conn);
connection_release_lock(conn);
return result;
}
Expand Down Expand Up @@ -1184,6 +1297,10 @@ connection_type_destructor(ErlNifEnv* env, void* arg)

connection_t* conn = (connection_t*)arg;

// Signal cancel to wake any busy handler that might still be sleeping,
// so it returns and releases SQLite's db->mutex before we close.
conn->cancelled = 1;

if (conn->db) {
sqlite3_close_v2(conn->db);
conn->db = NULL;
Expand Down Expand Up @@ -1715,6 +1832,65 @@ exqlite_interrupt(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return am_ok;
}

///
/// Set busy timeout without destroying the custom handler.
/// (PRAGMA busy_timeout calls sqlite3_busy_timeout() which replaces handlers)
///
ERL_NIF_TERM
exqlite_set_busy_timeout(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
assert(env);

connection_t* conn = NULL;
int timeout_ms;

if (argc != 2) {
return enif_make_badarg(env);
}

if (!enif_get_resource(env, argv[0], connection_type, (void**)&conn)) {
return make_error_tuple(env, am_invalid_connection);
}

if (!enif_get_int(env, argv[1], &timeout_ms)) {
return enif_make_badarg(env);
}

conn->busy_timeout_ms = timeout_ms;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing an acquire for a lock on mutex


return am_ok;
}

/// Cancel: wake busy handler + interrupt VDBE.
/// Superset of interrupt/1: sets cancelled flag + calls sqlite3_interrupt().
///
ERL_NIF_TERM
exqlite_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
assert(env);

connection_t* conn = NULL;

if (argc != 1) {
return enif_make_badarg(env);
}

if (!enif_get_resource(env, argv[0], connection_type, (void**)&conn)) {
return make_error_tuple(env, am_invalid_connection);
}

conn->cancelled = 1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar situation here, we are missing a lock on the conn being manipulated.


// Also interrupt VDBE execution (same as interrupt/1)
enif_mutex_lock(conn->interrupt_mutex);
if (conn->db != NULL) {
sqlite3_interrupt(conn->db);
}
enif_mutex_unlock(conn->interrupt_mutex);

return am_ok;
}

ERL_NIF_TERM
exqlite_errmsg(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
Expand Down Expand Up @@ -1792,6 +1968,8 @@ static ErlNifFunc nif_funcs[] = {
{"set_authorizer", 2, exqlite_set_authorizer, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"set_log_hook", 1, exqlite_set_log_hook, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"interrupt", 1, exqlite_interrupt, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"set_busy_timeout", 2, exqlite_set_busy_timeout, 0},
{"cancel", 1, exqlite_cancel, 0},
{"errmsg", 1, exqlite_errmsg},
{"errstr", 1, exqlite_errstr},
};
Expand Down
Loading
Loading