Skip to content

Add FastAPI pub/sub example with real-world PGMQ usage patterns#32

Merged
jason810496 merged 18 commits intomainfrom
copilot/add-fastapi-pub-sub-example
Jan 7, 2026
Merged

Add FastAPI pub/sub example with real-world PGMQ usage patterns#32
jason810496 merged 18 commits intomainfrom
copilot/add-fastapi-pub-sub-example

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Jan 3, 2026

Description

Adds a production-ready FastAPI example demonstrating PGMQ integration for order management with separate producer and consumer processes.

Implementation

  • examples/fastapi_pub_sub/api.py: FastAPI server using sync operations (psycopg2) with PGMQOperation (imported as op) for message publishing in request handlers
    • Implements atomic transactions: order creation and message publishing in same transaction
    • Includes /messages endpoint to query messages from PGMQ queue
    • Supports environment variables for DATABASE_URL and QUEUE_NAME
  • examples/fastapi_pub_sub/consumer.py: Async worker using asyncpg with PGMQueue instance methods for concurrent message processing
    • Properly initialized with event loop to avoid conflicts
    • Uses simplified pgmq.read_batch() and pgmq.delete() methods (no explicit session management needed)
    • Supports environment variables for DATABASE_URL and QUEUE_NAME
  • examples/fastapi_pub_sub/README.md: Setup and usage documentation with uv installation instructions
  • examples/fastapi_pub_sub/pyproject.toml: Project configuration for uv-based dependency management

Testing

  • examples_tests/unit/: Unit test suite covering API endpoints, message publishing, and async consumer processing
  • examples_tests/integration/: Integration test suite with subprocess-based tests
    • Spins up API and consumer as separate processes
    • Creates 100 orders in parallel via API
    • Verifies consumer processes all messages
  • .github/workflows/examples.yml: CI workflow triggered by changes to examples/, examples_tests/, or pgmq_sqlalchemy/ on Python 3.9-3.12
    • Uses -vss pytest flags for better test output visibility in CI

Example Usage

# Producer (sync in API) - atomic transaction
db.flush()  # Get order ID
msg_id = op.send(queue_name, order_data, session=db, commit=False)
db.commit()  # Commit both order and message together

# Consumer (async) - using PGMQueue instance with simplified API
messages = await pgmq.read_batch(queue_name, vt=30, batch_size=10)
deleted = await pgmq.delete(queue_name, msg_id)

Notes

  • Consumer uses PGMQueue instance with proper event loop initialization and simplified method calls
  • PGMQueue methods handle session management internally, no need for explicit context managers
  • API ensures transactional consistency between order creation and message publishing
  • FastAPI uses modern lifespan context manager (no deprecated on_event)
  • Example includes pyproject.toml for uv-based dependency management
  • All tests pass; CodeQL reports no vulnerabilities

Status

  • In progress
  • Ready for review
  • Done

Checklist

  • Read the Contributing Guide
  • Passes tests
  • Linted ( we use pre-commit with ruff )
  • Updated documentation
Original prompt

Add real world scenarios in examples dir

  • add examples/fastapi_pub_sub
    • api.py
      • just define 1 Order model
      • should use get_db dependency, use psycopg2 (normal sync session)
      • when creating order, also create order using op to create a message
      • etc
    • consumer.py
      • processing order with asyncio
      • use asyncpg
      • use PGMQueue instead of op
  • We should add corresponding tests at examples_tests to ensure the expected behavior
  • Add GitHub Action that if we change any files under examples or examples_tests or pgmq_sqlalchemy. Then we should run examples_tests

💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

Copilot AI changed the title [WIP] Add real world scenarios in examples directory Add FastAPI pub/sub example with real-world PGMQ usage patterns Jan 3, 2026
Copilot AI requested a review from jason810496 January 3, 2026 16:42
@jason810496 jason810496 force-pushed the copilot/add-fastapi-pub-sub-example branch from 629f070 to 0a6d22e Compare January 4, 2026 03:41
Copy link
Copy Markdown
Owner

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot Resolve all the review comments

Comment thread examples/fastapi_pub_sub/api.py Outdated
Comment on lines +108 to +129
# Create order in database
db_order = Order(
customer_name=order_data.customer_name,
product_name=order_data.product_name,
quantity=order_data.quantity,
price=order_data.price,
)
db.add(db_order)
db.commit()
db.refresh(db_order)

# Publish message to PGMQ using op
message_data = {
"order_id": db_order.id,
"customer_name": db_order.customer_name,
"product_name": db_order.product_name,
"quantity": db_order.quantity,
"price": db_order.price,
"created_at": db_order.created_at.isoformat(),
}

msg_id = op.send(QUEUE_NAME, message_data, session=db, commit=True)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The op.send should be in the same transaction as creating order.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Now using db.flush() to get the order ID, then op.send() with commit=False, and finally db.commit() to commit both operations in the same transaction.

Comment thread examples/fastapi_pub_sub/consumer.py Outdated
batch_size: Number of messages to read in each batch
vt: Visibility timeout in seconds
"""
from pgmq_sqlalchemy import op
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use method of pgmq instance instead of using op in this demo.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Updated to use pgmq.read_batch_async() instead of op.read_batch_async().

Comment thread examples/fastapi_pub_sub/consumer.py Outdated
Comment on lines +70 to +72
# Read a batch of messages
async with pgmq.session_maker() as session:
messages = await op.read_batch_async(QUEUE_NAME, vt=vt, batch_size=batch_size, session=session, commit=True)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the method of pgmq instance directly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Updated to use pgmq.delete_async() instead of op.delete_async().

Comment thread examples/fastapi_pub_sub/consumer.py Outdated
Comment on lines +118 to +130
# Note: Manual PGMQueue setup to avoid event loop conflicts
# PGMQueue.__init__ tries to run a nested event loop which conflicts
# with asyncio.run(). This is a known limitation when using PGMQueue
# in an async context manager like asyncio.run().
# For proper usage, consider using PGMQOperation methods directly with sessions.
pgmq = PGMQueue.__new__(PGMQueue)
pgmq.engine = async_engine
pgmq.session_maker = async_session_maker
pgmq.is_async = True
pgmq.delay = 0
pgmq.vt = 30
pgmq.loop = None
pgmq.is_pg_partman_ext_checked = True
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the constructor of PGMQueue again. We can now provide event loop to the PGMQueue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Now properly initializing PGMQueue with loop=asyncio.get_event_loop() to avoid event loop conflicts.

Comment thread examples/fastapi_pub_sub/consumer.py Outdated
Comment on lines +132 to +135
# Check PGMQ extension manually
async with async_session_maker() as session:
from pgmq_sqlalchemy import op
await op.check_pgmq_ext_async(session=session, commit=True)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check by op, use method of pgmq instance

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Removed manual extension check with op - now handled automatically by PGMQueue constructor.

Comment on lines +31 to +37
## Installation

Install required dependencies:

```bash
pip install -r requirements.txt
```
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use uv to manage the dependencies instead of requirements.txt

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Updated README to use uv pip install commands instead of pip with requirements.txt.

if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a route to query the msg from pgmq.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Added GET /messages?limit=10 endpoint to query messages from the PGMQ queue.


from pgmq_sqlalchemy import PGMQueue


Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

place this file under examples_tests/unit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Moved test file to examples_tests/unit/test_fastapi_pub_sub.py.

from sqlalchemy.orm import sessionmaker

from pgmq_sqlalchemy import PGMQueue

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another test under examples_tests/integration

  1. api_instance fixture, module level, auto use, that use subprocess to spin up the api.py and should teardown the subprocess when finish.
  2. consumer_instance fixture, module level, auto use, and that use subprocess to spin up the consumer.py and should teardown the subprocess when finish.
  3. The test should create 100 order parallely by calling api
  4. The test should wait until consumer process the all the orders

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 0d2e4bf. Added examples_tests/integration/test_fastapi_integration.py with:

  • api_instance fixture that spins up API server subprocess
  • consumer_instance fixture that spins up consumer subprocess
  • Test that creates 100 orders in parallel via the API
  • Test waits for consumer to process all messages and verifies queue is empty

Copilot AI requested a review from jason810496 January 4, 2026 04:28
@jason810496 jason810496 marked this pull request as ready for review January 4, 2026 04:29
@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 4, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Owner

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread examples/fastapi_pub_sub/consumer.py Outdated
Comment on lines +70 to +71
async with pgmq.session_maker() as session:
messages = await pgmq.read_batch_async(QUEUE_NAME, vt=vt, batch_size=batch_size, session=session, commit=True)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with pgmq.session_maker() as session:
messages = await pgmq.read_batch_async(QUEUE_NAME, vt=vt, batch_size=batch_size, session=session, commit=True)
messages = pgmq.read_batch(QUEUE_NAME, vt=vt, batch_size=batch_size)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 01992d8. Simplified to use messages = await pgmq.read_batch(QUEUE_NAME, vt=vt, batch_size=batch_size) without explicit session management.

Comment thread examples/fastapi_pub_sub/consumer.py Outdated
Comment on lines +92 to +93
async with pgmq.session_maker() as session:
deleted = await pgmq.delete_async(QUEUE_NAME, msg_id, session=session, commit=True)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with pgmq.session_maker() as session:
deleted = await pgmq.delete_async(QUEUE_NAME, msg_id, session=session, commit=True)
deleted = await pgmq.delete(QUEUE_NAME, msg_id)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 01992d8. Simplified to use deleted = await pgmq.delete(QUEUE_NAME, msg_id) without explicit session management.

Comment on lines +1 to +5
fastapi>=0.104.0
uvicorn>=0.24.0
psycopg2-binary>=2.9.9
asyncpg>=0.29.0
pgmq-sqlalchemy>=0.1.2
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a uv based pyproject.toml under examples/fastapi_pub_sub

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 01992d8. Added examples/fastapi_pub_sub/pyproject.toml with project metadata and dependencies for uv-based dependency management. Also updated README with installation instructions.

Comment thread .github/workflows/examples.yml Outdated

- name: Run examples tests
run: |
uv run pytest examples_tests --cov=examples --cov-report=xml:coverage-examples-py${{ matrix.python-version }}.xml -v
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot

Suggested change
uv run pytest examples_tests --cov=examples --cov-report=xml:coverage-examples-py${{ matrix.python-version }}.xml -v
uv run pytest examples_tests --cov=examples --cov-report=xml:coverage-examples-py${{ matrix.python-version }}.xml -vss

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 71a5868. Updated pytest flags to -vss for better test output visibility in CI.

Distinguish sync and async operations in PGMQueue

- Introduced a pre-commit hook to check for missing async methods in PGMQueue.
- Added scripts to identify and generate missing async methods.
- Created utility functions for AST manipulation and method transformation.
- Established configuration for project paths and console output.
@jason810496 jason810496 force-pushed the copilot/add-fastapi-pub-sub-example branch from d6fc36e to 20e2fba Compare January 5, 2026 07:50
- Update FastAPI API to improve response models and logging.
- Introduce create_orders_coordinator.py for parallel order creation.
- Enhance consumer.py with verbose logging and improved error handling.
- Modify integration tests to support new features and improve readability.
@jason810496 jason810496 merged commit 85e1740 into main Jan 7, 2026
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants