Skip to content

Commit d1bdade

Browse files
authored
fix(clickhouse): timestamptz type formatting issue (supabase#535)
1 parent 1007644 commit d1bdade

2 files changed

Lines changed: 68 additions & 10 deletions

File tree

wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ use supabase_wrappers::prelude::*;
2828

2929
use super::{ClickHouseFdwError, ClickHouseFdwResult};
3030

31+
struct Formatter;
32+
33+
impl CellFormatter for Formatter {
34+
fn fmt_cell(&mut self, cell: &Cell) -> String {
35+
match cell {
36+
Cell::Timestamptz(tstz) => format!("parseDateTime64BestEffort('{tstz}', 6)"),
37+
_ => format!("{cell}"),
38+
}
39+
}
40+
}
41+
3142
#[derive(Debug, Clone)]
3243
struct ConvertedRow {
3344
values: Vec<Option<Cell>>,
@@ -138,7 +149,7 @@ fn convert_row_simple(
138149
// convert from Unix timestamp (seconds since 1970-01-01) to
139150
// PostgreSQL timestamp (microseconds since 2000-01-01)
140151
// difference between 1970 and 2000 epoch: 946684800 seconds
141-
let pg_raw_timestamp = (value.timestamp() - 946684800) * 1_000_000;
152+
let pg_raw_timestamp = value.timestamp_micros() - 946684800 * 1_000_000;
142153
let pg_timestamp = pgrx::datum::Timestamp::saturating_from_raw(
143154
pg_raw_timestamp as pg_sys::Timestamp,
144155
);
@@ -445,6 +456,7 @@ impl ClickHouseFdw {
445456
let mut sql = format!("select {} from {}", tgts, &table);
446457

447458
if !quals.is_empty() {
459+
let mut formatter = Formatter {};
448460
let cond = quals
449461
.iter()
450462
.filter(|q| {
@@ -455,7 +467,7 @@ impl ClickHouseFdw {
455467
};
456468
!is_param && !is_array
457469
})
458-
.map(|q| q.deparse())
470+
.map(|q| q.deparse_with_fmt(&mut formatter))
459471
.collect::<Vec<String>>()
460472
.join(" and ");
461473

@@ -850,6 +862,17 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
850862
};
851863
Ok(val)
852864
}
865+
Cell::Timestamptz(_) => {
866+
let s = c.to_string().replace('\'', "");
867+
let tm = DateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S%.6f%#z")?;
868+
let utc_tm = tm.with_timezone(&Utc);
869+
let val = if is_nullable {
870+
ChValue::Nullable(either::Either::Right(Box::new(utc_tm.into())))
871+
} else {
872+
ChValue::from(utc_tm)
873+
};
874+
Ok(val)
875+
}
853876
Cell::Uuid(v) => {
854877
let uuid = Uuid::try_parse(&v.to_string())?;
855878
let val = if is_nullable {
@@ -905,13 +928,15 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
905928
let pool = Pool::new(self.conn_str.clone());
906929
let mut client = self.rt.block_on(pool.get_handle())?;
907930

931+
let mut formatter = Formatter {};
908932
let mut sets = Vec::new();
909933
for (col, cell) in new_row.iter() {
910934
if col == &self.rowid_col {
911935
continue;
912936
}
913937
if let Some(cell) = cell {
914-
sets.push(format!("{col} = {cell}"));
938+
let formatted = formatter.fmt_cell(cell);
939+
sets.push(format!("{col} = {formatted}"));
915940
} else {
916941
sets.push(format!("{col} = null"));
917942
}

wrappers/src/fdw/clickhouse_fdw/tests.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
mod tests {
44
use clickhouse_rs as ch;
55
use pgrx::prelude::*;
6-
use pgrx::{datum::Timestamp, pg_test, Uuid};
6+
use pgrx::{
7+
datum::{Timestamp, TimestampWithTimeZone},
8+
pg_test, Uuid,
9+
};
710
use supabase_wrappers::prelude::create_async_runtime;
811

912
#[pg_test]
@@ -37,7 +40,8 @@ mod tests {
3740
u16col Nullable(UInt16),
3841
i32col Nullable(Int32),
3942
u32col Nullable(UInt32),
40-
created_at DateTime('UTC')
43+
created_at DateTime('UTC'),
44+
updated_at DateTime64(6, 'Asia/Singapore')
4145
) engine = Memory",
4246
)
4347
.await
@@ -80,7 +84,8 @@ mod tests {
8084
u16col integer,
8185
i32col integer,
8286
u32col bigint,
83-
created_at timestamp
87+
created_at timestamp,
88+
updated_at timestamptz
8489
)
8590
SERVER my_clickhouse_server
8691
OPTIONS (
@@ -112,7 +117,8 @@ mod tests {
112117
u16col integer,
113118
i32col integer,
114119
u32col bigint,
115-
created_at timestamp
120+
created_at timestamp,
121+
updated_at timestamptz
116122
)
117123
SERVER my_clickhouse_server
118124
OPTIONS (
@@ -144,7 +150,8 @@ mod tests {
144150
u16col integer,
145151
i32col integer,
146152
u32col bigint,
147-
created_at timestamp
153+
created_at timestamp,
154+
updated_at timestamptz
148155
)
149156
SERVER my_clickhouse_server
150157
OPTIONS (
@@ -190,9 +197,9 @@ mod tests {
190197
c.update(
191198
"INSERT INTO test_table (id, name, amt, uid, fstr, bignum, dnum,
192199
arr_i64, arr_str, is_valid, i8col, u8col, i16col, u16col,
193-
i32col, u32col, created_at)
200+
i32col, u32col, created_at, updated_at)
194201
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
195-
$14, $15, $16, $17)",
202+
$14, $15, $16, $17, $18)",
196203
None,
197204
&[
198205
42.into(),
@@ -212,6 +219,9 @@ mod tests {
212219
46i32.into(),
213220
47i64.into(),
214221
Timestamp::new(2025, 5, 1, 2, 3, 4.0).into(),
222+
TimestampWithTimeZone::with_timezone(2025, 5, 1, 2, 3, 4.0, "Asia/Singapore")
223+
.unwrap()
224+
.into(),
215225
],
216226
)
217227
.unwrap();
@@ -443,6 +453,29 @@ mod tests {
443453
Some(Uuid::from_bytes([43u8; 16])),
444454
);
445455

456+
// test timestamptz data type
457+
c.update(
458+
"UPDATE test_table
459+
SET updated_at = '2025-05-01 01:02:03.112233+8'::timestamptz
460+
WHERE id = 42
461+
",
462+
None,
463+
&[],
464+
)
465+
.unwrap();
466+
assert_eq!(
467+
c.select(
468+
"SELECT id FROM test_table WHERE updated_at = '2025-05-01 01:02:03.112233+8'::timestamptz",
469+
None,
470+
&[]
471+
)
472+
.unwrap()
473+
.first()
474+
.get_one::<i64>()
475+
.unwrap(),
476+
Some(42),
477+
);
478+
446479
// test delete data in foreign table
447480
c.update("DELETE FROM test_table WHERE id = 42", None, &[])
448481
.unwrap();

0 commit comments

Comments
 (0)