11import asyncio
2- from typing import List , Optional
2+ from typing import List , Optional , TYPE_CHECKING
33
44from sqlalchemy import create_engine
55from sqlalchemy .orm import sessionmaker
66from sqlalchemy .ext .asyncio import create_async_engine
7+ from asgiref .sync import async_to_sync
8+
79
810from .schema import Message , QueueMetrics
911from ._types import ENGINE_TYPE , SESSION_TYPE
1416)
1517from .operation import PGMQOperation
1618
19+ if TYPE_CHECKING :
20+ from sqlalchemy .orm import Session
21+ from sqlalchemy .ext .asyncio import AsyncSession
22+
1723
1824class PGMQueue :
1925 engine : ENGINE_TYPE = None
@@ -118,47 +124,20 @@ def __init__(
118124 # create pgmq extension if not exists
119125 self ._check_pgmq_ext ()
120126
121- async def _check_pgmq_ext_async (self ) -> None :
122- """Check if the pgmq extension exists."""
123- async with self .session_maker () as session :
124- await PGMQOperation .check_pgmq_ext_async (session = session , commit = True )
125-
126- def _check_pgmq_ext_sync (self ) -> None :
127- """Check if the pgmq extension exists."""
128- with self .session_maker () as session :
129- PGMQOperation .check_pgmq_ext (session = session , commit = True )
130-
131127 def _check_pgmq_ext (self ) -> None :
132128 """Check if the pgmq extension exists."""
133- if self .is_async :
134- return self .loop .run_until_complete (self ._check_pgmq_ext_async ())
135- return self ._check_pgmq_ext_sync ()
136-
137- async def _check_pg_partman_ext_async (self ) -> None :
138- """Check if the pg_partman extension exists."""
139- async with self .session_maker () as session :
140- await PGMQOperation .check_pg_partman_ext_async (session = session , commit = True )
141-
142- def _check_pg_partman_ext_sync (self ) -> None :
143- """Check if the pg_partman extension exists."""
144- with self .session_maker () as session :
145- PGMQOperation .check_pg_partman_ext (session = session , commit = True )
129+ self ._execute_operation (PGMQOperation .check_pgmq_ext , session = None , commit = True )
146130
147131 def _check_pg_partman_ext (self ) -> None :
148132 """Check if the pg_partman extension exists."""
149- if self .is_pg_partman_ext_checked :
150- return
151- self .is_pg_partman_ext_checked
152-
153- if self .is_async :
154- return self .loop .run_until_complete (self ._check_pg_partman_ext_async ())
155- return self ._check_pg_partman_ext_sync ()
133+ self ._execute_operation (
134+ PGMQOperation .check_pg_partman_ext , session = None , commit = True
135+ )
156136
157137 def _execute_operation (
158138 self ,
159139 op_sync ,
160- op_async ,
161- session : Optional [SESSION_TYPE ],
140+ session : Optional ["Session" ],
162141 commit : bool ,
163142 * args ,
164143 ** kwargs ,
@@ -167,7 +146,6 @@ def _execute_operation(
167146
168147 Args:
169148 op_sync: The synchronous operation function from PGMQOperation
170- op_async: The asynchronous operation function from PGMQOperation
171149 session: Optional session to use (if None, creates a new one)
172150 commit: Whether to commit the transaction
173151 *args: Positional arguments to pass to the operation
@@ -176,23 +154,36 @@ def _execute_operation(
176154 Returns:
177155 The result from the operation
178156 """
179- if self .is_async :
180- if session is None :
181-
182- async def _run ():
183- async with self .session_maker () as s :
184- return await op_async (* args , session = s , commit = commit , ** kwargs )
185-
186- return self .loop .run_until_complete (_run ())
187- return self .loop .run_until_complete (
188- op_async (* args , session = session , commit = commit , ** kwargs )
189- )
190-
191157 if session is None :
192158 with self .session_maker () as s :
193159 return op_sync (* args , session = s , commit = commit , ** kwargs )
194160 return op_sync (* args , session = session , commit = commit , ** kwargs )
195161
162+ async def _execute_async_operation (
163+ self ,
164+ op_async ,
165+ session : Optional ["AsyncSession" ],
166+ commit : bool ,
167+ * args ,
168+ ** kwargs ,
169+ ):
170+ """Helper method to execute sync or async operations with session management.
171+
172+ Args:
173+ op_async: The asynchronous operation function from PGMQOperation
174+ session: Optional session to use (if None, creates a new one)
175+ commit: Whether to commit the transaction
176+ *args: Positional arguments to pass to the operation
177+ **kwargs: Keyword arguments to pass to the operation
178+
179+ Returns:
180+ The result from the operation
181+ """
182+ if session is None :
183+ async with self .session_maker () as s :
184+ return await op_async (* args , session = s , commit = commit , ** kwargs )
185+ return await op_async (* args , session = session , commit = commit , ** kwargs )
186+
196187 def create_queue (
197188 self ,
198189 queue_name : str ,
@@ -219,7 +210,6 @@ def create_queue(
219210 """
220211 return self ._execute_operation (
221212 PGMQOperation .create_queue ,
222- PGMQOperation .create_queue_async ,
223213 session ,
224214 commit ,
225215 queue_name ,
@@ -272,7 +262,6 @@ def create_partitioned_queue(
272262
273263 return self ._execute_operation (
274264 PGMQOperation .create_partitioned_queue ,
275- PGMQOperation .create_partitioned_queue_async ,
276265 session ,
277266 commit ,
278267 queue_name ,
@@ -292,7 +281,6 @@ def validate_queue_name(
292281 """
293282 return self ._execute_operation (
294283 PGMQOperation .validate_queue_name ,
295- PGMQOperation .validate_queue_name_async ,
296284 session ,
297285 commit ,
298286 queue_name ,
@@ -329,7 +317,6 @@ def drop_queue(
329317
330318 return self ._execute_operation (
331319 PGMQOperation .drop_queue ,
332- PGMQOperation .drop_queue_async ,
333320 session ,
334321 commit ,
335322 queue ,
@@ -351,7 +338,6 @@ def list_queues(
351338 """
352339 return self ._execute_operation (
353340 PGMQOperation .list_queues ,
354- PGMQOperation .list_queues_async ,
355341 session ,
356342 commit ,
357343 )
@@ -385,7 +371,6 @@ def send(
385371 """
386372 return self ._execute_operation (
387373 PGMQOperation .send ,
388- PGMQOperation .send_async ,
389374 session ,
390375 commit ,
391376 queue_name ,
@@ -416,7 +401,6 @@ def send_batch(
416401 """
417402 return self ._execute_operation (
418403 PGMQOperation .send_batch ,
419- PGMQOperation .send_batch_async ,
420404 session ,
421405 commit ,
422406 queue_name ,
@@ -496,7 +480,6 @@ def read(
496480
497481 return self ._execute_operation (
498482 PGMQOperation .read ,
499- PGMQOperation .read_async ,
500483 session ,
501484 commit ,
502485 queue_name ,
@@ -533,7 +516,6 @@ def read_batch(
533516
534517 return self ._execute_operation (
535518 PGMQOperation .read_batch ,
536- PGMQOperation .read_batch_async ,
537519 session ,
538520 commit ,
539521 queue_name ,
@@ -605,7 +587,6 @@ def read_with_poll(
605587
606588 return self ._execute_operation (
607589 PGMQOperation .read_with_poll ,
608- PGMQOperation .read_with_poll_async ,
609590 session ,
610591 commit ,
611592 queue_name ,
@@ -683,7 +664,6 @@ def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str):
683664
684665 return self ._execute_operation (
685666 PGMQOperation .set_vt ,
686- PGMQOperation .set_vt_async ,
687667 session ,
688668 commit ,
689669 queue_name ,
@@ -710,7 +690,6 @@ def pop(
710690 """
711691 return self ._execute_operation (
712692 PGMQOperation .pop ,
713- PGMQOperation .pop_async ,
714693 session ,
715694 commit ,
716695 queue_name ,
@@ -743,7 +722,6 @@ def delete(
743722 """
744723 return self ._execute_operation (
745724 PGMQOperation .delete ,
746- PGMQOperation .delete_async ,
747725 session ,
748726 commit ,
749727 queue_name ,
@@ -776,7 +754,6 @@ def delete_batch(
776754 """
777755 return self ._execute_operation (
778756 PGMQOperation .delete_batch ,
779- PGMQOperation .delete_batch_async ,
780757 session ,
781758 commit ,
782759 queue_name ,
@@ -813,7 +790,6 @@ def archive(
813790 """
814791 return self ._execute_operation (
815792 PGMQOperation .archive ,
816- PGMQOperation .archive_async ,
817793 session ,
818794 commit ,
819795 queue_name ,
@@ -843,7 +819,6 @@ def archive_batch(
843819 """
844820 return self ._execute_operation (
845821 PGMQOperation .archive_batch ,
846- PGMQOperation .archive_batch_async ,
847822 session ,
848823 commit ,
849824 queue_name ,
@@ -870,7 +845,6 @@ def purge(
870845 """
871846 return self ._execute_operation (
872847 PGMQOperation .purge ,
873- PGMQOperation .purge_async ,
874848 session ,
875849 commit ,
876850 queue_name ,
@@ -903,7 +877,6 @@ def metrics(
903877 """
904878 return self ._execute_operation (
905879 PGMQOperation .metrics ,
906- PGMQOperation .metrics_async ,
907880 session ,
908881 commit ,
909882 queue_name ,
@@ -950,7 +923,6 @@ def metrics_all(
950923 """
951924 return self ._execute_operation (
952925 PGMQOperation .metrics_all ,
953- PGMQOperation .metrics_all_async ,
954926 session ,
955927 commit ,
956928 )
0 commit comments