diff --git a/README.md b/README.md index 0357106..c20d7c4 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,8 @@ More flexible [PGMQ Postgres extension](https://github.com/tembo-io/pgmq) Python * [Getting Started](#getting-started) * [Postgres Setup](#postgres-setup) * [Usage](#usage) + * [Transaction Usage](#transaction-usage) + * [FastAPI Pub/Sub Example with tests](#fastapi-pubsub-example) * [Issue/ Contributing / Development](#issue-contributing--development) * [TODO](#todo) @@ -26,11 +28,12 @@ More flexible [PGMQ Postgres extension](https://github.com/tembo-io/pgmq) Python ## Features - Supports **async** and **sync** `engines` and `sessionmakers`, or built from `dsn`. -- **Automatically** creates `pgmq` (or `pg_partman`) extension on the database if not exists. - Supports **all postgres DBAPIs supported by sqlalchemy**. > e.g. `psycopg`, `psycopg2`, `asyncpg` ..
> See [SQLAlchemy Postgresql Dialects](https://docs.sqlalchemy.org/en/20/dialects/postgresql.html) - **Transaction-friendly operations** via the `op` module for combining PGMQ with your business logic in the same transaction. +- [Fully tested across all **supported DBAPIs** in both **async** and **sync** modes](https://github.com/jason810496/pgmq-sqlalchemy/actions/workflows/codecov.yml). +- Battle-tested with **[real-world FastAPI Pub/Sub examples](./examples/fastapi_pub_sub/README.md)** and **[corresponding tests](https://github.com/jason810496/pgmq-sqlalchemy/actions/workflows/examples.yml)**. ## Installation @@ -153,7 +156,21 @@ with SessionLocal() as session: > See [Transaction Usage Documentation](https://pgmq-sqlalchemy.readthedocs.io/en/latest/getting-started.html#using-transaction-friendly-operations) for more examples. +### FastAPI Pub/Sub Example with tests + +See the [FastAPI Pub/Sub Example](./examples/fastapi_pub_sub/README.md) for a complete example of using `pgmq-sqlalchemy` in a FastAPI application with asynchronous message consumption and tests. + ## Issue/ Contributing / Development Welcome to open an issue or pull request !
See [`Development` on Online Document](https://pgmq-sqlalchemy.readthedocs.io/en/latest/) or [CONTRIBUTING.md](.github/CONTRIBUTING.md) for more information. + +## TODO + +- [ ] [Alembic](https://alembic.sqlalchemy.org/en/latest/) compatible migration scripts for PGMQ extension and schema setup, upgrade, downgrade. +- [ ] Compatibility tests with PGMQ across different PGMQ versions. +- [ ] More examples +- [ ] Smoothen contributing process with custom script for one step setup +- [ ] Mypy strict type checking +- [ ] Enable more ruff rules +- [ ] Drop Python 3.9 support in next minor release \ No newline at end of file diff --git a/doc/example-with-fastapi-pub-sub.rst b/doc/example-with-fastapi-pub-sub.rst new file mode 100644 index 0000000..b32e75e --- /dev/null +++ b/doc/example-with-fastapi-pub-sub.rst @@ -0,0 +1,178 @@ +FastAPI Pub/Sub Example with PGMQ +================================= + +.. note:: + + You can find the complete code for this example in `examples/fastapi_pub_sub `_. + +This example demonstrates a real-world scenario of using PGMQ with +FastAPI for an order management system. It shows how to: + +- Use PGMQ with FastAPI and sync SQLAlchemy sessions (psycopg2) +- Publish messages using ``PGMQOperation`` (``op``) in a web API +- Consume messages asynchronously using ``PGMQueue`` with ``asyncpg`` + + +Scenario +-------- + +1. Client creates an order via the API. +2. The API saves the order to the database and publishes a message to + PGMQ within the same transaction. +3. The consumer polls the queue, processes messages, deletes the + message on success, or leaves it for retry on failure. + + - We **simulate** that if ``msg_id % 6 == 0``, it will fail twice + before succeeding, and if ``msg_id % 2 == 0``, it will fail once + before succeeding. + + +Architecture +------------ + +- **API Server** (``api.py``): FastAPI application that creates orders + and publishes them to PGMQ + + - Uses sync database driver (psycopg2) + - Uses ``PGMQOperation`` (imported as ``op``) for publishing messages + - Provides REST endpoints for creating and retrieving orders + + - ``POST /orders`` – Create a new order + - ``GET /orders/{order_id}`` – Get order by ID + - ``GET /health`` – Health check endpoint + +- **Consumer** (``consumer.py``): Async worker that processes orders + from the queue + + - Uses async database driver (``asyncpg``) + - Uses ``PGMQueue`` class for reading messages + - Processes messages concurrently with ``asyncio`` + +- **Create Orders Script** (``create_orders_coordinator.py``): Script + to create multiple orders in parallel via the API for testing + + +Prerequisites +------------- + +- PostgreSQL with PGMQ extension installed +- Python 3.10 or higher + +Quick setup:: + + docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest + +Install dependencies from the project root with ``uv``:: + + cd /path/to/pgmq-sqlalchemy + uv ync --group dev + + +Running the Example with Pytest +-------------------------------- + +You can run the entire example (API server, consumer, and order +creation) using the provided pytest script + +.. code-block:: bash + + cd /path/to/pgmq-sqlalchemy + uv run pytest ./examples_tests/integration/test_fastapi_integration.py -ss + +This command starts the API server and consumer, and creates orders +automatically. You will see **logs from all three processes in real time +in the terminal** + +.. code-block:: text + + ========================================================================================================================= test session starts ========================================================================================================================== + platform darwin -- Python 3.12.12, pytest-7.4.4, pluggy-1.6.0 + rootdir: /Users/jason/pgmq-sqlalchemy + configfile: pyproject.toml + plugins: asyncio-0.23.8, anyio-4.12.0, xdist-3.8.0, lazy-fixture-0.6.3, cov-7.0.0 + asyncio: mode=Mode.AUTO + collected 1 item + + examples_tests/integration/test_fastapi_integration.py + examples_tests/integration/test_fastapi_integration.py + Starting API process... + API process started with PID: 95531 + Starting Consumer process... + Consumer process started with PID: 95532 + Starting Create Orders process... + Create Orders process started with PID: 95533 + + + ╭───────────────────────────────── API process: 12653 ─────────────────────────────────╮╭────────────────────────────── Consumer process: 12654 ───────────────────────────────╮╭──────────────────────────── Create Orders process: 12655 ────────────────────────────╮ + │ Starting API process... ││ Starting Consumer process... ││ Starting Create Orders process... │ + │ INFO: 127.0.0.1:54358 - "GET /health HTTP/1.1" 200 OK ││ [2026-01-07 19:48:03][INFO] - Starting consumer for queue: order_queue ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ + │ INFO: Started server process [12653] ││ [2026-01-07 19:48:03][INFO] - Batch size: 30, Visibility timeout: 10s ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ + │ INFO: 127.0.0.1:54359 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:04][INFO] - Received 30 messages ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ + │ INFO: Waiting for application startup. ││ [2026-01-07 19:48:05][INFO] - Order 111 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ + │ INFO: 127.0.0.1:54360 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 102 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: GET http://localhost:8000/health │ + │ INFO: Application startup complete. ││ [2026-01-07 19:48:05][INFO] - Order 116 processed fail at first try ││ "HTTP/1.1 200 OK" │ + │ INFO: 127.0.0.1:54368 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 122 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is ready! │ + │ INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) ││ [2026-01-07 19:48:05][INFO] - Order 130 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54363 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 104 processed fail at first try ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54362 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 119 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54369 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 108 processed fail at first try ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54364 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 118 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54366 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 127 processed fail at first try ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54372 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 109 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54367 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 128 processed fail at first try ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54370 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 112 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54373 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 124 processed fail at first try ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54365 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 113 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54371 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 110 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54361 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 102 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54376 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 116 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54380 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 122 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54377 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 130 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54379 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 104 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + │ INFO: 127.0.0.1:54393 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 120 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ + │ INFO: 127.0.0.1:54394 - "POST /orders HTTP/1.1" 201 Created ││ ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ + ╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯ + +Configuration +------------- + +You can modify the following environment variables before running the +example: + +- ``DATABASE_URL``: PostgreSQL connection string +- ``QUEUE_NAME``: Name of the PGMQ queue (default: ``"order_queue"``) +- ``BATCH_SIZE``: Number of messages to process in each batch + (``consumer.py``) +- ``VT``: Visibility timeout in seconds (``consumer.py``) +- ``API_PORT``: Port for the FastAPI server (default: ``8000``) + + +How It Works +------------ + +1. When an order is created via the API: + + - The order is saved to the database and published to PGMQ **within + the same transaction** by using ``op.send()``. + - The message contains order details. + +2. The consumer: + + - Continuously polls the queue for new messages. + - Processes messages concurrently using ``asyncio``. + - Deletes successfully processed messages. + - Leaves failed messages in the queue for retry. + + +Testing +------- + +This example is covered by an integration test located at +`examples_tests/integration/test_fastapi_integration.py `_. It is run +end to end with ``pytest`` for every pull request to ensure correctness +and reliability. The GitHub Actions workflow configuration for running +the tests is located in `.github/workflows/examples.yml `_. + +You can refer to that workflow file for more details on how to set up +and run the tests. + diff --git a/doc/getting-started.rst b/doc/getting-started.rst index 3e0aa6c..d8a0d40 100644 --- a/doc/getting-started.rst +++ b/doc/getting-started.rst @@ -21,7 +21,6 @@ Or using **Docker Compose** to start Postgres with ``PGMQ`` extension: ``docker-compose.yml``: .. code-block:: yaml - version: '3.8' services: pgmq_postgres: container_name: pgmq_postgres @@ -264,4 +263,12 @@ Combining business logic with PGMQ operations in a single transaction: See `API Reference `_ for the complete list of available operations in the ``op`` module. +FastAPI Pub/Sub Example +----------------------- + +For a complete, real-world example that combines ``PGMQOperation`` (``op``) +with FastAPI and ``PGMQueue`` for asynchronous consumption, see +`FastAPI Pub/Sub Example with PGMQ `_. + + \ No newline at end of file diff --git a/doc/index.rst b/doc/index.rst index 7b09d46..e8dd7e2 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -26,11 +26,12 @@ Features -------- * Supports **async** and **sync** ``engines``, ``sessionmakers``, or directly constructed from ``dsn``. -* **Automatically** creates ``pgmq`` extension on the database if not exists. * Supports all Postgres DBAPIs supported by ``SQLAlchemy``. * Examples: ``psycopg``, ``psycopg2``, ``asyncpg`` * See `SQLAlchemy Postgresql Dialects `_ - +* **Transaction-friendly operations** via the `op` module for combining PGMQ with your business logic in the same transaction. +* `Fully tested across all supported DBAPIs in both async and sync modes `_. +* Battle-tested with `real-world FastAPI Pub/Sub examples `_ and `corresponding tests `_. Table of Contents ----------------- @@ -41,6 +42,7 @@ Table of Contents self installation getting-started + example-with-fastapi-pub-sub api-reference development release \ No newline at end of file diff --git a/examples/fastapi_pub_sub/README.md b/examples/fastapi_pub_sub/README.md index 3a54757..ff7cc42 100644 --- a/examples/fastapi_pub_sub/README.md +++ b/examples/fastapi_pub_sub/README.md @@ -6,107 +6,121 @@ This example demonstrates a real-world scenario of using PGMQ with FastAPI for a - Publish messages using `PGMQOperation` (op) in a web API - Consume messages asynchronously using `PGMQueue` with asyncpg + +- **Scenario**: + 1. Client creates an order via the API. + 2. The API saves the order to the database and publishes a message to PGMQ within the same transaction. + 3. The consumer polls the queue, processes messages, deletes the message on success, or leaves it for retry on failure. + - We **simulate** that if `msg_id` modulo 6 == 0, it will fail twice before succeeding, and if `msg_id` modulo 2 == 0, it will fail once before succeeding. + ## Architecture - **API Server (api.py)**: FastAPI application that creates orders and publishes them to PGMQ - Uses sync database driver (psycopg2) - Uses `PGMQOperation` (imported as `op`) for publishing messages - Provides REST endpoints for creating and retrieving orders + - `POST /orders` - Create a new order + - `GET /orders/{order_id}` - Get order by ID + - `GET /health` - Health check endpoint - **Consumer (consumer.py)**: Async worker that processes orders from the queue - Uses async database driver (asyncpg) - Uses `PGMQueue` class for reading messages - Processes messages concurrently with asyncio +- **Create Orders Script (create_orders_coordinator.py)**: Script to create multiple orders in parallel via the API for testing + ## Prerequisites - PostgreSQL with PGMQ extension installed -- Python 3.9 or higher +- Python 3.10 or higher Quick setup: ```bash docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest ``` -## Installation - -Install required dependencies using uv with the example's pyproject.toml: +Install dependencies from the project root with uv: ```bash -cd examples/fastapi_pub_sub -uv pip install -e . -``` - -Or install dependencies directly: - -```bash -uv pip install fastapi uvicorn psycopg2-binary asyncpg pgmq-sqlalchemy +cd /path/to/pgmq-sqlalchemy +uv sync --group dev ``` -Or install from the project root with uv: +## Running the Example with Pytest Script +You can run the entire example (API server, consumer, and order creation) using the provided pytest script: ```bash cd /path/to/pgmq-sqlalchemy -uv pip install -e ".[psycopg2-binary,asyncpg]" +uv run pytest ./examples_tests/integration/test_fastapi_integration.py -ss ``` -## Running the Example - -### 1. Start the API Server - -```bash -python api.py +This command starts the API server and consumer, and creates orders automatically. You will see logs from all three processes in real time from the terminal: ``` - -The API will be available at http://localhost:8000 - -### 2. Start the Consumer - -In a separate terminal: - -```bash -python consumer.py +========================================================================================================================= test session starts ========================================================================================================================== +platform darwin -- Python 3.12.12, pytest-7.4.4, pluggy-1.6.0 +rootdir: /Users/jason/pgmq-sqlalchemy +configfile: pyproject.toml +plugins: asyncio-0.23.8, anyio-4.12.0, xdist-3.8.0, lazy-fixture-0.6.3, cov-7.0.0 +asyncio: mode=Mode.AUTO +collected 1 item + +examples_tests/integration/test_fastapi_integration.py +examples_tests/integration/test_fastapi_integration.py +Starting API process... +API process started with PID: 95531 +Starting Consumer process... +Consumer process started with PID: 95532 +Starting Create Orders process... +Create Orders process started with PID: 95533 + + +╭───────────────────────────────── API process: 12653 ─────────────────────────────────╮╭────────────────────────────── Consumer process: 12654 ───────────────────────────────╮╭──────────────────────────── Create Orders process: 12655 ────────────────────────────╮ +│ Starting API process... ││ Starting Consumer process... ││ Starting Create Orders process... │ +│ INFO: 127.0.0.1:54358 - "GET /health HTTP/1.1" 200 OK ││ [2026-01-07 19:48:03][INFO] - Starting consumer for queue: order_queue ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ +│ INFO: Started server process [12653] ││ [2026-01-07 19:48:03][INFO] - Batch size: 30, Visibility timeout: 10s ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ +│ INFO: 127.0.0.1:54359 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:04][INFO] - Received 30 messages ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ +│ INFO: Waiting for application startup. ││ [2026-01-07 19:48:05][INFO] - Order 111 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ +│ INFO: 127.0.0.1:54360 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 102 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: GET http://localhost:8000/health │ +│ INFO: Application startup complete. ││ [2026-01-07 19:48:05][INFO] - Order 116 processed fail at first try ││ "HTTP/1.1 200 OK" │ +│ INFO: 127.0.0.1:54368 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 122 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is ready! │ +│ INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) ││ [2026-01-07 19:48:05][INFO] - Order 130 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54363 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 104 processed fail at first try ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54362 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 119 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54369 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 108 processed fail at first try ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54364 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 118 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54366 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 127 processed fail at first try ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54372 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 109 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54367 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 128 processed fail at first try ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54370 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 112 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54373 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 124 processed fail at first try ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54365 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 113 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54371 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 110 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54361 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 102 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54376 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 116 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54380 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 122 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54377 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 130 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54379 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 104 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +│ INFO: 127.0.0.1:54393 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 120 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ +│ INFO: 127.0.0.1:54394 - "POST /orders HTTP/1.1" 201 Created ││ ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ +╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯ ``` -### 3. Create Orders - -Create an order via the API: - -```bash -curl -X POST "http://localhost:8000/orders" \ - -H "Content-Type: application/json" \ - -d '{ - "customer_name": "John Doe", - "product_name": "Widget", - "quantity": 5, - "price": 29.99 - }' -``` - -You should see: -- The API returns the created order with a message ID -- The consumer logs show the order being processed - -### 4. View Order - -Get an order by ID: - -```bash -curl "http://localhost:8000/orders/1" -``` +## Configuration -## API Endpoints +You can modify the following environment variables before running the example: -- `POST /orders` - Create a new order -- `GET /orders/{order_id}` - Get order by ID -- `GET /health` - Health check endpoint +- `DATABASE_URL`: PostgreSQL connection string +- `QUEUE_NAME`: Name of the PGMQ queue (default: "order_queue") +- `BATCH_SIZE`: Number of messages to process in each batch (consumer.py) +- `VT`: Visibility timeout in seconds (consumer.py) +- `API_PORT`: Port for the FastAPI server (default: 8000) ## How It Works 1. When an order is created via the API: - - The order is saved to the database - - A message is published to PGMQ using `op.send()` - - The message contains order details + - The order is saved to the database and published to PGMQ **within the same transaction** by using `op.send()`. + - The message contains order details. 2. The consumer: - Continuously polls the queue for new messages @@ -114,11 +128,8 @@ curl "http://localhost:8000/orders/1" - Deletes successfully processed messages - Leaves failed messages in the queue for retry -## Configuration +## Testing -You can modify the following constants in the files: +This example is covered by an integration test located at `examples_tests/integration/test_fastapi_integration.py`. It is run end to end with pytest for every pull request to ensure correctness and reliability. The GitHub Actions workflow configuration for running the tests is located in `.github/workflows/examples.yml`. -- `DATABASE_URL`: PostgreSQL connection string -- `QUEUE_NAME`: Name of the PGMQ queue (default: "order_queue") -- `batch_size`: Number of messages to process in each batch (consumer.py) -- `vt`: Visibility timeout in seconds (consumer.py) +You can refer to that workflow file for more details on how to set up and run the tests. \ No newline at end of file diff --git a/examples/fastapi_pub_sub/api.py b/examples/fastapi_pub_sub/api.py index 06b0fad..92d6a4f 100644 --- a/examples/fastapi_pub_sub/api.py +++ b/examples/fastapi_pub_sub/api.py @@ -22,6 +22,7 @@ "DATABASE_URL", "postgresql+psycopg2://postgres:postgres@localhost:5432/postgres" ) QUEUE_NAME = os.getenv("QUEUE_NAME", "order_queue") +API_PORT = int(os.getenv("API_PORT", "8000")) # SQLAlchemy setup engine = create_engine(DATABASE_URL) @@ -207,4 +208,4 @@ def health_check(): if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) + uvicorn.run(app, host="0.0.0.0", port=API_PORT) diff --git a/examples/fastapi_pub_sub/create_orders_coordinator.py b/examples/fastapi_pub_sub/create_orders_coordinator.py index 59767c5..555a8a9 100644 --- a/examples/fastapi_pub_sub/create_orders_coordinator.py +++ b/examples/fastapi_pub_sub/create_orders_coordinator.py @@ -1,5 +1,6 @@ import time import logging +import os import httpx @@ -12,20 +13,27 @@ ) logger = logging.getLogger(__name__) +API_PORT = int(os.getenv("API_PORT", "8000")) +BASE_URL = f"http://localhost:{API_PORT}" + def wait_until_api_server_to_start() -> None: # Wait for the server to start - max_attempts = 30 - for _ in range(max_attempts): + start_time = time.time() + max_wait = 30 + while True: try: - response = httpx.get("http://localhost:8000/health", timeout=1) + response = httpx.get(f"{BASE_URL}/health", timeout=1) if response.status_code == 200: logger.info("API Server is ready!") return except Exception: - time.sleep(1) + time.sleep(0.05) logger.info("API Server is not ready...") + if (time.time() - start_time) > max_wait: + break + raise RuntimeError("API server failed to start") @@ -37,7 +45,7 @@ def create_order(order_num: int): "quantity": order_num % 10 + 1, "price": 10.0 + (order_num % 50), } - response = httpx.post("http://localhost:8000/orders", json=order_data, timeout=5) + response = httpx.post(f"{BASE_URL}/orders", json=order_data, timeout=5) return ( response.status_code == 201, response.json() if response.status_code == 201 else None,