diff --git a/.github/workflows/mapping.py b/.github/workflows/mapping.py index d377a408ed..38c7b1f9d0 100644 --- a/.github/workflows/mapping.py +++ b/.github/workflows/mapping.py @@ -1 +1 @@ -versions = {'etcd': '9.6', 'etcd3': '16', 'consul': '17', 'exhibitor': '12', 'raft': '14', 'kubernetes': '15'} +versions = {'etcd': '9.6', 'etcd3': '17', 'consul': '18', 'exhibitor': '12', 'raft': '14', 'kubernetes': '16'} diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6ab22f6b8d..59387d56bc 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -20,7 +20,7 @@ jobs: os: [ubuntu, windows, macos] steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up Python 3.7 uses: actions/setup-python@v5 @@ -91,18 +91,25 @@ jobs: run: python .github/workflows/run_tests.py if: matrix.os != 'macos' + - name: Set up Python 3.14 + uses: actions/setup-python@v5 + with: + python-version: 3.14 + if: matrix.os != 'macos' + - name: Install dependencies + run: python .github/workflows/install_deps.py + if: matrix.os != 'macos' + - name: Run tests and flake8 + run: python .github/workflows/run_tests.py + if: matrix.os != 'macos' + - name: Combine coverage run: python .github/workflows/run_tests.py combine - - name: Install coveralls - run: python -m pip install coveralls - - - name: Upload Coverage - env: - COVERALLS_FLAG_NAME: unit-${{ matrix.os }} - COVERALLS_PARALLEL: 'true' - GITHUB_TOKEN: ${{ secrets.github_token }} - run: python -m coveralls --service=github + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v5 + with: + flags: unit-${{ matrix.os }} behave: runs-on: ${{ fromJson('{"ubuntu":"ubuntu-22.04","windows":"windows-latest","macos":"macos-14"}')[matrix.os] }} @@ -114,7 +121,7 @@ jobs: fail-fast: false matrix: os: [ubuntu] - python-version: [3.7, 3.13] + python-version: [3.7, 3.14] dcs: [etcd, etcd3, consul, exhibitor, kubernetes, raft] include: - os: macos @@ -128,12 +135,14 @@ jobs: dcs: etcd3 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up Python uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - uses: nolar/setup-k3d-k3s@v1 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} if: matrix.dcs == 'kubernetes' - name: Add postgresql and citus apt repo run: | @@ -164,17 +173,6 @@ jobs: run: bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r cobertura.xml -l Python --partial if: ${{ env.SECRETS_AVAILABLE == 'true' }} - coveralls-finish: - name: Finalize coveralls.io - needs: unit - runs-on: ubuntu-latest - steps: - - uses: actions/setup-python@v5 - - run: python -m pip install coveralls - - run: python -m coveralls --service=github --finish - env: - GITHUB_TOKEN: ${{ secrets.github_token }} - codacy-final: name: Finalize Codacy needs: behave @@ -186,25 +184,25 @@ jobs: pyright: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - - name: Set up Python 3.13 + - name: Set up Python 3.14 uses: actions/setup-python@v5 with: - python-version: 3.13 + python-version: 3.14 - name: Install dependencies run: python -m pip install -r requirements.txt psycopg2-binary psycopg - uses: jakebailey/pyright-action@v2 with: - version: 1.1.405 + version: 1.1.408 ydiff: name: Test compatibility with the latest version of ydiff runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up Python 3.13 uses: actions/setup-python@v5 @@ -220,7 +218,7 @@ jobs: docs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up Python 3.11 uses: actions/setup-python@v5 @@ -244,7 +242,7 @@ jobs: isort: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up Python 3.12 uses: actions/setup-python@v5 diff --git a/README.rst b/README.rst index ae4fb93265..4dc2286bd8 100644 --- a/README.rst +++ b/README.rst @@ -1,10 +1,55 @@ |Tests Status| |Coverage Status| +.. image:: docs/_static/patroni-logo.svg + :height: 128px + :width: 128px + Patroni: A Template for PostgreSQL HA with ZooKeeper, etcd or Consul -------------------------------------------------------------------- You can find a version of this documentation that is searchable and also easier to navigate at `patroni.readthedocs.io `__. +**Important!** + Running Patroni on **memory-restricted systems with Python 3.11+** + +---- + +If you run Patroni on a system with strict memory limits, for example with ``vm.overcommit_memory=2`` (recommended for PostgreSQL), and use Python 3.11 or newer, you may observe unexpected behavior: + +- Patroni appears healthy +- PostgreSQL continues to run +- Patroni **REST API becomes unresponsive** +- The operating system reports that Patroni is listening on the REST API port +- Patroni logs look normal; however, following messages may appear once: ``Exception ignored in thread started by: ``, ``MemoryError`` +- Kernel logs may contain messages such as ``not enough memory for the allocation`` + +This behavior is caused by a `bug in Python 3.11+ `__. +Under strict memory conditions, starting a new thread may hang indefinitely when there is not enough free memory. + +Recommended solution +-------------------- + +Recent Patroni releases (4.1.1+, 4.0.8+) reduce the impact of this issue by starting all required threads early during startup, before the system is under memory pressure. + +Additional recommendations (Linux, glibc) +----------------------------------------- + +When running with ``vm.overcommit_memory=2`` (recommended for PostgreSQL), we also recommend starting Patroni with the following environment variables configured: + +- ``MALLOC_ARENA_MAX=1`` - reduces the amount of virtual memory allocated by glibc for multi-threaded + applications +- ``PG_MALLOC_ARENA_MAX=`` - resets the value of ``MALLOC_ARENA_MAX`` for PostgreSQL processes started by Patroni. + +In addition, you may tune the following Patroni configuration parameters: + +- ``thread_stack_size`` - stack size used for threads started by Patroni. Lowering this value reduces memory usage of the Patroni process. The default value set by Patroni is ``512kB``. Increase ``thread_stack_size`` if Patroni experience stack-related crashes; otherwise the default value is sufficient. +- ``thread_pool_size`` - size of the thread pool used by Patroni for asynchronous tasks and REST API communication with other members during leader race or failsafe checks. The default value is ``5``, which is sufficient for three-node clusters. +- ``restapi.thread_pool_size`` - size of the thread pool used to process REST API requests. The default value is ``5``, allowing up to five parallel REST API requests. Note that requests involving SQL queries are effectively serialized because a single database connection is used, so increasing this value typically provides no benefit. + +---- + +PostgreSQL High Availability and Patroni +---------------------------------------- There are many ways to run high availability with PostgreSQL; for a list, see the `PostgreSQL Documentation `__. @@ -173,5 +218,5 @@ When connecting from an application, always use a non-superuser. Patroni require .. |Tests Status| image:: https://github.com/patroni/patroni/actions/workflows/tests.yaml/badge.svg :target: https://github.com/patroni/patroni/actions/workflows/tests.yaml?query=branch%3Amaster -.. |Coverage Status| image:: https://coveralls.io/repos/patroni/patroni/badge.svg?branch=master - :target: https://coveralls.io/github/patroni/patroni?branch=master +.. |Coverage Status| image:: https://codecov.io/gh/patroni/patroni/graph/badge.svg?token=qWNJyFTeul + :target: https://codecov.io/gh/patroni/patroni diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 46bfb85dc8..1070b2839b 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -8,9 +8,12 @@ It is possible to override some of the configuration parameters defined in the P Global/Universal ---------------- - **PATRONI\_CONFIGURATION**: it is possible to set the entire configuration for the Patroni via ``PATRONI_CONFIGURATION`` environment variable. In this case any other environment variables will not be considered! +- **PATRONI\_THREAD\_POOL\_SIZE**: size of thread pool used by Patroni to execute asynchronous tasks and communicate via REST API with other members during leader race or failsafe checks. Minimal value is ``5``, default value is ``5``. +- **PATRONI\_THREAD\_STACK\_SIZE**: specifies the stack size to be used for threads started by Patroni. Value must be aligned by ``64kB``. Minimal value is ``64kB``, default value (set by Patroni) is ``512kB``. - **PATRONI\_NAME**: name of the node where the current instance of Patroni is running. Must be unique for the cluster. - **PATRONI\_NAMESPACE**: path within the configuration store where Patroni will keep information about the cluster. Default value: "/service" - **PATRONI\_SCOPE**: cluster name +- **PG\_MALLOC\_ARENA\_MAX**: custom value for ``MALLOC_ARENA_MAX`` environment variable for ``postmaster`` process. If not set, ``postmaster`` will inherit ``MALLOC_ARENA_MAX`` value. Log --- @@ -193,6 +196,7 @@ PostgreSQL REST API -------- +- **PATRONI\_RESTAPI\_THREAD\_POOL\_SIZE**: size of thread pool used by Patroni to process REST API requests. Minimal value is ``5``, default value is ``5``. - **PATRONI\_RESTAPI\_CONNECT\_ADDRESS**: IP address and port to access the REST API. - **PATRONI\_RESTAPI\_LISTEN**: IP address and port that Patroni will listen to, to provide health-check information for HAProxy. - **PATRONI\_RESTAPI\_USERNAME**: Basic-auth username to protect unsafe REST API endpoints. diff --git a/docs/_static/patroni-logo.png b/docs/_static/patroni-logo.png new file mode 100644 index 0000000000..dc964790e4 Binary files /dev/null and b/docs/_static/patroni-logo.png differ diff --git a/docs/_static/patroni-logo.svg b/docs/_static/patroni-logo.svg new file mode 100644 index 0000000000..00e9cfd9f0 --- /dev/null +++ b/docs/_static/patroni-logo.svg @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/docs/existing_data.rst b/docs/existing_data.rst index 2aa3ad26b7..c0bfbd7166 100644 --- a/docs/existing_data.rst +++ b/docs/existing_data.rst @@ -38,7 +38,7 @@ You can find below an overview of steps for converting an existing Postgres clus #. Create a YAML configuration file for Patroni. You can use :ref:`Patroni configuration generation and validation tooling ` for that. - * **Note (specific for the primary node):** If you have replication slots being used for replication between cluster members, then it is recommended that you enable ``use_slots`` and configure the existing replication slots as permanent via the ``slots`` configuration item. Be aware that Patroni automatically creates replication slots for replication between members, and drops replication slots that it does not recognize, when ``use_slots`` is enabled. The idea of using permanent slots here is to allow your existing slots to persist while the migration to Patroni is in progress. See :ref:`YAML Configuration Settings ` for details. + * **Note (specific for the primary node):** If you have replication slots being used for replication between cluster members, then it is recommended that you enable ``use_slots`` and configure the existing replication slots as permanent via the ``slots`` configuration item. Be aware that Patroni automatically creates replication slots for replication between members, and drops replication slots that it does not recognize, when ``use_slots`` is enabled. The idea of using permanent slots here is to allow your existing slots to persist while the migration to Patroni is in progress. See :ref:`Dynamic Configuration Settings ` for details. #. Start Patroni using the ``patroni`` systemd service unit. It automatically detects that Postgres is already running and starts monitoring the instance. @@ -47,7 +47,7 @@ You can find below an overview of steps for converting an existing Postgres clus #. Immediate restart of the standby nodes. #. Scheduled restart of the primary node within a maintenance window. -#. If you configured permanent slots in step ``1.2.``, then you should remove them from ``slots`` configuration through :ref:`patronictl edit-config cluster-name member-name ` command once the ``restart_lsn`` of the slots created by Patroni is able to catch up with the ``restart_lsn`` of the original slots for the corresponding members. By removing the slots from ``slots`` configuration you will allow Patroni to drop the original slots from your cluster once they are not needed anymore. You can find below an example query to check the ``restart_lsn`` of a couple slots, so you can compare them: +#. If you configured permanent slots in step ``1.2.``, then you should remove them from ``slots`` configuration through :ref:`patronictl edit-config cluster-name ` command once the ``restart_lsn`` of the slots created by Patroni is able to catch up with the ``restart_lsn`` of the original slots for the corresponding members. By removing the slots from ``slots`` configuration you will allow Patroni to drop the original slots from your cluster once they are not needed anymore. You can find below an example query to check the ``restart_lsn`` of a couple slots, so you can compare them: .. code-block:: sql diff --git a/docs/faq.rst b/docs/faq.rst index 4dd488db58..3fbef19b2d 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -157,6 +157,16 @@ How can I change my environment configuration? Take care to not cause a failover in the cluster! You might be interested in checking :ref:`patronictl_pause`. +How can I reduce repetitive heartbeat log lines during normal operation? + If your logs are too noisy because of repeated lines like ``Lock owner: ...`` and ``no action. I am ...``, + configure ``log.deduplicate_heartbeat_logs: true``. + + You can set it either in the Patroni YAML file (:ref:`log_settings`) or with + ``PATRONI_LOG_DEDUPLICATE_HEARTBEAT_LOGS=true``. + + Keep in mind this reduces log volume by suppressing repeated heartbeat messages, but you also lose per-loop + heartbeat visibility that can help during failover diagnostics. + What occurs if I change a Postgres GUC that requires a reload? When you change the dynamic or the local configuration as explained in the previous questions, Patroni will take care of reloading the Postgres configuration for you. diff --git a/docs/ha_multi_dc.rst b/docs/ha_multi_dc.rst index 2a9bfd2766..945f51940b 100644 --- a/docs/ha_multi_dc.rst +++ b/docs/ha_multi_dc.rst @@ -4,7 +4,7 @@ HA multi datacenter =================== -The high availability of a PostgreSQL cluster deployed in multiple data centers is based on replication, which can be synchronous or asynchronous (`replication_modes `_). +The high availability of a PostgreSQL cluster deployed in multiple data centers is based on replication, which can be synchronous or asynchronous (see :ref:`replication modes `). In both cases, it is important to be clear about the following concepts: diff --git a/docs/index.rst b/docs/index.rst index a088b8d91c..90b6d58069 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -3,6 +3,53 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. +Patroni: A Template for PostgreSQL HA with ZooKeeper, etcd or Consul +==================================================================== + +.. image:: _static/patroni-logo.png + :height: 128px + :width: 128px + +.. warning:: + + Running Patroni on **memory-restricted systems with Python 3.11+** + +---- + +If you run Patroni on a system with strict memory limits, for example with ``vm.overcommit_memory=2`` (recommended for PostgreSQL), and use Python 3.11 or newer, you may observe unexpected behavior: + +- Patroni appears healthy +- PostgreSQL continues to run +- Patroni **REST API becomes unresponsive** +- The operating system reports that Patroni is listening on the REST API port +- Patroni logs look normal; however, following messages may appear once: ``Exception ignored in thread started by: ``, ``MemoryError`` +- Kernel logs may contain messages such as ``not enough memory for the allocation`` + +This behavior is caused by a `bug in Python 3.11+ `__. +Under strict memory conditions, starting a new thread may hang indefinitely when there is not enough free memory. + +Recommended solution +==================== + +Recent Patroni releases (4.1.1+, 4.0.8+) reduce the impact of this issue by starting all required threads early during startup, before the system is under memory pressure. + +Additional recommendations (Linux, glibc) +========================================= + +When running with ``vm.overcommit_memory=2`` (recommended for PostgreSQL), we also recommend starting Patroni with the following environment variables configured: + +- ``MALLOC_ARENA_MAX=1`` - reduces the amount of virtual memory allocated by glibc for multi-threaded + applications +- ``PG_MALLOC_ARENA_MAX=`` - resets the value of ``MALLOC_ARENA_MAX`` for PostgreSQL processes started by Patroni. + +In addition, you may tune the following Patroni configuration parameters: + +- ``thread_stack_size`` - stack size used for threads started by Patroni. Lowering this value reduces memory usage of the Patroni process. The default value set by Patroni is ``512kB``. Increase ``thread_stack_size`` if Patroni experience stack-related crashes; otherwise the default value is sufficient. +- ``thread_pool_size`` - size of the thread pool used by Patroni for asynchronous tasks and REST API communication with other members during leader race or failsafe checks. The default value is ``5``, which is sufficient for three-node clusters. +- ``restapi.thread_pool_size`` - size of the thread pool used to process REST API requests. The default value is ``5``, allowing up to five parallel REST API requests. Note that requests involving SQL queries are effectively serialized because a single database connection is used, so increasing this value typically provides no benefit. + +---- + Introduction ============ diff --git a/docs/installation.rst b/docs/installation.rst index d2d8ba7c4a..85daab1da4 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -66,7 +66,7 @@ systemd `systemd-python` in order to use sd_notify integration all all of the above (except psycopg family) -psycopg +psycopg3 `psycopg[binary]>=3.0.0` module psycopg2 `psycopg2>=2.5.4` module diff --git a/docs/releases.rst b/docs/releases.rst index d3ddad44c3..ab660f40b1 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -3,6 +3,100 @@ Release notes ============= +Version 4.1.2 +------------- + +Released 2026-04-21 + +**Systemd support improvements** + +- Add support for ``notify-reload`` systemd unit type (Ronan Dunklau) + + Allows ``systemctl reload`` to wait until Patroni has actually processed the configuration reload by sending ``RELOADING=1`` and ``READY=1`` notifications to systemd. + +- Send ``STOPPING=1`` notification to systemd on shutdown (Alexander Kukushkin) + + Patroni now properly notifies systemd that it is shutting down, following the systemd notify protocol. + +- Do not let PostgreSQL to notify systemd (Alexander Kukushkin) + + Remove ``NotifyAccess=all`` from the example systemd unit file. Filter ``NOTIFY_SOCKET`` from the environment when starting PostgreSQL so it doesn't send ``READY=1`` or ``STOPPING=1`` to systemd. When taking over a PostgreSQL that was started before Patroni and already has ``NOTIFY_SOCKET``, re-assert ``READY=1`` during PostgreSQL shutdown to counteract its ``STOPPING=1``. + + +Version 4.1.1 +------------- + +Released 2026-04-08 + +**Stability improvements** + +- Compatibility with threading changes in python 3.11+ (Alexander Kukushkin) + + Avoid starting/stopping threads at runtime. Introduce thread pools for REST API and for executing async tasks. Allow configuring global ``thread_pool_size`` and ``restapi.thread_pool_size``. + +- Compatibility with python 3.14 (Alexander Kukushkin) + + Run tests against python 3.14 and fix compatibility issues. + +- Compatibility with Etcd security fixes in v3.6.9, v3.5.28, and v3.4.42 (Alexander Kukushkin) + + These Etcd releases addressed CVEs and changed behavior so cluster topology reads and lease keepalive are no longer allowed without authentication. Patroni now handles this by authenticating in member-discovery and lease-keepalive paths, re-authenticating on auth failures, and retrying requests accordingly. + +- Improvements for Etcd3 error handling (Alexander Kukushkin) + + Handle broken JSON responses, be flexible in how JSON error is parsed, and improve reporting for etcd internal errors. + +**Bugfixes** + +- Retry leader update on temporary Kubernetes ``403`` error (Sophia Ruan, Alexander Kukushkin) + + When the Kubernetes API temporarily returns ``403 Permission Denied`` (for example during transient RBAC issues), Patroni now verifies whether the current node still holds leadership and retries the leader update within ``retry_timeout`` instead of immediately demoting. + +- Fix issue with renaming leader node in sync mode and pause (Alexander Kukushkin) + + ``/sync`` key wasn't updated after renaming the leader node with Patroni restart in pause (without Postgres restart). It prevented Patroni from promoting after the next restart without pause. + +- Trigger ``pg_rewind`` check when the same primary increased timeline (Alexander Kukushkin) + + Such timeline increase may happen as a result of crash recovery in a single user mode + promote after taking a leader key while other replica nodes are isolated from DCS. In this case replica nodes didn't trigger ``pg_rewind`` state machine because the leader and therefore ``primary_conninfo`` didn't change. + +- Only write superuser password during ``initdb`` bootstrap if it is non-empty (Michael Banck) + + Writing an empty password during ``initdb`` bootstrap was causing issues. + +- Fix bug with ``failover_priority`` with ``synchronous_mode=on`` (Alexander Kukushkin) + + ``tag.failover_priority`` values were ignored when ``synchronous_node_count > 1``. + +- Fix bug with ``primary_conninfo`` password comparison (Alexander Kukushkin) + + Starting from PostgreSQL 10, Patroni uses passfile in ``primary_conninfo`` and failed to update the passfile after the replication password was updated in yaml-file configuration with reload. + +- Don't restart replica with ``nofailover`` tag in pause mode (Alexander Kukushkin) + + Patroni used to start a manually shut down PostgreSQL replica in pause mode when it had ``nofailover`` tag set to ``true``. + +- Fix ``check_recovery_conf()`` when PostgreSQL is in the starting state (Alexander Kukushkin) + + For PostgreSQL v12 and newer, ``pg_settings`` cannot be queried while the server is still starting and not yet accepting connections. Missing recovery parameters are now added to the internal state when writing ``postgresql.conf``. Additionally, restore the ``Postgresql.is_starting()`` check in ``Ha.is_healthiest_node()``. + +- Validate user options in dictionary format for ``initdb``/``basebackup`` (m4rrypro) + + When ``initdb`` or ``basebackup`` options were provided as a dictionary (instead of a list), the ``option_is_allowed()`` validation was bypassed, allowing blocked options to be used. + +- Allow server-side compression for ``basebackup`` option (m4rrypro) + + The ``compress`` option was completely blocked for ``basebackup``, but since PostgreSQL 15, server-side compression is useful and works transparently with plain format. Client-side compression is still rejected. + +- Don't reload PostgreSQL config while running custom bootstrap (Alexander Kukushkin) + + Custom bootstrap could be complex and involve PosgreSQL starting and stopping multiple times. Reloads of PostgreSQL config during this process could lead to unexpected behavior. + +- Check that ``postgresql.parameters`` is a dictionary (Alexander Kukushkin) + + Discard new config if ``postgresql.parameters`` is not a dictionary. + + Version 4.1.0 ------------- @@ -59,7 +153,7 @@ Released 2025-09-23 - Reduce log level of watchdog configuration failure (Ants Aasma) - Show the `Could not activate Linux watchdog device` log line on debug logging level, when the watchdog is configured with ``required`` mode. It was previously shown on info level. + Show the `Could not activate Linux watchdog device` log line on debug logging level, unless the watchdog is configured with ``required`` mode. It was previously shown on info level. - Take advantage of ``written_lsn`` and ``latest_end_lsn`` from ``pg_stat_wal_receiver`` (Alexander Kukushkin) diff --git a/docs/replication_modes.rst b/docs/replication_modes.rst index 27a75ce740..34d10dbf98 100644 --- a/docs/replication_modes.rst +++ b/docs/replication_modes.rst @@ -52,7 +52,7 @@ on at least two nodes, enable ``synchronous_mode_strict`` in addition to the ``synchronous_mode``. This parameter prevents Patroni from switching off the synchronous replication on the primary when no synchronous standby candidates are available. As a downside, the primary is not be available for writes -(unless the Postgres transaction explicitly turns off ``synchronous_mode``), +(unless the Postgres transaction explicitly turns off ``synchronous_commit``), blocking all client write requests until at least one synchronous replica comes up. diff --git a/docs/standby_cluster.rst b/docs/standby_cluster.rst index 5aa75e1434..d0c2333a37 100644 --- a/docs/standby_cluster.rst +++ b/docs/standby_cluster.rst @@ -80,3 +80,23 @@ standby cluster or from a standby member of the primary cluster: for that, you need to define a single host in the ``standby_cluster.host`` section. However, you need to beware that in this case ``pg_rewind`` will fail to execute on the standby cluster. + + + +.. warning:: + Member names (the ``name`` field in each node's Patroni configuration) must + be unique across the primary cluster and all standby clusters connected to it. + + Patroni sets ``synchronous_standby_names`` on the primary using member names, + which also become the ``application_name`` of each replication connection in + ``pg_stat_replication``. If a standby cluster node shares the same name as a + primary cluster member, PostgreSQL will see two connections with identical + ``application_name`` values. This ambiguity can cause PostgreSQL to satisfy + the synchronous replication requirement using the standby cluster's connection + instead of the intended primary cluster member, leading PostgreSQL to prematurely + acknowledge transactions as synchronously committed when they are not durable on the + correct standby. + + This is a silent failure — replication continues and no errors are logged, but + the cluster is effectively operating without a valid synchronous standby, which + is a potential data loss scenario if the primary fails. \ No newline at end of file diff --git a/docs/yaml_configuration.rst b/docs/yaml_configuration.rst index c27763328a..ff279dd833 100644 --- a/docs/yaml_configuration.rst +++ b/docs/yaml_configuration.rst @@ -7,6 +7,8 @@ YAML Configuration Settings Global/Universal ---------------- +- **thread\_pool\_size**: size of thread pool used by Patroni to execute asynchronous tasks and communicate via REST API with other members during leader race or failsafe checks. Minimal value is ``5``, default value is ``5``. +- **thread\_stack\_size**: specifies the stack size to be used for threads started by Patroni. Value must be aligned by ``64kB``. Minimal value is ``64kB``, default value (set by Patroni) is ``512kB``. - **name**: the name of the host. Must be unique for the cluster. - **namespace**: path within the configuration store where Patroni will keep information about the cluster. Default value: "/service" - **scope**: cluster name @@ -332,6 +334,7 @@ REST API -------- - **restapi**: + - **thread\_pool\_size**: size of thread pool used by Patroni to process REST API requests. Minimal value is ``5``, default value is ``5``. - **connect\_address**: IP address (or hostname) and port, to access the Patroni's :ref:`REST API `. All the members of the cluster must be able to connect to this address, so unless the Patroni setup is intended for a demo inside the localhost, this address must be a non "localhost" or loopback address (ie: "localhost" or "127.0.0.1"). It can serve as an endpoint for HTTP health checks (read below about the "listen" REST API parameter), and also for user queries (either directly or via the REST API), as well as for the health checks done by the cluster members during leader elections (for example, to determine whether the leader is still running, or if there is a node which has a WAL position that is ahead of the one doing the query; etc.) The connect_address is put in the member key in DCS, making it possible to translate the member name into the address to connect to its REST API. - **listen**: IP address (or hostname) and port that Patroni will listen to for the REST API - to provide also the same health checks and cluster messaging between the participating nodes, as described above. to provide health-check information for HAProxy (or any other load balancer capable of doing a HTTP "OPTION" or "GET" checks). - **authentication**: (optional) @@ -412,7 +415,7 @@ Tags - **nosync**: ``true`` or ``false``. If set to ``true`` the node will never be selected as a synchronous replica. - **sync_priority**: integer, controls the priority this node should have during synchronous replica selection when ``synchronous_mode`` is set to ``on``. Nodes with higher priority will be preferred over lower-priority nodes. If the ``sync_priority`` is 0 or negative - such node is not allowed to be written to ``synchronous_standby_names`` PostgreSQL parameter (similar to ``nosync: true``). Keep in mind, that this parameter has the opposite meaning to ``sync_priority`` value reported in ``pg_stat_replication`` view. - **nofailover**: ``true`` or ``false``, controls whether this node is allowed to participate in the leader race and become a leader. Defaults to ``false``, meaning this node _can_ participate in leader races. -- **failover_priority**: integer, controls the priority this node should have during failover. Nodes with higher priority will be preferred over lower-priority nodes if they received/replayed the same amount of WAL. However, nodes with higher values of receive/replay LSN are preferred regardless of their priority. If the ``failover_priority`` is 0 or negative - such node is not allowed to participate in the leader race and to become a leader (similar to ``nofailover: true``). +- **failover_priority**: integer, controls the priority this node should have during failover. Nodes with higher priority will be preferred over lower-priority nodes if they received/replayed the same amount of WAL. However, nodes with higher values of receive/replay LSN are preferred regardless of their priority. If the ``failover_priority`` is 0 or negative - such node is not allowed to participate in the leader race and to become a leader (similar to ``nofailover: true``). Known limitation: ``failover_priority`` currently doesn't work with :ref:`quorum-based synchronous replication `. - **nostream**: ``true`` or ``false``. If set to ``true`` the node will not use replication protocol to stream WAL. It will rely instead on archive recovery (if ``restore_command`` is configured) and ``pg_wal``/``pg_xlog`` polling. It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. Setting this tag on primary node has no effect. .. warning:: diff --git a/extras/startup-scripts/patroni b/extras/startup-scripts/patroni index 8d90ea1c99..0f5bd4f257 100644 --- a/extras/startup-scripts/patroni +++ b/extras/startup-scripts/patroni @@ -98,7 +98,7 @@ case "$1" in --chdir `eval echo ~$USER` \ --exec $PATRONI \ --startas /bin/sh -- \ - -c "/usr/bin/env PATH=$PGPATH /usr/bin/python $PATRONI $CONF >> $LOGFILE 2>&1" + -c "/usr/bin/env PATH=$PGPATH MALLOC_ARENA_MAX=1 PG_MALLOC_ARENA_MAX= /usr/bin/python $PATRONI $CONF >> $LOGFILE 2>&1" ;; stop) diff --git a/extras/startup-scripts/patroni.service b/extras/startup-scripts/patroni.service index f67d796ec1..f904e83b0b 100644 --- a/extras/startup-scripts/patroni.service +++ b/extras/startup-scripts/patroni.service @@ -14,6 +14,12 @@ Group=postgres # Read in configuration file if it exists, otherwise proceed EnvironmentFile=-/etc/patroni_env.conf +# Limit glibc malloc arenas to reduce virtual memory. +# This is essential when running with vm.overcommit_memory=2. +Environment=MALLOC_ARENA_MAX=1 +# Reset or set another MALLOC_ARENA_MAX value when starting up postmaster. +Environment=PG_MALLOC_ARENA_MAX= + # The default is the user's home directory, and if you want to change it, you must provide an absolute path. # WorkingDirectory=/home/sameuser diff --git a/features/permanent_slots.feature b/features/permanent_slots.feature index dbea3c0013..a0e939cea7 100644 --- a/features/permanent_slots.feature +++ b/features/permanent_slots.feature @@ -78,7 +78,10 @@ Feature: permanent slots Then postgres-1 has a physical replication slot named test_physical after 10 seconds And postgres-1 has a physical replication slot named postgres_0 after 10 seconds And postgres-1 has a physical replication slot named postgres_3 after 10 seconds - When I start postgres-0 + + @pg110000 + Scenario: check permanent physical replication slot on replica after failover + Given I start postgres-0 Then postgres-0 role is the replica after 20 seconds And physical replication slot named postgres_1 on postgres-0 has no xmin value after 10 seconds # postgres_2 and postgres_3 slots are retained, but postgres_2 will still have xmin value :( diff --git a/patroni/__main__.py b/patroni/__main__.py index 60be35f0fa..a820b394c4 100644 --- a/patroni/__main__.py +++ b/patroni/__main__.py @@ -50,6 +50,7 @@ def __init__(self, config: 'Config') -> None: :param config: Patroni configuration. """ + from patroni import thread_pool from patroni.api import RestApiServer from patroni.dcs import get_dcs from patroni.ha import Ha @@ -59,6 +60,14 @@ def __init__(self, config: 'Config') -> None: from patroni.version import __version__ from patroni.watchdog import Watchdog + try: + thread_pool_size = max(5, int(config.get('thread_pool_size', 5))) + except Exception as e: + logger.warning('Failed to parse thread_pool_size value "%s": %r', config.get('thread_pool_size'), e) + thread_pool_size = 5 + logger.info('Patroni global thread_pool_size = %d', thread_pool_size) + thread_pool.configure_global_pool(thread_pool_size) + super(Patroni, self).__init__(config) self.version = __version__ @@ -237,6 +246,8 @@ def _shutdown(self) -> None: Shut down the REST API and the HA handler. """ + from patroni import thread_pool + try: self.api.shutdown() except Exception: @@ -250,6 +261,8 @@ def _shutdown(self) -> None: except Exception: logger.exception('Exception during Ha.shutdown') + thread_pool.get_executor().shutdown(wait=False) + def patroni_main(configfile: str) -> None: """Configure and start ``patroni`` main daemon process. diff --git a/patroni/api.py b/patroni/api.py index 5dc2f13c04..09044f9e19 100644 --- a/patroni/api.py +++ b/patroni/api.py @@ -20,7 +20,6 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from ipaddress import ip_address, ip_network, IPv4Network, IPv6Network from socketserver import ThreadingMixIn -from threading import Thread from typing import Any, Callable, cast, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union from urllib.parse import parse_qs, urlparse @@ -31,6 +30,7 @@ from .dcs import Cluster from .exceptions import PostgresConnectionException, PostgresException from .postgresql.misc import postgres_version_to_int, PostgresqlRole, PostgresqlState +from .thread_pool import PatroniThreadPoolExecutor from .utils import cluster_as_json, deep_compare, enable_keepalive, parse_bool, \ parse_int, patch_config, Retry, RetryFailedError, split_host_port, tzutc, uri @@ -974,7 +974,7 @@ def do_POST_restart(self) -> None: # failed to parse the json return if request: - logger.debug("received restart request: {0}".format(request)) + logger.debug("received restart request: %s", request) if global_config.from_cluster(cluster).is_paused and 'schedule' in request: self.write_response(status_code, "Can't schedule restart in the paused state") @@ -1507,15 +1507,12 @@ def log_message(self, format: str, *args: Any) -> None: logger.debug("API thread: %s - - %s latency: %0.3f ms", self.client_address[0], format % args, latency) -class RestApiServer(ThreadingMixIn, HTTPServer, Thread): +class RestApiServer(ThreadingMixIn, HTTPServer): """Patroni REST API server. - An asynchronous thread-based HTTP server. + An asynchronous thread-pool-based HTTP server. """ - # On 3.7+ the `ThreadingMixIn` gathers all non-daemon worker threads in order to join on them at server close. - daemon_threads = True # Make worker threads "fire and forget" to prevent a memory leak. - def __init__(self, patroni: Patroni, config: Dict[str, Any]) -> None: """Establish patroni configuration for the REST API daemon. @@ -1532,6 +1529,13 @@ def __init__(self, patroni: Patroni, config: Dict[str, Any]) -> None: self.patroni = patroni self.__listen = None self.request_queue_size = int(config.get('request_queue_size', 5)) + try: + thread_pool_size = max(5, int(config.get('thread_pool_size', 5))) + except Exception as e: + logger.warning('Failed to parse restapi.thread_pool_size value "%s": %r', config.get('thread_pool_size'), e) + thread_pool_size = 5 + logger.info('REST API thread_pool_size = %d', thread_pool_size) + self._executor = PatroniThreadPoolExecutor(max_workers=thread_pool_size + 1, thread_name_prefix='RestAPI') self.__ssl_options: Dict[str, Any] = {} self.__ssl_serial_number = None self._received_new_cert = False @@ -1792,8 +1796,8 @@ def __initialize(self, listen: str, ssl_options: Dict[str, Any]) -> None: reloading_config = self.__listen is not None # changing config in runtime if reloading_config: - self.shutdown() - # Rely on ThreadingMixIn.server_close() to have all requests terminate before we continue + HTTPServer.shutdown(self) + # Rely on TCPServer.server_close() to have all requests terminate before we continue self.server_close() self.__listen = listen @@ -1801,7 +1805,6 @@ def __initialize(self, listen: str, ssl_options: Dict[str, Any]) -> None: self._received_new_cert = False # reset to False after reload_config() self.__httpserver_init(host, port) - Thread.__init__(self, target=self.serve_forever) self._set_fd_cloexec(self.socket) # wrap socket with ssl if 'certfile' is defined in a config.yaml @@ -1826,6 +1829,9 @@ def __initialize(self, listen: str, ssl_options: Dict[str, Any]) -> None: if reloading_config: self.start() + def start(self) -> None: + self._executor.submit(self.serve_forever) + def process_request_thread(self, request: Union[socket.socket, Tuple[bytes, socket.socket]], client_address: Tuple[str, int]) -> None: """Process a request to the REST API. @@ -1846,6 +1852,14 @@ def process_request_thread(self, request: Union[socket.socket, Tuple[bytes, sock request.do_handshake() super(RestApiServer, self).process_request_thread(request, client_address) + def process_request(self, request: Union[socket.socket, Tuple[bytes, socket.socket]], + client_address: Tuple[str, int]) -> None: + self._executor.submit(self.process_request_thread, request, client_address) + + def shutdown(self) -> None: + super(RestApiServer, self).shutdown() + self._executor.shutdown(wait=True) + def shutdown_request(self, request: Union[socket.socket, Tuple[bytes, socket.socket]]) -> None: """Shut down a request to the REST API. diff --git a/patroni/async_executor.py b/patroni/async_executor.py index f7fac02ac5..cabe69c79b 100644 --- a/patroni/async_executor.py +++ b/patroni/async_executor.py @@ -1,10 +1,11 @@ """Implement facilities for executing asynchronous tasks.""" import logging -from threading import Event, Lock, RLock, Thread +from threading import Event, Lock, RLock from types import TracebackType from typing import Any, Callable, Optional, Tuple, Type +from . import thread_pool from .postgresql.cancellable import CancellableSubprocess logger = logging.getLogger(__name__) @@ -179,12 +180,10 @@ def run(self, func: Callable[..., Any], args: Tuple[Any, ...] = ()) -> Optional[ def run_async(self, func: Callable[..., Any], args: Tuple[Any, ...] = ()) -> None: """Start an async thread that runs *func* with *args*. - :param func: function to be run. Will be passed through args to :class:`~threading.Thread` with a target of - :func:`run`. - :param args: arguments to be passed along to :class:`~threading.Thread` with *func*. - + :param func: function to be run. + :param args: arguments to be passed along with *func*. """ - Thread(target=self.run, args=(func, args)).start() + thread_pool.get_executor().submit(self.run, func, args) def try_run_async(self, action: str, func: Callable[..., Any], args: Tuple[Any, ...] = ()) -> Optional[str]: """Try to run an async task, if none is currently being executed. diff --git a/patroni/config.py b/patroni/config.py index 6723240aa9..7b778c3887 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -371,7 +371,7 @@ def reload_local_configuration(self) -> Optional[bool]: logger.exception('Exception when reloading local configuration from %s', self.config_file) @staticmethod - def _process_postgresql_parameters(parameters: Dict[str, Any], is_local: bool = False) -> Dict[str, Any]: + def _process_postgresql_parameters(parameters: Any, is_local: bool = False) -> Dict[str, Any]: """Process Postgres *parameters*. .. note:: @@ -403,9 +403,12 @@ def _process_postgresql_parameters(parameters: Dict[str, Any], is_local: bool = :returns: new value for ``postgresql.parameters`` after processing and validating *parameters*. """ + if not isinstance(parameters or {}, dict): + raise ConfigParseError('postgresql.parameters is not a dictionary') + pg_params: Dict[str, Any] = {} - for name, value in (parameters or {}).items(): + for name, value in cast(Dict[str, Any], parameters or {}).items(): if name not in ConfigHandler.CMDLINE_OPTIONS: pg_params[name] = value elif not is_local: @@ -487,7 +490,7 @@ def _popenv(name: str) -> Optional[str]: """ return os.environ.pop(PATRONI_ENV_PREFIX + name.upper(), None) - for param in ('name', 'namespace', 'scope'): + for param in ('name', 'namespace', 'scope', 'thread_pool_size', 'thread_stack_size'): value = _popenv(param) if value: ret[param] = value @@ -559,7 +562,7 @@ def _set_section_values(section: str, params: List[str]) -> None: if value is not None: ret[first][second] = value - for first, params in (('restapi', ('request_queue_size',)), + for first, params in (('restapi', ('request_queue_size', 'thread_pool_size')), ('log', ('max_queue_size', 'file_size', 'file_num', 'mode'))): for second in params: value = ret.get(first, {}).pop(second, None) diff --git a/patroni/ctl.py b/patroni/ctl.py index 2a2297cecd..43606220d4 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -12,7 +12,6 @@ If it is also missing in the configuration file we assume that this is just a normal Patroni cluster (not Citus). """ -import codecs import copy import datetime import difflib @@ -2344,7 +2343,7 @@ def invoke_editor(before_editing: str, cluster_name: str) -> Tuple[str, Dict[str if ret: raise PatroniCtlException("Editor exited with return code {0}".format(ret)) - with codecs.open(tmpfile, encoding='utf-8') as fd: + with open(tmpfile, encoding='utf-8') as fd: after_editing = fd.read() return after_editing, yaml.safe_load(after_editing) diff --git a/patroni/daemon.py b/patroni/daemon.py index dfc7160017..d195e6fc72 100644 --- a/patroni/daemon.py +++ b/patroni/daemon.py @@ -3,8 +3,6 @@ This module implements abstraction classes and functions for creating and managing daemon processes in Patroni. Currently it is only used for the main "Thread" of ``patroni`` and ``patroni_raft_controller`` commands. """ -from __future__ import print_function - import abc import argparse import logging @@ -12,7 +10,7 @@ import signal import sys -from threading import Lock +from threading import Lock, stack_size from typing import Any, Optional, Type, TYPE_CHECKING if TYPE_CHECKING: # pragma: no cover @@ -20,6 +18,18 @@ logger = logging.getLogger(__name__) +try: # pragma: no cover + from systemd import daemon # pyright: ignore + + def notify_systemd(msg: str) -> None: + daemon.notify(msg) # pyright: ignore + +except ImportError: # pragma: no cover + logger.info("Systemd integration is not supported") + + def notify_systemd(msg: str) -> None: + pass + def get_base_arg_parser() -> argparse.ArgumentParser: """Create a basic argument parser with the arguments used for both patroni and raft controller daemon. @@ -69,6 +79,7 @@ def sighup_handler(self, *_: Any) -> None: Flag the daemon as "SIGHUP received". """ self._received_sighup = True + notify_systemd("RELOADING=1") def api_sigterm(self) -> bool: """Guarantee only a single SIGTERM is being processed. @@ -137,16 +148,13 @@ def run(self) -> None: Start the logger thread and keep running execution cycles until a SIGTERM is eventually received. Also reload configuration upon receiving SIGHUP. """ - try: # pragma: no cover - from systemd import daemon # pyright: ignore - daemon.notify("READY=1") # pyright: ignore - except ImportError: # pragma: no cover - logger.info("Systemd integration is not supported") + notify_systemd("READY=1") self.logger.start() while not self.received_sigterm: if self._received_sighup: self._received_sighup = False self.reload_config(True, self.config.reload_local_configuration()) + notify_systemd("READY=1") self._run_cycle() @@ -161,6 +169,7 @@ def shutdown(self) -> None: """ with self._sigterm_lock: self._received_sigterm = True + notify_systemd("STOPPING=1") self._shutdown() self.logger.shutdown() @@ -172,11 +181,28 @@ def abstract_main(cls: Type[AbstractPatroniDaemon], configfile: str) -> None: :param configfile: """ from .config import Config, ConfigParseError + from .utils import parse_int try: config = Config(configfile) except ConfigParseError as e: sys.exit(e.value) + thread_stack_size = None + if 'thread_stack_size' in config: + thread_stack_size = parse_int(config.get('thread_stack_size'), 'B') + if thread_stack_size is None: + logger.warning('Failed to parse thread_stack_size value "%s"', config.get('thread_stack_size')) + + if thread_stack_size is None: + thread_stack_size = 524288 + logger.info('Using default value thread_stack_size=%s', thread_stack_size) + + thread_stack_size = max(65536, thread_stack_size) + try: + stack_size(thread_stack_size) + except Exception as e: + logger.warning('Failed to set threading.stack_size(%s): %r', thread_stack_size, e) + controller = cls(config) try: controller.run() diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index b4c29b2811..8f8b862593 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import json import logging import os diff --git a/patroni/dcs/etcd.py b/patroni/dcs/etcd.py index d8986887a0..d13f031a68 100644 --- a/patroni/dcs/etcd.py +++ b/patroni/dcs/etcd.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import abc import json import logging diff --git a/patroni/dcs/etcd3.py b/patroni/dcs/etcd3.py index 081451f8d8..1389ce78d7 100644 --- a/patroni/dcs/etcd3.py +++ b/patroni/dcs/etcd3.py @@ -1,6 +1,5 @@ -from __future__ import absolute_import - import base64 +import functools import json import logging import os @@ -21,7 +20,8 @@ from ..collections import EMPTY_DICT from ..exceptions import DCSError, PatroniException from ..postgresql.mpp import AbstractMPP -from ..utils import deep_compare, enable_keepalive, iter_response_objects, parse_bool, RetryFailedError, USER_AGENT +from ..utils import deep_compare, enable_keepalive, iter_response_objects, \ + parse_bool, RetryFailedError, USER_AGENT, WHITESPACE_RE from . import catch_return_false_exception, Cluster, ClusterConfig, \ Failover, Leader, Member, Status, SyncState, TimelineHistory from .etcd import AbstractEtcd, AbstractEtcdClientWithFailover, catch_etcd_errors, \ @@ -73,6 +73,8 @@ class Etcd3WatchCanceled(Etcd3Exception): class Etcd3ClientError(Etcd3Exception): def __init__(self, code: Optional[int] = None, error: Optional[str] = None, status: Optional[int] = None) -> None: + if not hasattr(self, 'code'): + self.code = code if not hasattr(self, 'error'): self.error = error and error.strip() self.codeText = GRPCcodeToText.get(code) if code is not None else None @@ -80,7 +82,7 @@ def __init__(self, code: Optional[int] = None, error: Optional[str] = None, stat def __repr__(self) -> str: return "<{0} error: '{1}', code: {2}>"\ - .format(self.__class__.__name__, getattr(self, 'error', None), getattr(self, 'code', None)) + .format(self.codeText, getattr(self, 'error', None), getattr(self, 'code', None)) __str__ = __repr__ @@ -163,18 +165,18 @@ def _raise_for_data(data: Union[bytes, str, Dict[str, Any]], status_code: Option data_error: Optional[Dict[str, Any]] = data.get('error') or data.get('Error') if isinstance(data_error, dict): # streaming response status_code = data_error.get('http_code') - code: Optional[int] = data_error['grpc_code'] + code: Optional[int] = data_error.get('code') or data_error['grpc_code'] error: str = data_error['message'] else: data_code = data.get('code') or data.get('Code') if TYPE_CHECKING: # pragma: no cover assert not isinstance(data_code, dict) code = data_code - error = str(data_error) + error = str(data_error or data.get('message') or data) except Exception: error = str(data) code = GRPCCode.Unknown - err = errStringToClientError.get(error) or errCodeToClientError.get(code) or Unknown + err = errStringToClientError.get(error) or errCodeToClientError.get(code) or Etcd3ClientError return err(code, error, status_code) @@ -217,13 +219,24 @@ class Etcd3Client(AbstractEtcdClientWithFailover): ERROR_CLS = Etcd3Error def __init__(self, config: Dict[str, Any], dns_resolver: DnsCachingResolver, cache_ttl: int = 300) -> None: + self._decoder = json.JSONDecoder() self._reauthenticate = False self._token = None self._cluster_version: Tuple[int, ...] = tuple() super(Etcd3Client, self).__init__({**config, 'version_prefix': '/v3beta'}, dns_resolver, cache_ttl) + if self._use_proxies and not self._cluster_version: + kwargs = self._prepare_common_parameters(1) + self._ensure_version_prefix(self._base_uri, **kwargs) + self.authenticate_on_start() + + def authenticate_on_start(self, auth_request_func: Optional[Callable[..., Dict[str, Any]]] = None): + """Authenticate with Etcd v3 at startup and exit on invalid credentials. + :param auth_request_func: optional custom authentication request function, + if not provided, :meth:`call_rpc` will be used. + """ try: - self.authenticate() + self.authenticate(auth_request_func=auth_request_func) except AuthFailed as e: logger.fatal('Etcd3 authentication failed: %r', e) sys.exit(1) @@ -245,7 +258,8 @@ def _handle_server_response(self, response: urllib3.response.HTTPResponse) -> Di data = response.data try: data = data.decode('utf-8') - ret: Dict[str, Any] = json.loads(data) + idx = WHITESPACE_RE.match(data, 0).end() # pyright: ignore [reportOptionalMemberAccess] + ret: Dict[str, Any] = self._decoder.raw_decode(data, idx)[0] header = ret.get('header', EMPTY_DICT) self._check_cluster_raft_term(header.get('cluster_id'), header.get('raft_term')) @@ -254,7 +268,7 @@ def _handle_server_response(self, response: urllib3.response.HTTPResponse) -> Di return ret except (TypeError, ValueError, UnicodeError) as e: if response.status < 400: - raise etcd.EtcdException('Server response was not valid JSON: %r' % e) + raise etcd.EtcdException("Server response '%s' was not valid JSON: %r" % (data, e)) ret = {} ex = _raise_for_data(ret or data, response.status) if isinstance(ex, Unavailable): @@ -297,26 +311,82 @@ def _prepare_get_members(self, etcd_nodes: int) -> Dict[str, Any]: self._prepare_request(kwargs, {}) return kwargs + def _do_auth_request(self, base_uri: str, kwargs: Dict[str, Any], + method: str, fields: Dict[str, Any], retry: Optional[Retry] = None) -> Dict[str, Any]: + """Special method for handling authentication when discovering cluster members. + + We can't use `call_rpc()` method for this purpose because it may cause infinite recursion. + + :param base_uri: base url for authentication request, e.g. `http://etcd:2379/v3` + :param kwargs: common request parameters, e.g. headers. + :param method: `/auth/authenticate` + :param fields: authentication fields, e.g. `{'name': 'user', 'password': 'pass'}`. + :param retry: optional retry configuration, ignored. + """ + request_kwargs = kwargs.copy() + request_kwargs['headers'] = {k: v for k, v in kwargs['headers'].items() if k != 'authorization'} + self._prepare_request(request_kwargs, fields) + response = self.http.urlopen(self._MPOST, base_uri + method, **request_kwargs) + return self._handle_server_response(response) + + def _do_member_list_request(self, url: str, retry: Optional[Retry] = None, **kwargs: Any) -> Any: + """Special method for handling member list requests. + + :param url: base url for member list request, e.g. `http://etcd:2379/v3/cluster/member/list` + :param kwargs: common request parameters, e.g. headers. + :param retry: optional retry configuration, ignored. + """ + request_kwargs = kwargs.copy() + request_kwargs['headers'] = kwargs['headers'].copy() + # We want to update headers with authentication token if it was obtained during authentication request. + request_kwargs['headers'].update(self._get_headers()) + response = self.http.urlopen(self._MPOST, url, **request_kwargs) + return self._handle_server_response(response) + def _get_members(self, base_uri: str, **kwargs: Any) -> List[str]: self._ensure_version_prefix(base_uri, **kwargs) - resp = self.http.urlopen(self._MPOST, base_uri + self.version_prefix + '/cluster/member/list', **kwargs) - members = self._handle_server_response(resp)['members'] - return [url for member in members for url in member.get('clientURLs', [])] + base_uri += self.version_prefix + + retry = None + if self._update_machines_cache: + retry = Retry(deadline=self._config['retry_timeout'], max_delay=1, max_tries=-1) + # handle_auth_errors() calls retry.ensure_deadline(), which expects Retry.__call__() + # to have initialized the internal deadline state first. + retry(lambda: None) + + # custom authentication request function, because we can't use `call_rpc()` method + # for this purpose as it may cause infinite recursion + auth_request_func = functools.partial(self._do_auth_request, base_uri, kwargs) + + # if _machine_cache is empty, it means we are just starting and want to exit early if authentication fails. + if not self._machines_cache: + self.authenticate_on_start(auth_request_func) + + response = self.handle_auth_errors(Etcd3Client._do_member_list_request, base_uri + '/cluster/member/list', + auth_request_func=auth_request_func, retry=retry, **kwargs) + return [url for member in response['members'] for url in member.get('clientURLs', [])] def call_rpc(self, method: str, fields: Dict[str, Any], retry: Optional[Retry] = None) -> Dict[str, Any]: fields['retry'] = retry return self.api_execute(self.version_prefix + method, self._MPOST, fields) - def authenticate(self, *, retry: Optional[Retry] = None) -> bool: - if self._use_proxies and not self._cluster_version: - kwargs = self._prepare_common_parameters(1) - self._ensure_version_prefix(self._base_uri, **kwargs) + def authenticate(self, *, retry: Optional[Retry] = None, + auth_request_func: Optional[Callable[..., Dict[str, Any]]] = None) -> bool: + """Authenticate with the Etcd v3 cluster. + + :param retry: optional retry configuration. + :param auth_request_func: optional custom authentication request function, + if not provided, `call_rpc()` method will be used. + """ if not (self._cluster_version >= (3, 3) and self.username and self.password): return False + if not auth_request_func: + auth_request_func = self.call_rpc logger.info('Trying to authenticate on Etcd...') old_token, self._token = self._token, None try: - response = self.call_rpc('/auth/authenticate', {'name': self.username, 'password': self.password}, retry) + response = auth_request_func('/auth/authenticate', + {'name': self.username, 'password': self.password}, retry) except AuthNotEnabled: logger.info('Etcd authentication is not enabled') self._token = None @@ -328,13 +398,23 @@ def authenticate(self, *, retry: Optional[Retry] = None) -> bool: return old_token != self._token def handle_auth_errors(self: 'Etcd3Client', func: Callable[..., Any], *args: Any, + auth_request_func: Optional[Callable[..., Dict[str, Any]]] = None, retry: Optional[Retry] = None, **kwargs: Any) -> Any: + """Handle authentication errors for the given function. + + :param func: function to call. + :param args: positional arguments for the function. + :param auth_request_func: optional custom authentication request function, + if not provided, `call_rpc()` method will be used. + :param retry: optional retry configuration. + :param kwargs: keyword arguments for the function. + """ reauthenticated = False exc = None while True: if self._reauthenticate: if self.username and self.password: - self.authenticate(retry=retry) + self.authenticate(retry=retry, auth_request_func=auth_request_func) self._reauthenticate = False else: msg = 'Username or password not set, authentication is not possible' @@ -378,6 +458,7 @@ def prefix(self, key: str, serializable: bool = True, *, retry: Optional[Retry] def lease_grant(self, ttl: int, *, retry: Optional[Retry] = None) -> str: return self.call_rpc('/lease/grant', {'TTL': ttl}, retry)['ID'] + @_handle_auth_errors def lease_keepalive(self, ID: str, *, retry: Optional[Retry] = None) -> Optional[str]: return self.call_rpc('/lease/keepalive', {'ID': ID}, retry).get('result', {}).get('TTL') diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 2806c9a3be..ed81a6bded 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -558,10 +558,11 @@ def __getattr__(self, func: str) -> Callable[..., Any]: func = func[:-4] + ('endpoints' if self._use_endpoints else 'config_map') def wrapper(*args: Any, **kwargs: Any) -> Any: + retriable_http_codes = self._retriable_http_codes | set(kwargs.pop('_retriable_http_codes', None) or []) try: return getattr(self._core_v1_api, func)(*args, **kwargs) except k8s_client.rest.ApiException as e: - if e.status in self._retriable_http_codes or e.headers and 'retry-after' in e.headers: + if e.status in retriable_http_codes or e.headers and 'retry-after' in e.headers: raise KubernetesRetriableException(e) raise return wrapper @@ -1187,17 +1188,18 @@ def _update_leader_with_retry(self, annotations: Dict[str, Any], resource_version: Optional[str], ips: List[str]) -> bool: retry = self._retry.copy() - def _retry(*args: Any, **kwargs: Any) -> Any: - kwargs['_retry'] = retry + def _retry_403(*args: Any, **kwargs: Any) -> Any: + kwargs.update(_retry=retry, _retriable_http_codes=frozenset([403])) return retry(*args, **kwargs) try: - return bool(self._patch_or_create(self.leader_path, annotations, resource_version, ips=ips, retry=_retry)) + return bool(self._patch_or_create(self.leader_path, annotations, resource_version, + ips=ips, retry=_retry_403)) except k8s_client.rest.ApiException as e: if e.status == 409: logger.warning('Concurrent update of %s', self.leader_path) else: - logger.exception('Permission denied' if e.status == 403 else 'Unexpected error from Kubernetes API') + logger.exception('Unexpected error from Kubernetes API') return False except (RetryFailedError, K8sException) as e: raise KubernetesError(e) @@ -1206,6 +1208,10 @@ def _retry(*args: Any, **kwargs: Any) -> Any: if not retry.ensure_deadline(1): return False # No time for retry. Tell ha.py that we have to demote due to failed update. + def _retry(*args: Any, **kwargs: Any) -> Any: + kwargs['_retry'] = retry + return retry(*args, **kwargs) + # Try to get the latest version directly from K8s API instead of relying on async cache try: kind = _retry(self._api.read_namespaced_kind, self.leader_path, self._namespace) diff --git a/patroni/ha.py b/patroni/ha.py index 806664684f..a3af2ac6e7 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -1,3 +1,4 @@ +import concurrent.futures import datetime import functools import json @@ -6,11 +7,10 @@ import time import uuid -from multiprocessing.pool import ThreadPool from threading import RLock from typing import Any, Callable, cast, Collection, Dict, List, NamedTuple, Optional, Tuple, TYPE_CHECKING, Union -from . import global_config, psycopg +from . import global_config, psycopg, thread_pool from .__main__ import Patroni from .async_executor import AsyncExecutor, CriticalTask from .collections import CaseInsensitiveSet @@ -250,6 +250,11 @@ def __init__(self, patroni: Patroni): # We update this value from update_lock() and touch_member() methods, because they fetch it anyway. # This value is used to notify the leader when the failsafe_mode is active without performing any queries. self._last_wal_lsn = None + # The last known value of current timeline on this standby node. + # We update this value from touch_member() and _is_healthiest_node() methods, because they fetch it anyway. + # This value is used to detect cases of timeline bump with actual leader remaining on the same node + # and trigger pg_rewind state machine. + self._last_timeline = None # Count of concurrent sync disabling requests. Value above zero means that we don't want to be synchronous # standby. Changes protected by _member_state_lock. @@ -493,6 +498,8 @@ def touch_member(self) -> bool: data['replication_state'] = replication_state # try pg_stat_wal_receiver to get the timeline timeline = self.state_handler.received_timeline() + if timeline: + self._last_timeline = timeline if not timeline: # So far the only way to get the current timeline on the standby is from # the replication connection. In order to avoid opening the replication @@ -505,6 +512,8 @@ def touch_member(self) -> bool: timeline = pg_control_timeline or self.state_handler.pg_control_timeline() else: timeline = self.state_handler.replica_cached_timeline(self._leader_timeline) or 0 + if timeline: + self._last_timeline = timeline if timeline: data['timeline'] = timeline except Exception: @@ -759,14 +768,14 @@ def follow(self, demote_reason: str, follow_reason: str, refresh: bool = True) - if refresh: self.load_cluster_from_dcs() - is_leader = self.state_handler.is_primary() + is_primary = self.state_handler.is_primary() node_to_follow = self._get_node_to_follow(self.cluster) if self.is_paused(): if not (self._rewind.is_needed and self._rewind.can_rewind_or_reinitialize_allowed)\ or self.cluster.is_unlocked(): - if is_leader: + if is_primary: self.state_handler.set_role(PostgresqlRole.PRIMARY) return 'continue to run as primary without lock' elif self.state_handler.role != PostgresqlRole.STANDBY_LEADER: @@ -774,7 +783,7 @@ def follow(self, demote_reason: str, follow_reason: str, refresh: bool = True) - if not node_to_follow: return 'no action. I am ({0})'.format(self.state_handler.name) - elif is_leader: + elif is_primary: if self.is_standby_cluster(): self._async_executor.try_run_async('demoting to a standby cluster', self.demote, ('demote-cluster',)) else: @@ -809,9 +818,17 @@ def follow(self, demote_reason: str, follow_reason: str, refresh: bool = True) - else: self.state_handler.follow(node_to_follow, role, do_reload=True) self._rewind.trigger_check_diverged_lsn() - elif role == PostgresqlRole.STANDBY_LEADER and self.state_handler.role != role: - self.state_handler.set_role(role) - self.state_handler.call_nowait(CallbackAction.ON_ROLE_CHANGE) + else: + if role == PostgresqlRole.STANDBY_LEADER and self.state_handler.role != role: + self.state_handler.set_role(role) + self.state_handler.call_nowait(CallbackAction.ON_ROLE_CHANGE) + + if self._last_timeline and self._leader_timeline and self._last_timeline < self._leader_timeline: + self._rewind.trigger_check_diverged_lsn() + if not self.state_handler.is_starting(): + msg = self._handle_rewind_or_reinitialize() + if msg: + return msg return follow_reason @@ -941,6 +958,14 @@ def _process_multisync_replication(self) -> None: allow_promote = current_state.sync_confirmed voters = CaseInsensitiveSet(sync.voters) + if self.state_handler.name != sync.leader: + logger.warning("Inconsistent state of /sync key detected, leader = %s doesn't match %s, " + "updating synchronous replication key", sync.leader, self.state_handler.name) + sync = self.dcs.write_sync_state(self.state_handler.name, None, 0, version=sync.version) + if not sync: + return logger.warning("Updating sync state failed") + voters = CaseInsensitiveSet() + if picked == voters and voters != allow_promote: logger.warning('Inconsistent state between synchronous_standby_names = %s and /sync = %s key ' 'detected, updating synchronous replication key...', list(allow_promote), list(voters)) @@ -1151,6 +1176,7 @@ def enforce_primary_role(self, message: str, promote_message: str) -> str: if self.state_handler.role not in (PostgresqlRole.PRIMARY, PostgresqlRole.PROMOTED): # reset failsafe state when promote self._failsafe.set_is_active(0) + self._last_timeline = None def before_promote(): self._rewind.reset_state() # make sure we will trigger checkpoint after promote @@ -1184,10 +1210,10 @@ def fetch_node_status(self, member: Member) -> _MemberStatus: def fetch_nodes_statuses(self, members: List[Member]) -> List[_MemberStatus]: if not members: return [] - pool = ThreadPool(len(members)) - results = pool.map(self.fetch_node_status, members) # Run API calls on members in parallel - pool.close() - pool.join() + + futures = [thread_pool.get_executor().submit(self.fetch_node_status, member) for member in members] + # Run API calls on members in parallel + results = [future.result() for future in concurrent.futures.as_completed(futures)] return results def update_failsafe(self, data: Dict[str, Any]) -> Union[int, str, None]: @@ -1267,11 +1293,9 @@ def check_failsafe_topology(self) -> bool: for name, url in failsafe.items() if name != self.state_handler.name] if not members: # A single node cluster return True - pool = ThreadPool(len(members)) - call_failsafe_member = functools.partial(self.call_failsafe_member, data) - results: List[_FailsafeResponse] = pool.map(call_failsafe_member, members) - pool.close() - pool.join() + + futures = [thread_pool.get_executor().submit(self.call_failsafe_member, data, member) for member in members] + results = [future.result() for future in concurrent.futures.as_completed(futures)] ret = all(r.accepted for r in results) if ret: # The LSN feedback will be later used to advance position of replication slots @@ -1303,14 +1327,16 @@ def _is_healthiest_node(self, members: Collection[Member], themselves as the healthiest because they received/replayed up to the same LSN, but this is totally fine. """ + cluster_timeline = self.cluster.timeline + my_timeline = self.state_handler.replica_cached_timeline(cluster_timeline) + if my_timeline: + self._last_timeline = my_timeline my_wal_position = self.state_handler.last_operation() if check_replication_lag and self.is_lagging(my_wal_position): logger.info('My wal position exceeds maximum replication lag') return False # Too far behind last reported wal position on primary if not self.is_standby_cluster() and self.check_timeline(): - cluster_timeline = self.cluster.timeline - my_timeline = self.state_handler.replica_cached_timeline(cluster_timeline) if my_timeline is None: logger.info('Can not figure out my timeline') return False @@ -1368,7 +1394,7 @@ def _is_healthiest_node(self, members: Collection[Member], leader_name, st.failover_priority, self.patroni.failover_priority) low_priority = False - if low_priority and (not self.sync_mode_is_active() or quorum_vote): + if low_priority and (not self.quorum_commit_mode_is_active() or quorum_vote): # There's a higher priority non-lagging replica logger.info( '%s has equally tolerable WAL position and priority %s, while this node has priority %s', @@ -1485,8 +1511,10 @@ def manual_failover_process_no_leader(self) -> Optional[bool]: # at this point we assume that our node is a candidate for a failover among all nodes except former leader - # exclude former leader from the list (failover.leader can be None) - members = [m for m in self.cluster.members if m.name != failover.leader] + # exclude former leader (failover.leader can be None) and non-sync nodes in case of sync replication from list + members = [m for m in self.cluster.members if m.name != failover.leader + and (not self.sync_mode_is_active() or self.cluster.sync.matches(m.name))] + return self._is_healthiest_node(members, check_replication_lag=False) def is_healthiest_node(self) -> bool: @@ -1506,6 +1534,9 @@ def is_healthiest_node(self) -> bool: if ret is not None: # continue if we just deleted the stale failover key as a leader return ret + if self.state_handler.is_starting(): # postgresql still starting up is unhealthy + return False + if self.state_handler.is_primary(): if self.is_paused(): # in pause leader is the healthiest only when no initialize or sysid matches with initialize! @@ -1556,7 +1587,7 @@ def is_healthiest_node(self) -> bool: # Special handling if synchronous mode was requested and activated (the leader in /sync is not empty) if self.sync_mode_is_active(): # In quorum commit mode we allow nodes outside of "voters" to take part in - # the leader race. They just need to get enough votes to `reach quorum + 1`. + # the leader race. They just need to get enough votes to reach `quorum + 1`. if not self.is_quorum_commit_mode() and not self.cluster.sync.matches(self.state_handler.name, True): return False # pick between synchronous candidates so we minimize unnecessary failovers/demotions @@ -1725,7 +1756,7 @@ def should_run_scheduled_action(self, action_name: str, scheduled_at: Optional[d # The value is very close to now time.sleep(max(delta, 0)) - logger.info('Manual scheduled {0} at %s'.format(action_name), scheduled_at.isoformat()) + logger.info('Manual scheduled %s at %s', action_name, scheduled_at.isoformat()) return True except TypeError: logger.warning('Incorrect value of scheduled_at: %s', scheduled_at) @@ -1808,9 +1839,11 @@ def process_unhealthy_cluster(self) -> str: return self.follow('demoted self after trying and failing to obtain lock', 'following new leader after trying and failing to obtain lock') else: - # when we are doing manual failover there is no guaranty that new leader is ahead of any other node - # node tagged as nofailover can be ahead of the new leader either, but it is always excluded from elections - if bool(self.cluster.failover) or self.patroni.nofailover: + # When we are doing manual failover there is no guaranty that new leader is ahead of any other node. + # Node tagged as nofailover can be also ahead of the new leader, but it is always excluded from elections + # and therefore we trigger rewind checks on it, but only if not in pause, because there is no race in pause. + if self.cluster.failover and self.cluster.failover.candidate or \ + self.patroni.nofailover and not self.is_paused(): self._rewind.trigger_check_diverged_lsn() time.sleep(2) # Give a time to somebody to take the leader lock diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 7a0b25ebf3..edaac81962 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -19,6 +19,7 @@ from .. import global_config, psycopg from ..async_executor import CriticalTask from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT +from ..daemon import notify_systemd from ..dcs import Cluster, Leader, Member, slot_name_from_member_name from ..exceptions import PostgresConnectionException from ..tags import Tags @@ -96,11 +97,11 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None: self._bin_dir = config.get('bin_dir') or '' self._role_lock = Lock() self.set_role(PostgresqlRole.UNINITIALIZED) + self.bootstrap = Bootstrap(self) + self.bootstrapping = False self.config = ConfigHandler(self, config) self.config.check_directories() - self.bootstrap = Bootstrap(self) - self.bootstrapping = False self.__thread_ident = current_thread().ident self.slots_handler = SlotsHandler(self) @@ -251,8 +252,9 @@ def cluster_info_query(self) -> str: written_lsn = ("pg_catalog.pg_wal_lsn_diff(written_lsn, '0/0')::bigint" if self._major_version >= 130000 else "NULL") extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, " - "pg_catalog.pg_wal_lsn_diff(latest_end_lsn, '0/0')::bigint, slot_name, " - "conninfo, status, {1} FROM pg_catalog.pg_stat_get_wal_receiver()").format(written_lsn, extra) + "pg_catalog.pg_{2}_{3}_diff(latest_end_lsn, '0/0')::bigint, slot_name, " + "conninfo, status, {1} FROM pg_catalog.pg_stat_get_wal_receiver()" + ).format(written_lsn, extra, self.wal_name, self.lsn_name) if self.role == PostgresqlRole.STANDBY_LEADER: extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()" else: @@ -904,10 +906,14 @@ def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool, on_safepoint() return success, True - # We can skip safepoint detection if we don't have a callback + # Wait for our connection to terminate to detect that PostgreSQL started shutting down. + self._wait_for_connection_close(postmaster) + # If the stopped PostgreSQL was started before Patroni (e.g. a takeover) it may have + # had NOTIFY_SOCKET in its environment and sent STOPPING=1 to systemd on shutdown. + # Re-assert READY=1 to counteract that when NotifyAccess=all is configured. + notify_systemd("READY=1") + if on_safepoint: - # Wait for our connection to terminate so we can be sure that no new connections are being initiated - self._wait_for_connection_close(postmaster) postmaster.wait_for_user_backends_to_close(stop_timeout) on_safepoint() diff --git a/patroni/postgresql/bootstrap.py b/patroni/postgresql/bootstrap.py index c4f5b8098c..3692ed6054 100644 --- a/patroni/postgresql/bootstrap.py +++ b/patroni/postgresql/bootstrap.py @@ -93,7 +93,7 @@ def option_is_allowed(name: str) -> bool: if isinstance(options, dict): for key, val in cast(Dict[str, str], options).items(): - if key and val: + if key and val and option_is_allowed(key): user_options.append('--{0}={1}'.format(key, unquote(val))) elif isinstance(options, list): for opt in cast(List[Any], options): @@ -127,7 +127,7 @@ def error_handler(e: str) -> None: if self._postgresql.config.superuser: if 'username' in self._postgresql.config.superuser: options.append('--username={0}'.format(self._postgresql.config.superuser['username'])) - if 'password' in self._postgresql.config.superuser: + if isinstance(self._postgresql.config.superuser.get('password'), str): (fd, pwfile) = tempfile.mkstemp() os.write(fd, self._postgresql.config.superuser['password'].encode('utf-8')) os.close(fd) @@ -339,9 +339,25 @@ def basebackup(self, conn_url: str, env: Dict[str, str], options: Dict[str, Any] # supports additional user-supplied options, those are not validated maxfailures = 2 ret = 1 - not_allowed_options = ('pgdata', 'format', 'wal-method', 'xlog-method', 'gzip', - 'version', 'compress', 'dbname', 'host', 'port', 'username', 'password') - user_options = self.process_user_options('basebackup', options, not_allowed_options, logger.error) + not_allowed_options = ['pgdata', 'format', 'wal-method', 'xlog-method', 'gzip', + 'version', 'dbname', 'host', 'port', 'username', 'password'] + pg_version = self._postgresql.config.pg_version + if pg_version < 150000: + not_allowed_options.append('compress') + user_options = self.process_user_options('basebackup', options, tuple(not_allowed_options), logger.error) + # Validate compress option on PG15+: only server-side compression is allowed + if pg_version >= 150000: + validated_options: List[str] = [] + for opt in user_options: + if opt.startswith('--compress='): + if opt.startswith('--compress=server'): + validated_options.append(opt) + else: + logger.error('compress option for basebackup must use server-side compression ' + '(e.g., server-gzip, server-zstd). Client-side compression is not allowed.') + else: + validated_options.append(opt) + user_options = validated_options cmd = [ self._postgresql.pgcommand("pg_basebackup"), "--pgdata=" + self._postgresql.data_dir, diff --git a/patroni/postgresql/callback_executor.py b/patroni/postgresql/callback_executor.py index 06b9f3531b..1b536090aa 100644 --- a/patroni/postgresql/callback_executor.py +++ b/patroni/postgresql/callback_executor.py @@ -5,6 +5,7 @@ from threading import Condition, Thread from typing import Any, Dict, List +from .. import thread_pool from .cancellable import CancellableExecutor, CancellableSubprocess logger = logging.getLogger(__name__) @@ -33,7 +34,7 @@ def call_nowait(self, cmd: List[str]) -> None: with self._lock: started = self._start_process(cmd, close_fds=True) if started and self._process is not None: - Thread(target=self._process.wait).start() + thread_pool.get_executor().submit(self._process.wait) class CallbackExecutor(CancellableExecutor, Thread): diff --git a/patroni/postgresql/cancellable.py b/patroni/postgresql/cancellable.py index ca5d3d1507..0e9e3e9de1 100644 --- a/patroni/postgresql/cancellable.py +++ b/patroni/postgresql/cancellable.py @@ -41,7 +41,7 @@ def _kill_process(self) -> None: try: self._process.suspend() # Suspend the process before getting list of children except psutil.Error as e: - logger.info('Failed to suspend the process: %s', e.msg) + logger.info('Failed to suspend the process: %s', e) try: self._process_children = self._process.children(recursive=True) diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index 81a64904f7..084ece4de9 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -856,11 +856,11 @@ def _check_primary_conninfo(self, primary_conninfo: Dict[str, Any], dbname = primary_conninfo.get('dbname') if dbname: wal_receiver_primary_conninfo['dbname'] = dbname + # pg_stat_get_wal_receiver() returns masked password, therefore + # we need to copy password value from primary_conninfo GUC. + if 'password' in primary_conninfo: + wal_receiver_primary_conninfo['password'] = primary_conninfo['password'] primary_conninfo = wal_receiver_primary_conninfo - # There could be no password in the primary_conninfo or it is masked. - # Just copy the "desired" value in order to make comparison succeed. - if 'password' in wanted_primary_conninfo: - primary_conninfo['password'] = wanted_primary_conninfo['password'] if 'passfile' in primary_conninfo and 'password' not in primary_conninfo \ and 'password' in wanted_primary_conninfo: @@ -1007,6 +1007,10 @@ def restart_required(name: str) -> bool: self._current_recovery_params = CaseInsensitiveDict({n: [v, restart_required(n), self._postgresql_conf] for n, v in recovery_params.items()}) + self._current_recovery_params.setdefault('recovery_min_apply_delay', ['0', False, self._postgresql_conf]) + self._current_recovery_params.update({param: ['', restart_required(param), self._postgresql_conf] + for param in self._recovery_parameters_to_compare + if param not in self._current_recovery_params}) else: with self.config_writer(self._recovery_conf) as f: self._write_recovery_params(f, recovery_params) @@ -1199,7 +1203,8 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None: conf_changed = hba_changed = ident_changed = local_connection_address_changed = False param_diff = CaseInsensitiveDict() - if self._postgresql.state == PostgresqlState.RUNNING: + if not self._postgresql.bootstrap.running_custom_bootstrap and \ + self._postgresql.state == PostgresqlState.RUNNING: changes = CaseInsensitiveDict({p: v for p, v in server_parameters.items() if p not in params_skip_changes}) changes.update({p: None for p in self._server_parameters.keys() @@ -1272,6 +1277,9 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None: proxy_addr = config.get('proxy_address') self._postgresql.proxy_url = uri('postgres', proxy_addr, self._postgresql.database) if proxy_addr else None + if self._postgresql.bootstrap.running_custom_bootstrap: + return logger.info('Skipping PostgreSQL configuration update while in custom bootstrap.') + if conf_changed or sighup: self.write_postgresql_conf() diff --git a/patroni/postgresql/mpp/citus.py b/patroni/postgresql/mpp/citus.py index 106052e44a..e872977cdc 100644 --- a/patroni/postgresql/mpp/citus.py +++ b/patroni/postgresql/mpp/citus.py @@ -403,7 +403,10 @@ def __init__(self, postgresql: 'Postgresql', config: Dict[str, Union[str, int]]) self._in_flight: Optional[PgDistTask] = None # Reference to the `PgDistTask` being changed in a transaction self._schedule_load_pg_dist_group = True # Flag that "pg_dist_group" should be queried from the database self._condition = Condition() # protects _pg_dist_group, _tasks, _in_flight, and _schedule_load_pg_dist_group + self._ready_to_run = Event() self.schedule_cache_rebuild() + if self.is_coordinator(): + self.start() def schedule_cache_rebuild(self) -> None: """Cache rebuild handler. @@ -468,9 +471,8 @@ def sync_meta_data(self, cluster: Cluster) -> None: if not self.is_coordinator(): return - with self._condition: - if not self.is_alive(): - self.start() + # notify run() method that it should start doing its job + self._ready_to_run.set() self.add_task('after_promote', CITUS_COORDINATOR_GROUP_ID, cluster, self._postgresql.name, self._postgresql.connection_string) @@ -604,6 +606,9 @@ def process_tasks(self) -> None: task.wakeup() def run(self) -> None: + # we want to postpone "start" until first attempt to sync_meta_data + self._ready_to_run.wait() + while True: try: with self._condition: @@ -683,7 +688,7 @@ def add_task(self, event: str, groupid: int, cluster: Cluster, leader_name: str, return task if self._add_task(task) else None def handle_event(self, cluster: Cluster, event: Dict[str, Any]) -> None: - if not self.is_alive(): + if not self._ready_to_run.is_set(): return worker = cluster.workers.get(event['group']) diff --git a/patroni/postgresql/postmaster.py b/patroni/postgresql/postmaster.py index 5e47251391..8a1498eeee 100644 --- a/patroni/postgresql/postmaster.py +++ b/patroni/postgresql/postmaster.py @@ -152,7 +152,7 @@ def signal_stop(self, mode: str, pg_ctl: str = 'pg_ctl') -> Optional[bool]: :returns None if signaled, True if process is already gone, False if error """ if self.is_single_user: - logger.warning("Cannot stop server; single-user server is running (PID: {0})".format(self.pid)) + logger.warning("Cannot stop server; single-user server is running (PID: %s)", self.pid) return False if os.name != 'posix': return self.pg_ctl_kill(mode, pg_ctl) @@ -161,7 +161,7 @@ def signal_stop(self, mode: str, pg_ctl: str = 'pg_ctl') -> Optional[bool]: except psutil.NoSuchProcess: return True except psutil.AccessDenied as e: - logger.warning("Could not send stop signal to PostgreSQL (error: {0})".format(e)) + logger.warning("Could not send stop signal to PostgreSQL: %r", e) return False return None @@ -225,8 +225,12 @@ def start(pgcommand: str, data_dir: str, conf: str, options: List[str]) -> Optio # In order to make everything portable we can't use fork&exec approach here, so we will call # ourselves and pass list of arguments which must be used to start postgres. # On Windows, in order to run a side-by-side assembly the specified env must include a valid SYSTEMROOT. - env = {p: os.environ[p] for p in os.environ if not p.startswith( - PATRONI_ENV_PREFIX) and not p.startswith(KUBERNETES_ENV_PREFIX)} + # We also remove NOTIFY_SOCKET environment variable so that PostgreSQL doesn't send READY=1 or STOPPING=1 + # to systemd, because it is exclusively responsibility of Patroni to do it. + env = {p: os.environ[p] for p in os.environ if p != 'NOTIFY_SOCKET' + and not p.startswith(PATRONI_ENV_PREFIX) and not p.startswith(KUBERNETES_ENV_PREFIX)} + if 'PG_MALLOC_ARENA_MAX' in env: + env['MALLOC_ARENA_MAX'] = env.pop('PG_MALLOC_ARENA_MAX') try: proc = PostmasterProcess._from_pidfile(data_dir) if proc and not proc._is_postmaster_process(): diff --git a/patroni/postgresql/rewind.py b/patroni/postgresql/rewind.py index ac21e4d7ea..72ae788de2 100644 --- a/patroni/postgresql/rewind.py +++ b/patroni/postgresql/rewind.py @@ -6,9 +6,10 @@ import subprocess from enum import IntEnum -from threading import Lock, Thread +from threading import Lock from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from .. import thread_pool from ..async_executor import CriticalTask from ..collections import EMPTY_DICT from ..dcs import Leader, RemoteMember @@ -322,7 +323,7 @@ def ensure_checkpoint_after_promote(self, wakeup: Callable[..., Any]) -> None: self._state = REWIND_STATUS.CHECKPOINT else: self._checkpoint_task = CriticalTask() - Thread(target=self.__checkpoint, args=(self._checkpoint_task, wakeup)).start() + thread_pool.get_executor().submit(self.__checkpoint, self._checkpoint_task, wakeup) def checkpoint_after_promote(self) -> bool: return self._state == REWIND_STATUS.CHECKPOINT diff --git a/patroni/postgresql/slots.py b/patroni/postgresql/slots.py index 683da4b6a0..61df2b6b55 100644 --- a/patroni/postgresql/slots.py +++ b/patroni/postgresql/slots.py @@ -188,7 +188,7 @@ def __init__(self, postgresql: 'Postgresql') -> None: self._force_readiness_check = False self._schedule_load_slots = False self._postgresql = postgresql - self._advance = None + self._advance = SlotsAdvanceThread(self) self._replication_slots: Dict[str, Dict[str, Any]] = {} # already existing replication slots self._logical_slots_processing_queue: Dict[str, Optional[int]] = {} self.pg_replslot_dir = os.path.join(self._postgresql.data_dir, 'pg_replslot') @@ -491,17 +491,6 @@ class instance. slots.pop(name) self._schedule_load_slots = True - def schedule_advance_slots(self, slots: Dict[str, Dict[str, int]]) -> Tuple[bool, List[str]]: - """Wrapper to ensure slots advance daemon thread is started if not already. - - :param slots: dictionary containing slot information. - - :return: tuple with the result of the scheduling of slot advancement: ``failed`` and list of slots to copy. - """ - if not self._advance: - self._advance = SlotsAdvanceThread(self) - return self._advance.schedule(slots) - def _ensure_logical_slots_replica(self, slots: Dict[str, Any]) -> List[str]: """Update logical *slots* on replicas. @@ -545,7 +534,7 @@ class instance. Slots that exist are also advanced if their ``confirmed_flush_ls for name in create_slots: slots.pop(name) - error, copy_slots = self.schedule_advance_slots(advance_slots) + error, copy_slots = self._advance.schedule(advance_slots) if error: self._schedule_load_slots = True return create_slots + copy_slots diff --git a/patroni/postgresql/validator.py b/patroni/postgresql/validator.py index c3887508bf..ff17e64f95 100644 --- a/patroni/postgresql/validator.py +++ b/patroni/postgresql/validator.py @@ -392,7 +392,7 @@ class in this module. logger.warning(str(exc)) continue - logger.debug(f'Parsing validators from file `{file}`.') + logger.debug('Parsing validators from file `%s`.', file) mapping = { 'parameters': parameters, diff --git a/patroni/psycopg.py b/patroni/psycopg.py index 7005c97ab9..8186c947a1 100644 --- a/patroni/psycopg.py +++ b/patroni/psycopg.py @@ -24,7 +24,7 @@ from psycopg2.extensions import adapt try: - from psycopg2.extensions import parse_dsn, quote_ident as _quote_ident + from psycopg2.extensions import parse_dsn, quote_ident as __quote_ident def _parse_conninfo(conninfo: str, **kwargs: Any) -> Any: """Wraps :func:`parse_dsn` function. @@ -32,13 +32,27 @@ def _parse_conninfo(conninfo: str, **kwargs: Any) -> Any: Exists only to please pyright. """ return parse_dsn(conninfo) - except ImportError: + except ImportError: # pragma: no cover _legacy = True def _parse_conninfo(conninfo: str, **kwargs: Any) -> Any: """Return ``None`` and rely on fallback.""" return None + def __quote_ident(value: Any, scope: Any) -> Any: + """Exists just to please pyright""" + return None + + def _quote_ident(value: Any, scope: Any) -> str: + """Quote *value* as a SQL identifier. + + :param value: value to be quoted. + :param scope: connection or cursor to evaluate the returning string into. + + :returns: *value* quoted as a SQL identifier. + """ + return __quote_ident(value, scope) + def quote_literal(value: Any, conn: Optional[Any] = None) -> str: """Quote *value* as a SQL literal. diff --git a/patroni/scripts/wale_restore.py b/patroni/scripts/wale_restore.py index 3f75964da2..c5d7aedf63 100755 --- a/patroni/scripts/wale_restore.py +++ b/patroni/scripts/wale_restore.py @@ -322,7 +322,7 @@ def create_replica_with_s3(self) -> int: logger.debug('calling: %r', cmd) exit_code = subprocess.call(cmd) except Exception as e: - logger.error('Error when fetching backup with WAL-E: {0}'.format(e)) + logger.error('Error when fetching backup with WAL-E: %r', e) return ExitCode.RETRY_LATER if (exit_code == 0 and not diff --git a/patroni/thread_pool.py b/patroni/thread_pool.py new file mode 100644 index 0000000000..6d44ab809b --- /dev/null +++ b/patroni/thread_pool.py @@ -0,0 +1,28 @@ +from concurrent.futures import ThreadPoolExecutor +from threading import Barrier +from typing import Any + + +class PatroniThreadPoolExecutor(ThreadPoolExecutor): + + def __init__(self, max_workers: int, **kwargs: Any) -> None: + super(PatroniThreadPoolExecutor, self).__init__(max_workers=max_workers, **kwargs) + + # warmup + barrier = Barrier(max_workers + 1) + for _ in range(max_workers): + self.submit(barrier.wait) + barrier.wait() + + +__executor: PatroniThreadPoolExecutor + + +def configure_global_pool(max_workers: int) -> None: + global __executor + __executor = PatroniThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='Global Pool') + + +def get_executor() -> ThreadPoolExecutor: + global __executor # noqa: F824 + return __executor diff --git a/patroni/validator.py b/patroni/validator.py index fb56daa24e..818d789370 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -883,29 +883,33 @@ class IntValidator(object): :ivar min: minimum allowed value for the setting, if any. :ivar max: maximum allowed value for the setting, if any. :ivar base_unit: the base unit to convert the value to before checking if it's within *min* and *max* range. + :ivar aligned: require value to be aligned. :ivar expected_type: the expected Python type. :ivar raise_assert: if an ``assert`` test should be performed regarding expected type and valid range. """ - def __init__(self, min: OptionalType[int] = None, max: OptionalType[int] = None, - base_unit: OptionalType[str] = None, expected_type: Any = None, raise_assert: bool = False) -> None: + def __init__(self, *, min: OptionalType[int] = None, max: OptionalType[int] = None, + base_unit: OptionalType[str] = None, aligned: OptionalType[int] = None, + expected_type: Any = None, raise_assert: bool = False) -> None: """Create an :class:`IntValidator` object with the given rules. :param min: minimum allowed value for the setting, if any. :param max: maximum allowed value for the setting, if any. :param base_unit: the base unit to convert the value to before checking if it's within *min* and *max* range. + :param aligned: require value to be aligned. :param expected_type: the expected Python type. :param raise_assert: if an ``assert`` test should be performed regarding expected type and valid range. """ self.min = min self.max = max self.base_unit = base_unit + self.aligned = aligned if expected_type: self.expected_type = expected_type self.raise_assert = raise_assert def __call__(self, value: Any) -> bool: - """Check if *value* is a valid integer and within the expected range. + """Check if *value* is a valid integer within the expected range and properly aligned if required. .. note:: If ``raise_assert`` is ``True`` and *value* is not valid, then an :class:`AssertionError` will be triggered. @@ -917,7 +921,8 @@ def __call__(self, value: Any) -> bool: value = parse_int(value, self.base_unit) ret = isinstance(value, int)\ and (self.min is None or value >= self.min)\ - and (self.max is None or value <= self.max) + and (self.max is None or value <= self.max)\ + and (self.aligned is None or value % self.aligned == 0) if self.raise_assert: assert_(ret) @@ -995,6 +1000,9 @@ def validate_watchdog_mode(value: Any) -> None: schema = Schema({ "name": str, "scope": str, + Optional("thread_pool_size"): IntValidator(min=5, expected_type=int, raise_assert=True), + Optional("thread_stack_size"): IntValidator(min=65536, base_unit='B', aligned=65535, + expected_type=int, raise_assert=True), Optional("log"): { Optional("type"): EnumValidator(('plain', 'json'), case_sensitive=True, raise_assert=True), Optional("level"): EnumValidator(('DEBUG', 'INFO', 'WARN', 'WARNING', 'ERROR', 'FATAL', 'CRITICAL'), @@ -1019,6 +1027,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("keyfile_password"): str }, "restapi": { + Optional("thread_pool_size"): IntValidator(min=5, expected_type=int, raise_assert=True), "listen": validate_host_port_listen, "connect_address": validate_connect_address, Optional("authentication"): { @@ -1050,12 +1059,12 @@ def validate_watchdog_mode(value: Any) -> None: Optional('member_slots_ttl'): IntValidator(min=0, base_unit='s', raise_assert=True), Optional("postgresql"): { Optional("parameters"): { - Optional("max_connections"): IntValidator(1, 262143, raise_assert=True), - Optional("max_locks_per_transaction"): IntValidator(10, 2147483647, raise_assert=True), - Optional("max_prepared_transactions"): IntValidator(0, 262143, raise_assert=True), - Optional("max_replication_slots"): IntValidator(0, 262143, raise_assert=True), - Optional("max_wal_senders"): IntValidator(0, 262143, raise_assert=True), - Optional("max_worker_processes"): IntValidator(0, 262143, raise_assert=True), + Optional("max_connections"): IntValidator(min=1, max=262143, raise_assert=True), + Optional("max_locks_per_transaction"): IntValidator(min=10, max=2147483647, raise_assert=True), + Optional("max_prepared_transactions"): IntValidator(min=0, max=262143, raise_assert=True), + Optional("max_replication_slots"): IntValidator(min=0, max=262143, raise_assert=True), + Optional("max_wal_senders"): IntValidator(min=0, max=262143, raise_assert=True), + Optional("max_worker_processes"): IntValidator(min=0, max=262143, raise_assert=True), }, Optional("use_pg_rewind"): bool, Optional("pg_hba"): [str], diff --git a/patroni/version.py b/patroni/version.py index 5ee1eb6aad..0a50a059c0 100644 --- a/patroni/version.py +++ b/patroni/version.py @@ -2,4 +2,4 @@ :var __version__: the current Patroni version. """ -__version__ = '4.1.0.2' +__version__ = '4.1.2.0' diff --git a/patroni/watchdog/base.py b/patroni/watchdog/base.py index 2085f5b467..be731b98d3 100644 --- a/patroni/watchdog/base.py +++ b/patroni/watchdog/base.py @@ -28,7 +28,7 @@ def parse_mode(mode: Union[bool, str]) -> str: return MODE_AUTOMATIC else: if mode not in ['off', 'disable', 'disabled']: - logger.warning("Watchdog mode {0} not recognized, disabling watchdog".format(mode)) + logger.warning("Watchdog mode %s not recognized, disabling watchdog", mode) return MODE_OFF @@ -132,8 +132,8 @@ def _activate(self) -> bool: self.active_config = self.config if self.config.timing_slack < 0: - logger.warning('Watchdog not supported because leader TTL {0} is less than 2x loop_wait {1}' - .format(self.config.ttl, self.config.loop_wait)) + logger.warning('Watchdog not supported because leader TTL %s is less than 2x loop_wait %s', + self.config.ttl, self.config.loop_wait) self.impl = NullWatchdog() try: @@ -154,18 +154,17 @@ def _activate(self) -> bool: if self.impl.is_null: logger.error("Configuration requires watchdog, but watchdog could not be configured.") else: - logger.error("Configuration requires watchdog, but a safe watchdog timeout {0} could" - " not be configured. Watchdog timeout is {1}.".format( - self.config.timeout, actual_timeout)) + logger.error("Configuration requires watchdog, but a safe watchdog timeout %s could" + " not be configured. Watchdog timeout is %s.", self.config.timeout, actual_timeout) return False else: if not self.impl.is_null: - logger.warning("Watchdog timeout {0} seconds does not ensure safe termination within {1} seconds" - .format(actual_timeout, self.config.timeout)) + logger.warning("Watchdog timeout %s seconds does not ensure safe termination within %s seconds", + actual_timeout, self.config.timeout) if self.is_running: - logger.info("{0} activated with {1} second timeout, timing slack {2} seconds" - .format(self.impl.describe(), actual_timeout, self.config.timing_slack)) + logger.info("%s activated with %s second timeout, timing slack %s seconds", + self.impl.describe(), actual_timeout, self.config.timing_slack) else: if self.config.mode == MODE_REQUIRED: logger.error("Configuration requires watchdog, but watchdog could not be activated") @@ -180,8 +179,8 @@ def _set_timeout(self) -> Optional[int]: # Safety checks for watchdog implementations that don't support configurable timeouts actual_timeout = self.impl.get_timeout() if self.impl.is_running and actual_timeout < self.config.loop_wait: - logger.error('loop_wait of {0} seconds is too long for watchdog {1} second timeout' - .format(self.config.loop_wait, actual_timeout)) + logger.error('loop_wait of %s seconds is too long for watchdog %s second timeout', + self.config.loop_wait, actual_timeout) if self.impl.can_be_disabled: logger.info('Disabling watchdog due to unsafe timeout.') self.impl.close() @@ -200,7 +199,7 @@ def _disable(self) -> None: # Give sysadmin some extra time to clean stuff up. self.impl.keepalive() logger.warning("Watchdog implementation can't be disabled. System will reboot after " - "{0} seconds when watchdog times out.".format(self.impl.get_timeout())) + "%s seconds when watchdog times out.", self.impl.get_timeout()) self.impl.close() except WatchdogError as e: logger.error("Error while disabling watchdog: %s", e) @@ -223,8 +222,8 @@ def keepalive(self) -> None: if self.config.timeout != self.active_config.timeout: self.impl.set_timeout(self.config.timeout) if self.is_running: - logger.info("{0} updated with {1} second timeout, timing slack {2} seconds" - .format(self.impl.describe(), self.impl.get_timeout(), self.config.timing_slack)) + logger.info("%s updated with %s second timeout, timing slack %s seconds", + self.impl.describe(), self.impl.get_timeout(), self.config.timing_slack) self.active_config = self.config except WatchdogError as e: logger.error("Error while sending keepalive: %s", e) diff --git a/pyrightconfig.json b/pyrightconfig.json index ba4157955b..9c81c16fb4 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -19,7 +19,7 @@ "reportMissingImports": true, "reportMissingTypeStubs": false, - "pythonVersion": "3.13", + "pythonVersion": "3.14", "pythonPlatform": "All", "typeCheckingMode": "strict" diff --git a/requirements.dev.txt b/requirements.dev.txt index 520e887d18..839bce77ec 100644 --- a/requirements.dev.txt +++ b/requirements.dev.txt @@ -6,3 +6,4 @@ flake8>=3.0.0 pytest-cov pytest setuptools +isort diff --git a/requirements.docs.txt b/requirements.docs.txt index b2b15ba4cb..d5ce5218b2 100644 --- a/requirements.docs.txt +++ b/requirements.docs.txt @@ -1,7 +1,7 @@ -sphinx>=4,<8.2.0 +sphinx>=4 sphinx_rtd_theme>1 sphinxcontrib-apidoc -sphinx-github-style<1.0.3 +sphinx-github-style psycopg[binary] psycopg2-binary==2.9.9; sys_platform == "darwin" psycopg2-binary diff --git a/setup.py b/setup.py index 2e7fce4d22..0ff9d58df9 100644 --- a/setup.py +++ b/setup.py @@ -8,6 +8,7 @@ import inspect import logging import os +import re import sys from setuptools import Command, find_packages, setup @@ -50,6 +51,7 @@ 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', 'Programming Language :: Python :: 3.13', + 'Programming Language :: Python :: 3.14', 'Programming Language :: Python :: Implementation :: CPython', ] @@ -207,7 +209,7 @@ def main(): license=LICENSE, license_files=('LICENSE',), keywords=KEYWORDS, - long_description=read('README.rst'), + long_description=re.sub(r'\n\n\.\. image:: docs/[^\n]*(?:\n [^\n]+)*', '', read('README.rst')), classifiers=CLASSIFIERS, packages=find_packages(exclude=['tests', 'tests.*']), package_data={MAIN_PACKAGE: [ diff --git a/tests/__init__.py b/tests/__init__.py index 903e24def1..5ca5caed46 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -241,6 +241,8 @@ def __exit__(self, *args): class MockConnect(object): + pgconn = None + connection = None server_version = 99999 autocommit = False closed = 0 @@ -280,6 +282,8 @@ class PostgresInit(unittest.TestCase): @patch('patroni.psycopg._connect', psycopg_connect) @patch('patroni.postgresql.CallbackExecutor', Mock()) + @patch('patroni.postgresql.mpp.citus.CitusHandler.start', Mock()) + @patch('patroni.postgresql.slots.SlotsAdvanceThread.start', Mock()) @patch.object(ConfigHandler, 'write_postgresql_conf', Mock()) @patch.object(ConfigHandler, 'replace_pg_hba', Mock()) @patch.object(ConfigHandler, 'replace_pg_ident', Mock()) diff --git a/tests/test_api.py b/tests/test_api.py index 1a10812767..64adc26ee4 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -5,7 +5,6 @@ from http.server import HTTPServer from io import BytesIO as IO -from socketserver import ThreadingMixIn from unittest.mock import Mock, patch, PropertyMock from patroni import global_config @@ -200,6 +199,7 @@ def __init__(self, Handler, request, config=None): @patch('ssl.SSLContext.load_cert_chain', Mock()) @patch('ssl.SSLContext.wrap_socket', Mock(return_value=0)) +@patch('patroni.api.PatroniThreadPoolExecutor', Mock()) @patch.object(HTTPServer, '__init__', Mock()) class TestRestApiHandler(unittest.TestCase): @@ -351,7 +351,9 @@ def test_do_GET_liveness(self, mock_dcs): mock_dcs.ttl.return_value = PropertyMock(30) self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /liveness HTTP/1.0')) - def test_do_GET_readiness(self): + @patch.object(MockPatroni, 'dcs') + def test_do_GET_readiness(self, mock_dcs): + mock_dcs.cluster.status.last_lsn = 5 MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0') with patch.object(MockHa, 'is_leader', Mock(return_value=True)): MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0') @@ -379,7 +381,7 @@ def patch_query(latest_lsn, received_location, replayed_location): response_mock.assert_called_with(503) # DCS not available - MockPatroni.dcs.cluster = None + mock_dcs.cluster = None with patch_query(None, None, None), \ patch.object(RestApiHandler, '_write_status_code_only') as response_mock: # Failsafe active @@ -737,12 +739,13 @@ class TestRestApiServer(unittest.TestCase): @patch('ssl.SSLContext.load_cert_chain', Mock()) @patch('ssl.SSLContext.set_ciphers', Mock()) @patch('ssl.SSLContext.wrap_socket', Mock(return_value=0)) + @patch('patroni.api.PatroniThreadPoolExecutor', Mock()) @patch.object(HTTPServer, '__init__', Mock()) def setUp(self): self.srv = MockRestApiServer(Mock(), '', {'listen': '*:8008', 'certfile': 'a', 'verify_client': 'required', 'ciphers': '!SSLv1:!SSLv2:!SSLv3:!TLSv1:!TLSv1.1', 'allowlist': ['127.0.0.1', '::1/128', '::1/zxc'], - 'allowlist_include_members': True}) + 'allowlist_include_members': True, 'thread_pool_size': 'a'}) @patch.object(HTTPServer, '__init__', Mock()) def test_reload_config(self): @@ -790,9 +793,9 @@ def __create_socket(self): pass return sock - @patch.object(ThreadingMixIn, 'process_request_thread', Mock()) - def test_process_request_thread(self): - self.srv.process_request_thread(self.__create_socket(), ('2', 54321)) + def test_process_request(self): + with patch.object(self.srv._executor, 'submit', lambda f, r, c: f(r, c)): + self.srv.process_request(self.__create_socket(), ('2', 54321)) @patch.object(MockRestApiServer, 'process_request', Mock(side_effect=RuntimeError)) @patch.object(MockRestApiServer, 'get_request') diff --git a/tests/test_async_executor.py b/tests/test_async_executor.py index 853d685aca..47369523cf 100644 --- a/tests/test_async_executor.py +++ b/tests/test_async_executor.py @@ -1,6 +1,5 @@ import unittest -from threading import Thread from unittest.mock import Mock, patch from patroni.async_executor import AsyncExecutor, CriticalTask @@ -11,7 +10,7 @@ class TestAsyncExecutor(unittest.TestCase): def setUp(self): self.a = AsyncExecutor(Mock(), Mock()) - @patch.object(Thread, 'start', Mock()) + @patch('patroni.thread_pool.get_executor', Mock()) def test_run_async(self): self.a.run_async(Mock(return_value=True)) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 7d7f1bf2fc..7b56e6c452 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -96,6 +96,7 @@ def test_create_replica_old_format(self, mock_cancellable_subprocess_call): @patch.object(CancellableSubprocess, 'call', Mock(return_value=0)) @patch.object(Postgresql, 'data_directory_empty', Mock(return_value=True)) + @patch.object(ConfigHandler, 'pg_version', PropertyMock(return_value=150000)) def test_basebackup(self): with patch('patroni.postgresql.bootstrap.logger.debug') as mock_debug: self.p.cancellable.cancel() @@ -109,6 +110,32 @@ def test_basebackup(self): ['pg_basebackup', f'--pgdata={self.p.data_dir}', '-X', 'stream', '--dbname=', '--foo=bar'], ) + # Test compress option: server-side compression should be allowed on PG15+ (issue #3532) + with patch('patroni.postgresql.bootstrap.logger.debug') as mock_debug: + self.p.cancellable.reset_is_cancelled() + self.b.basebackup("", None, {'compress': 'server-zstd'}) + mock_debug.assert_called_with( + 'calling: %r', + ['pg_basebackup', f'--pgdata={self.p.data_dir}', '-X', 'stream', '--dbname=', '--compress=server-zstd'], + ) + + # Test compress option: client-side compression should be rejected on PG15+ + with patch('patroni.postgresql.bootstrap.logger.error') as mock_error: + self.p.cancellable.reset_is_cancelled() + self.b.basebackup("", None, {'compress': 'gzip'}) + mock_error.assert_called_with( + 'compress option for basebackup must use server-side compression ' + '(e.g., server-gzip, server-zstd). Client-side compression is not allowed.' + ) + + # Test compress option: should be rejected on PG < 15 + with patch('patroni.postgresql.bootstrap.logger.error') as mock_error, \ + patch.object(ConfigHandler, 'pg_version', PropertyMock(return_value=140000)): + self.p.cancellable.reset_is_cancelled() + self.b.basebackup("", None, {'compress': 'server-zstd'}) + # compress is in not_allowed_options for PG < 15, error logged via process_user_options + mock_error.assert_any_call('compress option for basebackup is not allowed') + def test__initdb(self): self.assertRaises(Exception, self.b.bootstrap, {'initdb': [{'pgdata': 'bar'}]}) self.assertRaises(Exception, self.b.bootstrap, {'initdb': [{'foo': 'bar', 1: 2}]}) @@ -166,6 +193,16 @@ def error_handler(msg): ), ['--checkpoint=fast', '--gzip', '--label=standby'], ) + # not allowed options in dict format are also filtered out (issue #3533) + self.assertEqual( + self.b.process_user_options( + 'pg_basebackup', + {'checkpoint': 'fast', 'dbname': 'dbname=postgres', 'label': 'standby'}, + ('dbname',), + print + ), + ['--checkpoint=fast', '--label=standby'], + ) @patch.object(CancellableSubprocess, 'call', Mock()) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) @@ -240,7 +277,7 @@ def test_custom_bootstrap(self, mock_cancellable_subprocess_call): @patch('os.unlink', Mock()) @patch('shutil.copy', Mock()) @patch('os.path.isfile', Mock(return_value=True)) - @patch('patroni.postgresql.bootstrap.quote_ident', Mock()) + @patch('patroni.psycopg.__quote_ident', Mock(), create=True) @patch.object(Bootstrap, 'call_post_bootstrap', Mock(return_value=True)) @patch.object(Bootstrap, '_custom_bootstrap', Mock(return_value=True)) @patch.object(Postgresql, 'start', Mock(return_value=True)) @@ -271,11 +308,15 @@ def test_post_bootstrap(self): self.b.bootstrap(config) self.p.set_state(PostgresqlState.STOPPED) - self.p.reload_config({'authentication': {'superuser': {'username': 'p', 'password': 'p'}, - 'replication': {'username': 'r', 'password': 'r'}, - 'rewind': {'username': 'rw', 'password': 'rw'}}, - 'listen': '*', 'retry_timeout': 10, - 'parameters': {'wal_level': '', 'hba_file': 'foo', 'max_prepared_transactions': 10}}) + with patch('patroni.postgresql.config.logger.info') as mock_logger: + self.p.reload_config({'authentication': {'superuser': {'username': 'p', 'password': 'p'}, + 'replication': {'username': 'r', 'password': 'r'}, + 'rewind': {'username': 'rw', 'password': 'rw'}}, + 'listen': '*', 'retry_timeout': 10, + 'parameters': {'wal_level': '', 'hba_file': 'foo', 'max_prepared_transactions': 10}}) + mock_logger.assert_called_once() + self.assertEqual(mock_logger.call_args[0][0], + 'Skipping PostgreSQL configuration update while in custom bootstrap.') with patch.object(Postgresql, 'major_version', PropertyMock(return_value=110000)), \ patch.object(Postgresql, 'restart', Mock()) as mock_restart: self.b.post_bootstrap({}, task) diff --git a/tests/test_callback_executor.py b/tests/test_callback_executor.py index f7ac3b1cc3..a451b4a7a6 100644 --- a/tests/test_callback_executor.py +++ b/tests/test_callback_executor.py @@ -44,7 +44,9 @@ def test_callback_executor(self, mock_popen): mock_popen.reset_mock() mock_popen.side_effect = [Mock()] - self.assertIsNone(ce.call(['test.sh', CallbackAction.ON_RELOAD, PostgresqlRole.REPLICA, 'foo'])) - self.assertEqual(mock_popen.call_args_list[0], - mock.call(['test.sh', 'on_reload', 'replica', 'foo'], close_fds=True)) + with patch('patroni.thread_pool.get_executor') as mock_executor: + mock_executor.return_value.submit = lambda f: f() + self.assertIsNone(ce.call(['test.sh', CallbackAction.ON_RELOAD, PostgresqlRole.REPLICA, 'foo'])) + self.assertEqual(mock_popen.call_args_list[0], + mock.call(['test.sh', 'on_reload', 'replica', 'foo'], close_fds=True)) ce.join() diff --git a/tests/test_citus.py b/tests/test_citus.py index 0389f07a35..e7cb3c6fa8 100644 --- a/tests/test_citus.py +++ b/tests/test_citus.py @@ -26,8 +26,8 @@ def setUp(self): @patch('patroni.postgresql.mpp.citus.logger.exception', Mock(side_effect=SleepException)) @patch('patroni.postgresql.mpp.citus.logger.warning') @patch('patroni.postgresql.mpp.citus.PgDistTask.wait', Mock()) - @patch.object(CitusHandler, 'is_alive', Mock(return_value=True)) def test_run(self, mock_logger_warning): + self.c._ready_to_run.set() # `before_demote` or `before_promote` REST API calls starting a # transaction. We want to make sure that it finishes during # certain timeout. In case if it is not, we want to roll it back @@ -43,7 +43,6 @@ def test_run(self, mock_logger_warning): self.assertTrue(mock_logger_warning.call_args[0][0].startswith('Rolling back transaction')) self.assertTrue(repr(mock_logger_warning.call_args[0][1]).startswith('PgDistTask')) - @patch.object(CitusHandler, 'is_alive', Mock(return_value=False)) @patch.object(CitusHandler, 'start', Mock()) def test_sync_meta_data(self): with patch.object(CitusHandler, 'is_enabled', Mock(return_value=False)): @@ -52,9 +51,9 @@ def test_sync_meta_data(self): def test_handle_event(self): self.c.handle_event(self.cluster, {}) - with patch.object(CitusHandler, 'is_alive', Mock(return_value=True)): - self.c.handle_event(self.cluster, {'type': 'after_promote', 'group': 2, - 'leader': 'leader', 'timeout': 30, 'cooldown': 10}) + self.c._ready_to_run.set() + self.c.handle_event(self.cluster, {'type': 'after_promote', 'group': 2, + 'leader': 'leader', 'timeout': 30, 'cooldown': 10}) def test_add_task(self): with patch('patroni.postgresql.mpp.citus.logger.error') as mock_logger, \ @@ -133,7 +132,7 @@ def test_on_demote(self): @patch('patroni.postgresql.mpp.citus.logger.error') @patch.object(MockCursor, 'execute', Mock(side_effect=Exception)) def test_load_pg_dist_group(self, mock_logger): - # load_pg_dist_group) triggers, query fails and exception is property handled + # load_pg_dist_group() triggers, query fails and exception is property handled self.c.process_tasks() self.assertTrue(self.c._schedule_load_pg_dist_group) mock_logger.assert_called_once() diff --git a/tests/test_config.py b/tests/test_config.py index 13e2febdb7..cafe4969d2 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -34,6 +34,9 @@ def test_reload_local_configuration(self): 'PATRONI_NAME': 'postgres0', 'PATRONI_NAMESPACE': '/patroni/', 'PATRONI_SCOPE': 'batman2', + 'PATRONI_THREAD_POOL_SIZE': '5', + 'PATRONI_THREAD_STACK_SIZE': '262144', + 'PATRONI_SCOPE': 'batman2', 'PATRONI_LOGLEVEL': 'ERROR', 'PATRONI_LOG_FORMAT': '["message", {"levelname": "level"}]', 'PATRONI_LOG_LOGGERS': 'patroni.postmaster: WARNING, urllib3: DEBUG', @@ -42,6 +45,7 @@ def test_reload_local_configuration(self): 'PATRONI_CITUS_DATABASE': 'citus', 'PATRONI_CITUS_GROUP': '0', 'PATRONI_CITUS_HOST': '0', + 'PATRONI_RESTAPI_THREAD_POOL_SIZE': '5', 'PATRONI_RESTAPI_USERNAME': 'username', 'PATRONI_RESTAPI_PASSWORD': 'password', 'PATRONI_RESTAPI_LISTEN': '0.0.0.0:8008', @@ -180,12 +184,20 @@ def open_mock(fname, *args, **kwargs): ''') elif fname.endswith('00-empty.yml'): return io.StringIO(u'''---''') + elif fname.endswith('00-nondict-parameters.yml'): + return io.StringIO( + u''' + postgresql: + parameters: + - abc: 3 + ''') with patch('builtins.open', MagicMock(side_effect=open_mock)): self.assertRaises(ConfigParseError, Config, 'postgres0') mock_logger.error.assert_called_once_with( '%s does not contain a dict', 'postgres0\\00-empty.yml' if sys.platform == 'win32' else 'postgres0/00-empty.yml') + self.assertRaises(ConfigParseError, Config, '00-nondict-parameters.yml') @patch.object(Config, 'get') @patch('patroni.config.logger') diff --git a/tests/test_ctl.py b/tests/test_ctl.py index c900edab23..a5d0bf8181 100644 --- a/tests/test_ctl.py +++ b/tests/test_ctl.py @@ -850,7 +850,8 @@ def test_cluster_promote(self, mock_patch): # success mock_patch.return_value.status = 200 with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[standby_cluster, standby_cluster, - only_leader_cluster])): + only_leader_cluster])), \ + patch('time.sleep', Mock()): result = self.runner.invoke(ctl, ['promote-cluster', 'dummy', '--force']) assert result.exit_code == 0 diff --git a/tests/test_etcd3.py b/tests/test_etcd3.py index 0edbd0286e..7620032ba1 100644 --- a/tests/test_etcd3.py +++ b/tests/test_etcd3.py @@ -79,6 +79,40 @@ def test_authenticate(self): DnsCachingResolver()) self.assertIsNotNone(etcd3._cluster_version) + @patch.object(urllib3.PoolManager, 'urlopen') + def test_get_members_retries_auth_errors(self, mock_urlopen): + auth_requests = [] + member_list_requests = [] + + def urlopen_side_effect(method, url, **kwargs): + ret = MockResponse() + if method == 'GET' and url.endswith('/version'): + ret.content = '{"etcdserver": "3.6.10", "etcdcluster": "3.6.0"}' + elif url.endswith('/auth/authenticate'): + auth_requests.append(kwargs) + ret.content = '{{"token":"authtoken{0}"}}'.format(len(auth_requests)) + elif url.endswith('/cluster/member/list'): + member_list_requests.append(kwargs) + if 1 < len(member_list_requests) < 4: + ret.status_code = 401 + ret.content = '{"code":16,"error":"etcdserver: invalid auth token"}' + else: + ret.content = '{"members":[{"clientURLs":["http://localhost:2379"]}]}' + return ret + + mock_urlopen.side_effect = urlopen_side_effect + client = Etcd3Client({'host': '127.0.0.1', 'port': 2379, 'retry_timeout': 10, 'patronictl': True, + 'username': 'etcduser', 'password': 'etcdpassword'}, DnsCachingResolver()) + client._update_machines_cache = True + + kwargs = client._prepare_get_members(1) + self.assertEqual(client._get_members('http://127.0.0.1:2379', **kwargs), ['http://localhost:2379']) + self.assertEqual(len(auth_requests), 3) + self.assertEqual(json.loads(auth_requests[-1]['body']), {'name': 'etcduser', 'password': 'etcdpassword'}) + self.assertNotIn('authorization', auth_requests[-1]['headers']) + self.assertEqual(len(member_list_requests), 4) + self.assertEqual(member_list_requests[-1]['headers']['authorization'], 'authtoken3') + class BaseTestEtcd3(unittest.TestCase): @@ -197,13 +231,13 @@ def test__handle_server_response(self): self.assertRaises(etcd.EtcdException, self.client._handle_server_response, response) response.status_code = 400 self.assertRaises(Unknown, self.client._handle_server_response, response) - response.content = '{"error":{"grpc_code":14,"message":"","http_code":400}}' + response.content = '{"error":{"code":14,"message":"","http_code":400}}' self.assertRaises(socket.timeout, self.client._handle_server_response, response) response.content = '{"error":{"grpc_code":0,"message":"","http_code":400}}' try: self.client._handle_server_response(response) - except Unknown as e: - self.assertEqual(e.as_dict(), {'code': 2, 'codeText': 'OK', 'error': u'', 'status': 400}) + except Etcd3ClientError as e: + self.assertEqual(e.as_dict(), {'code': 0, 'codeText': 'OK', 'error': u'', 'status': 400}) @patch.object(urllib3.PoolManager, 'urlopen') def test__ensure_version_prefix(self, mock_urlopen): diff --git a/tests/test_ha.py b/tests/test_ha.py index 5bad939358..48dd066c9f 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -22,9 +22,10 @@ from patroni.postgresql.config import ConfigHandler from patroni.postgresql.misc import PostgresqlRole, PostgresqlState from patroni.postgresql.postmaster import PostmasterProcess -from patroni.postgresql.rewind import Rewind +from patroni.postgresql.rewind import Rewind, REWIND_STATUS from patroni.postgresql.slots import SlotsHandler from patroni.postgresql.sync import _SyncState +from patroni.thread_pool import PatroniThreadPoolExecutor from patroni.utils import tzutc from patroni.watchdog import Watchdog @@ -103,6 +104,10 @@ def get_cluster_initialized_with_leader_and_failsafe(): cluster_config=ClusterConfig(1, {'failsafe_mode': True}, 1)) +def _check_timeline_and_lsn(self, *args): + self._state = REWIND_STATUS.NEED + + def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0, timeline=2, wal_position=10, nofailover=False, watchdog_failed=False, failover_priority=1, sync_priority=1): @@ -187,6 +192,7 @@ def run_async(self, func, args=()): 'Latest checkpoint location': '0/12345678', "Latest checkpoint's TimeLineID": '2'})) @patch.object(SlotsHandler, 'load_replication_slots', Mock(side_effect=Exception)) +@patch.object(ConfigHandler, 'pg_version', PropertyMock(return_value=180000)) @patch.object(ConfigHandler, 'append_pg_hba', Mock()) @patch.object(ConfigHandler, 'write_pgpass', Mock(return_value={})) @patch.object(ConfigHandler, 'write_recovery_conf', Mock()) @@ -205,8 +211,7 @@ def run_async(self, func, args=()): @patch('patroni.postgresql.polling_loop', Mock(return_value=range(1))) @patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False)) @patch('patroni.async_executor.AsyncExecutor.run_async', run_async) -@patch('patroni.postgresql.rewind.Thread', Mock()) -@patch('patroni.postgresql.mpp.citus.CitusHandler.start', Mock()) +@patch('patroni.thread_pool.get_executor', Mock(return_value=PatroniThreadPoolExecutor(max_workers=3))) @patch('subprocess.call', Mock(return_value=0)) @patch('time.sleep', Mock()) class TestHa(PostgresInit): @@ -508,12 +513,14 @@ def test_follow_in_pause(self): self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)') - @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) + @patch.object(Rewind, '_check_timeline_and_lsn', _check_timeline_and_lsn) + @patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False))) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_follow_triggers_rewind(self): self.p.is_primary = false - self.ha._rewind.trigger_check_diverged_lsn() self.ha.cluster = get_cluster_initialized_with_leader() + self.p.timeline_wal_position = Mock(return_value=(0, 1, 0, 1, 1, 0)) + self.ha._leader_timeline = 11 self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader') def test_no_dcs_connection_primary_demote(self): @@ -653,6 +660,7 @@ def test_bootstrap_release_initialize_key_on_failure(self): @patch('patroni.postgresql.mpp.citus.connect', psycopg_connect) @patch('patroni.postgresql.mpp.citus.quote_ident', Mock()) @patch.object(Postgresql, 'connection', Mock(return_value=None)) + @patch.object(Postgresql, '_wait_for_connection_close', Mock()) def test_bootstrap_release_initialize_key_on_watchdog_failure(self): self.ha.cluster = get_cluster_not_initialized_without_leader() self.e.initialize = true @@ -666,6 +674,7 @@ def test_bootstrap_release_initialize_key_on_watchdog_failure(self): ' watchdog activation failed')) @patch('patroni.psycopg.connect', psycopg_connect) + @patch.object(Postgresql, '_wait_for_connection_close', Mock()) def test_reinitialize(self): self.assertIsNotNone(self.ha.reinitialize()) @@ -792,21 +801,22 @@ def test_manual_switchover_from_leader(self): with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(nofailover=True) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') - self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not allowed to promote')) + self.assertEqual(mock_info.call_args_list[0][0][0::2], ('Member %s is %s', 'not allowed to promote')) with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(watchdog_failed=True) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') - self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not watchdog capable')) + self.assertEqual(mock_info.call_args_list[0][0][0::2], ('Member %s is %s', 'not watchdog capable')) with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(timeline=1) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') - self.assertEqual(mock_info.call_args_list[0][0], - ('Timeline %s of member %s is behind the cluster timeline %s', 1, 'leader', 2)) + self.assertEqual(mock_info.call_args_list[0][0][0], + 'Timeline %s of member %s is behind the cluster timeline %s') + self.assertEqual(mock_info.call_args_list[0][0][1::2], (1, 2)) with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(wal_position=1) self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5}) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') - self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader')) + self.assertEqual(mock_info.call_args_list[0][0][0], 'Member %s exceeds maximum replication lag') @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_scheduled_switchover_from_leader(self): @@ -966,8 +976,7 @@ def test_manual_switchover_process_no_leader(self): with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') - self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not reachable')) - self.assertEqual(mock_info.call_args_list[1][0], ('Member %s is %s', 'other', 'not reachable')) + self.assertEqual(mock_info.call_args_list[0][0][0::2], ('Member %s is %s', 'not reachable')) def test_manual_failover_process_no_leader_in_synchronous_mode(self): self.ha.is_synchronous_mode = true @@ -1062,6 +1071,8 @@ def test_is_healthiest_node(self): self.ha.dcs._last_failsafe = None with patch.object(Watchdog, 'is_healthy', PropertyMock(return_value=False)): self.assertFalse(self.ha.is_healthiest_node()) + with patch('patroni.postgresql.Postgresql.is_starting', return_value=True): + self.assertFalse(self.ha.is_healthiest_node()) self.ha.is_paused = true self.assertFalse(self.ha.is_healthiest_node()) @@ -1084,9 +1095,19 @@ def test__is_healthiest_node(self): self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members, leader=self.ha.old_cluster.leader)) self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) - # in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)): + # in synchronous_mode consider us healthy if the former leader is accessible in read-only and ahead of us self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members)) + self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name + ',other')) + self.ha.fetch_node_status = get_node_status(failover_priority=10, wal_position=10) + # in synchronous_mode we need to respect failover_priority + with patch('patroni.ha.logger.info') as mock_info: + self.assertFalse(self.ha._is_healthiest_node(self.ha.cluster.members)) + self.assertEqual( + mock_info.call_args_list[0][0][0], + '%s has equally tolerable WAL position and priority %s, while this node has priority %s') + self.assertEqual(mock_info.call_args_list[0][0][2:], (10, 1)) + self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5}) global_config.update(self.ha.cluster) with patch('patroni.postgresql.Postgresql.last_operation', return_value=1): @@ -1253,7 +1274,13 @@ def test_failed_to_update_lock_in_pause(self): 'PAUSE: continue to run as primary after failing to update leader lock in DCS') def test_postgres_unhealthy_in_pause(self): + self.p.is_primary = false self.ha.is_paused = true + with patch.object(Postgresql, 'cb_called', PropertyMock(return_value=True)), \ + patch.object(Rewind, 'trigger_check_diverged_lsn') as mock_rewind_check: + self.ha.patroni.nofailover = True + self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)') + mock_rewind_check.assert_not_called() self.p.is_healthy = false self.assertEqual(self.ha.run_cycle(), 'PAUSE: postgres is not running') self.ha.has_lock = true @@ -1623,16 +1650,19 @@ def test_inconsistent_synchronous_state(self): self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a')) self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, CaseInsensitiveSet(), CaseInsensitiveSet(), CaseInsensitiveSet('a'))) - self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) - mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() - with patch('patroni.ha.logger.warning') as mock_logger: - self.ha.run_cycle() - mock_set_sync.assert_called_once() - self.assertTrue(mock_logger.call_args_list[0][0][0].startswith('Inconsistent state between ')) - self.ha.dcs.write_sync_state = Mock(return_value=None) - with patch('patroni.ha.logger.warning') as mock_logger: - self.ha.run_cycle() - self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed') + for leader_name in ('other', 'leader'): + with self.subTest(leader_name=leader_name): + self.p.name = leader_name + self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) + mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() + with patch('patroni.ha.logger.warning') as mock_logger: + self.ha.run_cycle() + mock_set_sync.assert_called_once() + self.assertTrue(mock_logger.call_args_list[0][0][0].startswith('Inconsistent state ')) + self.ha.dcs.write_sync_state = Mock(return_value=None) + with patch('patroni.ha.logger.warning') as mock_logger: + self.ha.run_cycle() + self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed') def test_effective_tags(self): self.ha._disable_sync = True @@ -1698,9 +1728,10 @@ def test_leader_with_not_accessible_data_directory(self): @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_update_cluster_history(self): self.ha.has_lock = true - for tl in (1, 3): - self.p.get_primary_timeline = Mock(return_value=tl) - self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') + with patch('patroni.thread_pool.get_executor', Mock()): + for tl in (1, 3): + self.p.get_primary_timeline = Mock(return_value=tl) + self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') @patch('sys.exit', return_value=1) def test_abort_join(self, exit_mock): @@ -1768,6 +1799,7 @@ def test_sysid_no_match_in_pause(self): @patch('builtins.open', mock_open()) @patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False))) @patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000)) + @patch.object(Postgresql, '_wait_for_connection_close', Mock()) @patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls'])) def test_follow_copy(self): self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}} diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index f48431a965..9b81ad2fef 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -109,20 +109,21 @@ def test_load_incluster_config(self): def test_refresh_token(self): with patch('os.environ', {SERVICE_HOST_ENV_NAME: 'a', SERVICE_PORT_ENV_NAME: '1'}), \ + patch('patroni.dcs.kubernetes.datetime') as mock_datetime, \ patch('os.path.isfile', Mock(side_effect=[True, True, False, True, True, True])), \ patch('builtins.open', Mock(side_effect=[ mock_open(read_data='cert')(), mock_open(read_data='a')(), mock_open()(), mock_open(read_data='b')(), mock_open(read_data='c')()])): + mock_datetime.datetime.now.side_effect = [datetime.datetime(1, 1, 1, 0, 0, 0)] * 2 + \ + [datetime.datetime(1, 1, 1, 0, 0, 1)] * 4 + [datetime.datetime(1, 1, 1, 0, 0, 2)] * 3 k8s_config.load_incluster_config(token_refresh_interval=datetime.timedelta(milliseconds=100)) self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a') - time.sleep(0.1) # token file doesn't exist self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a') # token file is empty self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a') # token refreshed self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer b') - time.sleep(0.1) # token refreshed self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer c') # no need to refresh token @@ -395,6 +396,7 @@ def test_delete_cluster(self): def test_watch(self): self.k.set_ttl(10) self.k.watch(None, 0) + self.k.event.set() self.k.watch('5', 0) def test_set_history_value(self): @@ -445,6 +447,7 @@ def test_update_leader(self, mock_patch_namespaced_endpoints): self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1' self.assertFalse(self.k.update_leader(cluster, '123')) + @patch('time.sleep', Mock()) @patch.object(k8s_client.CoreV1Api, 'read_namespaced_endpoints', create=True) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True) def test__update_leader_with_retry(self, mock_patch, mock_read): @@ -472,7 +475,13 @@ def test__update_leader_with_retry(self, mock_patch, mock_read): self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123') mock_read.side_effect = Exception self.assertFalse(self.k.update_leader(cluster, '123')) + # Ensure 403 is retried by _retry_403 + mock_patch.reset_mock() + mock_patch.side_effect = [k8s_client.rest.ApiException(403, ''), mock_namespaced_kind()] + self.assertTrue(self.k._update_leader_with_retry({}, '1', [])) + self.assertEqual(mock_patch.call_count, 2) + @patch('time.sleep', Mock()) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', Mock(side_effect=[k8s_client.rest.ApiException(500, ''), k8s_client.rest.ApiException(502, '')]), create=True) diff --git a/tests/test_patroni.py b/tests/test_patroni.py index 72caf74028..3cdcff6d36 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -34,6 +34,12 @@ def mock_import(*args, **kwargs): def mock_import2(*args, **kwargs): + ret = Mock() + ret.__version__ = '2.8.3.dev1 a b c' + return ret + + +def mock_import3(*args, **kwargs): if args[0] == 'psycopg2': raise ImportError ret = Mock() @@ -76,6 +82,8 @@ def test_validate_config(self): @patch('pkgutil.iter_importers', Mock(return_value=[MockFrozenImporter()])) @patch('urllib3.PoolManager.request', Mock(side_effect=Exception)) @patch('sys.frozen', Mock(return_value=True), create=True) + @patch('patroni.api.PatroniThreadPoolExecutor', Mock()) + @patch('patroni.thread_pool.PatroniThreadPoolExecutor', Mock()) @patch.object(HTTPServer, '__init__', Mock()) @patch.object(etcd.Client, 'read', etcd_read) @patch.object(Thread, 'start', Mock()) @@ -111,13 +119,17 @@ def test_apply_dynamic_configuration(self): @patch('sys.argv', ['patroni.py', 'postgres0.yml']) @patch('time.sleep', Mock(side_effect=SleepException)) + @patch('patroni.daemon.stack_size', Mock(side_effect=Exception)) @patch.object(etcd.Client, 'delete', Mock()) @patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379'])) @patch.object(Thread, 'join', Mock()) @patch.object(Postgresql, '_get_gucs', Mock(return_value={'foo': True, 'bar': True})) + @patch.object(Postgresql, '_wait_for_connection_close', Mock()) def test_patroni_patroni_main(self): with patch('subprocess.call', Mock(return_value=1)): with patch.object(Patroni, 'run', Mock(side_effect=SleepException)): + os.environ['PATRONI_THREAD_STACK_SIZE'] = 'a' + os.environ['PATRONI_THREAD_POOL_SIZE'] = 'a' os.environ['PATRONI_POSTGRESQL_DATA_DIR'] = 'data/test0' self.assertRaises(SleepException, _main) with patch.object(Patroni, 'run', Mock(side_effect=KeyboardInterrupt())): @@ -296,6 +308,8 @@ def test_check_psycopg(self): with patch('builtins.__import__', mock_import): self.assertIsNone(check_psycopg()) with patch('builtins.__import__', mock_import2): + self.assertIsNone(check_psycopg()) + with patch('builtins.__import__', mock_import3): self.assertRaises(SystemExit, check_psycopg) def test_ensure_unique_name(self): diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index a1726970bf..eabd95a14e 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -310,6 +310,7 @@ def test_check_recovery_conf(self, mock_get_pg_settings): self.p.config.write_postgresql_conf() self.assertEqual(self.p.config.check_recovery_conf(None), (False, False)) with patch.object(Postgresql, 'primary_conninfo', Mock(return_value='host=1')): + mock_get_pg_settings.return_value['primary_conninfo'][1] = 'host=1 dbname=postgres password=a' mock_get_pg_settings.return_value['primary_slot_name'] = [ 'primary_slot_name', '', '', 'string', 'postmaster', self.p.config._postgresql_conf] self.assertEqual(self.p.config.check_recovery_conf(None), (True, True)) @@ -489,6 +490,7 @@ def test_call_nowait(self): self.assertIsNone(self.p.call_nowait(CallbackAction.ON_START)) @patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster())) + @patch.object(Postgresql, '_wait_for_connection_close', Mock()) def test_is_primary_exception(self): self.p.start() self.p.query = Mock(side_effect=psycopg.OperationalError("not supported")) diff --git a/tests/test_postmaster.py b/tests/test_postmaster.py index 1075902d8b..9929760965 100644 --- a/tests/test_postmaster.py +++ b/tests/test_postmaster.py @@ -155,6 +155,7 @@ def test_wait_for_user_backends_to_close(self, mock_wait): @patch('os.setsid', Mock(), create=True) @patch('multiprocessing.Process', MockProcess) @patch('multiprocessing.get_context', Mock(return_value=multiprocessing), create=True) + @patch('os.environ', {'PG_MALLOC_ARENA_MAX': ''}) @patch.object(PostmasterProcess, 'from_pid') @patch.object(PostmasterProcess, '_from_pidfile') def test_start(self, mock_frompidfile, mock_frompid, mock_popen): diff --git a/tests/test_rewind.py b/tests/test_rewind.py index 14840ad995..1366f877eb 100644 --- a/tests/test_rewind.py +++ b/tests/test_rewind.py @@ -7,16 +7,6 @@ from . import BaseTestPostgresql, MockCursor, psycopg_connect -class MockThread(object): - - def __init__(self, target, args): - self._target = target - self._args = args - - def start(self): - self._target(*self._args) - - def mock_cancellable_call(*args, **kwargs): communicate = kwargs.pop('communicate', None) if isinstance(communicate, dict): @@ -322,11 +312,13 @@ def test_maybe_clean_pg_replslot(self, mock_logger): def test_ensure_clean_shutdown(self): self.assertIsNone(self.r.ensure_clean_shutdown()) - @patch('patroni.postgresql.rewind.Thread', MockThread) + @patch('patroni.thread_pool.get_executor') @patch.object(Postgresql, 'controldata') @patch.object(Postgresql, 'checkpoint') @patch.object(Postgresql, 'get_primary_timeline') - def test_ensure_checkpoint_after_promote(self, mock_get_primary_timeline, mock_checkpoint, mock_controldata): + def test_ensure_checkpoint_after_promote(self, mock_get_primary_timeline, + mock_checkpoint, mock_controldata, mock_executor): + mock_executor.return_value.submit = lambda f, p1, p2: f(p1, p2) mock_controldata.return_value = {"Latest checkpoint's TimeLineID": 1} mock_get_primary_timeline.return_value = 1 self.r.ensure_checkpoint_after_promote(Mock()) diff --git a/tests/test_slots.py b/tests/test_slots.py index c646072445..8a364562f8 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -24,13 +24,13 @@ def tags(self): @patch('subprocess.call', Mock(return_value=0)) @patch('patroni.psycopg.connect', psycopg_connect) -@patch.object(Thread, 'start', Mock()) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) class TestSlotsHandler(BaseTestPostgresql): @patch('subprocess.call', Mock(return_value=0)) @patch('os.rename', Mock()) @patch('patroni.postgresql.CallbackExecutor', Mock()) + @patch.object(Thread, 'start', Mock()) @patch.object(Postgresql, 'get_major_version', Mock(return_value=130000)) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) def setUp(self): @@ -229,14 +229,14 @@ def test__ensure_logical_slots_replica(self): with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 1, 499, 'b', 'a', 5, 100, 500)])), \ patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \ - patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))): + patch.object(SlotsAdvanceThread, 'schedule') as advance_mock: + advance_mock.return_value = (True, ['ls']) # copy invalidated slot with patch.object(psycopg.OperationalError, 'diag') as mock_diag: type(mock_diag).sqlstate = PropertyMock(return_value='58P01') self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls']) # advance slots based on the replay lsn value - with patch.object(Postgresql, 'replay_lsn', Mock(side_effect=[200, 700, 900])), \ - patch.object(SlotsHandler, 'schedule_advance_slots') as advance_mock: + with patch.object(Postgresql, 'replay_lsn', Mock(side_effect=[200, 700, 900])): self.s.sync_replication_slots(self.cluster, self.tags) advance_mock.assert_called_with(dict()) self.s.sync_replication_slots(self.cluster, self.tags) @@ -278,7 +278,7 @@ def test_check_logical_slots_readiness(self): @patch.object(Postgresql, 'start', Mock(return_value=True)) @patch.object(Postgresql, 'is_primary', Mock(return_value=False)) def test_on_promote(self): - self.s.schedule_advance_slots({'foo': {'bar': 100}}) + self.s._advance.schedule({'foo': {'bar': 100}}) self.s.copy_logical_slots(self.cluster, self.tags, ['ls']) self.s.on_promote() @@ -294,11 +294,11 @@ def test_slots_advance_thread(self): patch.object(psycopg.OperationalError, 'diag') as mock_diag: for err in ('58P01', '55000'): type(mock_diag).sqlstate = PropertyMock(return_value=err) - self.s.schedule_advance_slots({'foo': {'bar': 100}}) + self.s._advance.schedule({'foo': {'bar': 100}}) self.s._advance.sync_slots() self.assertEqual(self.s._advance._copy_slots, ["bar"]) # we don't want to make attempts to advance slots that are to be copied - self.s.schedule_advance_slots({'foo': {'bar': 101}}) + self.s._advance.schedule({'foo': {'bar': 101}}) self.assertEqual(self.s._advance._scheduled, {}) self.s._advance.clean() @@ -307,7 +307,7 @@ def test_slots_advance_thread(self): self.assertRaises(Exception, self.s._advance.run) with patch.object(SlotsHandler, 'get_local_connection_cursor', Mock(side_effect=Exception)): - self.s.schedule_advance_slots({'foo': {'bar': 100}}) + self.s._advance.schedule({'foo': {'bar': 100}}) self.s._advance.sync_slots() def test_advance_physical_primary(self): diff --git a/tox.ini b/tox.ini index f54caa7e41..83a16f2cda 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [common] -python_matrix = {36,37,38,39,310,311,312,313} +python_matrix = {36,37,38,39,310,311,312,313,314} postgres_matrix = pg11: PG_MAJOR = 11 pg12: PG_MAJOR = 12 @@ -7,8 +7,10 @@ postgres_matrix = pg14: PG_MAJOR = 14 pg15: PG_MAJOR = 15 pg16: PG_MAJOR = 16 + pg17: PG_MAJOR = 17 + pg18: PG_MAJOR = 18 psycopg_deps = - py{37,38,39,310,311,312,313}-{lin,win}: psycopg[binary] + py{37,38,39,310,311,312,313,314}-{lin,win}: psycopg[binary] mac: psycopg2-binary py36: psycopg2-binary platforms = @@ -44,7 +46,7 @@ commands = flake8 {posargs:patroni tests setup.py} deps = flake8 -[testenv:py{36,37,38,39,310,311,312,313}-test-{lin,win,mac}] +[testenv:py{36,37,38,39,310,311,312,313,314}-test-{lin,win,mac}] description = Run unit tests with pytest labels = test @@ -88,7 +90,7 @@ deps = pipdeptree {[common]psycopg_deps} -[testenv:py{37,38,39,310,311,312,313}-type-{lin,mac,win}] +[testenv:py{37,38,39,310,311,312,313,314}-type-{lin,mac,win}] description = Run static type checking with pyright labels = type @@ -106,7 +108,7 @@ description = Reformat code with black deps = black commands = black {posargs:patroni tests} -[testenv:pg{12,13,14,15,16}-docker-build] +[testenv:pg{12,13,14,15,16,17,18}-docker-build] description = Build docker containers needed for testing labels = behave @@ -124,7 +126,7 @@ commands = --file features/Dockerfile allowlist_externals = docker -[testenv:pg{12,13,14,15,16}-docker-behave-{etcd,etcd3}-{lin,mac}] +[testenv:pg{12,13,14,15,16,17,18}-docker-behave-{etcd,etcd3}-{lin,mac}] description = Run behaviour tests in patroni-dev docker container setenv = etcd: DCS=etcd @@ -134,10 +136,10 @@ setenv = labels = behave depends = - pg{11,12,13,14,15,16}-docker-build + pg{11,12,13,14,15,16,17,18}-docker-build # There's a bug which affects calling multiple envs on the command line -# This should be a valid command: tox -e 'py{36,37,38,39,310,311,312,313}-behave-{env:DCS}-lin' +# This should be a valid command: tox -e 'py{36,37,38,39,310,311,312,313,314}-behave-{env:DCS}-lin' # Replaced with workaround, see https://github.com/tox-dev/tox/issues/2850 commands = docker run \ @@ -159,7 +161,7 @@ platform = ; win: win32 mac: darwin -[testenv:py{36,38,39,310,311,312,313}-behave-{etcd,etcd3}-{lin,win,mac}] +[testenv:py{36,38,39,310,311,312,313,314}-behave-{etcd,etcd3}-{lin,win,mac}] description = Run behaviour tests (locally with tox) deps = -r requirements.txt