Skip to content

Commit aca65a3

Browse files
Copilotjason810496
andcommitted
Address code review feedback: use lifespan, improve exception handling
Co-authored-by: jason810496 <[email protected]>
1 parent a256a18 commit aca65a3

3 files changed

Lines changed: 42 additions & 30 deletions

File tree

examples/fastapi_pub_sub/api.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
- Creating orders and sending them to a message queue
77
"""
88
from typing import Generator
9-
from contextlib import contextmanager
9+
from contextlib import contextmanager, asynccontextmanager
1010

1111
from fastapi import FastAPI, Depends, HTTPException
12-
from pydantic import BaseModel
12+
from pydantic import BaseModel, ConfigDict
1313
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime
1414
from sqlalchemy.orm import Session, sessionmaker, declarative_base
1515
from datetime import datetime
@@ -47,6 +47,8 @@ class OrderCreate(BaseModel):
4747

4848

4949
class OrderResponse(BaseModel):
50+
model_config = ConfigDict(from_attributes=True)
51+
5052
id: int
5153
customer_name: str
5254
product_name: str
@@ -55,40 +57,41 @@ class OrderResponse(BaseModel):
5557
created_at: datetime
5658
message_id: int
5759

58-
class Config:
59-
from_attributes = True
60-
61-
62-
# FastAPI app
63-
app = FastAPI(title="Order Management with PGMQ")
64-
65-
66-
# Database dependency
67-
def get_db() -> Generator[Session, None, None]:
68-
"""Database session dependency."""
69-
db = SessionLocal()
70-
try:
71-
yield db
72-
finally:
73-
db.close()
74-
7560

76-
@app.on_event("startup")
77-
def startup_event():
61+
# Lifespan context manager for startup/shutdown
62+
@asynccontextmanager
63+
async def lifespan(app: FastAPI):
7864
"""Initialize database tables and PGMQ queue on startup."""
79-
# Create tables if they don't exist
65+
# Startup
8066
Base.metadata.create_all(bind=engine)
8167

8268
# Initialize PGMQ queue
8369
with SessionLocal() as session:
8470
op.check_pgmq_ext(session=session, commit=True)
8571

86-
# Create queue if it doesn't exist (will not fail if exists)
72+
# Create queue if it doesn't exist
8773
try:
8874
op.create_queue(QUEUE_NAME, session=session, commit=True)
8975
except Exception:
9076
# Queue might already exist, which is fine
9177
pass
78+
79+
yield
80+
81+
# Shutdown (if needed)
82+
83+
84+
# FastAPI app with lifespan
85+
app = FastAPI(title="Order Management with PGMQ", lifespan=lifespan)
86+
87+
# Database dependency
88+
def get_db() -> Generator[Session, None, None]:
89+
"""Database session dependency."""
90+
db = SessionLocal()
91+
try:
92+
yield db
93+
finally:
94+
db.close()
9295

9396

9497
@app.post("/orders", response_model=OrderResponse, status_code=201)

examples/fastapi_pub_sub/consumer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,11 @@ async def main():
115115
async_engine = create_async_engine(DATABASE_URL)
116116
async_session_maker = sessionmaker(bind=async_engine, class_=AsyncSession)
117117

118-
# Create PGMQueue instance manually to avoid event loop issues
118+
# Note: Manual PGMQueue setup to avoid event loop conflicts
119+
# PGMQueue.__init__ tries to run a nested event loop which conflicts
120+
# with asyncio.run(). This is a known limitation when using PGMQueue
121+
# in an async context manager like asyncio.run().
122+
# For proper usage, consider using PGMQOperation methods directly with sessions.
119123
pgmq = PGMQueue.__new__(PGMQueue)
120124
pgmq.engine = async_engine
121125
pgmq.session_maker = async_session_maker

examples_tests/test_fastapi_pub_sub.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ def setup_api_app(sync_database_url, test_queue_name):
4848
api.op.check_pgmq_ext(session=session, commit=True)
4949
try:
5050
api.op.create_queue(test_queue_name, session=session, commit=True)
51-
except Exception:
52-
pass
51+
except Exception as e:
52+
# Queue already exists from a previous test run
53+
import logging
54+
logging.warning(f"Could not create queue (may already exist): {e}")
5355

5456
yield api
5557

@@ -58,8 +60,9 @@ def setup_api_app(sync_database_url, test_queue_name):
5860
# Drop the test queue
5961
try:
6062
api.op.drop_queue(test_queue_name, session=session, commit=True)
61-
except Exception:
62-
pass
63+
except Exception as e:
64+
import logging
65+
logging.warning(f"Could not drop queue: {e}")
6366

6467
# Drop tables
6568
session.execute(text("DROP TABLE IF EXISTS orders CASCADE"))
@@ -169,8 +172,10 @@ async def test_consumer_processing(async_database_url, sync_database_url, test_q
169172
op.check_pgmq_ext(session=session, commit=True)
170173
try:
171174
op.create_queue(test_queue_name, session=session, commit=True)
172-
except Exception:
173-
pass # Queue might already exist
175+
except Exception as e:
176+
# Queue already exists from a previous test run
177+
import logging
178+
logging.warning(f"Could not create queue (may already exist): {e}")
174179

175180
test_message = {
176181
"order_id": 12345,

0 commit comments

Comments
 (0)