Skip to content

Commit 85e1740

Browse files
Copilotjason810496
andauthored
Add FastAPI pub/sub example with real-world PGMQ usage patterns (#32)
* Initial plan * Add FastAPI pub/sub example with tests and GitHub Action Co-authored-by: jason810496 <[email protected]> * Fix tests for FastAPI pub/sub example Co-authored-by: jason810496 <[email protected]> * Fix consumer to work with async event loop properly Co-authored-by: jason810496 <[email protected]> * Address code review feedback: use lifespan, improve exception handling Co-authored-by: jason810496 <[email protected]> * Add workflow permissions to examples GitHub Action Co-authored-by: jason810496 <[email protected]> * Address review comments: fix transaction, use pgmq methods, add messages endpoint, reorganize tests Co-authored-by: jason810496 <[email protected]> * Fix code review issues: move imports to top of file and remove unused imports Co-authored-by: jason810496 <[email protected]> * Simplify consumer code and add pyproject.toml for example Co-authored-by: jason810496 <[email protected]> * Update pytest flags to -vss for better test output visibility Co-authored-by: jason810496 <[email protected]> * Add pre-commit hooks and scripts for async method checks in PGMQueue Distinguish sync and async operations in PGMQueue - Introduced a pre-commit hook to check for missing async methods in PGMQueue. - Added scripts to identify and generate missing async methods. - Created utility functions for AST manipulation and method transformation. - Established configuration for project paths and console output. * WIP * Add MultiSubprocessesRenderer utils * Fix MultiSubprocessesRenderer not rendering issue * Add timeout arg and verify stop_condition_callable * Refactor FastAPI example and add create orders coordinator script - Update FastAPI API to improve response models and logging. - Introduce create_orders_coordinator.py for parallel order creation. - Enhance consumer.py with verbose logging and improved error handling. - Modify integration tests to support new features and improve readability. * Remove Python 3.9 from examples workflow matrix --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: jason810496 <[email protected]> Co-authored-by: LIU ZHE YOU <[email protected]>
1 parent 8a63206 commit 85e1740

18 files changed

Lines changed: 1861 additions & 33 deletions

File tree

.github/workflows/codecov.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
curl -LsSf https://astral.sh/uv/install.sh | sh
3434
echo "$HOME/.local/bin" >> $GITHUB_PATH
3535
- name: Install dependencies
36-
run: uv sync --extra dev
36+
run: uv sync --group postgresql-drivers --group test
3737
- name: Start PostgreSQL
3838
run: |
3939
cp pgmq_postgres.template.env pgmq_postgres.env

.github/workflows/examples.yml

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
name: Examples Tests
2+
3+
on:
4+
push:
5+
branches: [main, develop]
6+
paths:
7+
- 'examples/**'
8+
- 'examples_tests/**'
9+
- 'pgmq_sqlalchemy/**'
10+
- '.github/workflows/examples.yml'
11+
pull_request:
12+
branches: [main, develop]
13+
paths:
14+
- 'examples/**'
15+
- 'examples_tests/**'
16+
- 'pgmq_sqlalchemy/**'
17+
- '.github/workflows/examples.yml'
18+
19+
jobs:
20+
test-examples:
21+
runs-on: ubuntu-latest
22+
23+
permissions:
24+
contents: read
25+
26+
strategy:
27+
matrix:
28+
python-version: ["3.10", "3.11", "3.12"]
29+
30+
name: Test Examples (Python ${{ matrix.python-version }})
31+
32+
steps:
33+
- uses: actions/checkout@v4
34+
35+
- name: Set up Python ${{ matrix.python-version }}
36+
uses: actions/setup-python@v5
37+
with:
38+
python-version: ${{ matrix.python-version }}
39+
40+
- name: Install uv
41+
run: |
42+
curl -LsSf https://astral.sh/uv/install.sh | sh
43+
echo "$HOME/.local/bin" >> $GITHUB_PATH
44+
45+
- name: Install dependencies
46+
run: |
47+
uv sync --all-groups --no-group docs
48+
49+
- name: Start PostgreSQL
50+
run: |
51+
cp pgmq_postgres.template.env pgmq_postgres.env
52+
cp pgmq_tests.template.env pgmq_tests.env
53+
make start-db
54+
55+
- name: Setup database for examples tests
56+
run: |
57+
docker compose exec -T pgmq_postgres psql -U postgres -c "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;"
58+
59+
- name: Run examples tests
60+
run: |
61+
uv run pytest examples_tests --cov=examples --cov-report=xml:coverage-examples-py${{ matrix.python-version }}.xml -vss
62+
63+
- name: Upload coverage artifact
64+
if: always()
65+
uses: actions/upload-artifact@v4
66+
with:
67+
name: coverage-examples-py${{ matrix.python-version }}
68+
path: coverage-examples-py${{ matrix.python-version }}.xml
69+
retention-days: 1
70+
71+
- name: Cleanup
72+
if: always()
73+
run: |
74+
docker compose down

examples/fastapi_pub_sub/README.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# FastAPI Pub/Sub Example with PGMQ
2+
3+
This example demonstrates a real-world scenario of using PGMQ with FastAPI for an order management system. It shows how to:
4+
5+
- Use PGMQ with FastAPI and sync SQLAlchemy sessions (psycopg2)
6+
- Publish messages using `PGMQOperation` (op) in a web API
7+
- Consume messages asynchronously using `PGMQueue` with asyncpg
8+
9+
## Architecture
10+
11+
- **API Server (api.py)**: FastAPI application that creates orders and publishes them to PGMQ
12+
- Uses sync database driver (psycopg2)
13+
- Uses `PGMQOperation` (imported as `op`) for publishing messages
14+
- Provides REST endpoints for creating and retrieving orders
15+
16+
- **Consumer (consumer.py)**: Async worker that processes orders from the queue
17+
- Uses async database driver (asyncpg)
18+
- Uses `PGMQueue` class for reading messages
19+
- Processes messages concurrently with asyncio
20+
21+
## Prerequisites
22+
23+
- PostgreSQL with PGMQ extension installed
24+
- Python 3.9 or higher
25+
26+
Quick setup:
27+
```bash
28+
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
29+
```
30+
31+
## Installation
32+
33+
Install required dependencies using uv with the example's pyproject.toml:
34+
35+
```bash
36+
cd examples/fastapi_pub_sub
37+
uv pip install -e .
38+
```
39+
40+
Or install dependencies directly:
41+
42+
```bash
43+
uv pip install fastapi uvicorn psycopg2-binary asyncpg pgmq-sqlalchemy
44+
```
45+
46+
Or install from the project root with uv:
47+
48+
```bash
49+
cd /path/to/pgmq-sqlalchemy
50+
uv pip install -e ".[psycopg2-binary,asyncpg]"
51+
```
52+
53+
## Running the Example
54+
55+
### 1. Start the API Server
56+
57+
```bash
58+
python api.py
59+
```
60+
61+
The API will be available at http://localhost:8000
62+
63+
### 2. Start the Consumer
64+
65+
In a separate terminal:
66+
67+
```bash
68+
python consumer.py
69+
```
70+
71+
### 3. Create Orders
72+
73+
Create an order via the API:
74+
75+
```bash
76+
curl -X POST "http://localhost:8000/orders" \
77+
-H "Content-Type: application/json" \
78+
-d '{
79+
"customer_name": "John Doe",
80+
"product_name": "Widget",
81+
"quantity": 5,
82+
"price": 29.99
83+
}'
84+
```
85+
86+
You should see:
87+
- The API returns the created order with a message ID
88+
- The consumer logs show the order being processed
89+
90+
### 4. View Order
91+
92+
Get an order by ID:
93+
94+
```bash
95+
curl "http://localhost:8000/orders/1"
96+
```
97+
98+
## API Endpoints
99+
100+
- `POST /orders` - Create a new order
101+
- `GET /orders/{order_id}` - Get order by ID
102+
- `GET /health` - Health check endpoint
103+
104+
## How It Works
105+
106+
1. When an order is created via the API:
107+
- The order is saved to the database
108+
- A message is published to PGMQ using `op.send()`
109+
- The message contains order details
110+
111+
2. The consumer:
112+
- Continuously polls the queue for new messages
113+
- Processes messages concurrently using asyncio
114+
- Deletes successfully processed messages
115+
- Leaves failed messages in the queue for retry
116+
117+
## Configuration
118+
119+
You can modify the following constants in the files:
120+
121+
- `DATABASE_URL`: PostgreSQL connection string
122+
- `QUEUE_NAME`: Name of the PGMQ queue (default: "order_queue")
123+
- `batch_size`: Number of messages to process in each batch (consumer.py)
124+
- `vt`: Visibility timeout in seconds (consumer.py)

0 commit comments

Comments
 (0)