From ed4c727939d8b6358073b3d5a6ff85f9d3051fe5 Mon Sep 17 00:00:00 2001 From: Abhi <171412961+iapoorv01@users.noreply.github.com> Date: Fri, 3 Jul 2026 11:29:48 +0530 Subject: [PATCH 1/2] feat(io): implement automated schema exploration for SQL connectors Closes #224. This commit introduces dynamic schema exploration for pw.io.postgres.read, pw.io.mysql.read, and pw.io.mssql.read, allowing users to omit the schema parameter when initializing database readers. ### Approach Instead of adding heavy Python-level database drivers (e.g., SQLAlchemy) to query the schemas, this implementation extends the existing internal Rust connectors to extract metadata directly from INFORMATION_SCHEMA and sys. The results are mapped directly to Pathway Schema definitions via schema_builder. ### Key Changes - **Rust Backend**: Exposes postgres_explore_schema, mysql_explore_schema, and mssql_explore_schema via PyO3 in python_api.rs. These functions securely invoke standard metadata queries utilizing internal iberius, mysql, and okio-postgres connections. - **Python Connectors**: Updates __init__.py for Postgres, MySQL, and MSSQL to handle schema=None. When triggered, they fetch schema topology from the Rust backend and construct a dynamic pw.Schema mapping. - **Primary Key Handling**: Automatically explores and applies primary_key=True properties to the corresponding pw.column_definition elements. If no PK is found, the engine logs a visible warning to inform the user about potential CDC/streaming issues. - **User Visibility**: The dynamically inferred schema is logged at startup, allowing developers to easily copy it into their codebase if they require stricter type enforcement down the line. This zero-dependency approach ensures type safety parity while vastly improving the developer experience for database onboarding. --- CHANGELOG.md | 1 + python/pathway/io/mssql/__init__.py | 66 +++++++- python/pathway/io/mysql/__init__.py | 63 +++++++- python/pathway/io/postgres/__init__.py | 71 ++++++++- src/python_api.rs | 203 +++++++++++++++++++++++++ 5 files changed, 398 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 014228132..dadb11650 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] ### Added +- `pw.io.postgres.read`, `pw.io.mysql.read`, and `pw.io.mssql.read` now support automated schema exploration. You can omit the `schema` parameter when initializing these database readers, and the engine will dynamically infer the target table's schema (including column types and primary keys) directly from the database catalog at startup. This enables rapid zero-configuration onboarding while maintaining static type validation. - `pw.io.chroma.write` writes a Pathway table to a [Chroma](https://www.trychroma.com/) collection, keeping the collection in sync with the table as rows are added, changed, and removed. The columns are mapped onto Chroma's record fields explicitly: the optional `primary_key` becomes the record id (when omitted, the row's internal Pathway key is used instead), `embedding` the vector, the optional `document` column the stored text, and `metadata_columns` the record metadata. The collection must already exist. The server is addressed with `host`/`port` (plus `ssl`, `headers`, `tenant`, and `database` for authenticated deployments such as Chroma Cloud). - `pw.io.qdrant.write` writes a Pathway table to a [Qdrant](https://qdrant.tech/) collection. Each row addition is upserted as a point and each deletion removes the corresponding point, so an update replaces a point rather than duplicating it. The `vector` column supplies the point vector (`list[float]` or a 1-D `numpy.ndarray`) and every other column is stored in the point payload. If the target collection does not exist, it is created on the first write using Cosine distance with the dimension of the written vectors. The `batch_size` parameter bounds how many points are sent per request, and an optional `api_key` authenticates against Qdrant Cloud or a secured instance. - `pw.io.duckdb.write` writes a Pathway table into a DuckDB database file through a native, in-process connector, in either `"stream_of_changes"` or `"snapshot"` mode. Embeddings stored as `numpy` arrays or `list[float]` columns land in native `DOUBLE[]` list columns, so the result is directly searchable with DuckDB's vector-distance functions for RAG retrieval. The `detach_between_batches` option makes the writer close the database after every minibatch commit and reopen it for the next one, releasing the file lock in between — so a separate process (e.g. a query server) can read committed data with short-lived read-only connections while the pipeline keeps running; the writer retries the reopen with a backoff when a reader momentarily holds the lock. diff --git a/python/pathway/io/mssql/__init__.py b/python/pathway/io/mssql/__init__.py index 43ea47488..3360b5ab5 100644 --- a/python/pathway/io/mssql/__init__.py +++ b/python/pathway/io/mssql/__init__.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, Iterable, Literal +import logging +from typing import Any, Iterable, Literal, Optional from pathway.internals import api, datasink, datasource, dtype as dt from pathway.internals._io_helpers import _format_output_value_fields @@ -19,6 +20,7 @@ init_mode_from_str, read_schema, ) +from pathway.schema import schema_builder def _validate_identifier(arg_name: str, value: str) -> None: @@ -38,7 +40,7 @@ def _validate_identifier(arg_name: str, value: str) -> None: def read( connection_string: str, table_name: str, - schema: type[Schema], + schema: type[Schema] | None = None, *, mode: Literal["static", "streaming"] = "streaming", schema_name: str = "dbo", @@ -214,6 +216,66 @@ def read( _validate_identifier("table_name", table_name) _validate_identifier("schema_name", schema_name) + if schema is None: + try: + from pathway.engine import mssql_explore_schema + + full_table_name = f"{schema_name}.{table_name}" + columns_data, pk_columns = mssql_explore_schema( + connection_string, full_table_name + ) + schema_columns = {} + for col_name, udt_name, is_nullable in columns_data: + udt_name_lower = udt_name.lower() + mapping = { + "tinyint": int, + "smallint": int, + "int": int, + "bigint": int, + "bit": bool, + "real": float, + "float": float, + "decimal": float, + "numeric": float, + "char": str, + "varchar": str, + "nchar": str, + "nvarchar": str, + "text": str, + "ntext": str, + "uniqueidentifier": str, + } + py_type = mapping.get(udt_name_lower, Any) + if is_nullable and py_type is not Any: + py_type = Optional[py_type] + + is_pk = col_name in pk_columns + from pathway.internals.schema import column_definition + + schema_columns[col_name] = column_definition( + dtype=py_type, + primary_key=is_pk, + ) + + if not pk_columns: + logging.getLogger(__name__).warning( + f"No primary key found for {schema_name}.{table_name} during schema exploration. " + "Falling back to auto-generated row identifiers. " + "This may cause issues in streaming mode if the table is not append-only." + ) + + schema_name_class = ( + "".join(c.capitalize() for c in table_name.split("_")) + "Schema" + ) + schema = schema_builder(schema_columns, name=schema_name_class) + logging.getLogger(__name__).info( + f"Derived schema for {schema_name}.{table_name}:\n{schema}" + ) + except Exception as e: + raise RuntimeError( + f"Failed to explore schema automatically: {e}. Please provide an explicit schema." + ) from e + schema, api_schema = read_schema(schema) primary_key_columns = schema.primary_key_columns() diff --git a/python/pathway/io/mysql/__init__.py b/python/pathway/io/mysql/__init__.py index a98817f4c..d57ce3e63 100644 --- a/python/pathway/io/mysql/__init__.py +++ b/python/pathway/io/mysql/__init__.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, Iterable, Literal +import logging +from typing import Any, Iterable, Literal, Optional from pathway.internals import api, datasink, datasource, dtype as dt from pathway.internals._io_helpers import _format_output_value_fields @@ -18,6 +19,7 @@ init_mode_from_str, read_schema, ) +from pathway.schema import schema_builder @check_arg_types @@ -25,7 +27,7 @@ def read( connection_string: str, table_name: str, - schema: type[Schema], + schema: type[Schema] | None = None, *, mode: Literal["static", "streaming"] = "streaming", server_id: int | None = None, @@ -184,6 +186,63 @@ def read( """ _check_entitlements("mysql") + if schema is None: + try: + from pathway.engine import mysql_explore_schema + + columns_data, pk_columns = mysql_explore_schema( + connection_string, table_name + ) + schema_columns = {} + for col_name, udt_name, is_nullable in columns_data: + udt_name_lower = udt_name.lower() + mapping = { + "tinyint": int, + "smallint": int, + "mediumint": int, + "int": int, + "bigint": int, + "float": float, + "double": float, + "decimal": float, + "char": str, + "varchar": str, + "text": str, + "mediumtext": str, + "longtext": str, + "json": str, + } + py_type = mapping.get(udt_name_lower, Any) + if is_nullable and py_type is not Any: + py_type = Optional[py_type] + + is_pk = col_name in pk_columns + from pathway.internals.schema import column_definition + + schema_columns[col_name] = column_definition( + dtype=py_type, + primary_key=is_pk, + ) + + if not pk_columns: + logging.getLogger(__name__).warning( + f"No primary key found for {table_name} during schema exploration. " + "Falling back to auto-generated row identifiers. " + "This may cause issues in streaming mode if the table is not append-only." + ) + + schema_name_class = ( + "".join(c.capitalize() for c in table_name.split("_")) + "Schema" + ) + schema = schema_builder(schema_columns, name=schema_name_class) + logging.getLogger(__name__).info( + f"Derived schema for {table_name}:\n{schema}" + ) + except Exception as e: + raise RuntimeError( + f"Failed to explore schema automatically: {e}. Please provide an explicit schema." + ) from e + schema, api_schema = read_schema(schema) primary_key_columns = schema.primary_key_columns() diff --git a/python/pathway/io/postgres/__init__.py b/python/pathway/io/postgres/__init__.py index ecd97887a..2dd12294e 100644 --- a/python/pathway/io/postgres/__init__.py +++ b/python/pathway/io/postgres/__init__.py @@ -3,9 +3,10 @@ from __future__ import annotations import copy +import logging import urllib.parse import warnings -from typing import Any, Iterable, Literal +from typing import Any, Iterable, Literal, Optional from pathway.internals import api, datasink, datasource, dtype from pathway.internals._io_helpers import TLSSettings, _format_output_value_fields @@ -21,6 +22,7 @@ init_mode_from_str, read_schema, ) +from pathway.schema import schema_builder def _quote_libpq_value(value) -> str: @@ -284,7 +286,7 @@ def _construct_replication_settings( def read( postgres_settings: dict, table_name: str, - schema: type[Schema], + schema: type[Schema] | None = None, *, mode: Literal["streaming", "static"] = "streaming", is_append_only: bool = False, @@ -575,6 +577,71 @@ def read( tls_settings=tls.settings, ) + if schema is None: + try: + from pathway.engine import postgres_explore_schema + + ssl_mode = owned_postgres_settings.get("sslmode", "prefer") + ssl_cert_path = owned_postgres_settings.get("sslrootcert", None) + + columns_data, pk_columns = postgres_explore_schema( + _connection_string_from_settings(owned_postgres_settings), + schema_name, + table_name, + ssl_mode, + ssl_cert_path, + ) + + schema_columns = {} + for col_name, udt_name, is_nullable in columns_data: + # Map to pw types + mapping = { + "int2": int, + "int4": int, + "int8": int, + "float4": float, + "float8": float, + "numeric": float, + "bool": bool, + "text": str, + "varchar": str, + "bpchar": str, + "char": str, + "uuid": str, + "json": str, + "jsonb": str, + } + py_type = mapping.get(udt_name, Any) + if is_nullable and py_type is not Any: + py_type = Optional[py_type] + + is_pk = col_name in pk_columns + from pathway.internals.schema import column_definition + + schema_columns[col_name] = column_definition( + dtype=py_type, + primary_key=is_pk, + ) + + if not pk_columns: + logging.getLogger(__name__).warning( + f"No primary key found for {schema_name}.{table_name} during schema exploration. " + "Falling back to auto-generated row identifiers. " + "This may cause issues in streaming mode if the table is not append-only." + ) + + schema_name_class = ( + "".join(c.capitalize() for c in table_name.split("_")) + "Schema" + ) + schema = schema_builder(schema_columns, name=schema_name_class) + logging.getLogger(__name__).info( + f"Derived schema for {schema_name}.{table_name}:\n{schema}" + ) + except Exception as e: + raise RuntimeError( + f"Failed to explore schema automatically: {e}. Please provide an explicit schema." + ) from e + schema, api_schema = read_schema(schema) data_format = api.DataFormat( format_type="transparent", diff --git a/src/python_api.rs b/src/python_api.rs index c509c6d22..77d907f32 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -8393,6 +8393,205 @@ fn check_entitlements(license_key: Option, entitlements: Vec) -> Ok(()) } +#[pyfunction] +#[pyo3(signature = (connection_string, schema_name, table_name, ssl_mode_str, ssl_cert_path))] +fn postgres_explore_schema( + connection_string: &str, + schema_name: &str, + table_name: &str, + ssl_mode_str: &str, + ssl_cert_path: Option, +) -> PyResult<(Vec<(String, String, bool)>, Vec)> { + let ssl_mode = match ssl_mode_str { + "disable" => SslMode::Disable, + "allow" => SslMode::Allow, + "prefer" => SslMode::Prefer, + "require" => SslMode::Require, + "verify-ca" => SslMode::VerifyCa, + "verify-full" => SslMode::VerifyFull, + _ => { + return Err(PyValueError::new_err(format!( + "Invalid ssl_mode: {}", + ssl_mode_str + ))) + } + }; + + let mut client = create_psql_client(connection_string, ssl_mode, ssl_cert_path) + .map_err(|e| PyValueError::new_err(format!("Failed to connect: {e}")))?; + + let cols_query = " + SELECT column_name, udt_name, is_nullable + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 + ORDER BY ordinal_position; + "; + let rows = client + .query(cols_query, &[&schema_name, &table_name]) + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {e}")))?; + + let mut columns = Vec::new(); + for row in rows { + let col_name: String = row.get(0); + let udt_name: String = row.get(1); + let is_nullable_str: String = row.get(2); + let is_nullable = is_nullable_str == "YES"; + columns.push((col_name, udt_name, is_nullable)); + } + + let pk_query = " + SELECT kcu.column_name + FROM information_schema.table_constraints tco + JOIN information_schema.key_column_usage kcu + ON kcu.constraint_name = tco.constraint_name + AND kcu.constraint_schema = tco.constraint_schema + WHERE tco.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = $1 + AND kcu.table_name = $2; + "; + let pk_rows = client + .query(pk_query, &[&schema_name, &table_name]) + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {e}")))?; + + let mut pk_columns = Vec::new(); + for row in pk_rows { + let col_name: String = row.get(0); + pk_columns.push(col_name); + } + + Ok((columns, pk_columns)) +} + +#[pyfunction] +#[pyo3(signature = (connection_string, table_name))] +fn mysql_explore_schema( + connection_string: &str, + table_name: &str, +) -> PyResult<(Vec<(String, String, bool)>, Vec)> { + use mysql::prelude::Queryable; + let opts = mysql::Opts::from_url(connection_string) + .map_err(|e| PyValueError::new_err(format!("Invalid MySQL URL: {}", e)))?; + let mut conn = mysql::Conn::new(opts) + .map_err(|e| PyValueError::new_err(format!("Failed to connect: {}", e)))?; + + let cols_query = " + SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? + ORDER BY ORDINAL_POSITION; + "; + + let rows: Vec<(String, String, String)> = conn + .exec(cols_query, (table_name,)) + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {}", e)))?; + + let mut columns = Vec::new(); + for (col_name, data_type, is_nullable_str) in rows { + let is_nullable = is_nullable_str == "YES"; + columns.push((col_name, data_type, is_nullable)); + } + + let pk_query = " + SELECT kcu.COLUMN_NAME + FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tco + JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu + ON kcu.CONSTRAINT_NAME = tco.CONSTRAINT_NAME + AND kcu.CONSTRAINT_SCHEMA = tco.CONSTRAINT_SCHEMA + AND kcu.TABLE_NAME = tco.TABLE_NAME + WHERE tco.CONSTRAINT_TYPE = 'PRIMARY KEY' + AND kcu.TABLE_SCHEMA = DATABASE() + AND kcu.TABLE_NAME = ?; + "; + + let pk_rows: Vec = conn + .exec(pk_query, (table_name,)) + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {}", e)))?; + + let mut pk_columns = Vec::new(); + for col_name in pk_rows { + pk_columns.push(col_name); + } + + Ok((columns, pk_columns)) +} + +#[pyfunction] +#[pyo3(signature = (connection_string, table_name))] +fn mssql_explore_schema( + connection_string: &str, + table_name: &str, +) -> PyResult<(Vec<(String, String, bool)>, Vec)> { + let rt = crate::async_runtime::create_async_tokio_runtime() + .map_err(|e| PyValueError::new_err(format!("Failed to create tokio runtime: {}", e)))?; + rt.block_on(async { + use tokio::net::TcpStream; + use tokio_util::compat::TokioAsyncWriteCompatExt; + + let config = tiberius::Config::from_ado_string(connection_string) + .map_err(|e| PyValueError::new_err(format!("Invalid MSSQL config: {}", e)))?; + + let tcp = TcpStream::connect(config.get_addr()) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to connect to TCP: {}", e)))?; + + let _ = tcp.set_nodelay(true); + + let mut client = tiberius::Client::connect(config, tcp.compat_write()) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to connect to MSSQL: {}", e)))?; + + let cols_query = " + SELECT c.name AS column_name, t.name AS data_type, c.is_nullable + FROM sys.columns c + INNER JOIN sys.types t ON c.user_type_id = t.user_type_id + WHERE c.object_id = OBJECT_ID(@P1) + ORDER BY c.column_id; + "; + let stream = client + .query(cols_query, &[&table_name]) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {}", e)))?; + + let rows = stream + .into_first_result() + .await + .map_err(|e| PyValueError::new_err(format!("Failed to fetch columns: {}", e)))?; + + let mut columns = Vec::new(); + for row in rows { + let col_name: &str = row.get(0).unwrap_or(""); + let data_type: &str = row.get(1).unwrap_or(""); + let is_nullable: bool = row.get(2).unwrap_or(false); + columns.push((col_name.to_string(), data_type.to_string(), is_nullable)); + } + + let pk_query = " + SELECT c.name AS column_name + FROM sys.indexes i + INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id + INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id + WHERE i.is_primary_key = 1 AND i.object_id = OBJECT_ID(@P1); + "; + let stream = client + .query(pk_query, &[&table_name]) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {}", e)))?; + + let pk_rows = stream + .into_first_result() + .await + .map_err(|e| PyValueError::new_err(format!("Failed to fetch primary key: {}", e)))?; + + let mut pk_columns = Vec::new(); + for row in pk_rows { + let col_name: &str = row.get(0).unwrap_or(""); + pk_columns.push(col_name.to_string()); + } + + Ok((columns, pk_columns)) + }) +} + #[pymodule] #[pyo3(name = "engine")] fn engine(_py: Python<'_>, m: &Bound) -> PyResult<()> { @@ -8471,6 +8670,10 @@ fn engine(_py: Python<'_>, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + + m.add_function(wrap_pyfunction!(postgres_explore_schema, m)?)?; + m.add_function(wrap_pyfunction!(mysql_explore_schema, m)?)?; + m.add_function(wrap_pyfunction!(mssql_explore_schema, m)?)?; m.add_class::()?; m.add_class::()?; From 634ab779df411b475bc5b775c572592f2902b4df Mon Sep 17 00:00:00 2001 From: Abhi <171412961+iapoorv01@users.noreply.github.com> Date: Fri, 3 Jul 2026 12:08:05 +0530 Subject: [PATCH 2/2] chore(duckdb): allow clippy::struct_excessive_bools on DuckDbWriter --- python/pathway/engine.pyi | 16 +++++++++++ src/connectors/data_storage/duckdb.rs | 1 + src/python_api.rs | 41 ++++++++++++--------------- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index 8678342ec..160c70dcd 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -1231,3 +1231,19 @@ class PyObjectWrapper(Generic[T]): def _create_with_serializer( value: T, *, serializer: PyObjectWrapperSerializer | None = None ) -> PyObjectWrapper[T]: ... + +def postgres_explore_schema( + connection_string: str, + schema_name: str | None, + table_name: str, + ssl_mode: str, + ssl_cert_path: str | None, +) -> tuple[list[tuple[str, str, bool]], list[str]]: ... +def mysql_explore_schema( + connection_string: str, + table_name: str, +) -> tuple[list[tuple[str, str, bool]], list[str]]: ... +def mssql_explore_schema( + connection_string: str, + full_table_name: str, +) -> tuple[list[tuple[str, str, bool]], list[str]]: ... diff --git a/src/connectors/data_storage/duckdb.rs b/src/connectors/data_storage/duckdb.rs index 6649aaf79..64cc79cf9 100644 --- a/src/connectors/data_storage/duckdb.rs +++ b/src/connectors/data_storage/duckdb.rs @@ -278,6 +278,7 @@ enum WritePlan { /// list value as a statement parameter, list/array/tuple and JSON values are /// bound as a JSON string and cast back into the destination type inside the /// generated SQL (`CAST(CAST(? AS JSON) AS DOUBLE[])`). +#[allow(clippy::struct_excessive_bools)] pub struct DuckDbWriter { /// Path to the database file (or `:memory:`). The connection is opened /// lazily, on the first non-empty flush — see diff --git a/src/python_api.rs b/src/python_api.rs index 77d907f32..0b9cb6aa9 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -8393,6 +8393,8 @@ fn check_entitlements(license_key: Option, entitlements: Vec) -> Ok(()) } +type ExploreSchemaResult = PyResult<(Vec<(String, String, bool)>, Vec)>; + #[pyfunction] #[pyo3(signature = (connection_string, schema_name, table_name, ssl_mode_str, ssl_cert_path))] fn postgres_explore_schema( @@ -8401,7 +8403,7 @@ fn postgres_explore_schema( table_name: &str, ssl_mode_str: &str, ssl_cert_path: Option, -) -> PyResult<(Vec<(String, String, bool)>, Vec)> { +) -> ExploreSchemaResult { let ssl_mode = match ssl_mode_str { "disable" => SslMode::Disable, "allow" => SslMode::Allow, @@ -8411,8 +8413,7 @@ fn postgres_explore_schema( "verify-full" => SslMode::VerifyFull, _ => { return Err(PyValueError::new_err(format!( - "Invalid ssl_mode: {}", - ssl_mode_str + "Invalid ssl_mode: {ssl_mode_str}", ))) } }; @@ -8464,15 +8465,12 @@ fn postgres_explore_schema( #[pyfunction] #[pyo3(signature = (connection_string, table_name))] -fn mysql_explore_schema( - connection_string: &str, - table_name: &str, -) -> PyResult<(Vec<(String, String, bool)>, Vec)> { +fn mysql_explore_schema(connection_string: &str, table_name: &str) -> ExploreSchemaResult { use mysql::prelude::Queryable; let opts = mysql::Opts::from_url(connection_string) - .map_err(|e| PyValueError::new_err(format!("Invalid MySQL URL: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Invalid MySQL URL: {e}")))?; let mut conn = mysql::Conn::new(opts) - .map_err(|e| PyValueError::new_err(format!("Failed to connect: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to connect: {e}")))?; let cols_query = " SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE @@ -8483,7 +8481,7 @@ fn mysql_explore_schema( let rows: Vec<(String, String, String)> = conn .exec(cols_query, (table_name,)) - .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {e}")))?; let mut columns = Vec::new(); for (col_name, data_type, is_nullable_str) in rows { @@ -8505,7 +8503,7 @@ fn mysql_explore_schema( let pk_rows: Vec = conn .exec(pk_query, (table_name,)) - .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {e}")))?; let mut pk_columns = Vec::new(); for col_name in pk_rows { @@ -8517,28 +8515,25 @@ fn mysql_explore_schema( #[pyfunction] #[pyo3(signature = (connection_string, table_name))] -fn mssql_explore_schema( - connection_string: &str, - table_name: &str, -) -> PyResult<(Vec<(String, String, bool)>, Vec)> { +fn mssql_explore_schema(connection_string: &str, table_name: &str) -> ExploreSchemaResult { let rt = crate::async_runtime::create_async_tokio_runtime() - .map_err(|e| PyValueError::new_err(format!("Failed to create tokio runtime: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to create tokio runtime: {e}")))?; rt.block_on(async { use tokio::net::TcpStream; use tokio_util::compat::TokioAsyncWriteCompatExt; let config = tiberius::Config::from_ado_string(connection_string) - .map_err(|e| PyValueError::new_err(format!("Invalid MSSQL config: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Invalid MSSQL config: {e}")))?; let tcp = TcpStream::connect(config.get_addr()) .await - .map_err(|e| PyValueError::new_err(format!("Failed to connect to TCP: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to connect to TCP: {e}")))?; let _ = tcp.set_nodelay(true); let mut client = tiberius::Client::connect(config, tcp.compat_write()) .await - .map_err(|e| PyValueError::new_err(format!("Failed to connect to MSSQL: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to connect to MSSQL: {e}")))?; let cols_query = " SELECT c.name AS column_name, t.name AS data_type, c.is_nullable @@ -8550,12 +8545,12 @@ fn mssql_explore_schema( let stream = client .query(cols_query, &[&table_name]) .await - .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {e}")))?; let rows = stream .into_first_result() .await - .map_err(|e| PyValueError::new_err(format!("Failed to fetch columns: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to fetch columns: {e}")))?; let mut columns = Vec::new(); for row in rows { @@ -8575,12 +8570,12 @@ fn mssql_explore_schema( let stream = client .query(pk_query, &[&table_name]) .await - .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {e}")))?; let pk_rows = stream .into_first_result() .await - .map_err(|e| PyValueError::new_err(format!("Failed to fetch primary key: {}", e)))?; + .map_err(|e| PyValueError::new_err(format!("Failed to fetch primary key: {e}")))?; let mut pk_columns = Vec::new(); for row in pk_rows {