From 6c7ea98af070aa6c58a5a4bf0b74ef7690c83690 Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Wed, 29 Apr 2026 11:34:48 -0500 Subject: [PATCH 1/2] fix(postgres): handle unrecognized types with explicit coverage and diagnostics Closes #3175 --- Cargo.lock | 1 + .../sources/postgres_source/Cargo.toml | 1 + .../sources/postgres_source/src/lib.rs | 248 +++++++++++++++++- 3 files changed, 246 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c213ee6513..a42f4a5959 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6742,6 +6742,7 @@ version = "0.4.0" dependencies = [ "async-trait", "base64 0.22.1", + "chrono", "dashmap", "futures", "humantime", diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml index 0490b1a30d..2923c9f901 100644 --- a/core/connectors/sources/postgres_source/Cargo.toml +++ b/core/connectors/sources/postgres_source/Cargo.toml @@ -42,6 +42,7 @@ cdc_pg_replicate = [] [dependencies] async-trait = { workspace = true } base64 = { workspace = true } +chrono = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } humantime = { workspace = true } diff --git a/core/connectors/sources/postgres_source/src/lib.rs b/core/connectors/sources/postgres_source/src/lib.rs index f01bcba3f2..1617b0e82d 100644 --- a/core/connectors/sources/postgres_source/src/lib.rs +++ b/core/connectors/sources/postgres_source/src/lib.rs @@ -18,6 +18,7 @@ */ use async_trait::async_trait; +use chrono::{NaiveDate, NaiveTime}; use humantime::Duration as HumanDuration; use iggy_common::{DateTime, Utc}; use iggy_connector_sdk::{ @@ -26,6 +27,7 @@ use iggy_connector_sdk::{ use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; +use sqlx::postgres::types::PgInterval; use sqlx::{Column, Pool, Postgres, Row, TypeInfo}; use std::collections::HashMap; use std::str::FromStr; @@ -911,7 +913,7 @@ impl PostgresSource { .map(|v| serde_json::Value::from(v as i64)) .unwrap_or(serde_json::Value::Null)) } - "INT4" => { + "INT4" | "OID" => { let value: Option = row .try_get(column_index) .map_err(|_| Error::InvalidRecord)?; @@ -952,7 +954,7 @@ impl PostgresSource { .map(serde_json::Value::from) .unwrap_or(serde_json::Value::Null)) } - "VARCHAR" | "TEXT" | "CHAR" => { + "VARCHAR" | "TEXT" | "CHAR" | "NAME" | "BPCHAR" => { let value: Option = row .try_get(column_index) .map_err(|_| Error::InvalidRecord)?; @@ -960,6 +962,22 @@ impl PostgresSource { .map(serde_json::Value::String) .unwrap_or(serde_json::Value::Null)) } + "DATE" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|d| serde_json::Value::String(d.to_string())) + .unwrap_or(serde_json::Value::Null)) + } + "TIME" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|t| serde_json::Value::String(t.to_string())) + .unwrap_or(serde_json::Value::Null)) + } "TIMESTAMP" | "TIMESTAMPTZ" => { let value: Option> = row .try_get(column_index) @@ -968,6 +986,14 @@ impl PostgresSource { .map(|dt| serde_json::Value::String(dt.to_rfc3339())) .unwrap_or(serde_json::Value::Null)) } + "INTERVAL" => { + let value: Option = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|iv| serde_json::Value::String(format_pg_interval(&iv))) + .unwrap_or(serde_json::Value::Null)) + } "UUID" => { let value: Option = row .try_get(column_index) @@ -995,10 +1021,130 @@ impl PostgresSource { }) .unwrap_or(serde_json::Value::Null)) } - _ => { - let value: Option = row + "_BOOL" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter().map(serde_json::Value::Bool).collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_INT2" => { + let value: Option> = row .try_get(column_index) .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter() + .map(|v| serde_json::Value::from(v as i64)) + .collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_INT4" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter() + .map(|v| serde_json::Value::from(v as i64)) + .collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_INT8" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter().map(serde_json::Value::from).collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_FLOAT4" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter() + .map(|v| serde_json::Value::from(v as f64)) + .collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_FLOAT8" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter().map(serde_json::Value::from).collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_TEXT" | "_VARCHAR" | "_CHAR" | "_BPCHAR" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter().map(serde_json::Value::String).collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_UUID" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(|arr| { + serde_json::Value::Array( + arr.into_iter() + .map(|u| serde_json::Value::String(u.to_string())) + .collect(), + ) + }) + .unwrap_or(serde_json::Value::Null)) + } + "_JSON" | "_JSONB" => { + let value: Option> = row + .try_get(column_index) + .map_err(|_| Error::InvalidRecord)?; + Ok(value + .map(serde_json::Value::Array) + .unwrap_or(serde_json::Value::Null)) + } + _ => { + let column_name = column.name(); + warn!( + "Column '{column_name}' has unrecognized Postgres type '{type_name}', \ + attempting String conversion" + ); + let value: Option = row.try_get(column_index).map_err(|e| { + error!("Failed to read column '{column_name}' (type '{type_name}'): {e}"); + Error::InvalidRecordValue(format!( + "column '{column_name}' has unsupported Postgres type '{type_name}'" + )) + })?; Ok(value .map(serde_json::Value::String) .unwrap_or(serde_json::Value::Null)) @@ -1026,6 +1172,50 @@ fn format_offset_value(value: &str) -> String { } } +fn format_pg_interval(interval: &PgInterval) -> String { + let mut parts = Vec::new(); + + let years = interval.months / 12; + let months = interval.months % 12; + + if years != 0 { + parts.push(format!( + "{years} year{}", + if years.abs() != 1 { "s" } else { "" } + )); + } + if months != 0 { + parts.push(format!( + "{months} mon{}", + if months.abs() != 1 { "s" } else { "" } + )); + } + if interval.days != 0 { + parts.push(format!( + "{} day{}", + interval.days, + if interval.days.abs() != 1 { "s" } else { "" } + )); + } + if interval.microseconds != 0 || parts.is_empty() { + let total_secs = interval.microseconds / 1_000_000; + let remaining_us = (interval.microseconds % 1_000_000).unsigned_abs(); + let hours = total_secs / 3600; + let mins = (total_secs % 3600) / 60; + let secs = total_secs % 60; + if remaining_us != 0 { + parts.push(format!( + "{:02}:{:02}:{:02}.{:06}", + hours, mins, secs, remaining_us + )); + } else { + parts.push(format!("{hours:02}:{mins:02}:{secs:02}")); + } + } + + parts.join(" ") +} + fn to_snake_case(input: &str) -> String { let mut result = String::new(); let mut prev_was_uppercase = false; @@ -1388,6 +1578,56 @@ mod tests { }); } + #[test] + fn given_zero_interval_should_format_as_zero_time() { + let interval = PgInterval { + months: 0, + days: 0, + microseconds: 0, + }; + assert_eq!(format_pg_interval(&interval), "00:00:00"); + } + + #[test] + fn given_interval_with_months_and_days_should_format_correctly() { + let interval = PgInterval { + months: 14, + days: 3, + microseconds: 0, + }; + assert_eq!(format_pg_interval(&interval), "1 year 2 mons 3 days"); + } + + #[test] + fn given_interval_with_time_should_format_correctly() { + let interval = PgInterval { + months: 0, + days: 0, + microseconds: 3_661_000_000, + }; + assert_eq!(format_pg_interval(&interval), "01:01:01"); + } + + #[test] + fn given_interval_with_microseconds_should_format_fractional_seconds() { + let interval = PgInterval { + months: 0, + days: 1, + microseconds: 500_000, + }; + assert_eq!(format_pg_interval(&interval), "1 day 00:00:00.500000"); + } + + #[test] + fn given_singular_units_should_omit_plural_suffix() { + let interval = PgInterval { + months: 13, + days: 1, + microseconds: 0, + }; + assert_eq!(format_pg_interval(&interval), "1 year 1 mon 1 day"); + } + #[test] fn state_should_be_serializable_and_deserializable() { let original = State { From 73fcce078d9e2f7063788c7dc751a34d7ce577af Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Wed, 29 Apr 2026 13:03:09 -0500 Subject: [PATCH 2/2] trigger CI