2222
2323static void apply_concurrent_changes (EState * estate , ModifyTableState * mtstate ,
2424 struct PartitionTupleRouting * proute ,
25- Relation rel_dst ,
2625 DecodingOutputState * dstate ,
2726 ScanKey key , int nkeys ,
2827 Relation ident_index ,
29- TupleTableSlot * ind_slot ,
28+ TupleTableSlot * slot_dst_ind ,
3029 partitions_hash * partitions ,
3130 TupleConversionMapExt * conv_map );
32- static void apply_insert (Relation rel , HeapTuple tup , TupleTableSlot * slot ,
31+ static void apply_insert (HeapTuple tup , TupleTableSlot * slot ,
3332 EState * estate , ModifyTableState * mtstate ,
3433 struct PartitionTupleRouting * proute ,
3534 partitions_hash * partitions ,
3635 TupleConversionMapExt * conv_map ,
3736 BulkInsertState bistate );
38- static void apply_update_or_delete (Relation rel , HeapTuple tup ,
37+ static void apply_update_or_delete (HeapTuple tup ,
3938 HeapTuple tup_old ,
4039 ConcurrentChangeKind change_kind ,
41- TupleTableSlot * slot , EState * estate ,
40+ EState * estate ,
4241 ScanKey key , int nkeys , Relation ident_index ,
43- TupleTableSlot * ind_slot ,
42+ TupleTableSlot * slot_dst ,
43+ TupleTableSlot * slot_dst_ind ,
4444 ModifyTableState * mtstate ,
4545 struct PartitionTupleRouting * proute ,
4646 partitions_hash * partitions ,
@@ -50,7 +50,7 @@ static void find_tuple_in_partition(HeapTuple tup, Relation partition,
5050 ScanKey key , int nkeys , ItemPointer ctid );
5151static void find_tuple (HeapTuple tup , Relation rel , Relation ident_index ,
5252 ScanKey key , int nkeys , ItemPointer ctid ,
53- TupleTableSlot * ind_slot );
53+ TupleTableSlot * slot_dst_ind );
5454static bool processing_time_elapsed (struct timeval * utmost );
5555
5656static void plugin_startup (LogicalDecodingContext * ctx ,
@@ -84,10 +84,10 @@ pg_rewrite_process_concurrent_changes(EState *estate,
8484 LogicalDecodingContext * ctx ,
8585 XLogRecPtr end_of_wal ,
8686 CatalogState * cat_state ,
87- Relation rel_dst , ScanKey ident_key ,
87+ ScanKey ident_key ,
8888 int ident_key_nentries ,
8989 Relation ident_index ,
90- TupleTableSlot * ind_slot ,
90+ TupleTableSlot * slot_dst_ind ,
9191 LOCKMODE lock_held ,
9292 partitions_hash * partitions ,
9393 TupleConversionMapExt * conv_map ,
@@ -101,8 +101,10 @@ pg_rewrite_process_concurrent_changes(EState *estate,
101101 * non-partitioned one. XXX Is some refactoring needed here, such as using
102102 * an union?
103103 */
104- Assert ((ident_index && ind_slot && partitions == NULL && proute == NULL ) ||
105- (ident_index == NULL && ind_slot == NULL && partitions && proute ));
104+ Assert ((ident_index && slot_dst_ind && partitions == NULL
105+ && proute == NULL ) ||
106+ (ident_index == NULL && slot_dst_ind == NULL &&
107+ partitions && proute ));
106108
107109 dstate = (DecodingOutputState * ) ctx -> output_writer_private ;
108110 done = false;
@@ -128,9 +130,10 @@ pg_rewrite_process_concurrent_changes(EState *estate,
128130 * processing partway through. Partial cleanup of the tuplestore seems
129131 * non-trivial.
130132 */
131- apply_concurrent_changes (estate , mtstate , proute , rel_dst ,
133+ apply_concurrent_changes (estate , mtstate , proute ,
132134 dstate , ident_key , ident_key_nentries ,
133- ident_index , ind_slot , partitions , conv_map );
135+ ident_index , slot_dst_ind ,
136+ partitions , conv_map );
134137 }
135138
136139 return true;
@@ -229,17 +232,17 @@ pg_rewrite_decode_concurrent_changes(LogicalDecodingContext *ctx,
229232static void
230233apply_concurrent_changes (EState * estate , ModifyTableState * mtstate ,
231234 struct PartitionTupleRouting * proute ,
232- Relation rel_dst ,
233235 DecodingOutputState * dstate ,
234236 ScanKey key , int nkeys ,
235237 Relation ident_index ,
236- TupleTableSlot * ind_slot ,
238+ TupleTableSlot * slot_dst_ind ,
237239 partitions_hash * partitions ,
238240 TupleConversionMapExt * conv_map )
239241{
240- TupleTableSlot * slot ;
241242 BulkInsertState bistate = NULL ;
242243 HeapTuple tup_old = NULL ;
244+ Relation rel_dst ;
245+ TupleTableSlot * slot_dst ;
243246
244247 if (dstate -> nchanges == 0 )
245248 return ;
@@ -249,9 +252,12 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
249252 bistate = GetBulkInsertState ();
250253
251254 /*
252- * TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples().
255+ * Slot for the destination relation is needed even in the partitioned
256+ * case, to route changes to partitions.
253257 */
254- slot = MakeSingleTupleTableSlot (dstate -> tupdesc , & TTSOpsHeapTuple );
258+ rel_dst = mtstate -> resultRelInfo -> ri_RelationDesc ;
259+ slot_dst = MakeSingleTupleTableSlot (RelationGetDescr (rel_dst ),
260+ & TTSOpsHeapTuple );
255261
256262 /*
257263 * In case functions in the index need the active snapshot and caller
@@ -290,16 +296,16 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
290296 else if (change -> kind == CHANGE_INSERT )
291297 {
292298 Assert (tup_old == NULL );
293- apply_insert (rel_dst , tup , slot , estate , mtstate , proute ,
299+ apply_insert (tup , slot_dst , estate , mtstate , proute ,
294300 partitions , conv_map , bistate );
295301 }
296302 else if (change -> kind == CHANGE_UPDATE_NEW ||
297303 change -> kind == CHANGE_DELETE )
298304 {
299- apply_update_or_delete (rel_dst , tup , tup_old , change -> kind ,
300- slot , estate , key , nkeys , ident_index ,
301- ind_slot , mtstate , proute , partitions ,
302- conv_map );
305+ apply_update_or_delete (tup , tup_old , change -> kind ,
306+ estate , key , nkeys , ident_index ,
307+ slot_dst , slot_dst_ind , mtstate , proute ,
308+ partitions , conv_map );
303309
304310 /* The function is responsible for freeing. */
305311 if (tup_old != NULL )
@@ -326,13 +332,14 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
326332 PopActiveSnapshot ();
327333
328334 /* Cleanup. */
329- ExecDropSingleTupleTableSlot (slot );
330335 if (bistate )
331336 FreeBulkInsertState (bistate );
337+
338+ ExecDropSingleTupleTableSlot (slot_dst );
332339}
333340
334341static void
335- apply_insert (Relation rel , HeapTuple tup , TupleTableSlot * slot ,
342+ apply_insert (HeapTuple tup , TupleTableSlot * slot ,
336343 EState * estate , ModifyTableState * mtstate ,
337344 struct PartitionTupleRouting * proute ,
338345 partitions_hash * partitions , TupleConversionMapExt * conv_map ,
@@ -419,11 +426,12 @@ apply_insert(Relation rel, HeapTuple tup, TupleTableSlot *slot,
419426}
420427
421428static void
422- apply_update_or_delete (Relation rel , HeapTuple tup , HeapTuple tup_old ,
429+ apply_update_or_delete (HeapTuple tup , HeapTuple tup_old ,
423430 ConcurrentChangeKind change_kind ,
424- TupleTableSlot * slot , EState * estate ,
431+ EState * estate ,
425432 ScanKey key , int nkeys , Relation ident_index ,
426- TupleTableSlot * ind_slot ,
433+ TupleTableSlot * slot_dst ,
434+ TupleTableSlot * slot_dst_ind ,
427435 ModifyTableState * mtstate ,
428436 struct PartitionTupleRouting * proute ,
429437 partitions_hash * partitions ,
@@ -450,17 +458,17 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
450458 if (proute )
451459 {
452460 /* Which partition does the tuple belong to? */
453- ExecStoreHeapTuple (tup , slot , false);
461+ ExecStoreHeapTuple (tup , slot_dst , false);
454462 rri = ExecFindPartition (mtstate , mtstate -> rootResultRelInfo ,
455- proute , slot , estate );
456- ExecClearTuple (slot );
463+ proute , slot_dst , estate );
464+ ExecClearTuple (slot_dst );
457465
458466 if (change_kind == CHANGE_UPDATE_NEW && tup_old )
459467 {
460- ExecStoreHeapTuple (tup_old , slot , false);
468+ ExecStoreHeapTuple (tup_old , slot_dst , false);
461469 rri_old = ExecFindPartition (mtstate , mtstate -> rootResultRelInfo ,
462- proute , slot , estate );
463- ExecClearTuple (slot );
470+ proute , slot_dst , estate );
471+ ExecClearTuple (slot_dst );
464472 }
465473 }
466474 else
@@ -495,8 +503,8 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
495503 RelationGetRelid (rri -> ri_RelationDesc ));
496504 if (entry -> conv_map )
497505 tup = convert_tuple_for_dest_table (tup , entry -> conv_map );
498- ExecStoreHeapTuple (tup , slot , false);
499- table_tuple_insert (rri -> ri_RelationDesc , slot ,
506+ ExecStoreHeapTuple (tup , entry -> slot , false);
507+ table_tuple_insert (rri -> ri_RelationDesc , entry -> slot ,
500508 GetCurrentCommandId (true), 0 , NULL );
501509
502510#if PG_VERSION_NUM < 140000
@@ -507,7 +515,7 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
507515#if PG_VERSION_NUM >= 140000
508516 rri ,
509517#endif
510- slot ,
518+ entry -> slot ,
511519 estate ,
512520#if PG_VERSION_NUM >= 140000
513521 false , /* update */
@@ -519,7 +527,7 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
519527 , false /* onlySummarizing */
520528#endif
521529 );
522- ExecClearTuple (slot );
530+ ExecClearTuple (entry -> slot );
523531
524532 /* Update the progress information. */
525533 SpinLockAcquire (& MyWorkerTask -> mutex );
@@ -534,8 +542,8 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
534542 ItemPointerData ctid ;
535543
536544 /*
537- * Both old and new tuple are in the same partition ( or the target
538- * table is not partitioned) . Find the tuple to be updated or deleted.
545+ * Both old and new tuple are in the same partition, or the target
546+ * table is not partitioned. Find the tuple to be updated or deleted.
539547 */
540548 if (change_kind == CHANGE_UPDATE_NEW )
541549 tup_key = tup_old != NULL ? tup_old : tup ;
@@ -550,19 +558,19 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
550558 find_tuple_in_partition (tup_key , rri -> ri_RelationDesc ,
551559 partitions , key , nkeys , & ctid );
552560 else
553- find_tuple (tup_key , rel , ident_index , key , nkeys , & ctid ,
554- ind_slot );
561+ find_tuple (tup_key , rri -> ri_RelationDesc , ident_index , key , nkeys ,
562+ & ctid , slot_dst_ind );
555563
556564 if (change_kind == CHANGE_UPDATE_NEW )
557565 {
566+ PartitionEntry * entry = NULL ;
567+
558568#if PG_VERSION_NUM >= 160000
559569 TU_UpdateIndexes update_indexes ;
560570#endif
561571
562572 if (partitions )
563573 {
564- PartitionEntry * entry ;
565-
566574 /*
567575 * Make sure the tuple matches the partition.
568576 */
@@ -580,8 +588,11 @@ apply_update_or_delete(Relation rel, HeapTuple tup, HeapTuple tup_old,
580588 );
581589 if (!HeapTupleIsHeapOnly (tup ))
582590 {
591+ TupleTableSlot * slot ;
583592 List * recheck ;
584593
594+ slot = entry ? entry -> slot : slot_dst ;
595+
585596 ExecStoreHeapTuple (tup , slot , false);
586597
587598 /*
@@ -667,7 +678,7 @@ find_tuple_in_partition(HeapTuple tup, Relation partition,
667678 tup = tup_mapped ;
668679 }
669680 find_tuple (tup , partition , entry -> ident_index , key , nkeys , ctid ,
670- entry -> ind_slot );
681+ entry -> slot_ind );
671682 if (tup_mapped )
672683 pfree (tup_mapped );
673684}
@@ -678,7 +689,7 @@ find_tuple_in_partition(HeapTuple tup, Relation partition,
678689 */
679690static void
680691find_tuple (HeapTuple tup , Relation rel , Relation ident_index , ScanKey key ,
681- int nkeys , ItemPointer ctid , TupleTableSlot * ind_slot )
692+ int nkeys , ItemPointer ctid , TupleTableSlot * slot_dst_ind )
682693{
683694 Form_pg_index ident_form ;
684695 int2vector * ident_indkey ;
@@ -706,11 +717,11 @@ find_tuple(HeapTuple tup, Relation rel, Relation ident_index, ScanKey key,
706717 & isnull );
707718 Assert (!isnull );
708719 }
709- if (index_getnext_slot (scan , ForwardScanDirection , ind_slot ))
720+ if (index_getnext_slot (scan , ForwardScanDirection , slot_dst_ind ))
710721 {
711722 bool shouldFreeInd ;
712723
713- tup_exist = ExecFetchSlotHeapTuple (ind_slot , false,
724+ tup_exist = ExecFetchSlotHeapTuple (slot_dst_ind , false,
714725 & shouldFreeInd );
715726 /* TTSOpsBufferHeapTuple has .get_heap_tuple != NULL. */
716727 Assert (!shouldFreeInd );
@@ -935,7 +946,7 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
935946 * toast_flatten_tuple_to_datum() might be more convenient but we
936947 * don't want the decompression it does.
937948 */
938- tuple = toast_flatten_tuple (tuple , dstate -> tupdesc );
949+ tuple = toast_flatten_tuple (tuple , dstate -> tupdesc_src );
939950 flattened = true;
940951 }
941952
0 commit comments