@@ -79,6 +79,11 @@ static void rewrite_table_impl(char *relschema_src, char *relname_src,
7979 char * relname_new , char * relschema_dst ,
8080 char * relname_dst );
8181static Relation get_identity_index (Relation rel_dst , Relation rel_src );
82+ static partitions_hash * get_partitions (Relation rel_src , Relation rel_dst ,
83+ int * nparts ,
84+ Relation * * parts_dst_p ,
85+ ScanKey * ident_key_p ,
86+ int * ident_key_nentries );
8287static int index_cat_info_compare (const void * arg1 , const void * arg2 );
8388
8489/* The WAL segment being decoded. */
@@ -831,11 +836,10 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
831836 RangeVar * relrv ;
832837 Relation rel_src ,
833838 rel_dst ;
834- PartitionDesc part_desc = NULL ;
835839 Oid ident_idx_src ;
836840 Oid relid_src ;
837841 Relation ident_index = NULL ;
838- ScanKey ident_key = NULL ;
842+ ScanKey ident_key ;
839843 TupleTableSlot * ind_slot = NULL ;
840844 int i ,
841845 ident_key_nentries = 0 ;
@@ -852,6 +856,7 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
852856 IndexCatInfo * ind_info ;
853857 bool source_finalized ;
854858 Relation * parts_dst = NULL ;
859+ int nparts ;
855860 partitions_hash * partitions = NULL ;
856861 TupleConversionMapExt * conv_map ;
857862 EState * estate ;
@@ -979,31 +984,6 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
979984 (errcode (ERRCODE_WRONG_OBJECT_TYPE ),
980985 errmsg ("\"%s\" is not a permanent table" , relname_dst )));
981986
982- /*
983- * Are we going to route the data into partitions?
984- */
985- if (rel_dst -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
986- {
987- #if PG_VERSION_NUM >= 140000
988- part_desc = RelationGetPartitionDesc (rel_dst , true);
989- #else
990- part_desc = RelationGetPartitionDesc (rel_dst );
991- #endif
992- if (part_desc -> nparts == 0 )
993- ereport (ERROR ,
994- (errmsg ("table \"%s\" has no partitions" , relname_dst )));
995-
996- /*
997- * It's probably not necessary to lock the partitions in exclusive
998- * mode, but we'll need to open them later. Simply use the exclusive
999- * lock instead of trying to determine the minimum lock level needed.
1000- */
1001- parts_dst = (Relation * ) palloc (part_desc -> nparts * sizeof (Relation ));
1002- for (i = 0 ; i < part_desc -> nparts ; i ++ )
1003- parts_dst [i ] = table_open (part_desc -> oids [i ],
1004- AccessExclusiveLock );
1005- }
1006-
1007987 /*
1008988 * Build a "historic snapshot", i.e. one that reflect the table state at
1009989 * the moment the snapshot builder reached SNAPBUILD_CONSISTENT state.
@@ -1027,64 +1007,12 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
10271007 conv_map = convert_tuples_by_name_ext (CreateTupleDescCopy (RelationGetDescr (rel_src )),
10281008 RelationGetDescr (rel_dst ));
10291009
1030- /* TODO Move this branch into a function. */
1031- if (part_desc )
1032- {
1033- /*
1034- * Pointers to identity indexes will be looked up by the partition
1035- * relation OID.
1036- */
1037- partitions = partitions_create (CurrentMemoryContext , 8 , NULL );
1038-
1039- /*
1040- * Gather partition information that we'll need later. It happens here
1041- * because it's a good opportunity to check the partition tuple
1042- * descriptors and identity indexes before the initial load starts. (The
1043- * load does not need those indexes, but it'd be unfortunate to find out
1044- * incorrect or missing identity index after the initial load has been
1045- * performed.)
1046- */
1047- for (i = 0 ; i < part_desc -> nparts ; i ++ )
1048- {
1049- Relation partition = parts_dst [i ];
1050- PartitionEntry * entry ;
1051- bool found ;
1052-
1053- /* Info on partitions. */
1054- entry = partitions_insert (partitions , RelationGetRelid (partition ),
1055- & found );
1056- Assert (!found );
1057-
1058- /*
1059- * Identity of the rows of a foreign table is hard to implement for
1060- * foreign tables. We'd hit the problem below, but it's clearer to
1061- * report the problem this way.
1062- */
1063- if (partition -> rd_rel -> relkind == RELKIND_FOREIGN_TABLE )
1064- ereport (ERROR ,
1065- (errmsg ("\"%s\" is a foreign table" ,
1066- RelationGetRelationName (partition ))));
1067-
1068- entry -> ident_index = get_identity_index (partition , rel_src );
1069- entry -> ind_slot = table_slot_create (partition , NULL );
1070- entry -> conv_map =
1071- convert_tuples_by_name_ext (RelationGetDescr (rel_dst ),
1072- RelationGetDescr (partition ));
1073- /* Expect many insertions. */
1074- entry -> bistate = GetBulkInsertState ();
1075-
1076- /*
1077- * Build scan key that we'll use to look for rows to be updated /
1078- * deleted during logical decoding.
1079- *
1080- * As all the partitions have the same definition of the identity
1081- * index, there should only be a single identity key.
1082- */
1083- if (ident_key == NULL )
1084- ident_key = build_identity_key (entry -> ident_index ,
1085- & ident_key_nentries );
1086- }
1087- }
1010+ /*
1011+ * Are we going to route the data into partitions?
1012+ */
1013+ if (rel_dst -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
1014+ partitions = get_partitions (rel_src , rel_dst , & nparts , & parts_dst ,
1015+ & ident_key , & ident_key_nentries );
10881016 else
10891017 {
10901018 ident_index = get_identity_index (rel_dst , rel_src );
@@ -1314,11 +1242,11 @@ rewrite_table_impl(char *relschema_src, char *relname_src,
13141242 */
13151243 table_close (rel_dst , AccessExclusiveLock );
13161244
1317- if (part_desc )
1245+ if (partitions )
13181246 {
13191247 close_partitions (partitions );
13201248
1321- for (i = 0 ; i < part_desc -> nparts ; i ++ )
1249+ for (i = 0 ; i < nparts ; i ++ )
13221250 table_close (parts_dst [i ], AccessExclusiveLock );
13231251 pfree (parts_dst );
13241252 }
@@ -1407,6 +1335,103 @@ get_identity_index(Relation rel_dst, Relation rel_src)
14071335 return index_dst ;
14081336}
14091337
1338+ /*
1339+ * Retrieve information needed to apply DML commands to partitioned table.
1340+ */
1341+ static partitions_hash *
1342+ get_partitions (Relation rel_src , Relation rel_dst , int * nparts ,
1343+ Relation * * parts_dst_p , ScanKey * ident_key_p ,
1344+ int * ident_key_nentries )
1345+ {
1346+ partitions_hash * partitions ;
1347+ Relation * parts_dst ;
1348+ ScanKey ident_key = NULL ;
1349+ PartitionDesc part_desc ;
1350+
1351+ #if PG_VERSION_NUM >= 140000
1352+ part_desc = RelationGetPartitionDesc (rel_dst , true);
1353+ #else
1354+ part_desc = RelationGetPartitionDesc (rel_dst );
1355+ #endif
1356+ if (part_desc -> nparts == 0 )
1357+ ereport (ERROR ,
1358+ (errmsg ("table \"%s\" has no partitions" ,
1359+ RelationGetRelationName (rel_dst ))));
1360+
1361+ /*
1362+ * It's probably not necessary to lock the partitions in exclusive mode,
1363+ * but we'll need to open them later. Simply use the exclusive lock
1364+ * instead of trying to determine the minimum lock level needed.
1365+ */
1366+ parts_dst = (Relation * ) palloc (part_desc -> nparts * sizeof (Relation ));
1367+ for (int i = 0 ; i < part_desc -> nparts ; i ++ )
1368+ parts_dst [i ] = table_open (part_desc -> oids [i ],
1369+ AccessExclusiveLock );
1370+
1371+ /*
1372+ * Pointers to identity indexes will be looked up by the partition
1373+ * relation OID.
1374+ */
1375+ partitions = partitions_create (CurrentMemoryContext , 8 , NULL );
1376+
1377+ /*
1378+ * Gather partition information that we'll need later. It happens here
1379+ * because it's a good opportunity to check the partition tuple
1380+ * descriptors and identity indexes before the initial load starts. (The
1381+ * load does not need those indexes, but it'd be unfortunate to find out
1382+ * incorrect or missing identity index after the initial load has been
1383+ * performed.)
1384+ */
1385+ for (int i = 0 ; i < part_desc -> nparts ; i ++ )
1386+ {
1387+ Relation partition = parts_dst [i ];
1388+ PartitionEntry * entry ;
1389+ bool found ;
1390+
1391+ /* Info on partitions. */
1392+ entry = partitions_insert (partitions , RelationGetRelid (partition ),
1393+ & found );
1394+ Assert (!found );
1395+
1396+ /*
1397+ * Identity of the rows of a foreign table is hard to implement for
1398+ * foreign tables. We'd hit the problem below, but it's clearer to
1399+ * report the problem this way.
1400+ */
1401+ if (partition -> rd_rel -> relkind == RELKIND_FOREIGN_TABLE )
1402+ ereport (ERROR ,
1403+ (errmsg ("\"%s\" is a foreign table" ,
1404+ RelationGetRelationName (partition ))));
1405+
1406+ entry -> ident_index = get_identity_index (partition , rel_src );
1407+ entry -> ind_slot = table_slot_create (partition , NULL );
1408+ entry -> conv_map =
1409+ convert_tuples_by_name_ext (RelationGetDescr (rel_dst ),
1410+ RelationGetDescr (partition ));
1411+ /* Expect many insertions. */
1412+ entry -> bistate = GetBulkInsertState ();
1413+
1414+ /*
1415+ * Build scan key that we'll use to look for rows to be updated /
1416+ * deleted during logical decoding.
1417+ *
1418+ * As all the partitions have the same definition of the identity
1419+ * index, there should only be a single identity key.
1420+ */
1421+ if (ident_key == NULL )
1422+ {
1423+ ident_key = build_identity_key (entry -> ident_index ,
1424+ ident_key_nentries );
1425+ * ident_key_p = ident_key ;
1426+ }
1427+ }
1428+
1429+ * nparts = part_desc -> nparts ;
1430+ * parts_dst_p = parts_dst ;
1431+
1432+ return partitions ;
1433+ }
1434+
14101435static int
14111436index_cat_info_compare (const void * arg1 , const void * arg2 )
14121437{
0 commit comments