diff --git a/CHANGELOG.md b/CHANGELOG.md index 014228132..dadb11650 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] ### Added +- `pw.io.postgres.read`, `pw.io.mysql.read`, and `pw.io.mssql.read` now support automated schema exploration. You can omit the `schema` parameter when initializing these database readers, and the engine will dynamically infer the target table's schema (including column types and primary keys) directly from the database catalog at startup. This enables rapid zero-configuration onboarding while maintaining static type validation. - `pw.io.chroma.write` writes a Pathway table to a [Chroma](https://www.trychroma.com/) collection, keeping the collection in sync with the table as rows are added, changed, and removed. The columns are mapped onto Chroma's record fields explicitly: the optional `primary_key` becomes the record id (when omitted, the row's internal Pathway key is used instead), `embedding` the vector, the optional `document` column the stored text, and `metadata_columns` the record metadata. The collection must already exist. The server is addressed with `host`/`port` (plus `ssl`, `headers`, `tenant`, and `database` for authenticated deployments such as Chroma Cloud). - `pw.io.qdrant.write` writes a Pathway table to a [Qdrant](https://qdrant.tech/) collection. Each row addition is upserted as a point and each deletion removes the corresponding point, so an update replaces a point rather than duplicating it. The `vector` column supplies the point vector (`list[float]` or a 1-D `numpy.ndarray`) and every other column is stored in the point payload. If the target collection does not exist, it is created on the first write using Cosine distance with the dimension of the written vectors. The `batch_size` parameter bounds how many points are sent per request, and an optional `api_key` authenticates against Qdrant Cloud or a secured instance. - `pw.io.duckdb.write` writes a Pathway table into a DuckDB database file through a native, in-process connector, in either `"stream_of_changes"` or `"snapshot"` mode. Embeddings stored as `numpy` arrays or `list[float]` columns land in native `DOUBLE[]` list columns, so the result is directly searchable with DuckDB's vector-distance functions for RAG retrieval. The `detach_between_batches` option makes the writer close the database after every minibatch commit and reopen it for the next one, releasing the file lock in between — so a separate process (e.g. a query server) can read committed data with short-lived read-only connections while the pipeline keeps running; the writer retries the reopen with a backoff when a reader momentarily holds the lock. diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index 8678342ec..160c70dcd 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -1231,3 +1231,19 @@ class PyObjectWrapper(Generic[T]): def _create_with_serializer( value: T, *, serializer: PyObjectWrapperSerializer | None = None ) -> PyObjectWrapper[T]: ... + +def postgres_explore_schema( + connection_string: str, + schema_name: str | None, + table_name: str, + ssl_mode: str, + ssl_cert_path: str | None, +) -> tuple[list[tuple[str, str, bool]], list[str]]: ... +def mysql_explore_schema( + connection_string: str, + table_name: str, +) -> tuple[list[tuple[str, str, bool]], list[str]]: ... +def mssql_explore_schema( + connection_string: str, + full_table_name: str, +) -> tuple[list[tuple[str, str, bool]], list[str]]: ... diff --git a/python/pathway/io/mssql/__init__.py b/python/pathway/io/mssql/__init__.py index 43ea47488..3360b5ab5 100644 --- a/python/pathway/io/mssql/__init__.py +++ b/python/pathway/io/mssql/__init__.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, Iterable, Literal +import logging +from typing import Any, Iterable, Literal, Optional from pathway.internals import api, datasink, datasource, dtype as dt from pathway.internals._io_helpers import _format_output_value_fields @@ -19,6 +20,7 @@ init_mode_from_str, read_schema, ) +from pathway.schema import schema_builder def _validate_identifier(arg_name: str, value: str) -> None: @@ -38,7 +40,7 @@ def _validate_identifier(arg_name: str, value: str) -> None: def read( connection_string: str, table_name: str, - schema: type[Schema], + schema: type[Schema] | None = None, *, mode: Literal["static", "streaming"] = "streaming", schema_name: str = "dbo", @@ -214,6 +216,66 @@ def read( _validate_identifier("table_name", table_name) _validate_identifier("schema_name", schema_name) + if schema is None: + try: + from pathway.engine import mssql_explore_schema + + full_table_name = f"{schema_name}.{table_name}" + columns_data, pk_columns = mssql_explore_schema( + connection_string, full_table_name + ) + schema_columns = {} + for col_name, udt_name, is_nullable in columns_data: + udt_name_lower = udt_name.lower() + mapping = { + "tinyint": int, + "smallint": int, + "int": int, + "bigint": int, + "bit": bool, + "real": float, + "float": float, + "decimal": float, + "numeric": float, + "char": str, + "varchar": str, + "nchar": str, + "nvarchar": str, + "text": str, + "ntext": str, + "uniqueidentifier": str, + } + py_type = mapping.get(udt_name_lower, Any) + if is_nullable and py_type is not Any: + py_type = Optional[py_type] + + is_pk = col_name in pk_columns + from pathway.internals.schema import column_definition + + schema_columns[col_name] = column_definition( + dtype=py_type, + primary_key=is_pk, + ) + + if not pk_columns: + logging.getLogger(__name__).warning( + f"No primary key found for {schema_name}.{table_name} during schema exploration. " + "Falling back to auto-generated row identifiers. " + "This may cause issues in streaming mode if the table is not append-only." + ) + + schema_name_class = ( + "".join(c.capitalize() for c in table_name.split("_")) + "Schema" + ) + schema = schema_builder(schema_columns, name=schema_name_class) + logging.getLogger(__name__).info( + f"Derived schema for {schema_name}.{table_name}:\n{schema}" + ) + except Exception as e: + raise RuntimeError( + f"Failed to explore schema automatically: {e}. Please provide an explicit schema." + ) from e + schema, api_schema = read_schema(schema) primary_key_columns = schema.primary_key_columns() diff --git a/python/pathway/io/mysql/__init__.py b/python/pathway/io/mysql/__init__.py index a98817f4c..d57ce3e63 100644 --- a/python/pathway/io/mysql/__init__.py +++ b/python/pathway/io/mysql/__init__.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, Iterable, Literal +import logging +from typing import Any, Iterable, Literal, Optional from pathway.internals import api, datasink, datasource, dtype as dt from pathway.internals._io_helpers import _format_output_value_fields @@ -18,6 +19,7 @@ init_mode_from_str, read_schema, ) +from pathway.schema import schema_builder @check_arg_types @@ -25,7 +27,7 @@ def read( connection_string: str, table_name: str, - schema: type[Schema], + schema: type[Schema] | None = None, *, mode: Literal["static", "streaming"] = "streaming", server_id: int | None = None, @@ -184,6 +186,63 @@ def read( """ _check_entitlements("mysql") + if schema is None: + try: + from pathway.engine import mysql_explore_schema + + columns_data, pk_columns = mysql_explore_schema( + connection_string, table_name + ) + schema_columns = {} + for col_name, udt_name, is_nullable in columns_data: + udt_name_lower = udt_name.lower() + mapping = { + "tinyint": int, + "smallint": int, + "mediumint": int, + "int": int, + "bigint": int, + "float": float, + "double": float, + "decimal": float, + "char": str, + "varchar": str, + "text": str, + "mediumtext": str, + "longtext": str, + "json": str, + } + py_type = mapping.get(udt_name_lower, Any) + if is_nullable and py_type is not Any: + py_type = Optional[py_type] + + is_pk = col_name in pk_columns + from pathway.internals.schema import column_definition + + schema_columns[col_name] = column_definition( + dtype=py_type, + primary_key=is_pk, + ) + + if not pk_columns: + logging.getLogger(__name__).warning( + f"No primary key found for {table_name} during schema exploration. " + "Falling back to auto-generated row identifiers. " + "This may cause issues in streaming mode if the table is not append-only." + ) + + schema_name_class = ( + "".join(c.capitalize() for c in table_name.split("_")) + "Schema" + ) + schema = schema_builder(schema_columns, name=schema_name_class) + logging.getLogger(__name__).info( + f"Derived schema for {table_name}:\n{schema}" + ) + except Exception as e: + raise RuntimeError( + f"Failed to explore schema automatically: {e}. Please provide an explicit schema." + ) from e + schema, api_schema = read_schema(schema) primary_key_columns = schema.primary_key_columns() diff --git a/python/pathway/io/postgres/__init__.py b/python/pathway/io/postgres/__init__.py index ecd97887a..2dd12294e 100644 --- a/python/pathway/io/postgres/__init__.py +++ b/python/pathway/io/postgres/__init__.py @@ -3,9 +3,10 @@ from __future__ import annotations import copy +import logging import urllib.parse import warnings -from typing import Any, Iterable, Literal +from typing import Any, Iterable, Literal, Optional from pathway.internals import api, datasink, datasource, dtype from pathway.internals._io_helpers import TLSSettings, _format_output_value_fields @@ -21,6 +22,7 @@ init_mode_from_str, read_schema, ) +from pathway.schema import schema_builder def _quote_libpq_value(value) -> str: @@ -284,7 +286,7 @@ def _construct_replication_settings( def read( postgres_settings: dict, table_name: str, - schema: type[Schema], + schema: type[Schema] | None = None, *, mode: Literal["streaming", "static"] = "streaming", is_append_only: bool = False, @@ -575,6 +577,71 @@ def read( tls_settings=tls.settings, ) + if schema is None: + try: + from pathway.engine import postgres_explore_schema + + ssl_mode = owned_postgres_settings.get("sslmode", "prefer") + ssl_cert_path = owned_postgres_settings.get("sslrootcert", None) + + columns_data, pk_columns = postgres_explore_schema( + _connection_string_from_settings(owned_postgres_settings), + schema_name, + table_name, + ssl_mode, + ssl_cert_path, + ) + + schema_columns = {} + for col_name, udt_name, is_nullable in columns_data: + # Map to pw types + mapping = { + "int2": int, + "int4": int, + "int8": int, + "float4": float, + "float8": float, + "numeric": float, + "bool": bool, + "text": str, + "varchar": str, + "bpchar": str, + "char": str, + "uuid": str, + "json": str, + "jsonb": str, + } + py_type = mapping.get(udt_name, Any) + if is_nullable and py_type is not Any: + py_type = Optional[py_type] + + is_pk = col_name in pk_columns + from pathway.internals.schema import column_definition + + schema_columns[col_name] = column_definition( + dtype=py_type, + primary_key=is_pk, + ) + + if not pk_columns: + logging.getLogger(__name__).warning( + f"No primary key found for {schema_name}.{table_name} during schema exploration. " + "Falling back to auto-generated row identifiers. " + "This may cause issues in streaming mode if the table is not append-only." + ) + + schema_name_class = ( + "".join(c.capitalize() for c in table_name.split("_")) + "Schema" + ) + schema = schema_builder(schema_columns, name=schema_name_class) + logging.getLogger(__name__).info( + f"Derived schema for {schema_name}.{table_name}:\n{schema}" + ) + except Exception as e: + raise RuntimeError( + f"Failed to explore schema automatically: {e}. Please provide an explicit schema." + ) from e + schema, api_schema = read_schema(schema) data_format = api.DataFormat( format_type="transparent", diff --git a/src/connectors/data_storage/duckdb.rs b/src/connectors/data_storage/duckdb.rs index 6649aaf79..64cc79cf9 100644 --- a/src/connectors/data_storage/duckdb.rs +++ b/src/connectors/data_storage/duckdb.rs @@ -278,6 +278,7 @@ enum WritePlan { /// list value as a statement parameter, list/array/tuple and JSON values are /// bound as a JSON string and cast back into the destination type inside the /// generated SQL (`CAST(CAST(? AS JSON) AS DOUBLE[])`). +#[allow(clippy::struct_excessive_bools)] pub struct DuckDbWriter { /// Path to the database file (or `:memory:`). The connection is opened /// lazily, on the first non-empty flush — see diff --git a/src/python_api.rs b/src/python_api.rs index c509c6d22..0b9cb6aa9 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -8393,6 +8393,200 @@ fn check_entitlements(license_key: Option, entitlements: Vec) -> Ok(()) } +type ExploreSchemaResult = PyResult<(Vec<(String, String, bool)>, Vec)>; + +#[pyfunction] +#[pyo3(signature = (connection_string, schema_name, table_name, ssl_mode_str, ssl_cert_path))] +fn postgres_explore_schema( + connection_string: &str, + schema_name: &str, + table_name: &str, + ssl_mode_str: &str, + ssl_cert_path: Option, +) -> ExploreSchemaResult { + let ssl_mode = match ssl_mode_str { + "disable" => SslMode::Disable, + "allow" => SslMode::Allow, + "prefer" => SslMode::Prefer, + "require" => SslMode::Require, + "verify-ca" => SslMode::VerifyCa, + "verify-full" => SslMode::VerifyFull, + _ => { + return Err(PyValueError::new_err(format!( + "Invalid ssl_mode: {ssl_mode_str}", + ))) + } + }; + + let mut client = create_psql_client(connection_string, ssl_mode, ssl_cert_path) + .map_err(|e| PyValueError::new_err(format!("Failed to connect: {e}")))?; + + let cols_query = " + SELECT column_name, udt_name, is_nullable + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 + ORDER BY ordinal_position; + "; + let rows = client + .query(cols_query, &[&schema_name, &table_name]) + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {e}")))?; + + let mut columns = Vec::new(); + for row in rows { + let col_name: String = row.get(0); + let udt_name: String = row.get(1); + let is_nullable_str: String = row.get(2); + let is_nullable = is_nullable_str == "YES"; + columns.push((col_name, udt_name, is_nullable)); + } + + let pk_query = " + SELECT kcu.column_name + FROM information_schema.table_constraints tco + JOIN information_schema.key_column_usage kcu + ON kcu.constraint_name = tco.constraint_name + AND kcu.constraint_schema = tco.constraint_schema + WHERE tco.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = $1 + AND kcu.table_name = $2; + "; + let pk_rows = client + .query(pk_query, &[&schema_name, &table_name]) + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {e}")))?; + + let mut pk_columns = Vec::new(); + for row in pk_rows { + let col_name: String = row.get(0); + pk_columns.push(col_name); + } + + Ok((columns, pk_columns)) +} + +#[pyfunction] +#[pyo3(signature = (connection_string, table_name))] +fn mysql_explore_schema(connection_string: &str, table_name: &str) -> ExploreSchemaResult { + use mysql::prelude::Queryable; + let opts = mysql::Opts::from_url(connection_string) + .map_err(|e| PyValueError::new_err(format!("Invalid MySQL URL: {e}")))?; + let mut conn = mysql::Conn::new(opts) + .map_err(|e| PyValueError::new_err(format!("Failed to connect: {e}")))?; + + let cols_query = " + SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? + ORDER BY ORDINAL_POSITION; + "; + + let rows: Vec<(String, String, String)> = conn + .exec(cols_query, (table_name,)) + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {e}")))?; + + let mut columns = Vec::new(); + for (col_name, data_type, is_nullable_str) in rows { + let is_nullable = is_nullable_str == "YES"; + columns.push((col_name, data_type, is_nullable)); + } + + let pk_query = " + SELECT kcu.COLUMN_NAME + FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tco + JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu + ON kcu.CONSTRAINT_NAME = tco.CONSTRAINT_NAME + AND kcu.CONSTRAINT_SCHEMA = tco.CONSTRAINT_SCHEMA + AND kcu.TABLE_NAME = tco.TABLE_NAME + WHERE tco.CONSTRAINT_TYPE = 'PRIMARY KEY' + AND kcu.TABLE_SCHEMA = DATABASE() + AND kcu.TABLE_NAME = ?; + "; + + let pk_rows: Vec = conn + .exec(pk_query, (table_name,)) + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {e}")))?; + + let mut pk_columns = Vec::new(); + for col_name in pk_rows { + pk_columns.push(col_name); + } + + Ok((columns, pk_columns)) +} + +#[pyfunction] +#[pyo3(signature = (connection_string, table_name))] +fn mssql_explore_schema(connection_string: &str, table_name: &str) -> ExploreSchemaResult { + let rt = crate::async_runtime::create_async_tokio_runtime() + .map_err(|e| PyValueError::new_err(format!("Failed to create tokio runtime: {e}")))?; + rt.block_on(async { + use tokio::net::TcpStream; + use tokio_util::compat::TokioAsyncWriteCompatExt; + + let config = tiberius::Config::from_ado_string(connection_string) + .map_err(|e| PyValueError::new_err(format!("Invalid MSSQL config: {e}")))?; + + let tcp = TcpStream::connect(config.get_addr()) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to connect to TCP: {e}")))?; + + let _ = tcp.set_nodelay(true); + + let mut client = tiberius::Client::connect(config, tcp.compat_write()) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to connect to MSSQL: {e}")))?; + + let cols_query = " + SELECT c.name AS column_name, t.name AS data_type, c.is_nullable + FROM sys.columns c + INNER JOIN sys.types t ON c.user_type_id = t.user_type_id + WHERE c.object_id = OBJECT_ID(@P1) + ORDER BY c.column_id; + "; + let stream = client + .query(cols_query, &[&table_name]) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to query columns: {e}")))?; + + let rows = stream + .into_first_result() + .await + .map_err(|e| PyValueError::new_err(format!("Failed to fetch columns: {e}")))?; + + let mut columns = Vec::new(); + for row in rows { + let col_name: &str = row.get(0).unwrap_or(""); + let data_type: &str = row.get(1).unwrap_or(""); + let is_nullable: bool = row.get(2).unwrap_or(false); + columns.push((col_name.to_string(), data_type.to_string(), is_nullable)); + } + + let pk_query = " + SELECT c.name AS column_name + FROM sys.indexes i + INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id + INNER JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id + WHERE i.is_primary_key = 1 AND i.object_id = OBJECT_ID(@P1); + "; + let stream = client + .query(pk_query, &[&table_name]) + .await + .map_err(|e| PyValueError::new_err(format!("Failed to query primary key: {e}")))?; + + let pk_rows = stream + .into_first_result() + .await + .map_err(|e| PyValueError::new_err(format!("Failed to fetch primary key: {e}")))?; + + let mut pk_columns = Vec::new(); + for row in pk_rows { + let col_name: &str = row.get(0).unwrap_or(""); + pk_columns.push(col_name.to_string()); + } + + Ok((columns, pk_columns)) + }) +} + #[pymodule] #[pyo3(name = "engine")] fn engine(_py: Python<'_>, m: &Bound) -> PyResult<()> { @@ -8471,6 +8665,10 @@ fn engine(_py: Python<'_>, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + + m.add_function(wrap_pyfunction!(postgres_explore_schema, m)?)?; + m.add_function(wrap_pyfunction!(mysql_explore_schema, m)?)?; + m.add_function(wrap_pyfunction!(mssql_explore_schema, m)?)?; m.add_class::()?; m.add_class::()?;