Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
curl -LsSf https://astral.sh/uv/install.sh | sh
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Install dependencies
run: uv sync --extra dev
run: uv sync --group postgresql-drivers --group test
- name: Start PostgreSQL
run: |
cp pgmq_postgres.template.env pgmq_postgres.env
Expand Down
74 changes: 74 additions & 0 deletions .github/workflows/examples.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Examples Tests

on:
push:
branches: [main, develop]
paths:
- 'examples/**'
- 'examples_tests/**'
- 'pgmq_sqlalchemy/**'
- '.github/workflows/examples.yml'
pull_request:
branches: [main, develop]
paths:
- 'examples/**'
- 'examples_tests/**'
- 'pgmq_sqlalchemy/**'
- '.github/workflows/examples.yml'

jobs:
test-examples:
runs-on: ubuntu-latest

permissions:
contents: read

strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]

name: Test Examples (Python ${{ matrix.python-version }})

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install uv
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
echo "$HOME/.local/bin" >> $GITHUB_PATH

- name: Install dependencies
run: |
uv sync --all-groups --no-group docs

- name: Start PostgreSQL
run: |
cp pgmq_postgres.template.env pgmq_postgres.env
cp pgmq_tests.template.env pgmq_tests.env
make start-db

- name: Setup database for examples tests
run: |
docker compose exec -T pgmq_postgres psql -U postgres -c "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;"

- name: Run examples tests
run: |
uv run pytest examples_tests --cov=examples --cov-report=xml:coverage-examples-py${{ matrix.python-version }}.xml -vss

- name: Upload coverage artifact
if: always()
uses: actions/upload-artifact@v4
with:
name: coverage-examples-py${{ matrix.python-version }}
path: coverage-examples-py${{ matrix.python-version }}.xml
retention-days: 1

- name: Cleanup
if: always()
run: |
docker compose down
124 changes: 124 additions & 0 deletions examples/fastapi_pub_sub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# FastAPI Pub/Sub Example with PGMQ

This example demonstrates a real-world scenario of using PGMQ with FastAPI for an order management system. It shows how to:

- Use PGMQ with FastAPI and sync SQLAlchemy sessions (psycopg2)
- Publish messages using `PGMQOperation` (op) in a web API
- Consume messages asynchronously using `PGMQueue` with asyncpg

## Architecture

- **API Server (api.py)**: FastAPI application that creates orders and publishes them to PGMQ
- Uses sync database driver (psycopg2)
- Uses `PGMQOperation` (imported as `op`) for publishing messages
- Provides REST endpoints for creating and retrieving orders

- **Consumer (consumer.py)**: Async worker that processes orders from the queue
- Uses async database driver (asyncpg)
- Uses `PGMQueue` class for reading messages
- Processes messages concurrently with asyncio

## Prerequisites

- PostgreSQL with PGMQ extension installed
- Python 3.9 or higher

Quick setup:
```bash
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
```

## Installation

Install required dependencies using uv with the example's pyproject.toml:

```bash
cd examples/fastapi_pub_sub
uv pip install -e .
```

Or install dependencies directly:

```bash
uv pip install fastapi uvicorn psycopg2-binary asyncpg pgmq-sqlalchemy
```

Or install from the project root with uv:

```bash
cd /path/to/pgmq-sqlalchemy
uv pip install -e ".[psycopg2-binary,asyncpg]"
```
Comment on lines +31 to +51
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use uv to manage the dependencies instead of requirements.txt

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Updated README to use uv pip install commands instead of pip with requirements.txt.


## Running the Example

### 1. Start the API Server

```bash
python api.py
```

The API will be available at http://localhost:8000

### 2. Start the Consumer

In a separate terminal:

```bash
python consumer.py
```

### 3. Create Orders

Create an order via the API:

```bash
curl -X POST "http://localhost:8000/orders" \
-H "Content-Type: application/json" \
-d '{
"customer_name": "John Doe",
"product_name": "Widget",
"quantity": 5,
"price": 29.99
}'
```

You should see:
- The API returns the created order with a message ID
- The consumer logs show the order being processed

### 4. View Order

Get an order by ID:

```bash
curl "http://localhost:8000/orders/1"
```

## API Endpoints

- `POST /orders` - Create a new order
- `GET /orders/{order_id}` - Get order by ID
- `GET /health` - Health check endpoint

## How It Works

1. When an order is created via the API:
- The order is saved to the database
- A message is published to PGMQ using `op.send()`
- The message contains order details

2. The consumer:
- Continuously polls the queue for new messages
- Processes messages concurrently using asyncio
- Deletes successfully processed messages
- Leaves failed messages in the queue for retry

## Configuration

You can modify the following constants in the files:

- `DATABASE_URL`: PostgreSQL connection string
- `QUEUE_NAME`: Name of the PGMQ queue (default: "order_queue")
- `batch_size`: Number of messages to process in each batch (consumer.py)
- `vt`: Visibility timeout in seconds (consumer.py)
Loading