Skip to content

Commit 1a5057f

Browse files
author
Antonin Houska
committed
Added infrastructure for progress monitoring.
The pg_rewrite_progress view is the expected user interface.
1 parent 05e170c commit 1a5057f

5 files changed

Lines changed: 312 additions & 64 deletions

File tree

README.md

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ CREATE TABLE measurement_y2006m03 PARTITION OF measurement_aux
7474
```
7575

7676
*It's essential that both the source (`measurement`) and destination
77-
(`measurement_aux`) table have an identity index, the easiest way to ensure
77+
(`measurement_aux`) table have an identity index. The easiest way to ensure
7878
this is to create `PRIMARY KEY` or `UNIQUE` constraint. Also note that the key
7979
(i.e. column list) of the identity index of the source and destination table
8080
must be identical. The identity is needed to process data changes that
81-
applications make while data is being copied from the source to the destination
82-
table.*
81+
applications make while data is being copied from the source to the
82+
destination table.*
8383

8484
Also, unless you've set `rewrite.check_constraints` to `false`, make sure that
8585
the destination table has all the constraints that the source table has.
@@ -98,6 +98,19 @@ lock `measurement` exclusively and rename (1) `measurement` to
9898
ends up to be the partitioned table, while `measurement_old` is the original,
9999
non-partitioned table.
100100

101+
# Progress monitoring
102+
103+
If `partition_table()` takes long time to finish, you might be interested in
104+
the progress. The `pg_rewrite_progress` view shows all the pending calls of
105+
the function in the current database. The `src_table`, `dst_table` and
106+
`src_table_new` columns contain the arguments of the `partition_table()`
107+
function. `ins_initial` is the number of tuples inserted into the new table
108+
storage during the "initial load stage", i.e. the number of tuples present in
109+
the table before the processing started. On the other hand, `ins`, `upd` and
110+
`del` are the numbers of tuples inserted, updated and deleted by applications
111+
during the table processing. (These "concurrent data changes" must also be
112+
incorporated into the partitioned table, otherwise they'd get lost.)
113+
101114
# Limitations
102115

103116
Please consider the following before you try to use the function:

concurrent.c

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,6 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
205205
{
206206
TupleTableSlot *slot;
207207
HeapTuple tup_old = NULL;
208-
double ninserts,
209-
nupdates,
210-
ndeletes;
211208

212209
if (dstate->nchanges == 0)
213210
return;
@@ -221,9 +218,6 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
221218
*/
222219
PushActiveSnapshot(GetTransactionSnapshot());
223220

224-
ninserts = 0;
225-
nupdates = 0;
226-
ndeletes = 0;
227221
while (tuplestore_gettupleslot(dstate->tstore, true, false,
228222
dstate->tsslot))
229223
{
@@ -305,7 +299,10 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
305299
list_free(recheck);
306300
pfree(tup);
307301

308-
ninserts++;
302+
/* Update the progress information. */
303+
SpinLockAcquire(&MyWorkerTask->mutex);
304+
MyWorkerTask->progress.ins++;
305+
SpinLockRelease(&MyWorkerTask->mutex);
309306
}
310307
else if (change->kind == CHANGE_UPDATE_NEW ||
311308
change->kind == CHANGE_DELETE)
@@ -342,7 +339,11 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
342339
find_tuple_in_partition(tup_old, rri_old->ri_RelationDesc,
343340
partitions, key, nkeys, &ctid);
344341
simple_heap_delete(rri_old->ri_RelationDesc, &ctid);
345-
ndeletes++;
342+
343+
/* Update the progress information. */
344+
SpinLockAcquire(&MyWorkerTask->mutex);
345+
MyWorkerTask->progress.del++;
346+
SpinLockRelease(&MyWorkerTask->mutex);
346347

347348
/* Insert the new tuple into its partition. */
348349
ExecStoreHeapTuple(tup, slot, false);
@@ -370,7 +371,11 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
370371
#endif
371372
);
372373
ExecClearTuple(slot);
373-
ninserts++;
374+
375+
/* Update the progress information. */
376+
SpinLockAcquire(&MyWorkerTask->mutex);
377+
MyWorkerTask->progress.ins++;
378+
SpinLockRelease(&MyWorkerTask->mutex);
374379

375380
list_free(recheck);
376381
}
@@ -439,12 +444,19 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
439444
list_free(recheck);
440445
}
441446

442-
nupdates++;
447+
/* Update the progress information. */
448+
SpinLockAcquire(&MyWorkerTask->mutex);
449+
MyWorkerTask->progress.upd++;
450+
SpinLockRelease(&MyWorkerTask->mutex);
443451
}
444452
else
445453
{
446454
simple_heap_delete(rri->ri_RelationDesc, &ctid);
447-
ndeletes++;
455+
456+
/* Update the progress information. */
457+
SpinLockAcquire(&MyWorkerTask->mutex);
458+
MyWorkerTask->progress.del++;
459+
SpinLockRelease(&MyWorkerTask->mutex);
448460
}
449461
}
450462

@@ -470,10 +482,6 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
470482
pfree(tup_change);
471483
}
472484

473-
elog(DEBUG1,
474-
"pg_rewrite: concurrent changes applied: %.0f inserts, %.0f updates, %.0f deletes.",
475-
ninserts, nupdates, ndeletes);
476-
477485
tuplestore_clear(dstate->tstore);
478486
dstate->nchanges = 0;
479487

pg_rewrite--1.0--1.1.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,26 @@ RETURNS void
1212
AS 'MODULE_PATHNAME', 'partition_table_new'
1313
LANGUAGE C
1414
STRICT;
15+
16+
CREATE FUNCTION pg_rewrite_get_task_list()
17+
RETURNS TABLE (
18+
tabschema_src name,
19+
tabname_src name,
20+
tabschema_dst name,
21+
tabname_dst name,
22+
tabname_src_new name,
23+
ins_initial bigint,
24+
ins bigint,
25+
upd bigint,
26+
del bigint)
27+
AS 'MODULE_PATHNAME', 'pg_rewrite_get_task_list'
28+
LANGUAGE C;
29+
30+
-- The column names should match the arguments of the partition_table()
31+
-- function.
32+
CREATE VIEW pg_rewrite_progress AS
33+
SELECT COALESCE(tabschema_src || '.', '') || tabname_src AS src_table,
34+
COALESCE(tabschema_dst || '.', '') || tabname_dst AS dst_table,
35+
tabname_src_new AS src_table_new,
36+
ins_initial, ins, upd, del
37+
FROM pg_rewrite_get_task_list();

0 commit comments

Comments
 (0)