Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/connectors/sources/postgres_source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ cdc_pg_replicate = []
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
Expand Down
248 changes: 244 additions & 4 deletions core/connectors/sources/postgres_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

use async_trait::async_trait;
use chrono::{NaiveDate, NaiveTime};
use humantime::Duration as HumanDuration;
use iggy_common::{DateTime, Utc};
use iggy_connector_sdk::{
Expand All @@ -26,6 +27,7 @@ use iggy_connector_sdk::{
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPoolOptions;
use sqlx::postgres::types::PgInterval;
use sqlx::{Column, Pool, Postgres, Row, TypeInfo};
use std::collections::HashMap;
use std::str::FromStr;
Expand Down Expand Up @@ -911,7 +913,7 @@ impl PostgresSource {
.map(|v| serde_json::Value::from(v as i64))
.unwrap_or(serde_json::Value::Null))
}
"INT4" => {
"INT4" | "OID" => {
let value: Option<i32> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Expand Down Expand Up @@ -952,14 +954,30 @@ impl PostgresSource {
.map(serde_json::Value::from)
.unwrap_or(serde_json::Value::Null))
}
"VARCHAR" | "TEXT" | "CHAR" => {
"VARCHAR" | "TEXT" | "CHAR" | "NAME" | "BPCHAR" => {
let value: Option<String> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null))
}
"DATE" => {
let value: Option<NaiveDate> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|d| serde_json::Value::String(d.to_string()))
.unwrap_or(serde_json::Value::Null))
}
"TIME" => {
let value: Option<NaiveTime> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|t| serde_json::Value::String(t.to_string()))
.unwrap_or(serde_json::Value::Null))
}
"TIMESTAMP" | "TIMESTAMPTZ" => {
let value: Option<DateTime<Utc>> = row
.try_get(column_index)
Expand All @@ -968,6 +986,14 @@ impl PostgresSource {
.map(|dt| serde_json::Value::String(dt.to_rfc3339()))
.unwrap_or(serde_json::Value::Null))
}
"INTERVAL" => {
let value: Option<PgInterval> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|iv| serde_json::Value::String(format_pg_interval(&iv)))
.unwrap_or(serde_json::Value::Null))
}
"UUID" => {
let value: Option<Uuid> = row
.try_get(column_index)
Expand Down Expand Up @@ -995,10 +1021,130 @@ impl PostgresSource {
})
.unwrap_or(serde_json::Value::Null))
}
_ => {
let value: Option<String> = row
"_BOOL" => {
let value: Option<Vec<bool>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter().map(serde_json::Value::Bool).collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_INT2" => {
let value: Option<Vec<i16>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter()
.map(|v| serde_json::Value::from(v as i64))
.collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_INT4" => {
let value: Option<Vec<i32>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter()
.map(|v| serde_json::Value::from(v as i64))
.collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_INT8" => {
let value: Option<Vec<i64>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter().map(serde_json::Value::from).collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_FLOAT4" => {
let value: Option<Vec<f32>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter()
.map(|v| serde_json::Value::from(v as f64))
.collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_FLOAT8" => {
let value: Option<Vec<f64>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter().map(serde_json::Value::from).collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_TEXT" | "_VARCHAR" | "_CHAR" | "_BPCHAR" => {
let value: Option<Vec<String>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter().map(serde_json::Value::String).collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_UUID" => {
let value: Option<Vec<Uuid>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(|arr| {
serde_json::Value::Array(
arr.into_iter()
.map(|u| serde_json::Value::String(u.to_string()))
.collect(),
)
})
.unwrap_or(serde_json::Value::Null))
}
"_JSON" | "_JSONB" => {
let value: Option<Vec<serde_json::Value>> = row
.try_get(column_index)
.map_err(|_| Error::InvalidRecord)?;
Ok(value
.map(serde_json::Value::Array)
.unwrap_or(serde_json::Value::Null))
}
_ => {
let column_name = column.name();
warn!(
"Column '{column_name}' has unrecognized Postgres type '{type_name}', \
attempting String conversion"
);
let value: Option<String> = row.try_get(column_index).map_err(|e| {
error!("Failed to read column '{column_name}' (type '{type_name}'): {e}");
Error::InvalidRecordValue(format!(
"column '{column_name}' has unsupported Postgres type '{type_name}'"
))
})?;
Ok(value
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null))
Expand Down Expand Up @@ -1026,6 +1172,50 @@ fn format_offset_value(value: &str) -> String {
}
}

fn format_pg_interval(interval: &PgInterval) -> String {
let mut parts = Vec::new();

let years = interval.months / 12;
let months = interval.months % 12;

if years != 0 {
parts.push(format!(
"{years} year{}",
if years.abs() != 1 { "s" } else { "" }
));
}
if months != 0 {
parts.push(format!(
"{months} mon{}",
if months.abs() != 1 { "s" } else { "" }
));
}
if interval.days != 0 {
parts.push(format!(
"{} day{}",
interval.days,
if interval.days.abs() != 1 { "s" } else { "" }
));
}
if interval.microseconds != 0 || parts.is_empty() {
let total_secs = interval.microseconds / 1_000_000;
let remaining_us = (interval.microseconds % 1_000_000).unsigned_abs();
let hours = total_secs / 3600;
let mins = (total_secs % 3600) / 60;
let secs = total_secs % 60;
if remaining_us != 0 {
parts.push(format!(
"{:02}:{:02}:{:02}.{:06}",
hours, mins, secs, remaining_us
));
} else {
parts.push(format!("{hours:02}:{mins:02}:{secs:02}"));
}
}

parts.join(" ")
}

fn to_snake_case(input: &str) -> String {
let mut result = String::new();
let mut prev_was_uppercase = false;
Expand Down Expand Up @@ -1388,6 +1578,56 @@ mod tests {
});
}

#[test]
fn given_zero_interval_should_format_as_zero_time() {
let interval = PgInterval {
months: 0,
days: 0,
microseconds: 0,
};
assert_eq!(format_pg_interval(&interval), "00:00:00");
}

#[test]
fn given_interval_with_months_and_days_should_format_correctly() {
let interval = PgInterval {
months: 14,
days: 3,
microseconds: 0,
};
assert_eq!(format_pg_interval(&interval), "1 year 2 mons 3 days");
}

#[test]
fn given_interval_with_time_should_format_correctly() {
let interval = PgInterval {
months: 0,
days: 0,
microseconds: 3_661_000_000,
};
assert_eq!(format_pg_interval(&interval), "01:01:01");
}

#[test]
fn given_interval_with_microseconds_should_format_fractional_seconds() {
let interval = PgInterval {
months: 0,
days: 1,
microseconds: 500_000,
};
assert_eq!(format_pg_interval(&interval), "1 day 00:00:00.500000");
}

#[test]
fn given_singular_units_should_omit_plural_suffix() {
let interval = PgInterval {
months: 13,
days: 1,
microseconds: 0,
};
assert_eq!(format_pg_interval(&interval), "1 year 1 mon 1 day");
}

#[test]
fn state_should_be_serializable_and_deserializable() {
let original = State {
Expand Down
Loading