Skip to content

Commit 8d542a9

Browse files
JohnCariburmecia
andauthored
fix: serialize data instead of pointer in plan_foreign_modify (supabase#548)
* fix: serialize data instead of pointer in plan_foreign_modify Fix PostgreSQL server crash when using prepared statements with foreign tables. PostgreSQL caches query plans after ~5-6 executions. The previous implementation stored a raw pointer to FdwModifyState in fdw_private, which became invalid after end_foreign_modify freed the state. Solution: - Add FdwModifyPrivate struct holding serializable reconstruction data - Serialize table OID, rowid info (not pointer) in plan_foreign_modify - Create fresh FdwModifyState in begin_foreign_modify for each execution Also includes stress test for issue supabase#482 that executes 15 parameterized INSERT operations to validate the fix. Fixes supabase#482 Related to supabase#237 * fix: serialize data instead of pointer in plan_foreign_modify Fixes PostgreSQL server crash when using prepared statements with foreign tables (issue supabase#482). The root cause was that PostgreSQL caches query plans after ~5-6 executions (generic plan optimization). The previous implementation stored a raw pointer to FdwModifyState in fdw_private, which became invalid after end_foreign_modify freed the state. Subsequent executions with cached plans dereferenced this stale pointer, causing a crash. Solution: - Add FdwModifyPrivate struct holding serializable reconstruction data - Serialize table OID, rowid info (not pointer) in plan_foreign_modify - Create fresh FdwModifyState in begin_foreign_modify for each execution This ensures each query execution gets a valid FDW instance and state, even when PostgreSQL reuses a cached query plan and skips planning. * test(clickhouse): add prepared statement stress test Validates fix for issue supabase#482 by executing 15 parameterized INSERT operations - enough to trigger PostgreSQL's generic plan caching. Before the fix, this would crash around iteration 7. * fix(clippy): inline format args in stress test Fix uninlined-format-args clippy warning by using inline variable in format string: format!("stress_{i}") instead of format!("stress_{}", i) * update Cargo.lock * format code --------- Co-authored-by: Bo Lu <[email protected]> Co-authored-by: Bo Lu <[email protected]>
1 parent 66dfb1e commit 8d542a9

3 files changed

Lines changed: 318 additions & 60 deletions

File tree

supabase-wrappers/src/modify.rs

Lines changed: 230 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use pgrx::pg_sys::panic::ErrorReport;
22
use pgrx::{
33
debug2,
4+
list::List,
45
memcxt::PgMemoryContexts,
56
pg_sys::{MemoryContext, MemoryContextData, Oid},
67
prelude::*,
78
rel::PgRelation,
89
tupdesc::PgTupleDesc,
9-
FromDatum, PgSqlErrorCode,
10+
FromDatum, IntoDatum, PgSqlErrorCode,
1011
};
1112
use std::collections::HashMap;
13+
use std::ffi::c_void;
1214
use std::marker::PhantomData;
1315
use std::os::raw::c_int;
1416
use std::ptr;
@@ -20,6 +22,154 @@ use super::memctx;
2022
use super::polyfill;
2123
use super::utils;
2224

25+
/// Serializable data for fdw_private in modify operations.
26+
/// This struct contains only data that can be safely serialized to a PostgreSQL List
27+
/// and survives query plan caching. The FdwModifyState is reconstructed from this
28+
/// data in begin_foreign_modify for each execution.
29+
struct FdwModifyPrivate {
30+
/// Foreign table OID - used to create FDW instance and fetch options
31+
foreigntableid: Oid,
32+
/// Row identifier column name
33+
rowid_name: String,
34+
/// Row identifier type OID
35+
rowid_typid: Oid,
36+
/// Update columns (pg13 only)
37+
#[cfg(feature = "pg13")]
38+
update_cols: Vec<String>,
39+
}
40+
41+
impl FdwModifyPrivate {
42+
/// Serialize FdwModifyPrivate to a PostgreSQL List of Const nodes.
43+
/// Format:
44+
/// - [0] INT4: foreigntableid as i32
45+
/// - [1] TEXT: rowid_name
46+
/// - [2] INT4: rowid_typid as i32
47+
/// - [3] INT4: update_cols_count (pg13 only)
48+
/// - [4..N] TEXT: update_cols entries (pg13 only)
49+
unsafe fn serialize_to_list(&self) -> *mut pg_sys::List {
50+
pgrx::memcx::current_context(|mcx| {
51+
let mut ret = List::<*mut c_void>::Nil;
52+
53+
// [0] foreigntableid as i32
54+
let cst = pg_sys::makeConst(
55+
pg_sys::INT4OID,
56+
-1,
57+
pg_sys::InvalidOid,
58+
4,
59+
(self.foreigntableid.to_u32() as i32).into_datum().unwrap(),
60+
false,
61+
true,
62+
);
63+
ret.unstable_push_in_context(cst as _, mcx);
64+
65+
// [1] rowid_name as TEXT
66+
let cst = pg_sys::makeConst(
67+
pg_sys::TEXTOID,
68+
-1,
69+
pg_sys::InvalidOid,
70+
-1,
71+
self.rowid_name.clone().into_datum().unwrap(),
72+
false,
73+
false,
74+
);
75+
ret.unstable_push_in_context(cst as _, mcx);
76+
77+
// [2] rowid_typid as i32
78+
let cst = pg_sys::makeConst(
79+
pg_sys::INT4OID,
80+
-1,
81+
pg_sys::InvalidOid,
82+
4,
83+
(self.rowid_typid.to_u32() as i32).into_datum().unwrap(),
84+
false,
85+
true,
86+
);
87+
ret.unstable_push_in_context(cst as _, mcx);
88+
89+
#[cfg(feature = "pg13")]
90+
{
91+
// [3] update_cols_count as i32
92+
let cst = pg_sys::makeConst(
93+
pg_sys::INT4OID,
94+
-1,
95+
pg_sys::InvalidOid,
96+
4,
97+
(self.update_cols.len() as i32).into_datum().unwrap(),
98+
false,
99+
true,
100+
);
101+
ret.unstable_push_in_context(cst as _, mcx);
102+
103+
// [4..N] update_cols as TEXT
104+
for col in &self.update_cols {
105+
let cst = pg_sys::makeConst(
106+
pg_sys::TEXTOID,
107+
-1,
108+
pg_sys::InvalidOid,
109+
-1,
110+
col.clone().into_datum().unwrap(),
111+
false,
112+
false,
113+
);
114+
ret.unstable_push_in_context(cst as _, mcx);
115+
}
116+
}
117+
118+
ret.into_ptr()
119+
})
120+
}
121+
122+
/// Deserialize FdwModifyPrivate from a PostgreSQL List of Const nodes.
123+
unsafe fn deserialize_from_list(list: *mut pg_sys::List) -> Option<Self> {
124+
pgrx::memcx::current_context(|mcx| {
125+
let list = List::<*mut c_void>::downcast_ptr_in_memcx(list, mcx)?;
126+
127+
// [0] foreigntableid
128+
let cst_ptr = *list.get(0)? as *mut pg_sys::Const;
129+
let cst = *cst_ptr;
130+
let foreigntableid_i32 = i32::from_datum(cst.constvalue, cst.constisnull)?;
131+
let foreigntableid = Oid::from(foreigntableid_i32 as u32);
132+
133+
// [1] rowid_name
134+
let cst_ptr = *list.get(1)? as *mut pg_sys::Const;
135+
let cst = *cst_ptr;
136+
let rowid_name = String::from_datum(cst.constvalue, cst.constisnull)?;
137+
138+
// [2] rowid_typid
139+
let cst_ptr = *list.get(2)? as *mut pg_sys::Const;
140+
let cst = *cst_ptr;
141+
let rowid_typid_i32 = i32::from_datum(cst.constvalue, cst.constisnull)?;
142+
let rowid_typid = Oid::from(rowid_typid_i32 as u32);
143+
144+
#[cfg(feature = "pg13")]
145+
let update_cols = {
146+
// [3] update_cols_count
147+
let cst_ptr = *list.get(3)? as *mut pg_sys::Const;
148+
let cst = *cst_ptr;
149+
let count = i32::from_datum(cst.constvalue, cst.constisnull)? as usize;
150+
151+
// [4..N] update_cols
152+
let mut cols = Vec::with_capacity(count);
153+
for i in 0..count {
154+
let cst_ptr = *list.get(4 + i)? as *mut pg_sys::Const;
155+
let cst = *cst_ptr;
156+
let col = String::from_datum(cst.constvalue, cst.constisnull)?;
157+
cols.push(col);
158+
}
159+
cols
160+
};
161+
162+
Some(FdwModifyPrivate {
163+
foreigntableid,
164+
rowid_name,
165+
rowid_typid,
166+
#[cfg(feature = "pg13")]
167+
update_cols,
168+
})
169+
})
170+
}
171+
}
172+
23173
// Fdw private state for modify
24174
struct FdwModifyState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
25175
// foreign data wrapper instance
@@ -43,20 +193,6 @@ struct FdwModifyState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
43193
}
44194

45195
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
46-
unsafe fn new(foreigntableid: Oid, tmp_ctx: MemoryContext) -> Self {
47-
Self {
48-
instance: Some(instance::create_fdw_instance_from_table_id(foreigntableid)),
49-
rowid_name: String::default(),
50-
rowid_attno: 0,
51-
rowid_typid: Oid::INVALID,
52-
opts: HashMap::new(),
53-
tmp_ctx,
54-
_phantom: PhantomData,
55-
#[cfg(feature = "pg13")]
56-
update_cols: Vec::new(),
57-
}
58-
}
59-
60196
fn begin_modify(&mut self) -> Result<(), E> {
61197
if let Some(ref mut instance) = self.instance {
62198
instance.begin_modify(&self.opts)
@@ -98,8 +234,6 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
98234
}
99235
}
100236

101-
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> utils::SerdeList for FdwModifyState<E, W> {}
102-
103237
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> Drop for FdwModifyState<E, W> {
104238
fn drop(&mut self) {
105239
// drop foreign data wrapper instance
@@ -208,6 +342,7 @@ pub(super) extern "C-unwind" fn add_foreign_update_targets(
208342
}
209343

210344
#[pg_guard]
345+
#[allow(clippy::extra_unused_type_parameters)]
211346
pub(super) extern "C-unwind" fn plan_foreign_modify<
212347
E: Into<ErrorReport>,
213348
W: ForeignDataWrapper<E>,
@@ -233,17 +368,7 @@ pub(super) extern "C-unwind" fn plan_foreign_modify<
233368
let rel = PgRelation::with_lock((*rte).relid, pg_sys::NoLock as _);
234369

235370
let ftable = pg_sys::GetForeignTable(rel.oid());
236-
let mut opts = options_to_hashmap((*ftable).options).report_unwrap();
237-
238-
// add additional metadata to the options
239-
opts.insert(
240-
"wrappers.fserver_oid".into(),
241-
(*ftable).serverid.to_u32().to_string(),
242-
);
243-
opts.insert(
244-
"wrappers.ftable_oid".into(),
245-
(*ftable).relid.to_u32().to_string(),
246-
);
371+
let opts = options_to_hashmap((*ftable).options).report_unwrap();
247372

248373
// check if the rowid column name is specified in table options
249374
let rowid_name = opts.get("rowid_column");
@@ -256,27 +381,17 @@ pub(super) extern "C-unwind" fn plan_foreign_modify<
256381
}
257382
let rowid_name = rowid_name.unwrap();
258383

259-
// search for rowid attribute in tuple descrition
384+
// search for rowid attribute in tuple description
260385
let tup_desc = PgTupleDesc::from_relation(&rel);
261386
for attr in tup_desc.iter().filter(|a| !a.attisdropped) {
262387
let attname = pgrx::name_data_to_str(&attr.attname);
263388
if attname == rowid_name {
264-
let ftable_id = rel.oid();
265-
266-
// create memory context for modify
267-
let ctx_name = format!("Wrappers_modify_{}", ftable_id.to_u32());
268-
let ctx = memctx::create_wrappers_memctx(&ctx_name);
269-
270-
// create modify state
271-
let mut state = FdwModifyState::<E, W>::new(ftable_id, ctx);
272-
273-
state.rowid_name = rowid_name.to_string();
274-
state.rowid_typid = attr.atttypid;
275-
state.opts = opts;
389+
let foreigntableid = rel.oid();
276390

391+
// Collect update columns for pg13
277392
#[cfg(feature = "pg13")]
278-
{
279-
// get update column list
393+
let update_cols = {
394+
let mut cols = Vec::new();
280395
let tgts: pgrx::PgList<pg_sys::TargetEntry> =
281396
pgrx::PgList::from_pg((*(*root).parse).targetList);
282397
for tgt in tgts.iter_ptr() {
@@ -285,22 +400,28 @@ pub(super) extern "C-unwind" fn plan_foreign_modify<
285400
.unwrap()
286401
.to_owned();
287402
if !(*tgt).resjunk {
288-
state.update_cols.push(col_name);
403+
cols.push(col_name);
289404
}
290405
}
291-
}
292-
293-
// box the modify state and 'serialize' state to a list, the state
294-
// pointer is stored as an integer constant in the list, so it can be
295-
// `deserialized` when executing the plan later.
296-
// Note that the state itself is not serialized to any memory contexts,
297-
// it just sits in Rust managed Box'ed memory and will be dropped when
298-
// end_foreign_modify() is called.
299-
return PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
300-
let p = Box::leak(Box::new(state)) as *mut FdwModifyState<E, W>;
301-
let state = PgBox::<FdwModifyState<E, W>>::from_pg(p as _);
302-
FdwModifyState::serialize_to_list(state)
303-
});
406+
cols
407+
};
408+
409+
// Create FdwModifyPrivate with serializable data only.
410+
// This data will survive PostgreSQL's query plan caching because
411+
// we serialize actual data values, not pointers.
412+
let private = FdwModifyPrivate {
413+
foreigntableid,
414+
rowid_name: rowid_name.to_string(),
415+
rowid_typid: attr.atttypid,
416+
#[cfg(feature = "pg13")]
417+
update_cols,
418+
};
419+
420+
// Serialize the data to a PostgreSQL List.
421+
// The actual FdwModifyState will be created in begin_foreign_modify
422+
// for each execution, ensuring fresh state even when the query plan
423+
// is cached and this planning stage is skipped.
424+
return private.serialize_to_list();
304425
}
305426
}
306427

@@ -331,8 +452,54 @@ pub(super) extern "C-unwind" fn begin_foreign_modify<
331452
}
332453

333454
unsafe {
334-
let mut state = FdwModifyState::<E, W>::deserialize_from_list(fdw_private as _);
335-
assert!(!state.is_null());
455+
// Deserialize FdwModifyPrivate from the cached fdw_private list.
456+
// This contains the data needed to reconstruct the FdwModifyState.
457+
let private = FdwModifyPrivate::deserialize_from_list(fdw_private);
458+
if private.is_none() {
459+
report_error(
460+
PgSqlErrorCode::ERRCODE_FDW_ERROR,
461+
"invalid fdw_private data in begin_foreign_modify",
462+
);
463+
return;
464+
}
465+
let private = private.unwrap();
466+
467+
// Create a fresh memory context for this execution.
468+
// This ensures proper cleanup even when the query plan was cached.
469+
let ctx_name = format!("Wrappers_modify_{}", private.foreigntableid.to_u32());
470+
let tmp_ctx = memctx::create_wrappers_memctx(&ctx_name);
471+
472+
// Create a fresh FDW instance from the foreign table ID.
473+
// This is done here (not in plan_foreign_modify) so that we always have
474+
// a valid instance even when PostgreSQL reuses a cached query plan.
475+
let fdw_instance: W = instance::create_fdw_instance_from_table_id(private.foreigntableid);
476+
477+
// Fetch foreign table options fresh for this execution
478+
let ftable = pg_sys::GetForeignTable(private.foreigntableid);
479+
let mut opts = options_to_hashmap((*ftable).options).report_unwrap();
480+
481+
// add additional metadata to the options
482+
opts.insert(
483+
"wrappers.fserver_oid".into(),
484+
(*ftable).serverid.to_u32().to_string(),
485+
);
486+
opts.insert(
487+
"wrappers.ftable_oid".into(),
488+
(*ftable).relid.to_u32().to_string(),
489+
);
490+
491+
// Create the FdwModifyState with fresh data
492+
let mut state = FdwModifyState::<E, W> {
493+
instance: Some(fdw_instance),
494+
rowid_name: private.rowid_name,
495+
rowid_attno: 0, // Will be set below
496+
rowid_typid: private.rowid_typid,
497+
opts,
498+
tmp_ctx,
499+
_phantom: PhantomData,
500+
#[cfg(feature = "pg13")]
501+
update_cols: private.update_cols,
502+
};
336503

337504
// search for rowid attribute number
338505
#[cfg(feature = "pg13")]
@@ -343,6 +510,10 @@ pub(super) extern "C-unwind" fn begin_foreign_modify<
343510
state.rowid_attno =
344511
pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c);
345512

513+
// Box the state and call begin_modify
514+
let state_ptr = Box::leak(Box::new(state));
515+
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(state_ptr as _);
516+
346517
let result = state.begin_modify();
347518
if result.is_err() {
348519
drop_fdw_modify_state(state.as_ptr());

wasm-wrappers/fdw/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)