Skip to content

Commit 96a0eab

Browse files
bnjjjburmecia
andauthored
scan: refresh parameter state and rescan on parameter changes (PARAM_EXTERN + PARAM_EXEC) (supabase#587)
* add support for param fingerprint to know if it changed or not during rescan Signed-off-by: Benjamin <[email protected]> * refactor: update parameter fingerprint to use String for better handling during rescan --------- Signed-off-by: Benjamin <[email protected]> Co-authored-by: Bo Lu <[email protected]>
1 parent f56b281 commit 96a0eab

1 file changed

Lines changed: 67 additions & 20 deletions

File tree

supabase-wrappers/src/scan.rs

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
5151
values: Vec<Datum>,
5252
nulls: Vec<bool>,
5353
row: Row,
54+
// fingerprint of current parameter values to detect rescan changes
55+
param_fingerprint: String,
5456
_phantom: PhantomData<E>,
5557
}
5658

@@ -67,6 +69,7 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwState<E, W> {
6769
values: Vec::new(),
6870
nulls: Vec::new(),
6971
row: Row::new(),
72+
param_fingerprint: String::new(),
7073
_phantom: PhantomData,
7174
}
7275
}
@@ -329,8 +332,8 @@ pub(super) extern "C-unwind" fn explain_foreign_scan<
329332
}
330333
}
331334

332-
// extract paramter value and assign it to qual in scan state
333-
unsafe fn assign_paramenter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
335+
// extract parameter value and assign it to qual in scan state
336+
unsafe fn assign_parameter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
334337
node: *mut pg_sys::ForeignScanState,
335338
state: &mut FdwState<E, W>,
336339
) {
@@ -341,19 +344,23 @@ unsafe fn assign_paramenter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E>
341344
// assign parameter value to qual
342345
for qual in &mut state.quals.iter_mut() {
343346
if let Some(param) = &mut qual.param {
347+
let mut current_value: Option<Value> = None;
344348
match param.kind {
345349
ParamKind::PARAM_EXTERN => {
346350
// get parameter list in execution state
347351
let plist_info = (*estate).es_param_list_info;
348-
if plist_info.is_null() {
349-
continue;
350-
}
351-
let params_cnt = (*plist_info).numParams as usize;
352-
let plist = (*plist_info).params.as_slice(params_cnt);
353-
let p: pg_sys::ParamExternData = plist[param.id - 1];
354-
if let Some(cell) = Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype)
355-
{
356-
qual.value = Value::Cell(cell);
352+
if !plist_info.is_null() {
353+
let params_cnt = (*plist_info).numParams as usize;
354+
if param.id > 0 && param.id <= params_cnt {
355+
let plist = (*plist_info).params.as_slice(params_cnt);
356+
let p: pg_sys::ParamExternData = plist[param.id - 1];
357+
if let Some(cell) =
358+
Cell::from_polymorphic_datum(p.value, p.isnull, p.ptype)
359+
{
360+
qual.value = Value::Cell(cell.clone());
361+
current_value = Some(Value::Cell(cell));
362+
}
363+
}
357364
}
358365
}
359366
ParamKind::PARAM_EXEC => {
@@ -370,21 +377,51 @@ unsafe fn assign_paramenter_value<E: Into<ErrorReport>, W: ForeignDataWrapper<E>
370377
) && let Some(cell) =
371378
Cell::from_polymorphic_datum(datum, isnull, param.type_oid)
372379
{
373-
let mut eval_value = param
374-
.eval_value
375-
.lock()
376-
.expect("param.eval_value should be locked");
377-
*eval_value = Some(Value::Cell(cell.clone()));
378-
qual.value = Value::Cell(cell);
380+
qual.value = Value::Cell(cell.clone());
381+
current_value = Some(Value::Cell(cell));
379382
}
380383
}
381384
_ => {}
382385
}
386+
387+
let mut eval_value = param
388+
.eval_value
389+
.lock()
390+
.expect("param.eval_value should be locked");
391+
*eval_value = current_value;
383392
}
384393
}
385394
}
386395
}
387396

397+
fn compute_param_fingerprint<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
398+
state: &FdwState<E, W>,
399+
) -> String {
400+
state
401+
.quals
402+
.iter()
403+
.filter_map(|qual| {
404+
qual.param.as_ref().map(|param| {
405+
let eval_value = match param.eval_value.lock() {
406+
Ok(value) => format!("{:?}", *value),
407+
Err(_) => "lock_error".to_string(),
408+
};
409+
format!(
410+
"{}|{}|{}|{}|{}|{}|{}",
411+
qual.field,
412+
qual.operator,
413+
qual.use_or,
414+
param.kind,
415+
param.id,
416+
param.type_oid,
417+
eval_value,
418+
)
419+
})
420+
})
421+
.collect::<Vec<_>>()
422+
.join(";")
423+
}
424+
388425
#[pg_guard]
389426
pub(super) extern "C-unwind" fn begin_foreign_scan<
390427
E: Into<ErrorReport>,
@@ -401,7 +438,8 @@ pub(super) extern "C-unwind" fn begin_foreign_scan<
401438
assert!(!state.is_null());
402439

403440
// assign parameter values to qual
404-
assign_paramenter_value(node, &mut state);
441+
assign_parameter_value(node, &mut state);
442+
state.param_fingerprint = compute_param_fingerprint(&state);
405443

406444
// begin scan if it is not EXPLAIN statement
407445
if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 {
@@ -440,7 +478,7 @@ pub(super) extern "C-unwind" fn iterate_foreign_scan<
440478
let mut state = PgBox::<FdwState<E, W>>::from_pg((*node).fdw_state as _);
441479

442480
// evaluate parameter values
443-
assign_paramenter_value(node, &mut state);
481+
assign_parameter_value(node, &mut state);
444482

445483
// clear slot
446484
let slot = (*node).ss.ss_ScanTupleSlot;
@@ -497,7 +535,16 @@ pub(super) extern "C-unwind" fn re_scan_foreign_scan<
497535
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
498536
if !fdw_state.is_null() {
499537
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
500-
let result = state.re_scan();
538+
assign_parameter_value(node, &mut state);
539+
let next_fingerprint = compute_param_fingerprint(&state);
540+
let result = if next_fingerprint != state.param_fingerprint {
541+
state.param_fingerprint = next_fingerprint;
542+
// end the active scan to release resources before restarting with new params
543+
let _ = state.end_scan();
544+
state.begin_scan()
545+
} else {
546+
state.re_scan()
547+
};
501548
if result.is_err() {
502549
drop_fdw_state(state.as_ptr());
503550
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;

0 commit comments

Comments
 (0)