diff --git a/patroni/ctl.py b/patroni/ctl.py index a8540e698..6189a2f27 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -1517,7 +1517,7 @@ def _do_site_switchover(cluster_name: str, group: Optional[int], if scheduled is None and not force: next_hour = (datetime.datetime.now() + datetime.timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M') - scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ' ) ', + scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ') ', type=str, default='now') scheduled_at = parse_scheduled(scheduled) @@ -1741,11 +1741,28 @@ def get_cluster_service_info(cluster: Dict[str, Any]) -> List[str]: if 'multisite' in cluster: info = f"Multisite {cluster['multisite'].get('name') or ''} is {cluster['multisite']['status'].lower()}" standby_config = cluster['multisite'].get('standby_config', {}) - if standby_config and standby_config.get('host'): + replicating_states = ['streaming', 'in archive recovery'] + leader = [m for m in cluster.get('members', []) if m['role'] == 'Standby Leader'] + if standby_config and standby_config.get('host') and leader and leader[0].get('state') in replicating_states: info += f", replicating from {standby_config['leader_site']}" info += f" ({standby_config['host']}:{standby_config.get('port', 5432)})" service_info.append(info) + # latest_end_lsn (and consequently lag_to_primary) is only registered on standby leaders - we just combine all + # the member dicts to find it + r = {'latest_end_lsn': str, 'lag_to_primary': int} + if 'members' in cluster: + for m in cluster['members']: + if 'latest_end_lsn' in m and 'lag_to_primary' in m: + r.update(m) + if r['latest_end_lsn']: + lag_to_primary = r['lag_to_primary'] + lag_to_primary = round(lag_to_primary / 1024 / 1024) if isinstance(lag_to_primary, int) \ + else lag_to_primary + info = (f"The latest known LSN of the primary instance is {r['latest_end_lsn']}, " + f"the replication lag is {lag_to_primary} MB") + service_info.append(info) + if cluster.get('pause'): service_info.append('Maintenance mode: on') diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 86b75f2bd..c8b9e6f7f 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -337,6 +337,10 @@ def receive_lsn(self) -> Optional[int]: def replay_lsn(self) -> Optional[int]: return parse_int(self.data.get('replay_lsn')) + @property + def latest_end_lsn(self) -> Optional[int]: + return parse_int(self.data.get('latest_end_lsn')) + @property def multisite(self) -> Optional[Dict[str, Any]]: """The ``multisite`` dict of the member if multisite is on.""" diff --git a/patroni/ha.py b/patroni/ha.py index 0aae61f2d..6c8247ff4 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -477,7 +477,7 @@ def touch_member(self) -> bool: and data['state'] in [PostgresqlState.RUNNING, PostgresqlState.RESTARTING, PostgresqlState.STARTING]: try: - timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn =\ + timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn =\ self.state_handler.timeline_wal_position() data['xlog_location'] = self._last_wal_lsn = wal_position if not timeline: # running as a standby @@ -485,6 +485,8 @@ def touch_member(self) -> bool: data['replay_lsn'] = replay_lsn if receive_lsn: data['receive_lsn'] = receive_lsn + if latest_end_lsn: + data['latest_end_lsn'] = latest_end_lsn replication_state = self.state_handler.replication_state() if replication_state: data['replication_state'] = replication_state diff --git a/patroni/multisite.py b/patroni/multisite.py index dfd1f1c6f..f30d16b2c 100644 --- a/patroni/multisite.py +++ b/patroni/multisite.py @@ -202,7 +202,7 @@ def is_follower(self): return cfg is not None and 'host' in cfg def _set_standby_config(self, other: Member): - other_address = ','.join([':'.join([i, other.data['port']]) for i in other.data['host']]) + other_address = ','.join([':'.join([i, str(other.data['port'])]) for i in other.data['host'].split(',')]) logger.info(f"Setting standby config to replicate from site {other.name} ({other_address})") # TODO: add support for replication slots try: @@ -417,7 +417,7 @@ def touch_member(self): 'host': self.config['host'], 'port': self.config['port'], } - address = ','.join([':'.join([i, data['port']]) for i in data['host'].split(',')]) + address = ','.join([':'.join([i, str(data['port'])]) for i in data['host'].split(',')]) logger.info(f"Registering site {self.name} in DCS with address {address}") self.dcs.touch_member(data) diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 72cdf5034..7a0b25ebf 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -250,7 +250,8 @@ def cluster_info_query(self) -> str: written_lsn = ("pg_catalog.pg_wal_lsn_diff(written_lsn, '0/0')::bigint" if self._major_version >= 130000 else "NULL") - extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, slot_name, " + extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, " + "pg_catalog.pg_wal_lsn_diff(latest_end_lsn, '0/0')::bigint, slot_name, " "conninfo, status, {1} FROM pg_catalog.pg_stat_get_wal_receiver()").format(written_lsn, extra) if self.role == PostgresqlRole.STANDBY_LEADER: extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()" @@ -491,9 +492,10 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]: result = self._is_leader_retry(self._query, self.cluster_info_query)[0] cluster_info_state = dict(zip(['timeline', 'wal_position', 'replay_lsn', 'receive_lsn', 'replay_paused', 'pg_control_timeline', - 'received_tli', 'write_location', 'slot_name', 'conninfo', - 'receiver_state', 'restore_command', 'slots', 'synchronous_commit', - 'synchronous_standby_names', 'pg_stat_replication'], result)) + 'received_tli', 'write_location', 'latest_end_lsn', 'slot_name', + 'conninfo', 'receiver_state', 'restore_command', 'slots', + 'synchronous_commit', 'synchronous_standby_names', + 'pg_stat_replication'], result)) if self._should_query_slots and self.can_advance_slots: cluster_info_state['slots'] =\ self.slots_handler.process_permanent_slots(cluster_info_state['slots']) @@ -511,6 +513,9 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]: def replay_lsn(self) -> Optional[int]: return self._cluster_info_state_get('replay_lsn') + def latest_end_lsn(self) -> Optional[int]: + return self._cluster_info_state_get('latest_end_lsn') + def receive_lsn(self) -> Optional[int]: write = self._cluster_info_state_get('write_location') received = self._cluster_info_state_get('receive_lsn') @@ -1252,12 +1257,16 @@ def _wal_position(is_primary: bool, wal_position: int, receive_lsn: Optional[int], replay_lsn: Optional[int]) -> int: return wal_position if is_primary else max(receive_lsn or 0, replay_lsn or 0) - def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int]]: + def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int], Optional[int]]: """Get timeline and various wal positions. - :returns: a tuple composed of 5 integers representing timeline, ``pg_current_wal_lsn()`` position - on primary/the biggest value among receive and replay LSN on replicas, ``pg_control_checkpoint()`` - value on a standby leader, receive and replay LSN on replicas (if available). + :returns: a tuple composed of 6 integers representing + * timeline, + * ``pg_current_wal_lsn()`` position on the primary + or the biggest value among receive and replay LSN on replicas, + * ``pg_control_checkpoint()`` value on a standby leader, + * receive and replay LSN on replicas (if available), + * latest_end_lsn (the last known LSN on the primary) """ # This method could be called from different threads (simultaneously with some other `_query` calls). # If it is called not from main thread we will create a new cursor to execute statement. @@ -1266,14 +1275,16 @@ def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], wal_position = self._cluster_info_state_get('wal_position') or 0 replay_lsn = self.replay_lsn() receive_lsn = self.receive_lsn() + latest_end_lsn = self.latest_end_lsn() pg_control_timeline = self._cluster_info_state_get('pg_control_timeline') else: - timeline, wal_position, replay_lsn, receive_lsn, _, pg_control_timeline, _, write_location = \ - self._query(self.cluster_info_query)[0][:8] + timeline, wal_position, replay_lsn, receive_lsn, _, \ + pg_control_timeline, _, write_location, latest_end_lsn = \ + self._query(self.cluster_info_query)[0][:9] receive_lsn = max(receive_lsn or 0, write_location or 0) wal_position = self._wal_position(bool(timeline), wal_position, receive_lsn, replay_lsn) - return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn + return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn def postmaster_start_time(self) -> Optional[str]: try: diff --git a/patroni/utils.py b/patroni/utils.py index 206996569..2785390ea 100644 --- a/patroni/utils.py +++ b/patroni/utils.py @@ -995,6 +995,11 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]: else: member[lag_type] = 0 member[lsn_type] = format_lsn(lsn) + elif m.name == leader_name and (config.is_standby_cluster or multisite_active): + latest_end_lsn = getattr(m, 'latest_end_lsn') + member['latest_end_lsn'] = format_lsn(latest_end_lsn) if latest_end_lsn else '' + if member['latest_end_lsn']: + member['lag_to_primary'] = latest_end_lsn - cluster_lsn ret['members'].append(member) diff --git a/tests/__init__.py b/tests/__init__.py index 7c745b267..903e24def 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -177,7 +177,7 @@ def execute(self, sql, *params): elif sql.startswith('WITH slots AS (SELECT slot_name, active'): self.results = [(False, True)] if self.rowcount == 1 else [] elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'): - self.results = [(1, 2, 1, 0, False, 1, 1, 1, None, None, 'streaming', '', + self.results = [(1, 2, 1, 0, False, 1, 1, 1, 4, None, None, 'streaming', '', [{"slot_name": "ls", "confirmed_flush_lsn": 12345, "restart_lsn": 12344}], 'on', 'n1', None)] elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'): diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index cfced2bcf..a1726970b 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -461,7 +461,7 @@ def test_promote(self, mock_popen): self.assertFalse(self.p.promote(0, task)) def test_timeline_wal_position(self): - self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1, 1, 1)) + self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1, 1, 1, 4)) Thread(target=self.p.timeline_wal_position).start() @patch.object(PostmasterProcess, 'from_pidfile') diff --git a/tests/test_slots.py b/tests/test_slots.py index 610e1c90f..c64607244 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -110,7 +110,7 @@ def test_process_permanent_slots(self): with patch.object(Postgresql, '_query') as mock_query: self.p.reset_cluster_info_state(None) mock_query.return_value = [( - 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None, + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None, [{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b", "xmin": 105, "confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344}, {"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None, "xmin": 105, @@ -119,7 +119,7 @@ def test_process_permanent_slots(self): self.p.reset_cluster_info_state(None) mock_query.return_value = [( - 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None, + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None, [{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b", "xmin": 105, "confirmed_flush_lsn": 12345, "catalog_xmin": 105}])] self.assertEqual(self.p.slots(), {'postgresql0': 0})