@@ -72,7 +72,7 @@ PG_MODULE_MAGIC;
7272#define REPL_PLUGIN_NAME "pg_rewrite"
7373
7474static void partition_table_impl (char * relschema_src , char * relname_src ,
75- char * relname_src_new , char * relschema_dst ,
75+ char * relname_new , char * relschema_dst ,
7676 char * relname_dst );
7777static int index_cat_info_compare (const void * arg1 , const void * arg2 );
7878
@@ -108,6 +108,10 @@ static void worker_shmem_request(void);
108108static void worker_shmem_startup (void );
109109static void worker_shmem_shutdown (int code , Datum arg );
110110
111+ static WorkerTask * get_task (int * idx , char * relschema , char * relname );
112+ static void initialize_worker (BackgroundWorker * worker , int task_idx );
113+ static void run_worker (BackgroundWorker * worker , WorkerTask * task );
114+
111115static void check_prerequisites (Relation rel );
112116static LogicalDecodingContext * setup_decoding (Oid relid , TupleDesc tup_desc );
113117static void decoding_cleanup (LogicalDecodingContext * ctx );
@@ -349,60 +353,16 @@ worker_shmem_shutdown(int code, Datum arg)
349353 }
350354}
351355
352- /* PG >= 14 does define this macro. */
353- #if PG_VERSION_NUM < 140000
354- #define RelationIsPermanent (relation ) \
355- ((relation)->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)
356- #endif
357-
358356/*
359- * Start the background worker and wait until it exits .
357+ * Find a free task structure and initialize the common fields .
360358 */
361- extern Datum partition_table_new (PG_FUNCTION_ARGS );
362- PG_FUNCTION_INFO_V1 (partition_table_new );
363- Datum
364- partition_table_new (PG_FUNCTION_ARGS )
359+ static WorkerTask *
360+ get_task (int * idx , char * relschema , char * relname )
365361{
366- text * rel_src_t , * rel_src_new_t , * rel_dst_t ;
367- RangeVar * rv_src , * rv_src_new , * rv_dst ;
368- BackgroundWorker worker ;
369- BackgroundWorkerHandle * handle ;
370- pid_t pid ;
371- BgwHandleStatus status ;
372- Oid dbid , roleid ;
373- char * dbname ;
374362 int i ;
375363 WorkerTask * task = NULL ;
376364 bool found = false;
377- char * msg = NULL ;
378-
379- rel_src_t = PG_GETARG_TEXT_PP (0 );
380- rv_src = makeRangeVarFromNameList (textToQualifiedNameList (rel_src_t ));
381-
382- rel_dst_t = PG_GETARG_TEXT_PP (1 );
383- rv_dst = makeRangeVarFromNameList (textToQualifiedNameList (rel_dst_t ));
384-
385- rel_src_new_t = PG_GETARG_TEXT_PP (2 );
386- rv_src_new = makeRangeVarFromNameList (textToQualifiedNameList (rel_src_new_t ));
387-
388- if (rv_src -> catalogname || rv_dst -> catalogname || rv_src_new -> catalogname )
389- ereport (ERROR ,
390- (errmsg ("relation may only be qualified by schema, not by database" )));
391-
392- /*
393- * Technically it's possible to move the source relation to another schema
394- * but don't bother for this version.
395- */
396- if (rv_src_new -> schemaname )
397- ereport (ERROR ,
398- (errcode (ERRCODE_INVALID_NAME ),
399- (errmsg ("the new source relation name may not be qualified" ))));
400-
401-
402- dbid = MyDatabaseId ;
403- roleid = GetUserId ();
404365
405- /* Find free task structure. */
406366 for (i = 0 ; i < MAX_TASKS ; i ++ )
407367 {
408368 task = & workerTasks [i ];
@@ -428,58 +388,61 @@ partition_table_new(PG_FUNCTION_ARGS)
428388 if (!found )
429389 ereport (ERROR , (errmsg ("too many concurrent tasks in progress" )));
430390
431- worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
391+ /* Finalize the task. */
392+ task -> roleid = GetUserId ();
393+ task -> exit_requested = false;
394+ if (relschema )
395+ namestrcpy (& task -> relschema , relschema );
396+ else
397+ NameStr (task -> relschema )[0 ] = '\0' ;
398+ namestrcpy (& task -> relname , relname );
399+
400+ task -> msg [0 ] = '\0' ;
401+
402+ * idx = i ;
403+ return task ;
404+ }
405+
406+ static void
407+ initialize_worker (BackgroundWorker * worker , int task_idx )
408+ {
409+ char * dbname ;
410+
411+ worker -> bgw_flags = BGWORKER_SHMEM_ACCESS |
432412 BGWORKER_BACKEND_DATABASE_CONNECTION ;
433- worker . bgw_start_time = BgWorkerStart_RecoveryFinished ;
434- worker . bgw_restart_time = BGW_NEVER_RESTART ;
435- sprintf (worker . bgw_library_name , "pg_rewrite" );
436- sprintf (worker . bgw_function_name , "rewrite_worker_main" );
413+ worker -> bgw_start_time = BgWorkerStart_RecoveryFinished ;
414+ worker -> bgw_restart_time = BGW_NEVER_RESTART ;
415+ sprintf (worker -> bgw_library_name , "pg_rewrite" );
416+ sprintf (worker -> bgw_function_name , "rewrite_worker_main" );
437417
438418 /*
439419 * XXX The function can throw ERROR but the database should really exist,
440420 * so no need to put this code in the PG_TRY block.
441421 */
442- dbname = get_database_name (dbid );
443- snprintf (worker . bgw_name , BGW_MAXLEN ,
422+ dbname = get_database_name (MyDatabaseId );
423+ snprintf (worker -> bgw_name , BGW_MAXLEN ,
444424 "pg_rewrite worker for database %s" , dbname );
445- snprintf (worker .bgw_type , BGW_MAXLEN , "pg_rewrite worker" );
446-
447- Assert (i < MAX_TASKS );
448- worker .bgw_main_arg = (Datum ) i ;
449-
450- worker .bgw_notify_pid = MyProcPid ;
451-
452- /* Finalize the task. */
453- task -> roleid = roleid ;
454- task -> exit_requested = false;
455- if (rv_src -> schemaname )
456- namestrcpy (& task -> relschema_src , rv_src -> schemaname );
457- else
458- NameStr (task -> relschema_src )[0 ] = '\0' ;
459- namestrcpy (& task -> relname_src , rv_src -> relname );
460- if (rv_dst -> schemaname )
461- namestrcpy (& task -> relschema_dst , rv_dst -> schemaname );
462- else
463- NameStr (task -> relschema_dst )[0 ] = '\0' ;
464- namestrcpy (& task -> relname_dst , rv_dst -> relname );
465- namestrcpy (& task -> relname_src_new , rv_src_new -> relname );
425+ snprintf (worker -> bgw_type , BGW_MAXLEN , "pg_rewrite worker" );
466426
467- task -> msg [0 ] = '\0' ;
427+ worker -> bgw_main_arg = (Datum ) task_idx ;
428+ worker -> bgw_notify_pid = MyProcPid ;
429+ }
468430
469- /*
470- * The worker does not reload the configuration, so we pass these GUC
471- * setting via the shared memory.
472- */
473- task -> wait_after_load = rewrite_wait_after_load ;
474- task -> check_constraints = rewrite_check_constraints ;
431+ static void
432+ run_worker (BackgroundWorker * worker , WorkerTask * task )
433+ {
434+ BackgroundWorkerHandle * handle ;
435+ BgwHandleStatus status ;
436+ pid_t pid ;
437+ char * msg = NULL ;
475438
476439 /*
477440 * Start the worker. Avoid leaking the task if the function ends due to
478441 * ERROR.
479442 */
480443 PG_TRY ();
481444 {
482- if (!RegisterDynamicBackgroundWorker (& worker , & handle ))
445+ if (!RegisterDynamicBackgroundWorker (worker , & handle ))
483446 ereport (ERROR ,
484447 (errcode (ERRCODE_INSUFFICIENT_RESOURCES ),
485448 errmsg ("could not register background process" ),
@@ -504,9 +467,7 @@ partition_table_new(PG_FUNCTION_ARGS)
504467 if (status == BGWH_STOPPED )
505468 {
506469 /* Work already done? */
507- release_task (task );
508-
509- PG_RETURN_VOID ();
470+ goto done ;
510471 }
511472 else if (status == BGWH_POSTMASTER_DIED )
512473 {
@@ -552,6 +513,7 @@ partition_table_new(PG_FUNCTION_ARGS)
552513 */
553514 Assert (status == BGWH_STOPPED );
554515
516+ done :
555517 if (strlen (task -> msg ) > 0 )
556518 msg = pstrdup (task -> msg );
557519
@@ -561,6 +523,73 @@ partition_table_new(PG_FUNCTION_ARGS)
561523 if (msg )
562524 ereport (ERROR , (errmsg ("%s" , msg )));
563525
526+ }
527+
528+ /* PG >= 14 does define this macro. */
529+ #if PG_VERSION_NUM < 140000
530+ #define RelationIsPermanent (relation ) \
531+ ((relation)->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT)
532+ #endif
533+
534+ /*
535+ * Start the background worker and wait until it exits.
536+ */
537+ extern Datum partition_table_new (PG_FUNCTION_ARGS );
538+ PG_FUNCTION_INFO_V1 (partition_table_new );
539+ Datum
540+ partition_table_new (PG_FUNCTION_ARGS )
541+ {
542+ text * rel_src_t , * rel_src_new_t , * rel_dst_t ;
543+ RangeVar * rv_src , * rv_src_new , * rv_dst ;
544+ BackgroundWorker worker ;
545+ WorkerTask * task ;
546+ int task_idx ;
547+
548+ rel_src_t = PG_GETARG_TEXT_PP (0 );
549+ rv_src = makeRangeVarFromNameList (textToQualifiedNameList (rel_src_t ));
550+
551+ rel_dst_t = PG_GETARG_TEXT_PP (1 );
552+ rv_dst = makeRangeVarFromNameList (textToQualifiedNameList (rel_dst_t ));
553+
554+ rel_src_new_t = PG_GETARG_TEXT_PP (2 );
555+ rv_src_new = makeRangeVarFromNameList (textToQualifiedNameList (rel_src_new_t ));
556+
557+ if (rv_src -> catalogname || rv_dst -> catalogname || rv_src_new -> catalogname )
558+ ereport (ERROR ,
559+ (errmsg ("relation may only be qualified by schema, not by database" )));
560+
561+ /*
562+ * Technically it's possible to move the source relation to another schema
563+ * but don't bother for this version.
564+ */
565+ if (rv_src_new -> schemaname )
566+ ereport (ERROR ,
567+ (errcode (ERRCODE_INVALID_NAME ),
568+ (errmsg ("the new source relation name may not be qualified" ))));
569+
570+ task = get_task (& task_idx , rv_src -> schemaname , rv_src -> relname );
571+ Assert (task_idx < MAX_TASKS );
572+ task -> kind = WORKER_TASK_PARTITION ;
573+
574+ /* Fill-in the partitioning specific fields. */
575+ if (rv_dst -> schemaname )
576+ namestrcpy (& task -> relschema_dst , rv_dst -> schemaname );
577+ else
578+ NameStr (task -> relschema_dst )[0 ] = '\0' ;
579+ namestrcpy (& task -> relname_dst , rv_dst -> relname );
580+ namestrcpy (& task -> relname_new , rv_src_new -> relname );
581+
582+ initialize_worker (& worker , task_idx );
583+
584+ /*
585+ * The worker does not reload the configuration, so we pass these GUC
586+ * setting via the shared memory.
587+ */
588+ task -> wait_after_load = rewrite_wait_after_load ;
589+ task -> check_constraints = rewrite_check_constraints ;
590+
591+ run_worker (& worker , task );
592+
564593 PG_RETURN_VOID ();
565594}
566595
@@ -570,7 +599,7 @@ rewrite_worker_main(Datum main_arg)
570599 Datum arg ;
571600 int i ;
572601 Oid dbid , roleid ;
573- char * relschema_src , * relname_src , * relname_src_new , * relschema_dst ,
602+ char * relschema , * relname , * relname_new , * relschema_dst ,
574603 * relname_dst ;
575604 WorkerTask * task ;
576605
@@ -597,10 +626,10 @@ rewrite_worker_main(Datum main_arg)
597626 * worker. Let's copy the arguments so that we have a consistent view -
598627 * see the explanation below.
599628 */
600- relschema_src = NameStr (task -> relschema_src );
601- relschema_src = * relschema_src != '\0' ? pstrdup (relschema_src ) : NULL ;
602- relname_src = pstrdup (NameStr (task -> relname_src ));
603- relname_src_new = pstrdup (NameStr (task -> relname_src_new ));
629+ relschema = NameStr (task -> relschema );
630+ relschema = * relschema != '\0' ? pstrdup (relschema ) : NULL ;
631+ relname = pstrdup (NameStr (task -> relname ));
632+ relname_new = pstrdup (NameStr (task -> relname_new ));
604633
605634 relschema_dst = NameStr (task -> relschema_dst );
606635 relschema_dst = * relschema_dst != '\0' ? pstrdup (relschema_dst ) : NULL ;
@@ -641,9 +670,9 @@ rewrite_worker_main(Datum main_arg)
641670 StartTransactionCommand ();
642671 PG_TRY ();
643672 {
644- partition_table_impl ( relschema_src , relname_src ,
645- relname_src_new , relschema_dst , relname_dst );
646-
673+ Assert ( MyWorkerTask -> kind == WORKER_TASK_PARTITION );
674+ partition_table_impl ( relschema , relname , relname_new ,
675+ relschema_dst , relname_dst );
647676 CommitTransactionCommand ();
648677 }
649678 PG_CATCH ();
@@ -744,7 +773,7 @@ pg_rewrite_exit_if_requested(void)
744773 */
745774static void
746775partition_table_impl (char * relschema_src , char * relname_src ,
747- char * relname_src_new , char * relschema_dst ,
776+ char * relname_new , char * relschema_dst ,
748777 char * relname_dst )
749778{
750779 RangeVar * relrv ;
@@ -1262,7 +1291,7 @@ partition_table_impl(char *relschema_src, char *relname_src,
12621291 pfree (ident_key );
12631292
12641293 /* Rename the tables. */
1265- RenameRelationInternal (relid_src , relname_src_new , false, false);
1294+ RenameRelationInternal (relid_src , relname_new , false, false);
12661295
12671296 /*
12681297 * The new relation will be renamed to the old one, so make sure renaming
@@ -3924,17 +3953,17 @@ pg_rewrite_get_task_list(PG_FUNCTION_ARGS)
39243953
39253954 memset (isnull , false, TASK_LIST_RES_ATTRS * sizeof (bool ));
39263955
3927- if (strlen (NameStr (task -> relschema_src )) > 0 )
3928- values [0 ] = NameGetDatum (& task -> relschema_src );
3956+ if (strlen (NameStr (task -> relschema )) > 0 )
3957+ values [0 ] = NameGetDatum (& task -> relschema );
39293958 else
39303959 isnull [0 ] = true;
3931- values [1 ] = NameGetDatum (& task -> relname_src );
3960+ values [1 ] = NameGetDatum (& task -> relname );
39323961 if (strlen (NameStr (task -> relschema_dst )) > 0 )
39333962 values [2 ] = NameGetDatum (& task -> relschema_dst );
39343963 else
39353964 isnull [2 ] = true;
39363965 values [3 ] = NameGetDatum (& task -> relname_dst );
3937- values [4 ] = NameGetDatum (& task -> relname_src_new );
3966+ values [4 ] = NameGetDatum (& task -> relname_new );
39383967
39393968 values [5 ] = Int64GetDatum (progress -> ins_initial );
39403969 values [6 ] = Int64GetDatum (progress -> ins );
@@ -3987,7 +4016,7 @@ pg_rewrite_get_task_list(PG_FUNCTION_ARGS)
39874016 else
39884017 isnull [2 ] = true;
39894018 values [3 ] = NameGetDatum (& task -> relname_dst );
3990- values [4 ] = NameGetDatum (& task -> relname_src_new );
4019+ values [4 ] = NameGetDatum (& task -> relname_new );
39914020
39924021 values [5 ] = Int64GetDatum (progress -> ins_initial );
39934022 values [6 ] = Int64GetDatum (progress -> ins );
0 commit comments