|
| 1 | +FastAPI Pub/Sub Example with PGMQ |
| 2 | +================================= |
| 3 | + |
| 4 | +.. note:: |
| 5 | + |
| 6 | + You can find the complete code for this example in `examples/fastapi_pub_sub <https://github.com/jason810496/pgmq-sqlalchemy/tree/main/examples/fastapi_pub_sub>`_. |
| 7 | + |
| 8 | +This example demonstrates a real-world scenario of using PGMQ with |
| 9 | +FastAPI for an order management system. It shows how to: |
| 10 | + |
| 11 | +- Use PGMQ with FastAPI and sync SQLAlchemy sessions (psycopg2) |
| 12 | +- Publish messages using ``PGMQOperation`` (``op``) in a web API |
| 13 | +- Consume messages asynchronously using ``PGMQueue`` with ``asyncpg`` |
| 14 | + |
| 15 | + |
| 16 | +Scenario |
| 17 | +-------- |
| 18 | + |
| 19 | +1. Client creates an order via the API. |
| 20 | +2. The API saves the order to the database and publishes a message to |
| 21 | + PGMQ within the same transaction. |
| 22 | +3. The consumer polls the queue, processes messages, deletes the |
| 23 | + message on success, or leaves it for retry on failure. |
| 24 | + |
| 25 | + - We **simulate** that if ``msg_id % 6 == 0``, it will fail twice |
| 26 | + before succeeding, and if ``msg_id % 2 == 0``, it will fail once |
| 27 | + before succeeding. |
| 28 | + |
| 29 | + |
| 30 | +Architecture |
| 31 | +------------ |
| 32 | + |
| 33 | +- **API Server** (``api.py``): FastAPI application that creates orders |
| 34 | + and publishes them to PGMQ |
| 35 | + |
| 36 | + - Uses sync database driver (psycopg2) |
| 37 | + - Uses ``PGMQOperation`` (imported as ``op``) for publishing messages |
| 38 | + - Provides REST endpoints for creating and retrieving orders |
| 39 | + |
| 40 | + - ``POST /orders`` – Create a new order |
| 41 | + - ``GET /orders/{order_id}`` – Get order by ID |
| 42 | + - ``GET /health`` – Health check endpoint |
| 43 | + |
| 44 | +- **Consumer** (``consumer.py``): Async worker that processes orders |
| 45 | + from the queue |
| 46 | + |
| 47 | + - Uses async database driver (``asyncpg``) |
| 48 | + - Uses ``PGMQueue`` class for reading messages |
| 49 | + - Processes messages concurrently with ``asyncio`` |
| 50 | + |
| 51 | +- **Create Orders Script** (``create_orders_coordinator.py``): Script |
| 52 | + to create multiple orders in parallel via the API for testing |
| 53 | + |
| 54 | + |
| 55 | +Prerequisites |
| 56 | +------------- |
| 57 | + |
| 58 | +- PostgreSQL with PGMQ extension installed |
| 59 | +- Python 3.10 or higher |
| 60 | + |
| 61 | +Quick setup:: |
| 62 | + |
| 63 | + docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest |
| 64 | + |
| 65 | +Install dependencies from the project root with ``uv``:: |
| 66 | + |
| 67 | + cd /path/to/pgmq-sqlalchemy |
| 68 | + uv ync --group dev |
| 69 | + |
| 70 | + |
| 71 | +Running the Example with Pytest |
| 72 | +-------------------------------- |
| 73 | + |
| 74 | +You can run the entire example (API server, consumer, and order |
| 75 | +creation) using the provided pytest script |
| 76 | + |
| 77 | +.. code-block:: bash |
| 78 | +
|
| 79 | + cd /path/to/pgmq-sqlalchemy |
| 80 | + uv run pytest ./examples_tests/integration/test_fastapi_integration.py -ss |
| 81 | +
|
| 82 | +This command starts the API server and consumer, and creates orders |
| 83 | +automatically. You will see **logs from all three processes in real time |
| 84 | +in the terminal** |
| 85 | + |
| 86 | +.. code-block:: text |
| 87 | +
|
| 88 | + ========================================================================================================================= test session starts ========================================================================================================================== |
| 89 | + platform darwin -- Python 3.12.12, pytest-7.4.4, pluggy-1.6.0 |
| 90 | + rootdir: /Users/jason/pgmq-sqlalchemy |
| 91 | + configfile: pyproject.toml |
| 92 | + plugins: asyncio-0.23.8, anyio-4.12.0, xdist-3.8.0, lazy-fixture-0.6.3, cov-7.0.0 |
| 93 | + asyncio: mode=Mode.AUTO |
| 94 | + collected 1 item |
| 95 | +
|
| 96 | + examples_tests/integration/test_fastapi_integration.py |
| 97 | + examples_tests/integration/test_fastapi_integration.py |
| 98 | + Starting API process... |
| 99 | + API process started with PID: 95531 |
| 100 | + Starting Consumer process... |
| 101 | + Consumer process started with PID: 95532 |
| 102 | + Starting Create Orders process... |
| 103 | + Create Orders process started with PID: 95533 |
| 104 | +
|
| 105 | +
|
| 106 | + ╭───────────────────────────────── API process: 12653 ─────────────────────────────────╮╭────────────────────────────── Consumer process: 12654 ───────────────────────────────╮╭──────────────────────────── Create Orders process: 12655 ────────────────────────────╮ |
| 107 | + │ Starting API process... ││ Starting Consumer process... ││ Starting Create Orders process... │ |
| 108 | + │ INFO: 127.0.0.1:54358 - "GET /health HTTP/1.1" 200 OK ││ [2026-01-07 19:48:03][INFO] - Starting consumer for queue: order_queue ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ |
| 109 | + │ INFO: Started server process [12653] ││ [2026-01-07 19:48:03][INFO] - Batch size: 30, Visibility timeout: 10s ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ |
| 110 | + │ INFO: 127.0.0.1:54359 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:04][INFO] - Received 30 messages ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ |
| 111 | + │ INFO: Waiting for application startup. ││ [2026-01-07 19:48:05][INFO] - Order 111 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is not ready... │ |
| 112 | + │ INFO: 127.0.0.1:54360 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 102 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: GET http://localhost:8000/health │ |
| 113 | + │ INFO: Application startup complete. ││ [2026-01-07 19:48:05][INFO] - Order 116 processed fail at first try ││ "HTTP/1.1 200 OK" │ |
| 114 | + │ INFO: 127.0.0.1:54368 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 122 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - API Server is ready! │ |
| 115 | + │ INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) ││ [2026-01-07 19:48:05][INFO] - Order 130 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 116 | + │ INFO: 127.0.0.1:54363 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 104 processed fail at first try ││ "HTTP/1.1 201 Created" │ |
| 117 | + │ INFO: 127.0.0.1:54362 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 119 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 118 | + │ INFO: 127.0.0.1:54369 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 108 processed fail at first try ││ "HTTP/1.1 201 Created" │ |
| 119 | + │ INFO: 127.0.0.1:54364 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 118 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 120 | + │ INFO: 127.0.0.1:54366 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 127 processed fail at first try ││ "HTTP/1.1 201 Created" │ |
| 121 | + │ INFO: 127.0.0.1:54372 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 109 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 122 | + │ INFO: 127.0.0.1:54367 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 128 processed fail at first try ││ "HTTP/1.1 201 Created" │ |
| 123 | + │ INFO: 127.0.0.1:54370 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 112 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 124 | + │ INFO: 127.0.0.1:54373 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 124 processed fail at first try ││ "HTTP/1.1 201 Created" │ |
| 125 | + │ INFO: 127.0.0.1:54365 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][INFO] - Order 113 processed fail at first try ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 126 | + │ INFO: 127.0.0.1:54371 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 110 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ |
| 127 | + │ INFO: 127.0.0.1:54361 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 102 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 128 | + │ INFO: 127.0.0.1:54376 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 116 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ |
| 129 | + │ INFO: 127.0.0.1:54380 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 122 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 130 | + │ INFO: 127.0.0.1:54377 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 130 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ |
| 131 | + │ INFO: 127.0.0.1:54379 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 104 processing failed, will retry later ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 132 | + │ INFO: 127.0.0.1:54393 - "POST /orders HTTP/1.1" 201 Created ││ [2026-01-07 19:48:05][WARNING] - Message 120 processing failed, will retry later ││ "HTTP/1.1 201 Created" │ |
| 133 | + │ INFO: 127.0.0.1:54394 - "POST /orders HTTP/1.1" 201 Created ││ ││ [2026-01-07 19:48:03][INFO] - HTTP Request: POST http://localhost:8000/orders │ |
| 134 | + ╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯╰──────────────────────────────────────────────────────────────────────────────────────╯ |
| 135 | +
|
| 136 | +Configuration |
| 137 | +------------- |
| 138 | + |
| 139 | +You can modify the following environment variables before running the |
| 140 | +example: |
| 141 | + |
| 142 | +- ``DATABASE_URL``: PostgreSQL connection string |
| 143 | +- ``QUEUE_NAME``: Name of the PGMQ queue (default: ``"order_queue"``) |
| 144 | +- ``BATCH_SIZE``: Number of messages to process in each batch |
| 145 | + (``consumer.py``) |
| 146 | +- ``VT``: Visibility timeout in seconds (``consumer.py``) |
| 147 | +- ``API_PORT``: Port for the FastAPI server (default: ``8000``) |
| 148 | + |
| 149 | + |
| 150 | +How It Works |
| 151 | +------------ |
| 152 | + |
| 153 | +1. When an order is created via the API: |
| 154 | + |
| 155 | + - The order is saved to the database and published to PGMQ **within |
| 156 | + the same transaction** by using ``op.send()``. |
| 157 | + - The message contains order details. |
| 158 | + |
| 159 | +2. The consumer: |
| 160 | + |
| 161 | + - Continuously polls the queue for new messages. |
| 162 | + - Processes messages concurrently using ``asyncio``. |
| 163 | + - Deletes successfully processed messages. |
| 164 | + - Leaves failed messages in the queue for retry. |
| 165 | + |
| 166 | + |
| 167 | +Testing |
| 168 | +------- |
| 169 | + |
| 170 | +This example is covered by an integration test located at |
| 171 | +`examples_tests/integration/test_fastapi_integration.py <https://github.com/jason810496/pgmq-sqlalchemy/blob/main/examples_tests/integration/test_fastapi_integration.py>`_. It is run |
| 172 | +end to end with ``pytest`` for every pull request to ensure correctness |
| 173 | +and reliability. The GitHub Actions workflow configuration for running |
| 174 | +the tests is located in `.github/workflows/examples.yml <https://github.com/jason810496/pgmq-sqlalchemy/blob/main/.github/workflows/examples.yml>`_. |
| 175 | + |
| 176 | +You can refer to that workflow file for more details on how to set up |
| 177 | +and run the tests. |
| 178 | + |
0 commit comments