Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased]

### Added
- `pw.io.postgres.read`, `pw.io.mysql.read`, and `pw.io.mssql.read` now support automated schema exploration. You can omit the `schema` parameter when initializing these database readers, and the engine will dynamically infer the target table's schema (including column types and primary keys) directly from the database catalog at startup. This enables rapid zero-configuration onboarding while maintaining static type validation.
- `pw.io.chroma.write` writes a Pathway table to a [Chroma](https://www.trychroma.com/) collection, keeping the collection in sync with the table as rows are added, changed, and removed. The columns are mapped onto Chroma's record fields explicitly: the optional `primary_key` becomes the record id (when omitted, the row's internal Pathway key is used instead), `embedding` the vector, the optional `document` column the stored text, and `metadata_columns` the record metadata. The collection must already exist. The server is addressed with `host`/`port` (plus `ssl`, `headers`, `tenant`, and `database` for authenticated deployments such as Chroma Cloud).
- `pw.io.qdrant.write` writes a Pathway table to a [Qdrant](https://qdrant.tech/) collection. Each row addition is upserted as a point and each deletion removes the corresponding point, so an update replaces a point rather than duplicating it. The `vector` column supplies the point vector (`list[float]` or a 1-D `numpy.ndarray`) and every other column is stored in the point payload. If the target collection does not exist, it is created on the first write using Cosine distance with the dimension of the written vectors. The `batch_size` parameter bounds how many points are sent per request, and an optional `api_key` authenticates against Qdrant Cloud or a secured instance.
- `pw.io.duckdb.write` writes a Pathway table into a DuckDB database file through a native, in-process connector, in either `"stream_of_changes"` or `"snapshot"` mode. Embeddings stored as `numpy` arrays or `list[float]` columns land in native `DOUBLE[]` list columns, so the result is directly searchable with DuckDB's vector-distance functions for RAG retrieval. The `detach_between_batches` option makes the writer close the database after every minibatch commit and reopen it for the next one, releasing the file lock in between — so a separate process (e.g. a query server) can read committed data with short-lived read-only connections while the pipeline keeps running; the writer retries the reopen with a backoff when a reader momentarily holds the lock.
Expand Down
16 changes: 16 additions & 0 deletions python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1231,3 +1231,19 @@ class PyObjectWrapper(Generic[T]):
def _create_with_serializer(
value: T, *, serializer: PyObjectWrapperSerializer | None = None
) -> PyObjectWrapper[T]: ...

def postgres_explore_schema(
connection_string: str,
schema_name: str | None,
table_name: str,
ssl_mode: str,
ssl_cert_path: str | None,
) -> tuple[list[tuple[str, str, bool]], list[str]]: ...
def mysql_explore_schema(
connection_string: str,
table_name: str,
) -> tuple[list[tuple[str, str, bool]], list[str]]: ...
def mssql_explore_schema(
connection_string: str,
full_table_name: str,
) -> tuple[list[tuple[str, str, bool]], list[str]]: ...
66 changes: 64 additions & 2 deletions python/pathway/io/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from __future__ import annotations

from typing import Any, Iterable, Literal
import logging
from typing import Any, Iterable, Literal, Optional

from pathway.internals import api, datasink, datasource, dtype as dt
from pathway.internals._io_helpers import _format_output_value_fields
Expand All @@ -19,6 +20,7 @@
init_mode_from_str,
read_schema,
)
from pathway.schema import schema_builder


def _validate_identifier(arg_name: str, value: str) -> None:
Expand All @@ -38,7 +40,7 @@ def _validate_identifier(arg_name: str, value: str) -> None:
def read(
connection_string: str,
table_name: str,
schema: type[Schema],
schema: type[Schema] | None = None,
*,
mode: Literal["static", "streaming"] = "streaming",
schema_name: str = "dbo",
Expand Down Expand Up @@ -214,6 +216,66 @@ def read(
_validate_identifier("table_name", table_name)
_validate_identifier("schema_name", schema_name)

if schema is None:
try:
from pathway.engine import mssql_explore_schema

full_table_name = f"{schema_name}.{table_name}"
columns_data, pk_columns = mssql_explore_schema(
connection_string, full_table_name
)
schema_columns = {}
for col_name, udt_name, is_nullable in columns_data:
udt_name_lower = udt_name.lower()
mapping = {
"tinyint": int,
"smallint": int,
"int": int,
"bigint": int,
"bit": bool,
"real": float,
"float": float,
"decimal": float,
"numeric": float,
"char": str,
"varchar": str,
"nchar": str,
"nvarchar": str,
"text": str,
"ntext": str,
"uniqueidentifier": str,
}
py_type = mapping.get(udt_name_lower, Any)
if is_nullable and py_type is not Any:
py_type = Optional[py_type]

is_pk = col_name in pk_columns
from pathway.internals.schema import column_definition

schema_columns[col_name] = column_definition(
dtype=py_type,
primary_key=is_pk,
)

if not pk_columns:
logging.getLogger(__name__).warning(
f"No primary key found for {schema_name}.{table_name} during schema exploration. "
"Falling back to auto-generated row identifiers. "
"This may cause issues in streaming mode if the table is not append-only."
)

schema_name_class = (
"".join(c.capitalize() for c in table_name.split("_")) + "Schema"
)
schema = schema_builder(schema_columns, name=schema_name_class)
logging.getLogger(__name__).info(
f"Derived schema for {schema_name}.{table_name}:\n{schema}"
)
except Exception as e:
raise RuntimeError(
f"Failed to explore schema automatically: {e}. Please provide an explicit schema."
) from e

schema, api_schema = read_schema(schema)

primary_key_columns = schema.primary_key_columns()
Expand Down
63 changes: 61 additions & 2 deletions python/pathway/io/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from __future__ import annotations

from typing import Any, Iterable, Literal
import logging
from typing import Any, Iterable, Literal, Optional

from pathway.internals import api, datasink, datasource, dtype as dt
from pathway.internals._io_helpers import _format_output_value_fields
Expand All @@ -18,14 +19,15 @@
init_mode_from_str,
read_schema,
)
from pathway.schema import schema_builder


@check_arg_types
@trace_user_frame
def read(
connection_string: str,
table_name: str,
schema: type[Schema],
schema: type[Schema] | None = None,
*,
mode: Literal["static", "streaming"] = "streaming",
server_id: int | None = None,
Expand Down Expand Up @@ -184,6 +186,63 @@ def read(
"""
_check_entitlements("mysql")

if schema is None:
try:
from pathway.engine import mysql_explore_schema

columns_data, pk_columns = mysql_explore_schema(
connection_string, table_name
)
schema_columns = {}
for col_name, udt_name, is_nullable in columns_data:
udt_name_lower = udt_name.lower()
mapping = {
"tinyint": int,
"smallint": int,
"mediumint": int,
"int": int,
"bigint": int,
"float": float,
"double": float,
"decimal": float,
"char": str,
"varchar": str,
"text": str,
"mediumtext": str,
"longtext": str,
"json": str,
}
py_type = mapping.get(udt_name_lower, Any)
if is_nullable and py_type is not Any:
py_type = Optional[py_type]

is_pk = col_name in pk_columns
from pathway.internals.schema import column_definition

schema_columns[col_name] = column_definition(
dtype=py_type,
primary_key=is_pk,
)

if not pk_columns:
logging.getLogger(__name__).warning(
f"No primary key found for {table_name} during schema exploration. "
"Falling back to auto-generated row identifiers. "
"This may cause issues in streaming mode if the table is not append-only."
)

schema_name_class = (
"".join(c.capitalize() for c in table_name.split("_")) + "Schema"
)
schema = schema_builder(schema_columns, name=schema_name_class)
logging.getLogger(__name__).info(
f"Derived schema for {table_name}:\n{schema}"
)
except Exception as e:
raise RuntimeError(
f"Failed to explore schema automatically: {e}. Please provide an explicit schema."
) from e

schema, api_schema = read_schema(schema)

primary_key_columns = schema.primary_key_columns()
Expand Down
71 changes: 69 additions & 2 deletions python/pathway/io/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from __future__ import annotations

import copy
import logging
import urllib.parse
import warnings
from typing import Any, Iterable, Literal
from typing import Any, Iterable, Literal, Optional

from pathway.internals import api, datasink, datasource, dtype
from pathway.internals._io_helpers import TLSSettings, _format_output_value_fields
Expand All @@ -21,6 +22,7 @@
init_mode_from_str,
read_schema,
)
from pathway.schema import schema_builder


def _quote_libpq_value(value) -> str:
Expand Down Expand Up @@ -284,7 +286,7 @@ def _construct_replication_settings(
def read(
postgres_settings: dict,
table_name: str,
schema: type[Schema],
schema: type[Schema] | None = None,
*,
mode: Literal["streaming", "static"] = "streaming",
is_append_only: bool = False,
Expand Down Expand Up @@ -575,6 +577,71 @@ def read(
tls_settings=tls.settings,
)

if schema is None:
try:
from pathway.engine import postgres_explore_schema

ssl_mode = owned_postgres_settings.get("sslmode", "prefer")
ssl_cert_path = owned_postgres_settings.get("sslrootcert", None)

columns_data, pk_columns = postgres_explore_schema(
_connection_string_from_settings(owned_postgres_settings),
schema_name,
table_name,
ssl_mode,
ssl_cert_path,
)

schema_columns = {}
for col_name, udt_name, is_nullable in columns_data:
# Map to pw types
mapping = {
"int2": int,
"int4": int,
"int8": int,
"float4": float,
"float8": float,
"numeric": float,
"bool": bool,
"text": str,
"varchar": str,
"bpchar": str,
"char": str,
"uuid": str,
"json": str,
"jsonb": str,
}
py_type = mapping.get(udt_name, Any)
if is_nullable and py_type is not Any:
py_type = Optional[py_type]

is_pk = col_name in pk_columns
from pathway.internals.schema import column_definition

schema_columns[col_name] = column_definition(
dtype=py_type,
primary_key=is_pk,
)

if not pk_columns:
logging.getLogger(__name__).warning(
f"No primary key found for {schema_name}.{table_name} during schema exploration. "
"Falling back to auto-generated row identifiers. "
"This may cause issues in streaming mode if the table is not append-only."
)

schema_name_class = (
"".join(c.capitalize() for c in table_name.split("_")) + "Schema"
)
schema = schema_builder(schema_columns, name=schema_name_class)
logging.getLogger(__name__).info(
f"Derived schema for {schema_name}.{table_name}:\n{schema}"
)
except Exception as e:
raise RuntimeError(
f"Failed to explore schema automatically: {e}. Please provide an explicit schema."
) from e

schema, api_schema = read_schema(schema)
data_format = api.DataFormat(
format_type="transparent",
Expand Down
1 change: 1 addition & 0 deletions src/connectors/data_storage/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ enum WritePlan {
/// list value as a statement parameter, list/array/tuple and JSON values are
/// bound as a JSON string and cast back into the destination type inside the
/// generated SQL (`CAST(CAST(? AS JSON) AS DOUBLE[])`).
#[allow(clippy::struct_excessive_bools)]
pub struct DuckDbWriter {
/// Path to the database file (or `:memory:`). The connection is opened
/// lazily, on the first non-empty flush — see
Expand Down
Loading
Loading