Skip to content

Commit 66dfb1e

Browse files
JohnCariburmecia
andauthored
fix(clickhouse): implement re_scan for nested loop joins (supabase#546)
* fix(clickhouse): implement re_scan for nested loop joins Fixes supabase#532 PostgreSQL uses Nested Loop joins when joining foreign tables. During a Nested Loop join, the inner (right) table is scanned multiple times - once for each row of the outer table. The FDW framework calls re_scan() to restart the inner scan for each outer row. The ClickHouse FDW was using the default no-op re_scan() implementation, which meant that after the first scan completed, subsequent rescans returned no data because: - is_scan_complete was still true - row_receiver channel was exhausted - No new streaming task was spawned This fix implements re_scan() to restart the async streaming: - Abort existing streaming task if running - Reset scan state flags - Reinitialize the bounded channel - Spawn new streaming task with the same SQL query - Fetch the first row to initialize the rescan Also adds integration tests for inner, left, and right joins. * code refactor and upgrade version --------- Co-authored-by: Bo Lu <[email protected]>
1 parent 71eb745 commit 66dfb1e

3 files changed

Lines changed: 204 additions & 23 deletions

File tree

wrappers/src/fdw/clickhouse_fdw/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This is a foreign data wrapper for [ClickHouse](https://clickhouse.com/). It is
1111

1212
| Version | Date | Notes |
1313
| ------- | ---------- | ---------------------------------------------------- |
14+
| 0.1.10 | 2026-02-04 | Implement re_scan() for nested loop joins |
1415
| 0.1.9 | 2025-11-08 | Added stream_buffer_size foreign table option |
1516
| 0.1.8 | 2025-10-27 | Refactor to read rows with async streaming |
1617
| 0.1.7 | 2025-05-22 | Added more data types support |

wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ where
364364
}
365365

366366
#[wrappers_fdw(
367-
version = "0.1.9",
367+
version = "0.1.10",
368368
author = "Supabase",
369369
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/clickhouse_fdw",
370370
error_type = "ClickHouseFdwError"
@@ -386,6 +386,7 @@ pub(crate) struct ClickHouseFdw {
386386
tgt_cols: Vec<Column>,
387387
sql_query: String,
388388
params: Vec<Qual>,
389+
stream_buffer_size: usize,
389390
}
390391

391392
impl ClickHouseFdw {
@@ -547,6 +548,33 @@ impl ClickHouseFdw {
547548
Ok(())
548549
}
549550
}
551+
552+
// Helper method to set up the streaming mechanism
553+
fn setup_streaming(&mut self) -> ClickHouseFdwResult<()> {
554+
// Create bounded channel
555+
let (tx, rx) =
556+
channel::bounded::<ClickHouseFdwResult<Option<ConvertedRow>>>(self.stream_buffer_size);
557+
self.row_receiver = Some(rx);
558+
559+
// Clone data needed by the async task
560+
let conn_str = self.conn_str.clone();
561+
let sql = self.sql_query.clone();
562+
let tgt_cols = self.tgt_cols.clone();
563+
let params = self.params.clone();
564+
let tx_clone = tx.clone();
565+
566+
// Spawn the async streaming task
567+
let streaming_task = self.rt.spawn(async move {
568+
stream_data_to_channel(conn_str, sql, tgt_cols, params, tx_clone).await;
569+
});
570+
571+
self.streaming_task = Some(streaming_task);
572+
573+
// Fetch the first row to initialize the scan
574+
self.fetch_next_row()?;
575+
576+
Ok(())
577+
}
550578
}
551579

552580
impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
@@ -578,6 +606,7 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
578606
tgt_cols: Vec::new(),
579607
sql_query: String::new(),
580608
params: Vec::new(),
609+
stream_buffer_size: 1024,
581610
})
582611
}
583612

@@ -597,34 +626,14 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
597626
self.current_row_data = None;
598627

599628
// get stream buffer size from options, with validation
600-
let stream_buffer_size = options
629+
self.stream_buffer_size = options
601630
.get("stream_buffer_size")
602631
.map(|s| s.parse::<usize>().map(|size| size.clamp(1, 100_000)))
603632
.transpose()
604633
.map_err(ClickHouseFdwError::ParseIntError)?
605634
.unwrap_or(1024);
606635

607-
// create bounded channel
608-
let (tx, rx) =
609-
channel::bounded::<ClickHouseFdwResult<Option<ConvertedRow>>>(stream_buffer_size);
610-
self.row_receiver = Some(rx);
611-
612-
// clone data needed by the async task
613-
let conn_str = self.conn_str.clone();
614-
let sql = self.sql_query.clone();
615-
let tgt_cols = self.tgt_cols.clone();
616-
let params = self.params.clone();
617-
let tx_clone = tx.clone();
618-
619-
// spawn the async streaming task
620-
let streaming_task = self.rt.spawn(async move {
621-
stream_data_to_channel(conn_str, sql, tgt_cols, params, tx_clone).await;
622-
});
623-
624-
self.streaming_task = Some(streaming_task);
625-
626-
// fetch the first row to initialize the scan
627-
self.fetch_next_row()?;
636+
self.setup_streaming()?;
628637

629638
Ok(())
630639
}
@@ -669,6 +678,24 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
669678
Ok(())
670679
}
671680

681+
fn re_scan(&mut self) -> ClickHouseFdwResult<()> {
682+
// Abort existing streaming task if running
683+
if let Some(task) = self.streaming_task.take() {
684+
task.abort();
685+
}
686+
687+
// Drop existing channel receiver
688+
self.row_receiver = None;
689+
690+
// Reset state
691+
self.is_scan_complete = false;
692+
self.current_row_data = None;
693+
694+
self.setup_streaming()?;
695+
696+
Ok(())
697+
}
698+
672699
fn begin_modify(&mut self, options: &HashMap<String, String>) -> ClickHouseFdwResult<()> {
673700
self.table = require_option("table", options)?.to_string();
674701
self.rowid_col = require_option("rowid_column", options)?.to_string();

wrappers/src/fdw/clickhouse_fdw/tests.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,4 +485,157 @@ mod tests {
485485
.is_empty());
486486
});
487487
}
488+
489+
#[pg_test]
490+
fn clickhouse_join_test() {
491+
Spi::connect_mut(|c| {
492+
let clickhouse_pool = ch::Pool::new("tcp://default:default@localhost:9000/default");
493+
494+
let rt = create_async_runtime().expect("failed to create runtime");
495+
let mut handle = rt
496+
.block_on(async { clickhouse_pool.get_handle().await })
497+
.expect("handle");
498+
499+
// Create two tables in ClickHouse for join testing
500+
rt.block_on(async {
501+
handle.execute("DROP TABLE IF EXISTS join_t1").await?;
502+
handle.execute("DROP TABLE IF EXISTS join_t2").await?;
503+
handle
504+
.execute("CREATE TABLE join_t1 (k Int16) engine = Memory")
505+
.await?;
506+
handle
507+
.execute("CREATE TABLE join_t2 (k Int16) engine = Memory")
508+
.await?;
509+
// Insert values (1, 2, 3) into each table
510+
handle
511+
.execute("INSERT INTO join_t1 VALUES (1), (2), (3)")
512+
.await?;
513+
handle
514+
.execute("INSERT INTO join_t2 VALUES (1), (2), (3)")
515+
.await
516+
})
517+
.expect("join tables in ClickHouse");
518+
519+
// Set up FDW and server
520+
c.update(
521+
r#"CREATE FOREIGN DATA WRAPPER clickhouse_wrapper
522+
HANDLER click_house_fdw_handler VALIDATOR click_house_fdw_validator"#,
523+
None,
524+
&[],
525+
)
526+
.unwrap();
527+
c.update(
528+
r#"CREATE SERVER my_clickhouse_server
529+
FOREIGN DATA WRAPPER clickhouse_wrapper
530+
OPTIONS (
531+
conn_string 'tcp://default:default@localhost:9000/default'
532+
)"#,
533+
None,
534+
&[],
535+
)
536+
.unwrap();
537+
538+
// Create foreign tables for join testing
539+
c.update(
540+
r#"
541+
CREATE FOREIGN TABLE join_t1 (
542+
k smallint
543+
)
544+
SERVER my_clickhouse_server
545+
OPTIONS (table 'join_t1')
546+
"#,
547+
None,
548+
&[],
549+
)
550+
.unwrap();
551+
c.update(
552+
r#"
553+
CREATE FOREIGN TABLE join_t2 (
554+
k smallint
555+
)
556+
SERVER my_clickhouse_server
557+
OPTIONS (table 'join_t2')
558+
"#,
559+
None,
560+
&[],
561+
)
562+
.unwrap();
563+
564+
// Test inner join - should return 3 rows, not 1
565+
let inner_join_count: i64 = c
566+
.select(
567+
"SELECT COUNT(*) FROM join_t1 JOIN join_t2 ON join_t1.k = join_t2.k",
568+
None,
569+
&[],
570+
)
571+
.unwrap()
572+
.first()
573+
.get_one::<i64>()
574+
.unwrap()
575+
.unwrap();
576+
assert_eq!(
577+
inner_join_count, 3,
578+
"Inner join should return 3 rows, got {inner_join_count}",
579+
);
580+
581+
// Verify inner join values are correct
582+
let result = c
583+
.select(
584+
"SELECT join_t1.k as k1, join_t2.k as k2 FROM join_t1 JOIN join_t2 ON join_t1.k = join_t2.k ORDER BY join_t1.k",
585+
None,
586+
&[],
587+
)
588+
.unwrap();
589+
assert_eq!(result.len(), 3, "Inner join should return 3 rows");
590+
let mut values: Vec<(i16, i16)> = Vec::new();
591+
for row in result {
592+
let k1: i16 = row.get_by_name("k1").unwrap().unwrap();
593+
let k2: i16 = row.get_by_name("k2").unwrap().unwrap();
594+
values.push((k1, k2));
595+
}
596+
assert_eq!(
597+
values,
598+
vec![(1, 1), (2, 2), (3, 3)],
599+
"Inner join values incorrect"
600+
);
601+
602+
// Test left join - should return 3 rows with correct values
603+
let left_join_count: i64 = c
604+
.select(
605+
"SELECT COUNT(*) FROM join_t1 LEFT JOIN join_t2 ON join_t1.k = join_t2.k",
606+
None,
607+
&[],
608+
)
609+
.unwrap()
610+
.first()
611+
.get_one::<i64>()
612+
.unwrap()
613+
.unwrap();
614+
assert_eq!(
615+
left_join_count, 3,
616+
"Left join should return 3 rows, got {left_join_count}",
617+
);
618+
619+
// Test right join - should return 3 rows with correct values
620+
let right_join_count: i64 = c
621+
.select(
622+
"SELECT COUNT(*) FROM join_t1 RIGHT JOIN join_t2 ON join_t1.k = join_t2.k",
623+
None,
624+
&[],
625+
)
626+
.unwrap()
627+
.first()
628+
.get_one::<i64>()
629+
.unwrap()
630+
.unwrap();
631+
assert_eq!(
632+
right_join_count, 3,
633+
"Right join should return 3 rows, got {right_join_count}",
634+
);
635+
636+
// Cleanup
637+
c.update("DROP FOREIGN TABLE join_t1", None, &[]).unwrap();
638+
c.update("DROP FOREIGN TABLE join_t2", None, &[]).unwrap();
639+
});
640+
}
488641
}

0 commit comments

Comments
 (0)