Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ def _do_site_switchover(cluster_name: str, group: Optional[int],

if scheduled is None and not force:
next_hour = (datetime.datetime.now() + datetime.timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M')
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ' ) ',
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ') ',
type=str, default='now')

scheduled_at = parse_scheduled(scheduled)
Expand Down Expand Up @@ -1741,11 +1741,28 @@ def get_cluster_service_info(cluster: Dict[str, Any]) -> List[str]:
if 'multisite' in cluster:
info = f"Multisite {cluster['multisite'].get('name') or ''} is {cluster['multisite']['status'].lower()}"
standby_config = cluster['multisite'].get('standby_config', {})
if standby_config and standby_config.get('host'):
replicating_states = ['streaming', 'in archive recovery']
leader = [m for m in cluster.get('members', []) if m['role'] == 'Standby Leader']
if standby_config and standby_config.get('host') and leader and leader[0].get('state') in replicating_states:
info += f", replicating from {standby_config['leader_site']}"
info += f" ({standby_config['host']}:{standby_config.get('port', 5432)})"
service_info.append(info)

# latest_end_lsn (and consequently lag_to_primary) is only registered on standby leaders - we just combine all
# the member dicts to find it
r = {'latest_end_lsn': str, 'lag_to_primary': int}
if 'members' in cluster:
for m in cluster['members']:
if 'latest_end_lsn' in m and 'lag_to_primary' in m:
r.update(m)
if r['latest_end_lsn']:
lag_to_primary = r['lag_to_primary']
lag_to_primary = round(lag_to_primary / 1024 / 1024) if isinstance(lag_to_primary, int) \
else lag_to_primary
info = (f"The latest known LSN of the primary instance is {r['latest_end_lsn']}, "
f"the replication lag is {lag_to_primary} MB")
service_info.append(info)

if cluster.get('pause'):
service_info.append('Maintenance mode: on')

Expand Down
4 changes: 4 additions & 0 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ def receive_lsn(self) -> Optional[int]:
def replay_lsn(self) -> Optional[int]:
return parse_int(self.data.get('replay_lsn'))

@property
def latest_end_lsn(self) -> Optional[int]:
return parse_int(self.data.get('latest_end_lsn'))

@property
def multisite(self) -> Optional[Dict[str, Any]]:
"""The ``multisite`` dict of the member if multisite is on."""
Expand Down
4 changes: 3 additions & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,16 @@ def touch_member(self) -> bool:
and data['state'] in [PostgresqlState.RUNNING, PostgresqlState.RESTARTING,
PostgresqlState.STARTING]:
try:
timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn =\
timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn =\
self.state_handler.timeline_wal_position()
data['xlog_location'] = self._last_wal_lsn = wal_position
if not timeline: # running as a standby
if replay_lsn:
data['replay_lsn'] = replay_lsn
if receive_lsn:
data['receive_lsn'] = receive_lsn
if latest_end_lsn:
data['latest_end_lsn'] = latest_end_lsn
replication_state = self.state_handler.replication_state()
if replication_state:
data['replication_state'] = replication_state
Expand Down
4 changes: 2 additions & 2 deletions patroni/multisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def is_follower(self):
return cfg is not None and 'host' in cfg

def _set_standby_config(self, other: Member):
other_address = ','.join([':'.join([i, other.data['port']]) for i in other.data['host']])
other_address = ','.join([':'.join([i, str(other.data['port'])]) for i in other.data['host'].split(',')])
logger.info(f"Setting standby config to replicate from site {other.name} ({other_address})")
# TODO: add support for replication slots
try:
Expand Down Expand Up @@ -417,7 +417,7 @@ def touch_member(self):
'host': self.config['host'],
'port': self.config['port'],
}
address = ','.join([':'.join([i, data['port']]) for i in data['host'].split(',')])
address = ','.join([':'.join([i, str(data['port'])]) for i in data['host'].split(',')])
logger.info(f"Registering site {self.name} in DCS with address {address}")
self.dcs.touch_member(data)

Expand Down
33 changes: 22 additions & 11 deletions patroni/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ def cluster_info_query(self) -> str:

written_lsn = ("pg_catalog.pg_wal_lsn_diff(written_lsn, '0/0')::bigint"
if self._major_version >= 130000 else "NULL")
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, slot_name, "
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END, {0}, "
"pg_catalog.pg_wal_lsn_diff(latest_end_lsn, '0/0')::bigint, slot_name, "
"conninfo, status, {1} FROM pg_catalog.pg_stat_get_wal_receiver()").format(written_lsn, extra)
if self.role == PostgresqlRole.STANDBY_LEADER:
extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()"
Expand Down Expand Up @@ -491,9 +492,10 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]:
result = self._is_leader_retry(self._query, self.cluster_info_query)[0]
cluster_info_state = dict(zip(['timeline', 'wal_position', 'replay_lsn',
'receive_lsn', 'replay_paused', 'pg_control_timeline',
'received_tli', 'write_location', 'slot_name', 'conninfo',
'receiver_state', 'restore_command', 'slots', 'synchronous_commit',
'synchronous_standby_names', 'pg_stat_replication'], result))
'received_tli', 'write_location', 'latest_end_lsn', 'slot_name',
'conninfo', 'receiver_state', 'restore_command', 'slots',
'synchronous_commit', 'synchronous_standby_names',
'pg_stat_replication'], result))
if self._should_query_slots and self.can_advance_slots:
cluster_info_state['slots'] =\
self.slots_handler.process_permanent_slots(cluster_info_state['slots'])
Expand All @@ -511,6 +513,9 @@ def _cluster_info_state_get(self, name: str) -> Optional[Any]:
def replay_lsn(self) -> Optional[int]:
return self._cluster_info_state_get('replay_lsn')

def latest_end_lsn(self) -> Optional[int]:
return self._cluster_info_state_get('latest_end_lsn')

def receive_lsn(self) -> Optional[int]:
write = self._cluster_info_state_get('write_location')
received = self._cluster_info_state_get('receive_lsn')
Expand Down Expand Up @@ -1252,12 +1257,16 @@ def _wal_position(is_primary: bool, wal_position: int,
receive_lsn: Optional[int], replay_lsn: Optional[int]) -> int:
return wal_position if is_primary else max(receive_lsn or 0, replay_lsn or 0)

def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int]]:
def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int], Optional[int], Optional[int]]:
"""Get timeline and various wal positions.

:returns: a tuple composed of 5 integers representing timeline, ``pg_current_wal_lsn()`` position
on primary/the biggest value among receive and replay LSN on replicas, ``pg_control_checkpoint()``
value on a standby leader, receive and replay LSN on replicas (if available).
:returns: a tuple composed of 6 integers representing
* timeline,
* ``pg_current_wal_lsn()`` position on the primary
or the biggest value among receive and replay LSN on replicas,
* ``pg_control_checkpoint()`` value on a standby leader,
* receive and replay LSN on replicas (if available),
* latest_end_lsn (the last known LSN on the primary)
"""
# This method could be called from different threads (simultaneously with some other `_query` calls).
# If it is called not from main thread we will create a new cursor to execute statement.
Expand All @@ -1266,14 +1275,16 @@ def timeline_wal_position(self) -> Tuple[int, int, Optional[int], Optional[int],
wal_position = self._cluster_info_state_get('wal_position') or 0
replay_lsn = self.replay_lsn()
receive_lsn = self.receive_lsn()
latest_end_lsn = self.latest_end_lsn()
pg_control_timeline = self._cluster_info_state_get('pg_control_timeline')
else:
timeline, wal_position, replay_lsn, receive_lsn, _, pg_control_timeline, _, write_location = \
self._query(self.cluster_info_query)[0][:8]
timeline, wal_position, replay_lsn, receive_lsn, _, \
pg_control_timeline, _, write_location, latest_end_lsn = \
self._query(self.cluster_info_query)[0][:9]
receive_lsn = max(receive_lsn or 0, write_location or 0)

wal_position = self._wal_position(bool(timeline), wal_position, receive_lsn, replay_lsn)
return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn
return timeline, wal_position, pg_control_timeline, receive_lsn, replay_lsn, latest_end_lsn

def postmaster_start_time(self) -> Optional[str]:
try:
Expand Down
5 changes: 5 additions & 0 deletions patroni/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,11 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
else:
member[lag_type] = 0
member[lsn_type] = format_lsn(lsn)
elif m.name == leader_name and (config.is_standby_cluster or multisite_active):
latest_end_lsn = getattr(m, 'latest_end_lsn')
member['latest_end_lsn'] = format_lsn(latest_end_lsn) if latest_end_lsn else ''
if member['latest_end_lsn']:
member['lag_to_primary'] = latest_end_lsn - cluster_lsn

ret['members'].append(member)

Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def execute(self, sql, *params):
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
self.results = [(False, True)] if self.rowcount == 1 else []
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
self.results = [(1, 2, 1, 0, False, 1, 1, 1, None, None, 'streaming', '',
self.results = [(1, 2, 1, 0, False, 1, 1, 1, 4, None, None, 'streaming', '',
[{"slot_name": "ls", "confirmed_flush_lsn": 12345, "restart_lsn": 12344}],
'on', 'n1', None)]
elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def test_promote(self, mock_popen):
self.assertFalse(self.p.promote(0, task))

def test_timeline_wal_position(self):
self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1, 1, 1))
self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1, 1, 1, 4))
Thread(target=self.p.timeline_wal_position).start()

@patch.object(PostmasterProcess, 'from_pidfile')
Expand Down
4 changes: 2 additions & 2 deletions tests/test_slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test_process_permanent_slots(self):
with patch.object(Postgresql, '_query') as mock_query:
self.p.reset_cluster_info_state(None)
mock_query.return_value = [(
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b", "xmin": 105,
"confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344},
{"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None, "xmin": 105,
Expand All @@ -119,7 +119,7 @@ def test_process_permanent_slots(self):

self.p.reset_cluster_info_state(None)
mock_query.return_value = [(
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b", "xmin": 105,
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])]
self.assertEqual(self.p.slots(), {'postgresql0': 0})
Expand Down
Loading