Skip to content

Commit b983e7f

Browse files
authored
Merge pull request #19 from cybertec-postgresql/feature/show-lag-to-primary-on-standby-leaders
Show primary position LSN and lag on standby leaders
2 parents 53f3b28 + 338b035 commit b983e7f

9 files changed

Lines changed: 59 additions & 20 deletions

File tree

patroni/ctl.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,7 +1517,7 @@ def _do_site_switchover(cluster_name: str, group: Optional[int],
15171517

15181518
if scheduled is None and not force:
15191519
next_hour = (datetime.datetime.now() + datetime.timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M')
1520-
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ' ) ',
1520+
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ') ',
15211521
type=str, default='now')
15221522

15231523
scheduled_at = parse_scheduled(scheduled)
@@ -1741,11 +1741,28 @@ def get_cluster_service_info(cluster: Dict[str, Any]) -> List[str]:
17411741
if 'multisite' in cluster:
17421742
info = f"Multisite {cluster['multisite'].get('name') or ''} is {cluster['multisite']['status'].lower()}"
17431743
standby_config = cluster['multisite'].get('standby_config', {})
1744-
if standby_config and standby_config.get('host'):
1744+
replicating_states = ['streaming', 'in archive recovery']
1745+
leader = [m for m in cluster.get('members', []) if m['role'] == 'Standby Leader']
1746+
if standby_config and standby_config.get('host') and leader and leader[0].get('state') in replicating_states:
17451747
info += f", replicating from {standby_config['leader_site']}"
17461748
info += f" ({standby_config['host']}:{standby_config.get('port', 5432)})"
17471749
service_info.append(info)
17481750

1751+
# latest_end_lsn (and consequently lag_to_primary) is only registered on standby leaders - we just combine all
1752+
# the member dicts to find it
1753+
r = {'latest_end_lsn': str, 'lag_to_primary': int}
1754+
if 'members' in cluster:
1755+
for m in cluster['members']:
1756+
if 'latest_end_lsn' in m and 'lag_to_primary' in m:
1757+
r.update(m)
1758+
if r['latest_end_lsn']:
1759+
lag_to_primary = r['lag_to_primary']
1760+
lag_to_primary = round(lag_to_primary / 1024 / 1024) if isinstance(lag_to_primary, int) \
1761+
else lag_to_primary
1762+
info = (f"The latest known LSN of the primary instance is {r['latest_end_lsn']}, "
1763+
f"the replication lag is {lag_to_primary} MB")
1764+
service_info.append(info)
1765+
17491766
if cluster.get('pause'):
17501767
service_info.append('Maintenance mode: on')
17511768

patroni/dcs/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ def receive_lsn(self) -> Optional[int]:
337337
def replay_lsn(self) -> Optional[int]:
338338
return parse_int(self.data.get('replay_lsn'))
339339

340+
@property
341+
def latest_end_lsn(self) -> Optional[int]:
342+
return parse_int(self.data.get('latest_end_lsn'))
343+
340344
@property
341345
def multisite(self) -> Optional[Dict[str, Any]]:
342346
"""The ``multisite`` dict of the member if multisite is on."""

patroni/ha.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,14 +477,16 @@ def touch_member(self) -> bool:
477477
and data['state'] in [PostgresqlState.RUNNING, PostgresqlState.RESTARTING,
478478
PostgresqlState.STARTING]:
479479
try:
480-
timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn =\
480+
timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn =\
481481
self.state_handler.timeline_wal_position()
482482
data['xlog_location'] = self._last_wal_lsn = wal_position
483483
if not timeline: # running as a standby
484484
if replay_lsn:
485485
data['replay_lsn'] = replay_lsn
486486
if receive_lsn:
487487
data['receive_lsn'] = receive_lsn
488+
if latest_end_lsn:
489+
data['latest_end_lsn'] = latest_end_lsn
488490
replication_state = self.state_handler.replication_state()
489491
if replication_state:
490492
data['replication_state'] = replication_state

patroni/multisite.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def is_follower(self):
202202
return cfg is not None and 'host' in cfg
203203

204204
def _set_standby_config(self, other: Member):
205-
other_address = ','.join([':'.join([i, other.data['port']]) for i in other.data['host']])
205+
other_address = ','.join([':'.join([i, str(other.data['port'])]) for i in other.data['host'].split(',')])
206206
logger.info(f"Setting standby config to replicate from site {other.name} ({other_address})")
207207
# TODO: add support for replication slots
208208
try:
@@ -417,7 +417,7 @@ def touch_member(self):
417417
'host': self.config['host'],
418418
'port': self.config['port'],
419419
}
420-
address = ','.join([':'.join([i, data['port']]) for i in data['host'].split(',')])
420+
address = ','.join([':'.join([i, str(data['port'])]) for i in data['host'].split(',')])
421421
logger.info(f"Registering site {self.name} in DCS with address {address}")
422422
self.dcs.touch_member(data)
423423

patroni/postgresql/__init__.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ def cluster_info_query(self) -> str:
250250

251251
written_lsn = ("pg_catalog.pg_wal_lsn_diff(written_lsn, '0/0')::bigint"
252252
if self._major_version >= 130000 else "NULL")
253-
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, slot_name, "
253+
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, "
254+
"pg_catalog.pg_wal_lsn_diff(latest_end_lsn, '0/0')::bigint, slot_name, "
254255
"conninfo, status, {1} FROM pg_catalog.pg_stat_get_wal_receiver()").format(written_lsn, extra)
255256
if self.role == PostgresqlRole.STANDBY_LEADER:
256257
extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()"
@@ -491,9 +492,10 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]:
491492
result = self._is_leader_retry(self._query, self.cluster_info_query)[0]
492493
cluster_info_state = dict(zip(['timeline', 'wal_position', 'replay_lsn',
493494
'receive_lsn', 'replay_paused', 'pg_control_timeline',
494-
'received_tli', 'write_location', 'slot_name', 'conninfo',
495-
'receiver_state', 'restore_command', 'slots', 'synchronous_commit',
496-
'synchronous_standby_names', 'pg_stat_replication'], result))
495+
'received_tli', 'write_location', 'latest_end_lsn', 'slot_name',
496+
'conninfo', 'receiver_state', 'restore_command', 'slots',
497+
'synchronous_commit', 'synchronous_standby_names',
498+
'pg_stat_replication'], result))
497499
if self._should_query_slots and self.can_advance_slots:
498500
cluster_info_state['slots'] =\
499501
self.slots_handler.process_permanent_slots(cluster_info_state['slots'])
@@ -511,6 +513,9 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]:
511513
def replay_lsn(self) -> Optional[int]:
512514
return self._cluster_info_state_get('replay_lsn')
513515

516+
def latest_end_lsn(self) -> Optional[int]:
517+
return self._cluster_info_state_get('latest_end_lsn')
518+
514519
def receive_lsn(self) -> Optional[int]:
515520
write = self._cluster_info_state_get('write_location')
516521
received = self._cluster_info_state_get('receive_lsn')
@@ -1252,12 +1257,16 @@ def _wal_position(is_primary: bool, wal_position: int,
12521257
receive_lsn: Optional[int], replay_lsn: Optional[int]) -> int:
12531258
return wal_position if is_primary else max(receive_lsn or 0, replay_lsn or 0)
12541259

1255-
def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int]]:
1260+
def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int], Optional[int]]:
12561261
"""Get timeline and various wal positions.
12571262
1258-
:returns: a tuple composed of 5 integers representing timeline, ``pg_current_wal_lsn()`` position
1259-
on primary/the biggest value among receive and replay LSN on replicas, ``pg_control_checkpoint()``
1260-
value on a standby leader, receive and replay LSN on replicas (if available).
1263+
:returns: a tuple composed of 6 integers representing
1264+
* timeline,
1265+
* ``pg_current_wal_lsn()`` position on the primary
1266+
or the biggest value among receive and replay LSN on replicas,
1267+
* ``pg_control_checkpoint()`` value on a standby leader,
1268+
* receive and replay LSN on replicas (if available),
1269+
* latest_end_lsn (the last known LSN on the primary)
12611270
"""
12621271
# This method could be called from different threads (simultaneously with some other `_query` calls).
12631272
# If it is called not from main thread we will create a new cursor to execute statement.
@@ -1266,14 +1275,16 @@ def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int],
12661275
wal_position = self._cluster_info_state_get('wal_position') or 0
12671276
replay_lsn = self.replay_lsn()
12681277
receive_lsn = self.receive_lsn()
1278+
latest_end_lsn = self.latest_end_lsn()
12691279
pg_control_timeline = self._cluster_info_state_get('pg_control_timeline')
12701280
else:
1271-
timeline, wal_position, replay_lsn, receive_lsn, _, pg_control_timeline, _, write_location = \
1272-
self._query(self.cluster_info_query)[0][:8]
1281+
timeline, wal_position, replay_lsn, receive_lsn, _, \
1282+
pg_control_timeline, _, write_location, latest_end_lsn = \
1283+
self._query(self.cluster_info_query)[0][:9]
12731284
receive_lsn = max(receive_lsn or 0, write_location or 0)
12741285

12751286
wal_position = self._wal_position(bool(timeline), wal_position, receive_lsn, replay_lsn)
1276-
return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn
1287+
return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn
12771288

12781289
def postmaster_start_time(self) -> Optional[str]:
12791290
try:

patroni/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,11 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
995995
else:
996996
member[lag_type] = 0
997997
member[lsn_type] = format_lsn(lsn)
998+
elif m.name == leader_name and (config.is_standby_cluster or multisite_active):
999+
latest_end_lsn = getattr(m, 'latest_end_lsn')
1000+
member['latest_end_lsn'] = format_lsn(latest_end_lsn) if latest_end_lsn else ''
1001+
if member['latest_end_lsn']:
1002+
member['lag_to_primary'] = latest_end_lsn - cluster_lsn
9981003

9991004
ret['members'].append(member)
10001005

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def execute(self, sql, *params):
177177
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
178178
self.results = [(False, True)] if self.rowcount == 1 else []
179179
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
180-
self.results = [(1, 2, 1, 0, False, 1, 1, 1, None, None, 'streaming', '',
180+
self.results = [(1, 2, 1, 0, False, 1, 1, 1, 4, None, None, 'streaming', '',
181181
[{"slot_name": "ls", "confirmed_flush_lsn": 12345, "restart_lsn": 12344}],
182182
'on', 'n1', None)]
183183
elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'):

tests/test_postgresql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ def test_promote(self, mock_popen):
461461
self.assertFalse(self.p.promote(0, task))
462462

463463
def test_timeline_wal_position(self):
464-
self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1, 1, 1))
464+
self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1, 1, 1, 4))
465465
Thread(target=self.p.timeline_wal_position).start()
466466

467467
@patch.object(PostmasterProcess, 'from_pidfile')

tests/test_slots.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def test_process_permanent_slots(self):
110110
with patch.object(Postgresql, '_query') as mock_query:
111111
self.p.reset_cluster_info_state(None)
112112
mock_query.return_value = [(
113-
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
113+
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
114114
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b", "xmin": 105,
115115
"confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344},
116116
{"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None, "xmin": 105,
@@ -119,7 +119,7 @@ def test_process_permanent_slots(self):
119119

120120
self.p.reset_cluster_info_state(None)
121121
mock_query.return_value = [(
122-
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
122+
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
123123
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b", "xmin": 105,
124124
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])]
125125
self.assertEqual(self.p.slots(), {'postgresql0': 0})

0 commit comments

Comments
 (0)