Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/code_quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ jobs:
uses: astral-sh/setup-uv@v7
with:
version: "0.9.0"
python-version: "3.12"
# 3.13: the cmake-format pre-commit hook is pinned to python3.13
# (cmakelang crashes under 3.14). Keeping this in sync means the hook
# resolves to the running interpreter instead of hunting PATH.
python-version: "3.13"

- name: pre-commit (cache)
uses: actions/cache@v4
Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ repos:
rev: v0.6.13
hooks:
- id: cmake-format
# cmakelang is unmaintained and crashes under Python 3.14
# ("Cannot use capturing groups in re.Scanner"). Pin this hook's
# environment to 3.13 so it never picks up a 3.14 interpreter. The
# code_quality CI job provisions Python 3.13 to match, so the hook
# resolves to the running interpreter there.
language_version: python3.13

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.18.2
Expand Down
1 change: 1 addition & 0 deletions src/duckdb_py/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ add_library(
importer.cpp
map.cpp
path_like.cpp
python_log_storage.cpp
pyconnection.cpp
pyexpression.cpp
pyfilesystem.cpp
Expand Down
68 changes: 68 additions & 0 deletions src/duckdb_py/include/duckdb_python/python_log_storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb_python/python_log_storage.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/logging/log_storage.hpp"
#include "duckdb/common/map.hpp"
#include "duckdb/common/unique_ptr.hpp"

namespace duckdb {

class ColumnDataCollection;
class DatabaseInstance;

//! Scan state backing PythonLogStorage's in-memory buffers (so `duckdb_logs` can read them).
//! We define our own rather than reuse the engine's InMemoryLogStorageScanState to avoid
//! depending on whether that type's symbols are exported across platforms.
class PythonLogStorageScanState : public LogStorageScanState {
public:
explicit PythonLogStorageScanState(LoggingTargetTable table) : LogStorageScanState(table) {
}
~PythonLogStorageScanState() override = default;

ColumnDataScanState scan_state;
};

//! A composite log storage that does two things for every engine log entry:
//! 1. forwards it to Python's standard `logging` module (logging.getLogger("duckdb")), and
//! 2. retains it in-memory so `SELECT * FROM duckdb_logs` keeps working.
//!
//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed (and
//! therefore forwarded to Python) immediately, rather than batched until a 2048-entry buffer
//! fills — engine WARNINGs are sparse and must surface inline to be useful.
class PythonLogStorage : public BufferingLogStorage {
public:
explicit PythonLogStorage(DatabaseInstance &db);
~PythonLogStorage() override;

const string GetStorageName() override {
return "python_log_storage";
}

//! Single-threaded scan interface — mirrors InMemoryLogStorage so duckdb_logs can read us.
bool CanScan(LoggingTargetTable table) override;
unique_ptr<LogStorageScanState> CreateScanState(LoggingTargetTable table) const override;
bool Scan(LogStorageScanState &state, DataChunk &result) const override;
void InitializeScan(LogStorageScanState &state) const override;

protected:
//! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) forwards it to Python.
void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override;
//! Clears the in-memory buffers.
void ResetAllBuffers() override;

private:
ColumnDataCollection &GetBuffer(LoggingTargetTable table) const;
//! Forwards each row of a LOG_ENTRIES chunk to logging.getLogger("duckdb"). Never throws.
void ForwardEntriesToPython(DataChunk &chunk);

map<LoggingTargetTable, unique_ptr<ColumnDataCollection>> log_storage_buffers;
};

} // namespace duckdb
18 changes: 18 additions & 0 deletions src/duckdb_py/pyconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "duckdb_python/numpy/numpy_type.hpp"
#include "duckdb/main/prepared_statement.hpp"
#include "duckdb_python/jupyter_progress_bar_display.hpp"
#include "duckdb_python/python_log_storage.hpp"
#include "duckdb/logging/log_manager.hpp"
#include "duckdb_python/pyfilesystem.hpp"
#include "duckdb/main/client_config.hpp"
#include "duckdb/function/table/read_csv.hpp"
Expand Down Expand Up @@ -2283,6 +2285,22 @@ shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Connect(const py::object &dat
auto res = FetchOrCreateInstance(database, config);
auto &client_context = *res->con.GetConnection().context;
SetDefaultConfigArguments(client_context);
{
auto &db_instance = *res->con.GetDatabase().instance;
auto &log_manager = db_instance.GetLogManager();
auto storage = make_shared_ptr<PythonLogStorage>(db_instance);
shared_ptr<LogStorage> storage_base = storage;
// RegisterLogStorage returns false if the name is already registered on this
// DatabaseInstance. Instances are cached and shared across connections/cursors, so
// only configure logging on the first registration. SetLogStorage/SetEnableLogging/
// SetLogLevel are NOT idempotent — re-running them on every Connect() would silently
// stomp a user's explicit `SET enable_logging` / `SET logging_level` / storage choice.
if (log_manager.RegisterLogStorage("python_log_storage", storage_base)) {
log_manager.SetLogStorage(db_instance, "python_log_storage");
log_manager.SetEnableLogging(true);
log_manager.SetLogLevel(LogLevel::LOG_WARNING);
}
}
return res;
}

Expand Down
120 changes: 120 additions & 0 deletions src/duckdb_py/python_log_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "duckdb_python/python_log_storage.hpp"
#include "duckdb_python/pybind11/pybind_wrapper.hpp"

#include "duckdb/common/allocator.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/types/column/column_data_collection.hpp"
#include "duckdb/common/types/data_chunk.hpp"
#include "duckdb/common/types/vector.hpp"

namespace duckdb {

// Maps the engine's textual log level (stored as VARCHAR in the LOG_ENTRIES chunk) to the
// integer levels of Python's logging module.
static int LevelStringToPython(const string &level_str) {
if (level_str == "TRACE" || level_str == "DEBUG") {
return 10; // logging.DEBUG
}
if (level_str == "INFO") {
return 20; // logging.INFO
}
if (level_str == "WARNING") {
return 30; // logging.WARNING
}
if (level_str == "ERROR") {
return 40; // logging.ERROR
}
if (level_str == "FATAL") {
return 50; // logging.CRITICAL
}
return 30;
}

PythonLogStorage::PythonLogStorage(DatabaseInstance &db) : BufferingLogStorage(db, 1, true) {
log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] =
make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_ENTRIES));
log_storage_buffers[LoggingTargetTable::LOG_CONTEXTS] =
make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_CONTEXTS));
}

PythonLogStorage::~PythonLogStorage() {
}

ColumnDataCollection &PythonLogStorage::GetBuffer(LoggingTargetTable table) const {
auto res = log_storage_buffers.find(table);
if (res == log_storage_buffers.end()) {
throw InternalException("PythonLogStorage: failed to find buffer for logging target table");
}
return *res->second;
}

void PythonLogStorage::ForwardEntriesToPython(DataChunk &chunk) {
// This fires from engine worker threads with the GIL released, and from within both the
// LogManager lock and this storage's lock. It runs arbitrary user Python (logging
// handlers) and MUST NOT let an exception escape: the engine calls the write path with no
// try/catch, directly from query binding/execution, so a raising handler would otherwise
// fail the user's query. Hence we swallow everything here.
//
// Caveat: because a lock is held across this call, a handler that re-enters DuckDB on the
// same thread and emits another log entry can self-deadlock on the non-recursive lock.
// That is outside our control (and matches the engine's own contract for log storages).
if (!Py_IsInitialized()) {
return; // interpreter is finalizing — acquiring the GIL would crash
}
try {
py::gil_scoped_acquire gil;
auto logging = py::module::import("logging");
auto logger = logging.attr("getLogger")("duckdb");
// LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4).
// log_level and message are both VARCHAR; the buffer chunk is flat.
auto level_data = FlatVector::GetData<string_t>(chunk.data[3]);
auto message_data = FlatVector::GetData<string_t>(chunk.data[4]);
for (idx_t i = 0; i < chunk.size(); i++) {
logger.attr("log")(LevelStringToPython(level_data[i].GetString()), message_data[i].GetString());
}
} catch (...) {
// Logging must never disrupt query execution.
}
}

void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) {
D_ASSERT(table == LoggingTargetTable::LOG_ENTRIES || table == LoggingTargetTable::LOG_CONTEXTS);
// Retain the entry for duckdb_logs FIRST, so a misbehaving Python handler can never cost
// us a stored row.
log_storage_buffers[table]->Append(chunk);
// Forward only real log entries (not context metadata) to Python's logging module.
if (table == LoggingTargetTable::LOG_ENTRIES) {
ForwardEntriesToPython(chunk);
}
}

void PythonLogStorage::ResetAllBuffers() {
BufferingLogStorage::ResetAllBuffers();
for (const auto &buffer : log_storage_buffers) {
buffer.second->Reset();
}
}

bool PythonLogStorage::CanScan(LoggingTargetTable table) {
unique_lock<mutex> lck(lock);
return IsEnabledInternal(table);
}

unique_ptr<LogStorageScanState> PythonLogStorage::CreateScanState(LoggingTargetTable table) const {
return make_uniq<PythonLogStorageScanState>(table);
}

bool PythonLogStorage::Scan(LogStorageScanState &state, DataChunk &result) const {
unique_lock<mutex> lck(lock);
auto &python_scan_state = state.Cast<PythonLogStorageScanState>();
return GetBuffer(python_scan_state.table).Scan(python_scan_state.scan_state, result);
}

void PythonLogStorage::InitializeScan(LogStorageScanState &state) const {
unique_lock<mutex> lck(lock);
auto &python_scan_state = state.Cast<PythonLogStorageScanState>();
GetBuffer(python_scan_state.table)
.InitializeScan(python_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY);
}

} // namespace duckdb
Loading
Loading