diff --git a/.github/workflows/code_quality.yml b/.github/workflows/code_quality.yml index 575f6f5b..522c3811 100644 --- a/.github/workflows/code_quality.yml +++ b/.github/workflows/code_quality.yml @@ -32,7 +32,10 @@ jobs: uses: astral-sh/setup-uv@v7 with: version: "0.9.0" - python-version: "3.12" + # 3.13: the cmake-format pre-commit hook is pinned to python3.13 + # (cmakelang crashes under 3.14). Keeping this in sync means the hook + # resolves to the running interpreter instead of hunting PATH. + python-version: "3.13" - name: pre-commit (cache) uses: actions/cache@v4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a3123470..022a5257 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,6 +29,12 @@ repos: rev: v0.6.13 hooks: - id: cmake-format + # cmakelang is unmaintained and crashes under Python 3.14 + # ("Cannot use capturing groups in re.Scanner"). Pin this hook's + # environment to 3.13 so it never picks up a 3.14 interpreter. The + # code_quality CI job provisions Python 3.13 to match, so the hook + # resolves to the running interpreter there. + language_version: python3.13 - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.18.2 diff --git a/src/duckdb_py/CMakeLists.txt b/src/duckdb_py/CMakeLists.txt index 3d06b062..a0c65873 100644 --- a/src/duckdb_py/CMakeLists.txt +++ b/src/duckdb_py/CMakeLists.txt @@ -19,6 +19,7 @@ add_library( importer.cpp map.cpp path_like.cpp + python_log_storage.cpp pyconnection.cpp pyexpression.cpp pyfilesystem.cpp diff --git a/src/duckdb_py/duckdb_python.cpp b/src/duckdb_py/duckdb_python.cpp index d950960d..9b0a8695 100644 --- a/src/duckdb_py/duckdb_python.cpp +++ b/src/duckdb_py/duckdb_python.cpp @@ -5,6 +5,7 @@ #include "duckdb/parser/parser.hpp" #include "duckdb_python/python_objects.hpp" +#include "duckdb_python/python_log_storage.hpp" #include "duckdb_python/pyconnection/pyconnection.hpp" #include "duckdb_python/pystatement.hpp" #include "duckdb_python/pyrelation.hpp" @@ -1135,6 +1136,9 @@ PYBIND11_MODULE(DUCKDB_PYTHON_LIB_NAME, m) { // NOLINT "Tokenizes a SQL string, returning a list of (position, type) tuples that can be " "used for e.g., syntax highlighting", py::arg("query")); + m.def("_drain_log_forwarding", &PythonLogStorage::DrainForwarder, + "Block until all engine log entries queued for Python's logging module have been " + "forwarded. Forwarding is asynchronous; this is a test/synchronization aid."); py::enum_(m, "token_type", py::module_local()) .value("identifier", PySQLTokenType::PY_SQL_TOKEN_IDENTIFIER) .value("numeric_const", PySQLTokenType::PY_SQL_TOKEN_NUMERIC_CONSTANT) diff --git a/src/duckdb_py/include/duckdb_python/python_log_storage.hpp b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp new file mode 100644 index 00000000..f00d6b28 --- /dev/null +++ b/src/duckdb_py/include/duckdb_python/python_log_storage.hpp @@ -0,0 +1,86 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb_python/python_log_storage.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/logging/log_storage.hpp" +#include "duckdb/common/map.hpp" +#include "duckdb/common/unique_ptr.hpp" + +namespace duckdb { + +class ColumnDataCollection; +class DatabaseInstance; + +//! Scan state backing PythonLogStorage's in-memory buffers (so `duckdb_logs` can read them). +//! We define our own rather than reuse the engine's InMemoryLogStorageScanState to avoid +//! depending on whether that type's symbols are exported across platforms. +class PythonLogStorageScanState : public LogStorageScanState { +public: + explicit PythonLogStorageScanState(LoggingTargetTable table) : LogStorageScanState(table) { + } + ~PythonLogStorageScanState() override = default; + + ColumnDataScanState scan_state; +}; + +//! A composite log storage that does two things for every engine log entry: +//! 1. forwards it to Python's standard `logging` module (logging.getLogger("duckdb")), and +//! 2. retains it in-memory so `SELECT * FROM duckdb_logs` keeps working. +//! +//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed +//! immediately, rather than batched until a 2048-entry buffer fills — engine WARNINGs are +//! sparse and must surface promptly to be useful. +//! +//! Forwarding to Python is ASYNCHRONOUS. The engine calls FlushChunk while holding +//! LogManager::lock (a non-recursive mutex also taken by CreateLogger/WriteLogEntry). Acquiring +//! the GIL there would deadlock against any other thread that holds the GIL and then enters one +//! of those LogManager methods (i.e. ordinary concurrent queries). So FlushChunk only copies +//! (level, message) into a process-global queue, and a single background thread — which holds +//! no engine lock — drains it and forwards to `logging`. See python_log_storage.cpp. +class PythonLogStorage : public BufferingLogStorage { +public: + explicit PythonLogStorage(DatabaseInstance &db); + ~PythonLogStorage() override; + + const string GetStorageName() override { + return "python_log_storage"; + } + + //! Starts the process-global forwarder thread (idempotent). MUST be called with the GIL held + //! and no engine lock held — i.e. from Connect(), never from the engine log-write path. + static void EnsureForwarderStarted(); + + //! Blocks (releasing the GIL) until every queued entry has been forwarded to `logging`. + //! Forwarding is asynchronous, so callers that need to observe a just-emitted warning on the + //! Python side must drain first. Exposed to Python as `_duckdb._drain_log_forwarding` + //! for deterministic tests; harmless if the forwarder was never started. + static void DrainForwarder(); + + //! Single-threaded scan interface — mirrors InMemoryLogStorage so duckdb_logs can read us. + bool CanScan(LoggingTargetTable table) override; + unique_ptr CreateScanState(LoggingTargetTable table) const override; + bool Scan(LogStorageScanState &state, DataChunk &result) const override; + void InitializeScan(LogStorageScanState &state) const override; + +protected: + //! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) queues it for async forwarding. + void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override; + //! Clears the in-memory buffers. + void ResetAllBuffers() override; + +private: + ColumnDataCollection &GetBuffer(LoggingTargetTable table) const; + //! Copies each row of a LOG_ENTRIES chunk into the global forward queue. Never touches the + //! GIL or calls Python (it runs under LogManager::lock). Never throws. + void EnqueueEntriesForPython(DataChunk &chunk); + + map> log_storage_buffers; +}; + +} // namespace duckdb diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index 6fcbe0ac..ec2546a8 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -35,6 +35,8 @@ #include "duckdb_python/numpy/numpy_type.hpp" #include "duckdb/main/prepared_statement.hpp" #include "duckdb_python/jupyter_progress_bar_display.hpp" +#include "duckdb_python/python_log_storage.hpp" +#include "duckdb/logging/log_manager.hpp" #include "duckdb_python/pyfilesystem.hpp" #include "duckdb/main/client_config.hpp" #include "duckdb/function/table/read_csv.hpp" @@ -2283,6 +2285,26 @@ shared_ptr DuckDBPyConnection::Connect(const py::object &dat auto res = FetchOrCreateInstance(database, config); auto &client_context = *res->con.GetConnection().context; SetDefaultConfigArguments(client_context); + { + auto &db_instance = *res->con.GetDatabase().instance; + auto &log_manager = db_instance.GetLogManager(); + auto storage = make_shared_ptr(db_instance); + shared_ptr storage_base = storage; + // RegisterLogStorage returns false if the name is already registered on this + // DatabaseInstance. Instances are cached and shared across connections/cursors, so + // only configure logging on the first registration. SetLogStorage/SetEnableLogging/ + // SetLogLevel are NOT idempotent — re-running them on every Connect() would silently + // stomp a user's explicit `SET enable_logging` / `SET logging_level` / storage choice. + if (log_manager.RegisterLogStorage("python_log_storage", storage_base)) { + log_manager.SetLogStorage(db_instance, "python_log_storage"); + log_manager.SetEnableLogging(true); + log_manager.SetLogLevel(LogLevel::LOG_WARNING); + // Start the background thread that forwards queued entries to Python's logging + // module. We're here with the GIL held and no engine lock taken — the only safe + // place to do it (the engine log-write path holds LogManager::lock). + PythonLogStorage::EnsureForwarderStarted(); + } + } return res; } diff --git a/src/duckdb_py/python_log_storage.cpp b/src/duckdb_py/python_log_storage.cpp new file mode 100644 index 00000000..5f30a1c7 --- /dev/null +++ b/src/duckdb_py/python_log_storage.cpp @@ -0,0 +1,242 @@ +#include "duckdb_python/python_log_storage.hpp" +#include "duckdb_python/pybind11/pybind_wrapper.hpp" + +#include "duckdb/common/allocator.hpp" +#include "duckdb/common/exception.hpp" +#include "duckdb/common/types/column/column_data_collection.hpp" +#include "duckdb/common/types/data_chunk.hpp" +#include "duckdb/common/types/vector.hpp" + +#include +#include +#include + +namespace duckdb { + +// Maps the engine's textual log level (stored as VARCHAR in the LOG_ENTRIES chunk) to the +// integer levels of Python's logging module. +static int LevelStringToPython(const string &level_str) { + if (level_str == "TRACE" || level_str == "DEBUG") { + return 10; // logging.DEBUG + } + if (level_str == "INFO") { + return 20; // logging.INFO + } + if (level_str == "WARNING") { + return 30; // logging.WARNING + } + if (level_str == "ERROR") { + return 40; // logging.ERROR + } + if (level_str == "FATAL") { + return 50; // logging.CRITICAL + } + return 30; +} + +//===--------------------------------------------------------------------===// +// Asynchronous forwarder +// +// The engine invokes FlushChunk while holding LogManager::lock — a non-recursive mutex that is +// also taken by LogManager::CreateLogger / WriteLogEntry / Flush. Acquiring the GIL from inside +// that lock deadlocks: a worker thread holding the lock blocks on the GIL, while another thread +// holding the GIL blocks on the lock (e.g. via CreateLogger at the start of a concurrent query). +// We observed exactly this with two threads each running execute() on one database. +// +// So forwarding is decoupled. FlushChunk only copies plain (level, message) data into this +// process-global queue (no GIL, no Python). A single background thread drains the queue and +// forwards to logging.getLogger("duckdb") with the GIL held but NO engine lock held — breaking +// the lock-ordering cycle. One global thread (not one per DatabaseInstance) avoids spawning a +// thread per connection. The queue holds owned copies, so it is independent of any storage's +// lifetime. +//===--------------------------------------------------------------------===// +namespace { + +struct PendingLogEntry { + int level; + string message; +}; + +struct LogForwarder { + std::mutex mutex; // guards the fields below; NEVER held while acquiring the GIL + std::condition_variable cv; // forwarder waits here for work + std::condition_variable idle_cv; // drainers wait here for the queue to empty + vector queue; + bool stop = false; + bool started = false; + bool busy = false; // a batch has been dequeued but not yet forwarded + std::thread thread; +}; + +LogForwarder &GetForwarder() { + static LogForwarder forwarder; + return forwarder; +} + +void ForwarderLoop() { + auto &fwd = GetForwarder(); + while (true) { + vector batch; + { + std::unique_lock lck(fwd.mutex); + fwd.cv.wait(lck, [&fwd] { return fwd.stop || !fwd.queue.empty(); }); + if (fwd.stop && fwd.queue.empty()) { + return; + } + batch.swap(fwd.queue); + fwd.busy = true; // queue is empty again, but this batch isn't delivered yet + } + // No engine lock and no forwarder lock held here, so acquiring the GIL cannot deadlock. + if (Py_IsInitialized()) { // else interpreter is finalizing — acquiring the GIL would crash + try { + py::gil_scoped_acquire gil; + auto logging = py::module::import("logging"); + auto logger = logging.attr("getLogger")("duckdb"); + for (auto &entry : batch) { + logger.attr("log")(entry.level, entry.message); + } + } catch (...) { + // Logging must never disrupt anything. + } + } + { + std::unique_lock lck(fwd.mutex); + fwd.busy = false; + fwd.idle_cv.notify_all(); // wake any DrainForwarder() waiters + } + } +} + +// atexit callback: stop and join the forwarder while the interpreter is still alive. Runs on the +// main thread with the GIL held; the GIL is released around join() because the forwarder may be +// parked in take_gil and could not otherwise wake to observe `stop`. +void StopForwarder() { + auto &fwd = GetForwarder(); + { + std::unique_lock lck(fwd.mutex); + if (!fwd.started) { + return; + } + fwd.stop = true; + } + fwd.cv.notify_all(); + if (fwd.thread.joinable()) { + py::gil_scoped_release release; + fwd.thread.join(); + } +} + +} // namespace + +void PythonLogStorage::EnsureForwarderStarted() { + // Called from Connect() with the GIL held and no engine lock held. + auto &fwd = GetForwarder(); + { + std::unique_lock lck(fwd.mutex); + if (fwd.started) { + return; + } + fwd.started = true; + fwd.thread = std::thread(ForwarderLoop); + } + // Stop+join before interpreter finalization. Joining a GIL-blocked thread after Py_Finalize + // would crash, so we hook atexit (which runs while the interpreter is still valid). + try { + auto atexit = py::module::import("atexit"); + atexit.attr("register")(py::cpp_function([]() { StopForwarder(); })); + } catch (...) { + } +} + +void PythonLogStorage::DrainForwarder() { + auto &fwd = GetForwarder(); + // Release the GIL while waiting: the forwarder thread needs it to finish its in-flight batch + // and signal idle. Holding it here would deadlock the very thread we're waiting on. + py::gil_scoped_release release; + std::unique_lock lck(fwd.mutex); + fwd.idle_cv.wait(lck, [&fwd] { return fwd.queue.empty() && !fwd.busy; }); +} + +PythonLogStorage::PythonLogStorage(DatabaseInstance &db) : BufferingLogStorage(db, 1, true) { + log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] = + make_uniq(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_ENTRIES)); + log_storage_buffers[LoggingTargetTable::LOG_CONTEXTS] = + make_uniq(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_CONTEXTS)); +} + +PythonLogStorage::~PythonLogStorage() { +} + +ColumnDataCollection &PythonLogStorage::GetBuffer(LoggingTargetTable table) const { + auto res = log_storage_buffers.find(table); + if (res == log_storage_buffers.end()) { + throw InternalException("PythonLogStorage: failed to find buffer for logging target table"); + } + return *res->second; +} + +void PythonLogStorage::EnqueueEntriesForPython(DataChunk &chunk) { + // Runs under LogManager::lock (and our scan lock). It MUST NOT touch the GIL or call Python: + // doing so here would deadlock against any thread that holds the GIL and then enters a + // LogManager method that needs the same lock (CreateLogger / WriteLogEntry / Flush). So we + // only copy plain data into the global queue; the forwarder thread does the Python work + // lock-free. The strings are deep-copied (GetString), so they outlive this chunk. + // + // A side benefit of decoupling: a user logging handler that raises now runs on the forwarder + // thread, where the exception is swallowed — it can never reach the engine's query path. + // + // LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4). + // log_level and message are both VARCHAR; the buffer chunk is flat. + auto level_data = FlatVector::GetData(chunk.data[3]); + auto message_data = FlatVector::GetData(chunk.data[4]); + auto &fwd = GetForwarder(); + { + std::unique_lock lck(fwd.mutex); + for (idx_t i = 0; i < chunk.size(); i++) { + fwd.queue.push_back({LevelStringToPython(level_data[i].GetString()), message_data[i].GetString()}); + } + } + fwd.cv.notify_one(); +} + +void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) { + D_ASSERT(table == LoggingTargetTable::LOG_ENTRIES || table == LoggingTargetTable::LOG_CONTEXTS); + // Retain the entry for duckdb_logs FIRST, so a misbehaving Python handler can never cost + // us a stored row. + log_storage_buffers[table]->Append(chunk); + // Queue only real log entries (not context metadata) for async forwarding to logging. + if (table == LoggingTargetTable::LOG_ENTRIES) { + EnqueueEntriesForPython(chunk); + } +} + +void PythonLogStorage::ResetAllBuffers() { + BufferingLogStorage::ResetAllBuffers(); + for (const auto &buffer : log_storage_buffers) { + buffer.second->Reset(); + } +} + +bool PythonLogStorage::CanScan(LoggingTargetTable table) { + unique_lock lck(lock); + return IsEnabledInternal(table); +} + +unique_ptr PythonLogStorage::CreateScanState(LoggingTargetTable table) const { + return make_uniq(table); +} + +bool PythonLogStorage::Scan(LogStorageScanState &state, DataChunk &result) const { + unique_lock lck(lock); + auto &python_scan_state = state.Cast(); + return GetBuffer(python_scan_state.table).Scan(python_scan_state.scan_state, result); +} + +void PythonLogStorage::InitializeScan(LogStorageScanState &state) const { + unique_lock lck(lock); + auto &python_scan_state = state.Cast(); + GetBuffer(python_scan_state.table) + .InitializeScan(python_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY); +} + +} // namespace duckdb diff --git a/tests/fast/test_python_log_storage.py b/tests/fast/test_python_log_storage.py new file mode 100644 index 00000000..955d640c --- /dev/null +++ b/tests/fast/test_python_log_storage.py @@ -0,0 +1,256 @@ +"""Tests for the PythonLogStorage composite log storage (issue #480). + +PythonLogStorage forwards engine log entries to Python's `logging` module AND keeps them +queryable via `SELECT * FROM duckdb_logs`. It is registered on the first connection to each +DatabaseInstance and routes WARNING+ entries to logging.getLogger("duckdb"). + +Forwarding to `logging` is ASYNCHRONOUS (a background thread drains a queue), because the engine +calls the log-write path while holding LogManager::lock and acquiring the GIL there would +deadlock. So any assertion about the `logging` channel must first call `_drain()` to wait for the +forwarder to catch up. The `duckdb_logs` table channel is synchronous and needs no drain. +""" + +import logging + +import _duckdb +import pytest + +import duckdb + +DEPRECATION_FRAGMENT = "Deprecated lambda arrow" + + +def _drain(): + """Block until the async forwarder has delivered every queued entry to `logging`.""" + _duckdb._drain_log_forwarding() + + +def _trigger_deprecation_warning(con): + """Run a query that reliably emits a single engine DUCKDB_LOG_WARNING, then drain. + + The deprecated arrow (->) lambda form warns only when lambda_syntax is DEFAULT. DEFAULT + is the current engine default but is slated to change, so we pin it to keep this exercising + the warning path across submodule bumps. + + We drain the async forwarder before returning so the entry is delivered to `logging` while + the caller's caplog handler is still attached — callers may read records after their + `with caplog` block exits. + """ + con.execute("SET lambda_syntax='DEFAULT'") + con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + _drain() + + +def _deprecation_records(caplog): + return [r for r in caplog.records if "deprecated" in r.getMessage().lower()] + + +def _duckdb_logs_deprecation_count(con): + return con.execute(f"SELECT count(*) FROM duckdb_logs WHERE message LIKE '%{DEPRECATION_FRAGMENT}%'").fetchone()[0] + + +# --------------------------------------------------------------------------- +# Channel 1: forwarding to Python's logging module +# --------------------------------------------------------------------------- + + +def test_warning_routed_to_python_logging(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + _trigger_deprecation_warning(con) + records = _deprecation_records(caplog) + assert records, "expected a deprecation warning routed to the 'duckdb' logger" + assert all(r.name == "duckdb" for r in records) + assert all(r.levelno == logging.WARNING for r in records) + + +def test_warning_not_emitted_for_clean_queries(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + con.execute("SELECT 1 + 1").fetchone() + # Assert absence of the deprecation warning specifically rather than requiring zero records + # total — an incidental connect-time warning (e.g. the macOS Rosetta notice on some + # hardware) would otherwise make this flaky. + assert not _deprecation_records(caplog) + + +def test_module_level_default_connection_forwards(caplog): + # The most common Jupyter/script pattern: no explicit connect(). The default connection is + # created lazily via Connect(), so it must register the storage too. + with caplog.at_level(logging.WARNING, logger="duckdb"): + duckdb.execute("SET lambda_syntax='DEFAULT'") + duckdb.sql("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + _drain() # this test triggers directly rather than via _trigger_deprecation_warning + assert _deprecation_records(caplog), "default connection should route warnings to logging" + + +# --------------------------------------------------------------------------- +# Channel 2: duckdb_logs stays queryable (the regression this design fixes) +# --------------------------------------------------------------------------- + + +def test_duckdb_logs_still_populated(): + con = duckdb.connect() + _trigger_deprecation_warning(con) + assert _duckdb_logs_deprecation_count(con) >= 1, "SELECT * FROM duckdb_logs must still surface engine warnings" + + +def test_single_warning_visible_immediately(): + # Guards against regressing to a batched (buffer_size=2048) storage where a lone warning + # would never flush. With buffer_size=1 it must appear after a single triggering query. + con = duckdb.connect() + assert _duckdb_logs_deprecation_count(con) == 0 + _trigger_deprecation_warning(con) + assert _duckdb_logs_deprecation_count(con) >= 1 + + +def test_duckdb_logs_schema_and_content(): + con = duckdb.connect() + _trigger_deprecation_warning(con) + row = con.execute( + "SELECT log_level, message, type, timestamp " + f"FROM duckdb_logs WHERE message LIKE '%{DEPRECATION_FRAGMENT}%' LIMIT 1" + ).fetchone() + assert row is not None + log_level, message, log_type, timestamp = row + assert log_level == "WARNING" + assert DEPRECATION_FRAGMENT in message + # `type` is a VARCHAR but may be empty for some engine warnings (the deprecation notice + # carries no log type), so only assert the schema, not non-emptiness. + assert isinstance(log_type, str) + assert timestamp is not None + + +# --------------------------------------------------------------------------- +# Both channels together +# --------------------------------------------------------------------------- + + +def test_both_channels_receive_the_same_entry(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + _trigger_deprecation_warning(con) + table_rows = con.execute( + f"SELECT message FROM duckdb_logs WHERE message LIKE '%{DEPRECATION_FRAGMENT}%'" + ).fetchall() + logging_records = _deprecation_records(caplog) + assert logging_records, "logging channel missing the entry" + assert table_rows, "duckdb_logs channel missing the entry" + # The message content must agree across both channels. + assert any(DEPRECATION_FRAGMENT in r.getMessage() for r in logging_records) + assert all(DEPRECATION_FRAGMENT in row[0] for row in table_rows) + + +def test_repeated_warnings_accumulate_in_both_channels(caplog): + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + _trigger_deprecation_warning(con) + after_first = _duckdb_logs_deprecation_count(con) + records_after_first = len(_deprecation_records(caplog)) + _trigger_deprecation_warning(con) + after_second = _duckdb_logs_deprecation_count(con) + records_after_second = len(_deprecation_records(caplog)) + # No deduplication: a second occurrence is recorded again in both channels. + assert after_second > after_first + assert records_after_second > records_after_first + + +# --------------------------------------------------------------------------- +# Robustness +# --------------------------------------------------------------------------- + + +def test_raising_handler_does_not_fail_query_and_row_persists(): + # A user logging handler that raises must not disrupt anything. Forwarding runs on a + # background thread (decoupled from the query path), so the exception is swallowed there by + # the forwarder's catch(...) — the query can never see it. The entry is also stored BEFORE + # being queued, so the duckdb_logs row must still be present. We drain while the handler is + # attached so it actually fires and exercises the C++ exception safety net. + class BoomHandler(logging.Handler): + def emit(self, record): + # Intentionally raise (bare raise keeps ruff's EM101/TRY003 happy). + raise RuntimeError + + logger = logging.getLogger("duckdb") + handler = BoomHandler() + previous_level = logger.level + logger.addHandler(handler) + logger.setLevel(logging.WARNING) + try: + con = duckdb.connect() + con.execute("SET lambda_syntax='DEFAULT'") + result = con.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + _drain() # force the raising handler to fire on the forwarder thread + assert result == [([2, 3, 4],)] + assert _duckdb_logs_deprecation_count(con) >= 1 + finally: + logger.removeHandler(handler) + logger.setLevel(previous_level) + + +@pytest.mark.timeout(60) +def test_concurrent_warning_queries_do_not_deadlock(): + # Regression guard. Forwarding used to acquire the GIL from inside FlushChunk, which runs + # under LogManager::lock. With two threads each running a warning-emitting query, one thread + # would hold that lock and block on the GIL while another held the GIL and blocked on the + # lock (via LogManager::CreateLogger) — a hard deadlock. Forwarding is now async, so this + # must complete quickly. pytest-timeout (configured for the suite) fails the test if it hangs. + from concurrent.futures import ThreadPoolExecutor + + def hammer(con): + cur = con.cursor() + cur.execute("SET lambda_syntax='DEFAULT'") + for _ in range(20): + cur.execute("SELECT list_transform([1, 2, 3], x -> x + 1)").fetchall() + + con = duckdb.connect() + with ThreadPoolExecutor(max_workers=4) as pool: + futures = [pool.submit(hammer, con) for _ in range(4)] + for future in futures: + future.result() + _drain() + + +def test_default_storage_configuration(): + con = duckdb.connect() + assert con.execute("SELECT current_setting('logging_storage')").fetchone()[0] == "python_log_storage" + assert con.execute("SELECT current_setting('enable_logging')").fetchone()[0] + assert con.execute("SELECT current_setting('logging_level')").fetchone()[0] == "WARNING" + + +def test_switching_to_memory_storage_disables_forwarding(caplog): + # The user escape hatch: SET logging_storage='memory' detaches our storage. Forwarding to + # logging stops, but the table path still works (now via the engine's in-memory storage). + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + con.execute("SET logging_storage='memory'") + _trigger_deprecation_warning(con) + table_count = _duckdb_logs_deprecation_count(con) + assert table_count >= 1, "memory storage should still populate duckdb_logs" + assert not _deprecation_records(caplog), "forwarding should stop once our storage is detached" + + +def test_separate_databases_are_independent(caplog): + # Logging is per-DatabaseInstance; each fresh database registers its own storage and keeps + # its own duckdb_logs, while both forward to the shared process-wide 'duckdb' logger. + with caplog.at_level(logging.WARNING, logger="duckdb"): + con_a = duckdb.connect() + con_b = duckdb.connect() + _trigger_deprecation_warning(con_a) + assert _duckdb_logs_deprecation_count(con_a) >= 1 + assert _duckdb_logs_deprecation_count(con_b) == 0, "con_b has its own, untouched storage" + _trigger_deprecation_warning(con_b) + assert _duckdb_logs_deprecation_count(con_b) >= 1 + assert len(_deprecation_records(caplog)) >= 2 + + +def test_cursor_shares_storage(caplog): + # A cursor shares the parent's DatabaseInstance, so warnings it triggers land in the same + # duckdb_logs and route to logging. + with caplog.at_level(logging.WARNING, logger="duckdb"): + con = duckdb.connect() + cur = con.cursor() + _trigger_deprecation_warning(cur) + assert _duckdb_logs_deprecation_count(con) >= 1 + assert _duckdb_logs_deprecation_count(cur) >= 1 + assert _deprecation_records(caplog)