2222
2323static void apply_concurrent_changes (EState * estate , ModifyTableState * mtstate ,
2424 struct PartitionTupleRouting * proute ,
25+ Relation rel_dst ,
2526 DecodingOutputState * dstate ,
2627 ScanKey key , int nkeys ,
28+ Relation ident_index ,
29+ TupleTableSlot * ind_slot ,
2730 partitions_hash * partitions ,
2831 TupleConversionMap * conv_map );
2932static void find_tuple_in_partition (HeapTuple tup , Relation partition ,
3033 partitions_hash * partitions ,
31- ScanKey key , int nkeys ,
32- ItemPointer ctid );
34+ ScanKey key , int nkeys , ItemPointer ctid );
35+ static void find_tuple (HeapTuple tup , Relation rel , Relation ident_index ,
36+ ScanKey key , int nkeys , ItemPointer ctid ,
37+ TupleTableSlot * ind_slot );
3338static bool processing_time_elapsed (struct timeval * utmost );
3439
3540static void plugin_startup (LogicalDecodingContext * ctx ,
@@ -65,6 +70,8 @@ pg_rewrite_process_concurrent_changes(EState *estate,
6570 CatalogState * cat_state ,
6671 Relation rel_dst , ScanKey ident_key ,
6772 int ident_key_nentries ,
73+ Relation ident_index ,
74+ TupleTableSlot * ind_slot ,
6875 LOCKMODE lock_held ,
6976 partitions_hash * partitions ,
7077 TupleConversionMap * conv_map ,
@@ -97,9 +104,9 @@ pg_rewrite_process_concurrent_changes(EState *estate,
97104 * processing partway through. Partial cleanup of the tuplestore seems
98105 * non-trivial.
99106 */
100- apply_concurrent_changes (estate , mtstate , proute ,
101- dstate , ident_key ,
102- ident_key_nentries , partitions , conv_map );
107+ apply_concurrent_changes (estate , mtstate , proute , rel_dst ,
108+ dstate , ident_key , ident_key_nentries ,
109+ ident_index , ind_slot , partitions , conv_map );
103110 }
104111
105112 return true;
@@ -198,17 +205,25 @@ pg_rewrite_decode_concurrent_changes(LogicalDecodingContext *ctx,
198205static void
199206apply_concurrent_changes (EState * estate , ModifyTableState * mtstate ,
200207 struct PartitionTupleRouting * proute ,
208+ Relation rel_dst ,
201209 DecodingOutputState * dstate ,
202210 ScanKey key , int nkeys ,
211+ Relation ident_index ,
212+ TupleTableSlot * ind_slot ,
203213 partitions_hash * partitions ,
204214 TupleConversionMap * conv_map )
205215{
206216 TupleTableSlot * slot ;
217+ BulkInsertState bistate_nonpart = NULL ;
207218 HeapTuple tup_old = NULL ;
208219
209220 if (dstate -> nchanges == 0 )
210221 return ;
211222
223+ /* See perform_initial_load() */
224+ if (proute == NULL )
225+ bistate_nonpart = GetBulkInsertState ();
226+
212227 /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */
213228 slot = MakeSingleTupleTableSlot (dstate -> tupdesc , & TTSOpsHeapTuple );
214229
@@ -228,7 +243,7 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
228243 ConcurrentChange * change ;
229244 bool isnull [1 ];
230245 Datum values [1 ];
231- ResultRelInfo * rri ;
246+ ResultRelInfo * rri = NULL ;
232247
233248 /* Get the change from the single-column tuple. */
234249 tup_change = ExecFetchSlotHeapTuple (dstate -> tsslot , false, & shouldFree );
@@ -250,7 +265,7 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
250265 else if (change -> kind == CHANGE_INSERT )
251266 {
252267 List * recheck ;
253- Relation partition ;
268+ Relation rel_ins ;
254269 BulkInsertState bistate ;
255270
256271 Assert (tup_old == NULL );
@@ -259,14 +274,24 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
259274 if (conv_map )
260275 tup = convert_tuple_for_dest_table (tup , conv_map );
261276 ExecStoreHeapTuple (tup , slot , false);
262- rri = ExecFindPartition (mtstate , mtstate -> rootResultRelInfo ,
263- proute , slot , estate );
264- partition = rri -> ri_RelationDesc ;
277+ if (proute )
278+ {
279+ rri = ExecFindPartition (mtstate , mtstate -> rootResultRelInfo ,
280+ proute , slot , estate );
281+ rel_ins = rri -> ri_RelationDesc ;
265282
266- bistate = get_partition_insert_state (partitions ,
267- RelationGetRelid (partition ));
283+ bistate = get_partition_insert_state (partitions ,
284+ RelationGetRelid (rel_ins ));
285+ }
286+ else
287+ {
288+ /* Non-partitioned table. */
289+ rri = mtstate -> resultRelInfo ;
290+ rel_ins = rel_dst ;
291+ bistate = bistate_nonpart ;
292+ }
268293 Assert (bistate != NULL );
269- table_tuple_insert (partition , slot , GetCurrentCommandId (true), 0 ,
294+ table_tuple_insert (rel_ins , slot , GetCurrentCommandId (true), 0 ,
270295 bistate );
271296
272297#if PG_VERSION_NUM < 140000
@@ -318,7 +343,7 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
318343 ExecClearTuple (slot );
319344
320345 /* Is this a cross-partition update? */
321- if (change -> kind == CHANGE_UPDATE_NEW && tup_old )
346+ if (partitions && change -> kind == CHANGE_UPDATE_NEW && tup_old )
322347 {
323348 if (conv_map )
324349 tup_old = convert_tuple_for_dest_table (tup_old , conv_map );
@@ -385,8 +410,9 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
385410 ItemPointerData ctid ;
386411
387412 /*
388- * Both old and new tuple are in the same partition. Find the
389- * tuple to be updated or deleted.
413+ * Both old and new tuple are in the same partition (or the
414+ * target table is not partitioned). Find the tuple to be
415+ * updated or deleted.
390416 */
391417 if (change -> kind == CHANGE_UPDATE_NEW )
392418 tup_key = tup_old != NULL ? tup_old : tup ;
@@ -397,9 +423,12 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
397423 tup_key = tup ;
398424 }
399425
400- find_tuple_in_partition (tup_key , rri -> ri_RelationDesc ,
401- partitions , key , nkeys , & ctid );
402-
426+ if (partitions )
427+ find_tuple_in_partition (tup_key , rri -> ri_RelationDesc ,
428+ partitions , key , nkeys , & ctid );
429+ else
430+ find_tuple (tup_key , rel_dst , ident_index , key , nkeys ,
431+ & ctid , ind_slot );
403432
404433 if (change -> kind == CHANGE_UPDATE_NEW )
405434 {
@@ -451,6 +480,8 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
451480 }
452481 else
453482 {
483+ Assert (change -> kind == CHANGE_DELETE );
484+
454485 simple_heap_delete (rri -> ri_RelationDesc , & ctid );
455486
456487 /* Update the progress information. */
@@ -489,11 +520,13 @@ apply_concurrent_changes(EState *estate, ModifyTableState *mtstate,
489520
490521 /* Cleanup. */
491522 ExecDropSingleTupleTableSlot (slot );
523+ if (bistate_nonpart )
524+ FreeBulkInsertState (bistate_nonpart );
492525}
493526
494527/*
495- * Find tuple whose identity key is in 'tup_ind ' in partition 'partition ' and
496- * put its location into 'ctid'.
528+ * Find tuple whose identity key is passed as 'tup ' in relation 'rel ' and put
529+ * its location into 'ctid'.
497530 */
498531static void
499532find_tuple_in_partition (HeapTuple tup , Relation partition ,
@@ -502,24 +535,33 @@ find_tuple_in_partition(HeapTuple tup, Relation partition,
502535{
503536 Oid part_oid = RelationGetRelid (partition );
504537 PartitionEntry * entry ;
505- Relation ident_index ;
506- Form_pg_index ident_form ;
507- int2vector * ident_indkey ;
508- IndexScanDesc scan ;
509- int i ;
510- HeapTuple tup_exist ;
511538
512- /* Delete the old tuple from its partition. */
513539 entry = partitions_lookup (partitions , part_oid );
514540 if (entry == NULL )
515541 elog (ERROR , "identity index not found for partition %u" , part_oid );
516542 Assert (entry -> part_oid == part_oid );
517543
518- ident_index = entry -> ident_index ;
544+ find_tuple (tup , partition , entry -> ident_index , key , nkeys , ctid ,
545+ entry -> ind_slot );
546+ }
547+
548+ /*
549+ * Find tuple whose identity key is passed as 'tup' in relation 'rel' and put
550+ * its location into 'ctid'.
551+ */
552+ static void
553+ find_tuple (HeapTuple tup , Relation rel , Relation ident_index , ScanKey key ,
554+ int nkeys , ItemPointer ctid , TupleTableSlot * ind_slot )
555+ {
556+ Form_pg_index ident_form ;
557+ int2vector * ident_indkey ;
558+ IndexScanDesc scan ;
559+ int i ;
560+ HeapTuple tup_exist ;
561+
519562 ident_form = ident_index -> rd_index ;
520563 ident_indkey = & ident_form -> indkey ;
521- scan = index_beginscan (partition , ident_index , GetActiveSnapshot (), nkeys ,
522- 0 );
564+ scan = index_beginscan (rel , ident_index , GetActiveSnapshot (), nkeys , 0 );
523565 index_rescan (scan , key , nkeys , NULL , 0 );
524566
525567 /* Use the incoming tuple to finalize the scan key. */
@@ -533,15 +575,15 @@ find_tuple_in_partition(HeapTuple tup, Relation partition,
533575 attno_heap = ident_indkey -> values [i ];
534576 entry -> sk_argument = heap_getattr (tup ,
535577 attno_heap ,
536- partition -> rd_att ,
578+ rel -> rd_att ,
537579 & isnull );
538580 Assert (!isnull );
539581 }
540- if (index_getnext_slot (scan , ForwardScanDirection , entry -> ind_slot ))
582+ if (index_getnext_slot (scan , ForwardScanDirection , ind_slot ))
541583 {
542584 bool shouldFreeInd ;
543585
544- tup_exist = ExecFetchSlotHeapTuple (entry -> ind_slot , false,
586+ tup_exist = ExecFetchSlotHeapTuple (ind_slot , false,
545587 & shouldFreeInd );
546588 /* TTSOpsBufferHeapTuple has .get_heap_tuple != NULL. */
547589 Assert (!shouldFreeInd );
0 commit comments