Skip to content

Commit 8fcf18f

Browse files
committed
Enhance documentation and examples for FastAPI Pub/Sub integration with PGMQ
- Added a new FastAPI Pub/Sub example documentation. - Updated README to include links to transaction usage and FastAPI example. - Improved getting started guide with FastAPI Pub/Sub reference. - Refactored example scripts to use configurable API port. - Enhanced integration tests for FastAPI example.
1 parent 85e1740 commit 8fcf18f

7 files changed

Lines changed: 302 additions & 78 deletions

File tree

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@ More flexible [PGMQ Postgres extension](https://github.com/tembo-io/pgmq) Python
1919
* [Getting Started](#getting-started)
2020
* [Postgres Setup](#postgres-setup)
2121
* [Usage](#usage)
22+
* [Transaction Usage](#transaction-usage)
23+
* [FastAPI Pub/Sub Example with tests](#fastapi-pubsub-example)
2224
* [Issue/ Contributing / Development](#issue-contributing--development)
2325
* [TODO](#todo)
2426

2527

2628
## Features
2729

2830
- Supports **async** and **sync** `engines` and `sessionmakers`, or built from `dsn`.
29-
- **Automatically** creates `pgmq` (or `pg_partman`) extension on the database if not exists.
3031
- Supports **all postgres DBAPIs supported by sqlalchemy**.
3132
> e.g. `psycopg`, `psycopg2`, `asyncpg` .. <br>
3233
> See [SQLAlchemy Postgresql Dialects](https://docs.sqlalchemy.org/en/20/dialects/postgresql.html)
3334
- **Transaction-friendly operations** via the `op` module for combining PGMQ with your business logic in the same transaction.
35+
- [Fully tested across all **supported DBAPIs** in both **async** and **sync** modes](https://github.com/jason810496/pgmq-sqlalchemy/actions/workflows/codecov.yml).
36+
- Battle-tested with **[real-world FastAPI Pub/Sub examples](./examples/fastapi_pub_sub/README.md)** and **[corresponding tests](https://github.com/jason810496/pgmq-sqlalchemy/actions/workflows/examples.yml)**.
3437

3538
## Installation
3639

@@ -153,7 +156,21 @@ with SessionLocal() as session:
153156

154157
> See [Transaction Usage Documentation](https://pgmq-sqlalchemy.readthedocs.io/en/latest/getting-started.html#using-transaction-friendly-operations) for more examples.
155158
159+
### FastAPI Pub/Sub Example with tests
160+
161+
See the [FastAPI Pub/Sub Example](./examples/fastapi_pub_sub/README.md) for a complete example of using `pgmq-sqlalchemy` in a FastAPI application with asynchronous message consumption and tests.
162+
156163
## Issue/ Contributing / Development
157164

158165
Welcome to open an issue or pull request ! <br>
159166
See [`Development` on Online Document](https://pgmq-sqlalchemy.readthedocs.io/en/latest/) or [CONTRIBUTING.md](.github/CONTRIBUTING.md) for more information.
167+
168+
## TODO
169+
170+
- [ ] [Alembic](https://alembic.sqlalchemy.org/en/latest/) compatible migration scripts for PGMQ extension and schema setup, upgrade, downgrade.
171+
- [ ] Compatibility tests with PGMQ across different PGMQ versions.
172+
- [ ] More examples
173+
- [ ] Smoothen contributing process with custom script for one step setup
174+
- [ ] Mypy strict type checking
175+
- [ ] Enable more ruff rules
176+
- [ ] Drop Python 3.9 support in next minor release
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
FastAPI Pub/Sub Example with PGMQ
2+
=================================
3+
4+
.. note::
5+
6+
You can find the complete code for this example in `examples/fastapi_pub_sub <https://github.com/jason810496/pgmq-sqlalchemy/tree/main/examples/fastapi_pub_sub>`_.
7+
8+
This example demonstrates a real-world scenario of using PGMQ with
9+
FastAPI for an order management system. It shows how to:
10+
11+
- Use PGMQ with FastAPI and sync SQLAlchemy sessions (psycopg2)
12+
- Publish messages using ``PGMQOperation`` (``op``) in a web API
13+
- Consume messages asynchronously using ``PGMQueue`` with ``asyncpg``
14+
15+
16+
Scenario
17+
--------
18+
19+
1. Client creates an order via the API.
20+
2. The API saves the order to the database and publishes a message to
21+
PGMQ within the same transaction.
22+
3. The consumer polls the queue, processes messages, deletes the
23+
message on success, or leaves it for retry on failure.
24+
25+
- We **simulate** that if ``msg_id % 6 == 0``, it will fail twice
26+
before succeeding, and if ``msg_id % 2 == 0``, it will fail once
27+
before succeeding.
28+
29+
30+
Architecture
31+
------------
32+
33+
- **API Server** (``api.py``): FastAPI application that creates orders
34+
and publishes them to PGMQ
35+
36+
- Uses sync database driver (psycopg2)
37+
- Uses ``PGMQOperation`` (imported as ``op``) for publishing messages
38+
- Provides REST endpoints for creating and retrieving orders
39+
40+
- ``POST /orders`` – Create a new order
41+
- ``GET /orders/{order_id}`` – Get order by ID
42+
- ``GET /health`` – Health check endpoint
43+
44+
- **Consumer** (``consumer.py``): Async worker that processes orders
45+
from the queue
46+
47+
- Uses async database driver (``asyncpg``)
48+
- Uses ``PGMQueue`` class for reading messages
49+
- Processes messages concurrently with ``asyncio``
50+
51+
- **Create Orders Script** (``create_orders_coordinator.py``): Script
52+
to create multiple orders in parallel via the API for testing
53+
54+
55+
Prerequisites
56+
-------------
57+
58+
- PostgreSQL with PGMQ extension installed
59+
- Python 3.10 or higher
60+
61+
Quick setup::
62+
63+
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
64+
65+
Install dependencies from the project root with ``uv``::
66+
67+
cd /path/to/pgmq-sqlalchemy
68+
uv ync --group dev
69+
70+
71+
Running the Example with Pytest
72+
--------------------------------
73+
74+
You can run the entire example (API server, consumer, and order
75+
creation) using the provided pytest script
76+
77+
.. code-block:: bash
78+
79+
cd /path/to/pgmq-sqlalchemy
80+
uv run pytest ./examples_tests/integration/test_fastapi_integration.py -ss
81+
82+
This command starts the API server and consumer, and creates orders
83+
automatically. You will see **logs from all three processes in real time
84+
in the terminal**
85+
86+
.. code-block:: text
87+
88+
========================================================================================================================= test session starts ==========================================================================================================================
89+
platform darwin -- Python 3.12.12, pytest-7.4.4, pluggy-1.6.0
90+
rootdir: /Users/jason/pgmq-sqlalchemy
91+
configfile: pyproject.toml
92+
plugins: asyncio-0.23.8, anyio-4.12.0, xdist-3.8.0, lazy-fixture-0.6.3, cov-7.0.0
93+
asyncio: mode=Mode.AUTO
94+
collected 1 item
95+
96+
examples_tests/integration/test_fastapi_integration.py
97+
examples_tests/integration/test_fastapi_integration.py
98+
Starting API process...
99+
API process started with PID: 95531
100+
Starting Consumer process...
101+
Consumer process started with PID: 95532
102+
Starting Create Orders process...
103+
Create Orders process started with PID: 95533
104+
105+
106+
╭───────────────────────────────── API process: 12653 ─────────────────────────────────╮╭────────────────────────────── Consumer process: 12654 ───────────────────────────────╮╭──────────────────────────── Create Orders process: 12655 ────────────────────────────╮
107+
│ Starting API process... ││ Starting Consumer process... ││ Starting Create Orders process... │
108+
│ INFO: 127.0.0.1:54358 - "GET /health HTTP/1.1" 200 OK ││ [2026-01-07 19:48:03][INFO] - Starting consumer for queue: order_queue ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │
109+
│ INFO: Started server process [12653] ││ [2026-01-07 19:48:03][INFO] - Batch size: 30, Visibility timeout: 10s ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │
110+
│ INFO: 127.0.0.1:54359 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:04][INFO] - Received 30 messages ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │
111+
│ INFO: Waiting for application startup. ││ [2026-01-07 19:48:05][INFO] - Order 111 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │
112+
│ INFO: 127.0.0.1:54360 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 102 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: GET http://localhost:8000/health │
113+
│ INFO: Application startup complete. ││ [2026-01-07 19:48:05][INFO] - Order 116 processed fail at first try ││ "HTTP/1.1 200 OK" │
114+
│ INFO: 127.0.0.1:54368 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 122 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is ready! │
115+
│ INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) ││ [2026-01-07 19:48:05][INFO] - Order 130 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
116+
│ INFO: 127.0.0.1:54363 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 104 processed fail at first try ││ "HTTP/1.1 201 Created" │
117+
│ INFO: 127.0.0.1:54362 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 119 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
118+
│ INFO: 127.0.0.1:54369 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 108 processed fail at first try ││ "HTTP/1.1 201 Created" │
119+
│ INFO: 127.0.0.1:54364 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 118 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
120+
│ INFO: 127.0.0.1:54366 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 127 processed fail at first try ││ "HTTP/1.1 201 Created" │
121+
│ INFO: 127.0.0.1:54372 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 109 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
122+
│ INFO: 127.0.0.1:54367 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 128 processed fail at first try ││ "HTTP/1.1 201 Created" │
123+
│ INFO: 127.0.0.1:54370 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 112 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
124+
│ INFO: 127.0.0.1:54373 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 124 processed fail at first try ││ "HTTP/1.1 201 Created" │
125+
│ INFO: 127.0.0.1:54365 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 113 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
126+
│ INFO: 127.0.0.1:54371 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 110 processing failed, will retry later ││ "HTTP/1.1 201 Created" │
127+
│ INFO: 127.0.0.1:54361 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 102 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
128+
│ INFO: 127.0.0.1:54376 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 116 processing failed, will retry later ││ "HTTP/1.1 201 Created" │
129+
│ INFO: 127.0.0.1:54380 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 122 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
130+
│ INFO: 127.0.0.1:54377 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 130 processing failed, will retry later ││ "HTTP/1.1 201 Created" │
131+
│ INFO: 127.0.0.1:54379 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 104 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
132+
│ INFO: 127.0.0.1:54393 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 120 processing failed, will retry later ││ "HTTP/1.1 201 Created" │
133+
│ INFO: 127.0.0.1:54394 - "POST /orders HTTP/1.1" 201 Created ││ ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │
134+
╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯
135+
136+
Configuration
137+
-------------
138+
139+
You can modify the following environment variables before running the
140+
example:
141+
142+
- ``DATABASE_URL``: PostgreSQL connection string
143+
- ``QUEUE_NAME``: Name of the PGMQ queue (default: ``"order_queue"``)
144+
- ``BATCH_SIZE``: Number of messages to process in each batch
145+
(``consumer.py``)
146+
- ``VT``: Visibility timeout in seconds (``consumer.py``)
147+
- ``API_PORT``: Port for the FastAPI server (default: ``8000``)
148+
149+
150+
How It Works
151+
------------
152+
153+
1. When an order is created via the API:
154+
155+
- The order is saved to the database and published to PGMQ **within
156+
the same transaction** by using ``op.send()``.
157+
- The message contains order details.
158+
159+
2. The consumer:
160+
161+
- Continuously polls the queue for new messages.
162+
- Processes messages concurrently using ``asyncio``.
163+
- Deletes successfully processed messages.
164+
- Leaves failed messages in the queue for retry.
165+
166+
167+
Testing
168+
-------
169+
170+
This example is covered by an integration test located at
171+
`examples_tests/integration/test_fastapi_integration.py <https://github.com/jason810496/pgmq-sqlalchemy/blob/main/examples_tests/integration/test_fastapi_integration.py>`_. It is run
172+
end to end with ``pytest`` for every pull request to ensure correctness
173+
and reliability. The GitHub Actions workflow configuration for running
174+
the tests is located in `.github/workflows/examples.yml <https://github.com/jason810496/pgmq-sqlalchemy/blob/main/.github/workflows/examples.yml>`_.
175+
176+
You can refer to that workflow file for more details on how to set up
177+
and run the tests.
178+

doc/getting-started.rst

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ Or using **Docker Compose** to start Postgres with ``PGMQ`` extension:
2121
``docker-compose.yml``:
2222
.. code-block:: yaml
2323
24-
version: '3.8'
2524
services:
2625
pgmq_postgres:
2726
container_name: pgmq_postgres
@@ -264,4 +263,12 @@ Combining business logic with PGMQ operations in a single transaction:
264263
See `API Reference <api-reference>`_ for the complete list of available operations in the ``op`` module.
265264

266265

266+
FastAPI Pub/Sub Example
267+
-----------------------
268+
269+
For a complete, real-world example that combines ``PGMQOperation`` (``op``)
270+
with FastAPI and ``PGMQueue`` for asynchronous consumption, see
271+
`FastAPI Pub/Sub Example with PGMQ <example-with-fastapi-pub-sub>`_.
272+
273+
267274

doc/index.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ Features
2626
--------
2727

2828
* Supports **async** and **sync** ``engines``, ``sessionmakers``, or directly constructed from ``dsn``.
29-
* **Automatically** creates ``pgmq`` extension on the database if not exists.
3029
* Supports all Postgres DBAPIs supported by ``SQLAlchemy``.
3130
* Examples: ``psycopg``, ``psycopg2``, ``asyncpg``
3231
* See `SQLAlchemy Postgresql Dialects <https://docs.sqlalchemy.org/en/20/dialects/postgresql.html>`_
33-
32+
* **Transaction-friendly operations** via the `op` module for combining PGMQ with your business logic in the same transaction.
33+
* `Fully tested across all supported DBAPIs in both async and sync modes <https://github.com/jason810496/pgmq-sqlalchemy/actions/workflows/codecov.yml>`_.
34+
* Battle-tested with `real-world FastAPI Pub/Sub examples <https://github.com/jason810496/pgmq-sqlalchemy/tree/main/examples/fastapi_pub_sub/README.md>`_ and `corresponding tests <https://github.com/jason810496/pgmq-sqlalchemy/actions/workflows/examples.yml>`_.
3435

3536
Table of Contents
3637
-----------------
@@ -41,6 +42,7 @@ Table of Contents
4142
self
4243
installation
4344
getting-started
45+
example-with-fastapi-pub-sub
4446
api-reference
4547
development
4648
release

0 commit comments

Comments
 (0)