Skip to content

Commit 02c64db

Browse files
author
avandras
committed
Show primary position LSN and lag on standby leaders
1 parent 53f3b28 commit 02c64db

5 files changed

Lines changed: 46 additions & 11 deletions

File tree

patroni/ctl.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
from collections import defaultdict
3131
from contextlib import contextmanager
3232
from enum import Enum
33+
from functools import reduce
34+
from operator import ior
3335
from typing import Any, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
3436
from urllib.parse import urlparse
3537

@@ -1517,7 +1519,7 @@ def _do_site_switchover(cluster_name: str, group: Optional[int],
15171519

15181520
if scheduled is None and not force:
15191521
next_hour = (datetime.datetime.now() + datetime.timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M')
1520-
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ' ) ',
1522+
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ') ',
15211523
type=str, default='now')
15221524

15231525
scheduled_at = parse_scheduled(scheduled)
@@ -1746,6 +1748,18 @@ def get_cluster_service_info(cluster: Dict[str, Any]) -> List[str]:
17461748
info += f" ({standby_config['host']}:{standby_config.get('port', 5432)})"
17471749
service_info.append(info)
17481750

1751+
# latest_end_lsn is only registered on standby leaders - we just combine all the member dicts to find it
1752+
r = {}
1753+
if 'members' in cluster:
1754+
r = reduce(ior, cluster['members'], {})
1755+
if 'latest_end_lsn' in r:
1756+
lag_to_primary = r['lag_to_primary']
1757+
lag_to_primary = round(lag_to_primary / 1024 / 1024) if isinstance(lag_to_primary, int) \
1758+
else '' if lag_to_primary == 'unknown' else lag_to_primary
1759+
info = (f"The latest known LSN of the primary instance is {r['latest_end_lsn']}, "
1760+
f"the replication lag is {lag_to_primary} MB")
1761+
service_info.append(info)
1762+
17491763
if cluster.get('pause'):
17501764
service_info.append('Maintenance mode: on')
17511765

patroni/dcs/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ def receive_lsn(self) -> Optional[int]:
337337
def replay_lsn(self) -> Optional[int]:
338338
return parse_int(self.data.get('replay_lsn'))
339339

340+
@property
341+
def latest_end_lsn(self) -> Optional[int]:
342+
return parse_int(self.data.get('latest_end_lsn'))
343+
340344
@property
341345
def multisite(self) -> Optional[Dict[str, Any]]:
342346
"""The ``multisite`` dict of the member if multisite is on."""

patroni/ha.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,14 +477,16 @@ def touch_member(self) -> bool:
477477
and data['state'] in [PostgresqlState.RUNNING, PostgresqlState.RESTARTING,
478478
PostgresqlState.STARTING]:
479479
try:
480-
timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn =\
480+
timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn =\
481481
self.state_handler.timeline_wal_position()
482482
data['xlog_location'] = self._last_wal_lsn = wal_position
483483
if not timeline: # running as a standby
484484
if replay_lsn:
485485
data['replay_lsn'] = replay_lsn
486486
if receive_lsn:
487487
data['receive_lsn'] = receive_lsn
488+
if latest_end_lsn:
489+
data['latest_end_lsn'] = latest_end_lsn
488490
replication_state = self.state_handler.replication_state()
489491
if replication_state:
490492
data['replication_state'] = replication_state

patroni/postgresql/__init__.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ def cluster_info_query(self) -> str:
250250

251251
written_lsn = ("pg_catalog.pg_wal_lsn_diff(written_lsn, '0/0')::bigint"
252252
if self._major_version >= 130000 else "NULL")
253-
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, slot_name, "
253+
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, "
254+
"pg_catalog.pg_wal_lsn_diff(latest_end_lsn, '0/0')::bigint, slot_name, "
254255
"conninfo, status, {1} FROM pg_catalog.pg_stat_get_wal_receiver()").format(written_lsn, extra)
255256
if self.role == PostgresqlRole.STANDBY_LEADER:
256257
extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()"
@@ -491,7 +492,7 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]:
491492
result = self._is_leader_retry(self._query, self.cluster_info_query)[0]
492493
cluster_info_state = dict(zip(['timeline', 'wal_position', 'replay_lsn',
493494
'receive_lsn', 'replay_paused', 'pg_control_timeline',
494-
'received_tli', 'write_location', 'slot_name', 'conninfo',
495+
'received_tli', 'write_location', 'latest_end_lsn', 'slot_name', 'conninfo',
495496
'receiver_state', 'restore_command', 'slots', 'synchronous_commit',
496497
'synchronous_standby_names', 'pg_stat_replication'], result))
497498
if self._should_query_slots and self.can_advance_slots:
@@ -511,6 +512,9 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]:
511512
def replay_lsn(self) -> Optional[int]:
512513
return self._cluster_info_state_get('replay_lsn')
513514

515+
def latest_end_lsn(self) -> Optional[int]:
516+
return self._cluster_info_state_get('latest_end_lsn')
517+
514518
def receive_lsn(self) -> Optional[int]:
515519
write = self._cluster_info_state_get('write_location')
516520
received = self._cluster_info_state_get('receive_lsn')
@@ -1252,12 +1256,16 @@ def _wal_position(is_primary: bool, wal_position: int,
12521256
receive_lsn: Optional[int], replay_lsn: Optional[int]) -> int:
12531257
return wal_position if is_primary else max(receive_lsn or 0, replay_lsn or 0)
12541258

1255-
def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int]]:
1259+
def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int], Optional[int]]:
12561260
"""Get timeline and various wal positions.
12571261
1258-
:returns: a tuple composed of 5 integers representing timeline, ``pg_current_wal_lsn()`` position
1259-
on primary/the biggest value among receive and replay LSN on replicas, ``pg_control_checkpoint()``
1260-
value on a standby leader, receive and replay LSN on replicas (if available).
1262+
:returns: a tuple composed of 6 integers representing
1263+
* timeline,
1264+
* ``pg_current_wal_lsn()`` position on the primary
1265+
or the biggest value among receive and replay LSN on replicas,
1266+
* ``pg_control_checkpoint()`` value on a standby leader,
1267+
* receive and replay LSN on replicas (if available),
1268+
* latest_end_lsn (the last known LSN on the primary)
12611269
"""
12621270
# This method could be called from different threads (simultaneously with some other `_query` calls).
12631271
# If it is called not from main thread we will create a new cursor to execute statement.
@@ -1266,14 +1274,16 @@ def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int],
12661274
wal_position = self._cluster_info_state_get('wal_position') or 0
12671275
replay_lsn = self.replay_lsn()
12681276
receive_lsn = self.receive_lsn()
1277+
latest_end_lsn = self.latest_end_lsn()
12691278
pg_control_timeline = self._cluster_info_state_get('pg_control_timeline')
12701279
else:
1271-
timeline, wal_position, replay_lsn, receive_lsn, _, pg_control_timeline, _, write_location = \
1272-
self._query(self.cluster_info_query)[0][:8]
1280+
timeline, wal_position, replay_lsn, receive_lsn, _, \
1281+
pg_control_timeline, _, write_location, latest_end_lsn = \
1282+
self._query(self.cluster_info_query)[0][:9]
12731283
receive_lsn = max(receive_lsn or 0, write_location or 0)
12741284

12751285
wal_position = self._wal_position(bool(timeline), wal_position, receive_lsn, replay_lsn)
1276-
return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn
1286+
return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn
12771287

12781288
def postmaster_start_time(self) -> Optional[str]:
12791289
try:

patroni/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,11 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
995995
else:
996996
member[lag_type] = 0
997997
member[lsn_type] = format_lsn(lsn)
998+
elif m.name == leader_name and (config.is_standby_cluster or multisite_active):
999+
latest_end_lsn = getattr(m, 'latest_end_lsn')
1000+
member['latest_end_lsn'] = format_lsn(latest_end_lsn)
1001+
if member['latest_end_lsn']:
1002+
member['lag_to_primary'] = latest_end_lsn - cluster_lsn
9981003

9991004
ret['members'].append(member)
10001005

0 commit comments

Comments
 (0)