From 7dd1521700185ed65a61c7d6ea631749cbfa60f3 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU <68415893+jason810496@users.noreply.github.com> Date: Wed, 31 Jul 2024 01:58:50 +0000 Subject: [PATCH 1/4] fix: check vt is None instead of vt or self.vt --- pgmq_sqlalchemy/queue.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index 07e8615..ea271a4 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -485,13 +485,11 @@ def send_batch( ) return self._send_batch_sync(queue_name, encode_list_to_psql(messages), delay) - def _read_sync( - self, queue_name: str, vt: Optional[int] = None - ) -> Optional[Message]: + def _read_sync(self, queue_name: str, vt: int) -> Optional[Message]: with self.session_maker() as session: row = session.execute( text("select * from pgmq.read(:queue_name,:vt,1);"), - {"queue_name": queue_name, "vt": vt or self.vt}, + {"queue_name": queue_name, "vt": vt}, ).fetchone() session.commit() if row is None: @@ -500,14 +498,12 @@ def _read_sync( msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] ) - async def _read_async( - self, queue_name: str, vt: Optional[int] = None - ) -> Optional[Message]: + async def _read_async(self, queue_name: str, vt: int) -> Optional[Message]: async with self.session_maker() as session: row = ( await session.execute( text("select * from pgmq.read(:queue_name,:vt,1);"), - {"queue_name": queue_name, "vt": vt or self.vt}, + {"queue_name": queue_name, "vt": vt}, ) ).fetchone() await session.commit() @@ -584,15 +580,17 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]: def _read_batch_sync( self, queue_name: str, + vt: int, batch_size: int = 1, - vt: Optional[int] = None, ) -> Optional[List[Message]]: + if vt is None: + vt = self.vt with self.session_maker() as session: rows = session.execute( text("select * from pgmq.read(:queue_name,:vt,:batch_size);"), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "batch_size": batch_size, }, ).fetchall() @@ -613,8 +611,8 @@ def _read_batch_sync( async def _read_batch_async( self, queue_name: str, + vt: int, batch_size: int = 1, - vt: Optional[int] = None, ) -> Optional[List[Message]]: async with self.session_maker() as session: rows = ( @@ -622,7 +620,7 @@ async def _read_batch_async( text("select * from pgmq.read(:queue_name,:vt,:batch_size);"), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "batch_size": batch_size, }, ) @@ -663,6 +661,8 @@ def read_batch( msgs:List[Message] = pgmq_client.read_batch('my_queue', batch_size=10, vt=10) """ + if vt is None: + vt = self.vt if self.is_async: return self.loop.run_until_complete( self._read_batch_async(queue_name, batch_size, vt) @@ -672,7 +672,7 @@ def read_batch( def _read_with_poll_sync( self, queue_name: str, - vt: Optional[int] = None, + vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, @@ -685,7 +685,7 @@ def _read_with_poll_sync( ), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "qty": qty, "max_poll_seconds": max_poll_seconds, "poll_interval_ms": poll_interval_ms, @@ -708,7 +708,7 @@ def _read_with_poll_sync( async def _read_with_poll_async( self, queue_name: str, - vt: Optional[int] = None, + vt: int, qty: int = 1, max_poll_seconds: int = 5, poll_interval_ms: int = 100, @@ -722,7 +722,7 @@ async def _read_with_poll_async( ), { "queue_name": queue_name, - "vt": vt or self.vt, + "vt": vt, "qty": qty, "max_poll_seconds": max_poll_seconds, "poll_interval_ms": poll_interval_ms, @@ -799,6 +799,8 @@ def read_with_poll( assert len(msgs) == 3 # will read at most 3 messages (qty=3) """ + if vt is None: + vt = self.vt if self.is_async: return self.loop.run_until_complete( From 48eb171ea14398a921207f4ba663632d4a983218 Mon Sep 17 00:00:00 2001 From: jason810496 <810496@email.wlsh.tyc.edu.tw> Date: Sun, 4 Aug 2024 00:01:25 +0800 Subject: [PATCH 2/4] release: v0.1.2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a354664..e909bf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pgmq-sqlalchemy" -version = "0.1.1" +version = "0.1.2" description = "More flexible PGMQ Postgres extension Python client that using sqlalchemy ORM, supporting both async and sync engines, sessionmakers or built from dsn." authors = ["jason810496 <810496@email.wlsh.tyc.edu.tw>"] license = "MIT" From b36dc17fd0aefa6f1cce4624fd64b2c450feb7a3 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 27 Dec 2025 23:49:00 +0800 Subject: [PATCH 3/4] fix: install dev deps in GitHub Action, remove version in docker-compose --- .github/workflows/codecov.yml | 2 +- docker-compose.yml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 35fa07e..d0ae9ad 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -31,7 +31,7 @@ jobs: run: | poetry env use python${{ matrix.python-version }} - name: Install dependencies - run: poetry install --without=dev + run: poetry install --with dev - name: Start PostgreSQL run: | cp pgmq_postgres.template.env pgmq_postgres.env diff --git a/docker-compose.yml b/docker-compose.yml index ae1bf29..d00076b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.8' services: pgmq_postgres: container_name: pgmq_postgres From 6ae07ee8df6bdf7423355e5e4409787b7b1b2301 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 28 Dec 2025 11:31:10 +0800 Subject: [PATCH 4/4] Fix vt naming for set_vt --- pgmq_sqlalchemy/queue.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index ea271a4..0240d70 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -812,14 +812,12 @@ def read_with_poll( queue_name, vt, qty, max_poll_seconds, poll_interval_ms ) - def _set_vt_sync( - self, queue_name: str, msg_id: int, vt_offset: int - ) -> Optional[Message]: + def _set_vt_sync(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]: """Set the visibility timeout for a message.""" with self.session_maker() as session: row = session.execute( - text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"), - {"queue_name": queue_name, "msg_id": msg_id, "vt_offset": vt_offset}, + text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt);"), + {"queue_name": queue_name, "msg_id": msg_id, "vt": vt}, ).fetchone() session.commit() if row is None: @@ -829,17 +827,17 @@ def _set_vt_sync( ) async def _set_vt_async( - self, queue_name: str, msg_id: int, vt_offset: int + self, queue_name: str, msg_id: int, vt: int ) -> Optional[Message]: """Set the visibility timeout for a message.""" async with self.session_maker() as session: row = ( await session.execute( - text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt_offset);"), + text("select * from pgmq.set_vt(:queue_name,:msg_id,:vt);"), { "queue_name": queue_name, "msg_id": msg_id, - "vt_offset": vt_offset, + "vt": vt, }, ) ).fetchone() @@ -851,7 +849,7 @@ async def _set_vt_async( msg_id=row[0], read_ct=row[1], enqueued_at=row[2], vt=row[3], message=row[4] ) - def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Message]: + def set_vt(self, queue_name: str, msg_id: int, vt: int) -> Optional[Message]: """ .. _set_vt_method: ref:`pgmq_sqlalchemy.PGMQueue.set_vt` .. |set_vt_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.set_vt` @@ -861,7 +859,7 @@ def set_vt(self, queue_name: str, msg_id: int, vt_offset: int) -> Optional[Messa Args: queue_name (str): The name of the queue. msg_id (int): The message id. - vt_offset (int): The visibility timeout in seconds. + vt (int): The visibility timeout in seconds. Returns: |schema_message_class|_ or ``None`` if the message does not exist. @@ -904,15 +902,15 @@ def consumer_with_backoff_retry(pgmq_client: PGMQueue, queue_name: str): pgmq_client.set_vt( queue_name=query_name, msg_id=msg.msg_id, - vt_offset=_exp_backoff_retry(msg) + vt=_exp_backoff_retry(msg) ) """ if self.is_async: return self.loop.run_until_complete( - self._set_vt_async(queue_name, msg_id, vt_offset) + self._set_vt_async(queue_name, msg_id, vt) ) - return self._set_vt_sync(queue_name, msg_id, vt_offset) + return self._set_vt_sync(queue_name, msg_id, vt) def _pop_sync(self, queue_name: str) -> Optional[Message]: with self.session_maker() as session: