Skip to content

Commit c9b622a

Browse files
author
Antonin Houska
committed
Map tuple to partition before INSERT / UPDATE.
This is needed because partition can have different order of columns if was attached (as opposed to CREATE TABLE ... PARTITION OF ...).
1 parent 3aa2f4e commit c9b622a

3 files changed

Lines changed: 114 additions & 48 deletions

File tree

concurrent.c

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,27 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
278278
ExecStoreHeapTuple(tup, slot, false);
279279
if (proute)
280280
{
281+
PartitionEntry *entry;
282+
281283
rri = ExecFindPartition(mtstate, mtstate->rootResultRelInfo,
282284
proute, slot, estate);
283285
rel_ins = rri->ri_RelationDesc;
284286

285-
bistate = get_partition_insert_state(partitions,
286-
RelationGetRelid(rel_ins));
287+
entry = get_partition_entry(partitions,
288+
RelationGetRelid(rel_ins));
289+
bistate = entry->bistate;
290+
291+
/*
292+
* Make sure the tuple matches the partition. The typical
293+
* problem we address here is that a partition was attached
294+
* that has a different order of columns.
295+
*/
296+
if (entry->conv_map)
297+
{
298+
tup = convert_tuple_for_dest_table(tup, entry->conv_map);
299+
ExecClearTuple(slot);
300+
ExecStoreHeapTuple(tup, slot, false);
301+
}
287302
}
288303
else
289304
{
@@ -361,8 +376,12 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
361376
{
362377
ItemPointerData ctid;
363378
List *recheck;
379+
PartitionEntry *entry;
364380

365-
/* Delete the old tuple from its partition. */
381+
/*
382+
* Cross-partition update. Delete the old tuple from its
383+
* partition.
384+
*/
366385
find_tuple_in_partition(tup_old, rri_old->ri_RelationDesc,
367386
partitions, key, nkeys, &ctid);
368387
simple_heap_delete(rri_old->ri_RelationDesc, &ctid);
@@ -372,7 +391,14 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
372391
MyWorkerTask->progress.del++;
373392
SpinLockRelease(&MyWorkerTask->mutex);
374393

375-
/* Insert the new tuple into its partition. */
394+
/*
395+
* Insert the new tuple into its partition. This might include
396+
* conversion to match the partition, see above.
397+
*/
398+
entry = get_partition_entry(partitions,
399+
RelationGetRelid(rri->ri_RelationDesc));
400+
if (entry->conv_map)
401+
tup = convert_tuple_for_dest_table(tup, entry->conv_map);
376402
ExecStoreHeapTuple(tup, slot, false);
377403
table_tuple_insert(rri->ri_RelationDesc, slot,
378404
GetCurrentCommandId(true), 0, NULL);
@@ -438,6 +464,20 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
438464
TU_UpdateIndexes update_indexes;
439465
#endif
440466

467+
if (partitions)
468+
{
469+
PartitionEntry *entry;
470+
471+
/*
472+
* Make sure the tuple matches the partition.
473+
*/
474+
entry = get_partition_entry(partitions,
475+
RelationGetRelid(rri->ri_RelationDesc));
476+
if (entry->conv_map)
477+
tup = convert_tuple_for_dest_table(tup,
478+
entry->conv_map);
479+
}
480+
441481
simple_heap_update(rri->ri_RelationDesc, &ctid, tup
442482
#if PG_VERSION_NUM >= 160000
443483
, &update_indexes

pg_rewrite.c

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,9 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
10171017

10181018
entry->ident_index = get_identity_index(partition, rel_src);
10191019
entry->ind_slot = table_slot_create(partition, NULL);
1020+
entry->conv_map =
1021+
convert_tuples_by_name_ext(RelationGetDescr(rel_dst),
1022+
RelationGetDescr(partition));
10201023
/* Expect many insertions. */
10211024
entry->bistate = GetBulkInsertState();
10221025

@@ -1284,11 +1287,11 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
12841287

12851288
if (part_desc)
12861289
{
1290+
close_partitions(partitions);
1291+
12871292
for (i = 0; i < part_desc->nparts; i++)
12881293
table_close(parts_dst[i], AccessExclusiveLock);
1289-
12901294
pfree(parts_dst);
1291-
close_partitions(partitions);
12921295
}
12931296
else
12941297
{
@@ -2685,20 +2688,39 @@ perform_initial_load(EState *estate, ModifyTableState *mtstate,
26852688
if (tup_out == NULL)
26862689
break;
26872690

2688-
/* Convert the tuple if needed. */
2691+
/*
2692+
* If needed, convert the tuple so it matches the destination
2693+
* table.
2694+
*/
26892695
if (conv_map)
26902696
tup_out = convert_tuple_for_dest_table(tup_out, conv_map);
26912697
ExecStoreHeapTuple(tup_out, slot_dst, false);
26922698

26932699
if (proute)
26942700
{
2701+
PartitionEntry *entry;
2702+
26952703
/* Find out which partition the tuple belongs to. */
26962704
rri = ExecFindPartition(mtstate, mtstate->rootResultRelInfo,
26972705
proute, slot_dst, estate);
26982706

26992707
rel_ins = rri->ri_RelationDesc;
2700-
bistate = get_partition_insert_state(partitions,
2701-
RelationGetRelid(rri->ri_RelationDesc));
2708+
entry = get_partition_entry(partitions,
2709+
RelationGetRelid(rri->ri_RelationDesc));
2710+
bistate = entry->bistate;
2711+
2712+
/*
2713+
* Make sure the tuple matches the partition. The typical
2714+
* problem we address here is that a partition was attached
2715+
* that has a different order of columns.
2716+
*/
2717+
if (entry->conv_map)
2718+
{
2719+
tup_out = convert_tuple_for_dest_table(tup_out,
2720+
entry->conv_map);
2721+
ExecClearTuple(slot_dst);
2722+
ExecStoreHeapTuple(tup_out, slot_dst, false);
2723+
}
27022724
}
27032725
else
27042726
{
@@ -3061,17 +3083,19 @@ close_partitions(partitions_hash *partitions)
30613083
{
30623084
index_close(entry->ident_index, AccessShareLock);
30633085
ExecDropSingleTupleTableSlot(entry->ind_slot);
3086+
if (entry->conv_map)
3087+
free_conversion_map_ext(entry->conv_map);
30643088
FreeBulkInsertState(entry->bistate);
30653089
}
30663090

30673091
partitions_destroy(partitions);
30683092
}
30693093

30703094
/*
3071-
* Find BulkInsertState for given partition.
3095+
* Find hash entry for given partition.
30723096
*/
3073-
BulkInsertState
3074-
get_partition_insert_state(partitions_hash *partitions, Oid part_oid)
3097+
PartitionEntry *
3098+
get_partition_entry(partitions_hash *partitions, Oid part_oid)
30753099
{
30763100
PartitionEntry *entry;
30773101

@@ -3080,7 +3104,7 @@ get_partition_insert_state(partitions_hash *partitions, Oid part_oid)
30803104
elog(ERROR, "bulk insert state not found for partition %u", part_oid);
30813105
Assert(entry->part_oid == part_oid);
30823106

3083-
return entry->bistate;
3107+
return entry;
30843108
}
30853109

30863110
/*

pg_rewrite.h

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -185,39 +185,6 @@ typedef struct CatalogState
185185
bool have_pk_index;
186186
} CatalogState;
187187

188-
/*
189-
* Hash table to cache partition-specific information.
190-
*/
191-
typedef struct PartitionEntry
192-
{
193-
Oid part_oid; /* key */
194-
Relation ident_index;
195-
196-
/*
197-
* Slot to retrieve tuples from identity index. Since we only allow
198-
* partitions to have exactly the same attributes as the parent table, it
199-
* should work if we used the same slot for all partitions. However it
200-
* seems cleaner if separate slots are used.
201-
*/
202-
TupleTableSlot *ind_slot;
203-
204-
/* This should make insertions into partitions more efficient. */
205-
BulkInsertState bistate;
206-
207-
char status; /* used by simplehash */
208-
} PartitionEntry;
209-
210-
#define SH_PREFIX partitions
211-
#define SH_ELEMENT_TYPE PartitionEntry
212-
#define SH_KEY_TYPE Oid
213-
#define SH_KEY part_oid
214-
#define SH_HASH_KEY(tb, key) (key)
215-
#define SH_EQUAL(tb, a, b) ((a) == (b))
216-
#define SH_SCOPE static inline
217-
#define SH_DECLARE
218-
#define SH_DEFINE
219-
#include "lib/simplehash.h"
220-
221188
/* Progress tracking. */
222189
typedef struct TaskProgress
223190
{
@@ -233,6 +200,7 @@ typedef struct TaskProgress
233200
int64 del;
234201
} TaskProgress;
235202

203+
/* TODO Remove */
236204
typedef enum WorkerTaskKind
237205
{
238206
WORKER_TASK_PARTITION
@@ -325,6 +293,40 @@ typedef struct TupleConversionMapExt
325293
* coercion. */
326294
} TupleConversionMapExt;
327295

296+
/*
297+
* Hash table to cache partition-specific information.
298+
*/
299+
typedef struct PartitionEntry
300+
{
301+
Oid part_oid; /* key */
302+
Relation ident_index;
303+
304+
/* Slot to retrieve tuples from identity index. */
305+
TupleTableSlot *ind_slot;
306+
307+
/* This should make insertions into partitions more efficient. */
308+
BulkInsertState bistate;
309+
310+
/*
311+
* Map to convert tuples that match the partitioned table so they match
312+
* this partition.
313+
*/
314+
TupleConversionMapExt *conv_map;
315+
316+
char status; /* used by simplehash */
317+
} PartitionEntry;
318+
319+
#define SH_PREFIX partitions
320+
#define SH_ELEMENT_TYPE PartitionEntry
321+
#define SH_KEY_TYPE Oid
322+
#define SH_KEY part_oid
323+
#define SH_HASH_KEY(tb, key) (key)
324+
#define SH_EQUAL(tb, a, b) ((a) == (b))
325+
#define SH_SCOPE static inline
326+
#define SH_DECLARE
327+
#define SH_DEFINE
328+
#include "lib/simplehash.h"
329+
328330
extern PGDLLEXPORT void rewrite_worker_main(Datum main_arg);
329331

330332
extern void pg_rewrite_exit_if_requested(void);
@@ -357,7 +359,7 @@ extern bool pg_rewrite_decode_concurrent_changes(LogicalDecodingContext *ctx,
357359
extern HeapTuple convert_tuple_for_dest_table(HeapTuple tuple,
358360
TupleConversionMapExt *conv_map);
359361
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
360-
extern BulkInsertState get_partition_insert_state(partitions_hash *partitions,
361-
Oid part_oid);;
362+
extern PartitionEntry *get_partition_entry(partitions_hash *partitions,
363+
Oid part_oid);;
362364
extern HeapTuple pg_rewrite_execute_attr_map_tuple(HeapTuple tuple,
363365
TupleConversionMapExt *map);

0 commit comments

Comments
 (0)