Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airflow-core/docs/howto/set-up-database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,42 @@ We recommend using the ``psycopg2`` driver and specifying it in your SqlAlchemy

postgresql+psycopg2://<user>:<password>@<host>/<db>

Async SQLAlchemy engine and the async driver
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In addition to the synchronous engine, Airflow maintains an async SQLAlchemy engine for the metadata database (used e.g. by async API endpoints).
When ``[database] sql_alchemy_conn_async`` is not set, its URL is derived from ``sql_alchemy_conn`` using ``psycopg`` (psycopg3) as the async driver:

.. code-block:: text

postgresql+psycopg_async://<user>:<password>@<host>/<db>

psycopg3 is the default because it is safe behind transaction-mode PgBouncer (recommended for all production Postgres installations, see the note below) with no extra configuration.

@uranusjr uranusjr Jun 14, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should include a reference. (Both for why psycopg3 does and why another engine e.g. asyncpg does not.)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean mention the docs on prepared statements from both drivers, or just mention it has to do with the default setting of each driver when it comes to prepared statements?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more information to L220, L229

psycopg3 `defers statement preparation <https://www.psycopg.org/psycopg3/docs/advanced/prepare.html>`_ behind a threshold (default 5 executions) and lets you disable it entirely with ``prepare_threshold=None``.

If you need the extra throughput of ``asyncpg``, opt in by installing the ``apache-airflow-providers-postgres[asyncpg]`` extra and setting the async URL explicitly:

.. code-block:: ini

[database]
sql_alchemy_conn_async = postgresql+asyncpg://<user>:<password>@<host>/<db>

asyncpg uses `named server-side prepared statements <https://magicstack.github.io/asyncpg/current/faq.html#why-am-i-getting-prepared-statement-errors>`_, which break under transaction-mode PgBouncer.
If you run asyncpg behind transaction-mode PgBouncer, you must disable its prepared-statement caching by pointing ``sql_alchemy_connect_args_async`` at a dictionary defined in your ``airflow_local_settings.py``:

.. code-block:: python

# airflow_local_settings.py
connect_args_async = {
"statement_cache_size": 0,
"prepared_statement_cache_size": 0,
}

.. code-block:: ini

[database]
sql_alchemy_connect_args_async = airflow_local_settings.connect_args_async

Also note that since SqlAlchemy does not expose a way to target a specific schema in the database URI, you need to ensure schema ``public`` is in your Postgres user's search_path.

If you created a new Postgres account for Airflow:
Expand Down
32 changes: 32 additions & 0 deletions airflow-core/newsfragments/68496.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Default async Postgres driver changed from asyncpg to psycopg3

When ``[database] sql_alchemy_conn_async`` is not set, Airflow derives the async metadata database URL from ``sql_alchemy_conn``. For PostgreSQL, the derived URL now uses psycopg3 (``postgresql+psycopg_async://``) instead of asyncpg (``postgresql+asyncpg://``).

**Why:** Airflow recommends running PgBouncer in front of PostgreSQL in production. asyncpg uses named server-side prepared statements, which break under transaction-mode PgBouncer unless prepared-statement caching is explicitly disabled. psycopg3 is safe behind transaction-mode PgBouncer with no extra configuration, so the default async engine now works out of the box in recommended production deployments.

The SQLite (``aiosqlite``) and MySQL (``aiomysql``) async URL derivations are unchanged, and an explicitly configured ``sql_alchemy_conn_async`` is never rewritten.

**Packaging:** the ``apache-airflow-providers-postgres`` distribution now installs ``psycopg`` (psycopg3) by default, and ``asyncpg`` is no longer installed by default. Instead, it moved to the ``asyncpg`` optional extra.

**To keep using asyncpg**, install the extra and configure the async URL explicitly:

.. code-block:: bash

pip install 'apache-airflow-providers-postgres[asyncpg]'

.. code-block:: ini

[database]
sql_alchemy_conn_async = postgresql+asyncpg://<user>:<password>@<host>/<db>

When running behind transaction-mode PgBouncer, also disable asyncpg's prepared-statement caching by pointing ``sql_alchemy_connect_args_async`` at a dict defined in ``airflow_local_settings.py``:

.. code-block:: python

# airflow_local_settings.py
connect_args_async = {"statement_cache_size": 0, "prepared_statement_cache_size": 0}

.. code-block:: ini

[database]
sql_alchemy_connect_args_async = airflow_local_settings.connect_args_async
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def load_policy_plugins(pm: pluggy.PluginManager):


def _get_async_conn_uri_from_sync(sync_uri):
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "psycopg_async", "mysql": "aiomysql"}
"""Mapping of sync scheme to async scheme."""

scheme, rest = sync_uri.split(":", maxsplit=1)
Expand Down
30 changes: 30 additions & 0 deletions airflow-core/tests/unit/core/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,36 @@ def test_sqlite_relative_path(value, expectation):
settings.configure_orm()


@pytest.mark.parametrize(
("sync_uri", "expected_async_uri"),
[
("postgresql://user:pass@host/db", "postgresql+psycopg_async://user:pass@host/db"),
("postgresql+psycopg2://user:pass@host/db", "postgresql+psycopg_async://user:pass@host/db"),
("sqlite:////root/airflow.db", "sqlite+aiosqlite:////root/airflow.db"),
("mysql://user:pass@host/db", "mysql+aiomysql://user:pass@host/db"),
("mssql://user:pass@host/db", "mssql://user:pass@host/db"),
],
)
def test_get_async_conn_uri_from_sync(sync_uri, expected_async_uri):
from airflow import settings

assert settings._get_async_conn_uri_from_sync(sync_uri) == expected_async_uri


def test_explicit_sql_alchemy_conn_async_is_not_rewritten():
from airflow import settings

from tests_common.test_utils.config import conf_vars

explicit_async_uri = "postgresql+asyncpg://user:pass@host/db"
try:
with conf_vars({("database", "sql_alchemy_conn_async"): explicit_async_uri}):
settings.configure_vars()
assert explicit_async_uri == settings.SQL_ALCHEMY_CONN_ASYNC
finally:
settings.configure_vars()


class TestDisposeOrm:
"""Tests for dispose_orm() async engine disposal."""

Expand Down
5 changes: 3 additions & 2 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -1580,9 +1580,10 @@
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-sql>=1.32.0",
"apache-airflow>=2.11.0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.10; python_version >= '3.13'",
"psycopg2-binary>=2.9.9; python_version < '3.13'"
"psycopg2-binary>=2.9.9; python_version < '3.13'",
"psycopg[binary]>=3.2.9; python_version < '3.14'",
"psycopg[binary]>=3.3.3; python_version >= '3.14'"
],
"devel-deps": [],
"plugins": [],
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json.sha256sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93831555f2a141e481c81c147142aeb860c34ea860163ca130d045e5ecd0a83b
7398ed4e1177a69a7e8d09ee3dc07771aa525bf18d3366606281e7c1b0a13793
3 changes: 2 additions & 1 deletion providers/postgres/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ PIP package Version required
``apache-airflow-providers-common-sql`` ``>=1.32.0``
``psycopg2-binary`` ``>=2.9.9; python_version < "3.13"``
``psycopg2-binary`` ``>=2.9.10; python_version >= "3.13"``
``asyncpg`` ``>=0.30.0``
``psycopg[binary]`` ``>=3.2.9; python_version < "3.14"``
``psycopg[binary]`` ``>=3.3.3; python_version >= "3.14"``
========================================== ======================================

Cross provider package dependencies
Expand Down
9 changes: 9 additions & 0 deletions providers/postgres/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
Changelog
---------

.. note::
Breaking change (upcoming major release): ``asyncpg`` is no longer installed by default —
it moved to the ``asyncpg`` optional extra, and ``psycopg`` (psycopg3) is now installed by
default as the async metadata-database driver. The default derived async connection URL
changed from ``postgresql+asyncpg://`` to ``postgresql+psycopg_async://``, which is safe
behind transaction-mode PgBouncer with no extra configuration. To keep using asyncpg,
install ``apache-airflow-providers-postgres[asyncpg]`` and set
``[database] sql_alchemy_conn_async = postgresql+asyncpg://...`` explicitly.

6.8.0
.....

Expand Down
3 changes: 2 additions & 1 deletion providers/postgres/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ PIP package Version required
``apache-airflow-providers-common-sql`` ``>=1.32.0``
``psycopg2-binary`` ``>=2.9.9; python_version < "3.13"``
``psycopg2-binary`` ``>=2.9.10; python_version >= "3.13"``
``asyncpg`` ``>=0.30.0``
``psycopg[binary]`` ``>=3.2.9; python_version < "3.14"``
``psycopg[binary]`` ``>=3.3.3; python_version >= "3.14"``
========================================== ======================================

Cross provider package dependencies
Expand Down
13 changes: 8 additions & 5 deletions providers/postgres/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-sql>=1.32.0",
# psycopg2 remains the sync driver for now; its removal and the sync migration to
# psycopg3 are tracked at https://github.com/apache/airflow/issues/68453
"psycopg2-binary>=2.9.9; python_version < '3.13'",
"psycopg2-binary>=2.9.10; python_version >= '3.13'",
"asyncpg>=0.30.0",
"psycopg[binary]>=3.2.9; python_version < '3.14'",
"psycopg[binary]>=3.3.3; python_version >= '3.14'",
]

# The optional dependencies should be modified in place in the generated file
Expand All @@ -73,6 +76,9 @@ dependencies = [
"amazon" = [
"apache-airflow-providers-amazon>=2.6.0",
]
"asyncpg" = [
"asyncpg>=0.30.0",
]
"microsoft.azure" = [
"apache-airflow-providers-microsoft-azure>=12.8.0"
]
Expand All @@ -87,10 +93,6 @@ dependencies = [
"polars" = [
"polars>=1.26.0"
]
"psycopg" = [
"psycopg[binary]>=3.2.9; python_version < '3.14'",
"psycopg[binary]>=3.3.3; python_version >= '3.14'",
]
"sqlalchemy" = [
"sqlalchemy>=1.4.54"
]
Expand All @@ -108,6 +110,7 @@ dev = [
# Additional devel dependencies (do not remove this line and add extra development dependencies)
"apache-airflow-providers-common-sql[pandas]",
"apache-airflow-providers-common-sql[polars]",
"apache-airflow-providers-postgres[asyncpg]",
"apache-airflow-providers-postgres[sqlalchemy]"
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,14 @@


class CompatConnection(Protocol):
"""Protocol for type hinting psycopg2 and psycopg3 connection objects."""
"""Protocol for the common interface shared by psycopg2 and psycopg3 connection objects."""

def cursor(self, *args, **kwargs) -> Any: ...
def commit(self) -> None: ...
def close(self) -> None: ...

# Context manager support
def __enter__(self) -> CompatConnection: ...
def __enter__(self) -> Any: ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...

# Common properties
@property
def notices(self) -> list[Any]: ...

# psycopg3 specific (optional)
@property
def adapters(self) -> Any: ...

@property
def row_factory(self) -> Any: ...

# Optional method for psycopg3
def add_notice_handler(self, handler: Any) -> None: ...


class PostgresHook(DbApiHook):
"""
Expand Down
Loading
Loading