11"""Integration tests for FastAPI pub/sub example with subprocess."""
2- import asyncio
2+
33import os
44import subprocess
55import sys
66import time
77import signal
8+ import logging
9+
810import pytest
9- import requests
10- from sqlalchemy import create_engine , text
11+ import httpx
12+ from sqlalchemy import create_engine
1113from sqlalchemy .orm import sessionmaker
1214
1315from pgmq_sqlalchemy import op
1416
15-
16- @pytest .fixture (scope = "module" )
17- def examples_dir ():
18- """Return the path to the examples directory."""
19- return os .path .join (
20- os .path .dirname (os .path .dirname (os .path .dirname (__file__ ))),
21- "examples" ,
22- "fastapi_pub_sub"
23- )
17+ logger = logging .getLogger (__name__ )
2418
2519
2620@pytest .fixture (scope = "module" )
@@ -29,49 +23,35 @@ def test_queue_name():
2923 return "test_integration_order_queue"
3024
3125
32- @pytest .fixture (scope = "module" )
33- def database_url (request ):
34- """Get database URL from environment or CLI."""
35- db_name = request .config .getoption ("--db-name" )
36- if not db_name :
37- db_name = os .getenv ("SQLALCHEMY_DB" , "postgres" )
38-
39- host = os .getenv ("SQLALCHEMY_HOST" , "localhost" )
40- port = os .getenv ("SQLALCHEMY_PORT" , "5432" )
41- user = os .getenv ("SQLALCHEMY_USER" , "postgres" )
42- password = os .getenv ("SQLALCHEMY_PASSWORD" , "postgres" )
43-
44- return f"postgresql+psycopg2://{ user } :{ password } @{ host } :{ port } /{ db_name } "
45-
46-
4726@pytest .fixture (scope = "module" , autouse = True )
48- def api_instance (examples_dir , database_url , test_queue_name ):
27+ def api_instance (examples_dir , sync_database_url , test_queue_name ):
4928 """Fixture to spin up the API server as a subprocess."""
5029 # Update the API to use test queue
5130 api_py = os .path .join (examples_dir , "api.py" )
5231
5332 # Set environment variables for the subprocess
5433 env = os .environ .copy ()
55- env ["DATABASE_URL" ] = database_url
34+ env ["DATABASE_URL" ] = sync_database_url
5635 env ["QUEUE_NAME" ] = test_queue_name
5736
5837 # Start the API server
5938 process = subprocess .Popen (
6039 [sys .executable , api_py ],
61- stdout = subprocess . PIPE ,
62- stderr = subprocess . PIPE ,
40+ stdout = sys . stdout ,
41+ stderr = sys . stderr ,
6342 env = env ,
6443 preexec_fn = os .setsid if hasattr (os , 'setsid' ) else None
6544 )
45+ logger .info ("Create API Server Process" )
6646
6747 # Wait for the server to start
6848 max_attempts = 30
6949 for i in range (max_attempts ):
7050 try :
71- response = requests .get ("http://localhost:8000/health" , timeout = 1 )
51+ response = httpx .get ("http://localhost:8000/health" , timeout = 1 )
7252 if response .status_code == 200 :
7353 break
74- except requests . exceptions . RequestException :
54+ except Exception :
7555 time .sleep (1 )
7656 else :
7757 # Kill the process if it didn't start
@@ -80,7 +60,8 @@ def api_instance(examples_dir, database_url, test_queue_name):
8060 else :
8161 process .terminate ()
8262 pytest .fail ("API server failed to start" )
83-
63+
64+ logger .info ("API Server is healthy" )
8465 yield process
8566
8667 # Teardown: kill the API server
@@ -89,30 +70,32 @@ def api_instance(examples_dir, database_url, test_queue_name):
8970 else :
9071 process .terminate ()
9172 process .wait (timeout = 10 )
73+ logger .info ("Terminate API Server" )
9274
9375
9476@pytest .fixture (scope = "module" , autouse = True )
95- def consumer_instance (examples_dir , database_url , test_queue_name , api_instance ):
77+ def consumer_instance (examples_dir , async_database_url , test_queue_name ):
9678 """Fixture to spin up the consumer as a subprocess."""
9779 # Update the consumer to use test queue
9880 consumer_py = os .path .join (examples_dir , "consumer.py" )
9981
10082 # Set environment variables for the subprocess
10183 env = os .environ .copy ()
102- env ["DATABASE_URL" ] = database_url
84+ env ["DATABASE_URL" ] = async_database_url
10385 env ["QUEUE_NAME" ] = test_queue_name
10486
10587 # Start the consumer
10688 process = subprocess .Popen (
10789 [sys .executable , consumer_py ],
108- stdout = subprocess . PIPE ,
109- stderr = subprocess . PIPE ,
90+ stdout = sys . stdout ,
91+ stderr = sys . stderr ,
11092 env = env ,
11193 preexec_fn = os .setsid if hasattr (os , 'setsid' ) else None
11294 )
11395
11496 # Give the consumer some time to start
11597 time .sleep (3 )
98+ logger .info ("Create Consumer Process" )
11699
117100 yield process
118101
@@ -122,24 +105,25 @@ def consumer_instance(examples_dir, database_url, test_queue_name, api_instance)
122105 else :
123106 process .terminate ()
124107 process .wait (timeout = 10 )
108+ logger .info ("Terminate Consumer Process" )
125109
126110
127- def test_api_consumer_integration (api_instance , consumer_instance , database_url ):
111+ def test_api_consumer_integration (sync_database_url ):
128112 """Test creating 100 orders parallelly and waiting for consumer to process them all."""
129113 import concurrent .futures
130114
131115 # Create 100 orders in parallel
132116 num_orders = 100
133117
134- def create_order (order_num ):
118+ def create_order (order_num : int ):
135119 """Helper function to create a single order."""
136120 order_data = {
137121 "customer_name" : f"Customer { order_num } " ,
138122 "product_name" : f"Product { order_num } " ,
139123 "quantity" : order_num % 10 + 1 ,
140124 "price" : 10.0 + (order_num % 50 )
141125 }
142- response = requests .post ("http://localhost:8000/orders" , json = order_data , timeout = 5 )
126+ response = httpx .post ("http://localhost:8000/orders" , json = order_data , timeout = 5 )
143127 return response .status_code == 201 , response .json () if response .status_code == 201 else None
144128
145129 # Create orders in parallel
@@ -150,15 +134,17 @@ def create_order(order_num):
150134 # Check that all orders were created successfully
151135 successful_orders = sum (1 for success , _ in results if success )
152136 assert successful_orders == num_orders , f"Only { successful_orders } /{ num_orders } orders were created"
137+ logger .info ("Create %d successful orders via API Server" , successful_orders )
153138
154139 # Wait for the consumer to process all messages
155140 # Check the queue periodically until it's empty
156- engine = create_engine (database_url )
141+ engine = create_engine (sync_database_url )
157142 SessionLocal = sessionmaker (bind = engine )
158143
159144 max_wait = 120 # Wait up to 2 minutes
160145 start_time = time .time ()
161146
147+ logger .info ("Wait for Consumer to process all the orders" );
162148 while time .time () - start_time < max_wait :
163149 # Check queue metrics to see if there are any messages left
164150 with SessionLocal () as session :
@@ -167,15 +153,20 @@ def create_order(order_num):
167153
168154 try :
169155 metrics = op .metrics (test_queue , session = session , commit = True )
170- if metrics .queue_length == 0 :
171- # All messages have been processed
172- break
156+ if metrics :
157+ logger .info ("%s queue metrics: %s" , test_queue , str (metrics ))
158+ if metrics .queue_length == 0 :
159+ # All messages have been processed
160+ break
173161 except Exception as e :
174162 # Queue might not exist yet or other error
175163 print (f"Error checking metrics: { e } " )
176164
177165 time .sleep (2 )
178166 else :
167+ metrics = op .metrics (test_queue , session = session , commit = True )
168+ if metrics :
169+ logger .info ("%s queue metrics: %s" , test_queue , str (metrics ))
179170 pytest .fail (f"Consumer did not process all messages within { max_wait } seconds" )
180171
181172 # Verify that all messages were processed
0 commit comments