Skip to content

Commit 3edd0b1

Browse files
burmeciaCopilot
andauthored
feat(iceberg_fdw): [WRA-16] add schema evolution support (supabase#590)
* feat(iceberg_fdw): add schema evolution support * feat(duckdb_fdw): update to use arrow compatibility packages for version 55.x * Update docs/catalog/iceberg.md Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent f6ecf02 commit 3edd0b1

12 files changed

Lines changed: 588 additions & 251 deletions

File tree

Cargo.lock

Lines changed: 335 additions & 195 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/catalog/iceberg.md

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,6 @@ values (1, 'New Record', now());
359359

360360
### Limitations for Insertion
361361

362-
- Schema evolution during insert is not supported
363362
- Only append operations are supported (no upserts)
364363
- Complex data types (nested structs, arrays, maps) have limited support
365364

@@ -371,13 +370,45 @@ When using the `create_table_if_not_exists` option, please be aware of the follo
371370
- **Partitioning**: The automatically created table will use default partitioning settings. You cannot specify custom partition or sort specifications during automatic creation.
372371
- **Identifier Fields**: The automatically created table will not have any identifier fields specified. If you need identifier fields, you must create the Iceberg table manually beforehand.
373372

373+
## Schema Evolution
374+
375+
The Iceberg FDW supports [Apache Iceberg schema evolution](https://iceberg.apache.org/spec/#schema-evolution). When columns are added to an Iceberg table after data has already been written, rows from older data files will return `NULL` for those new columns, which matches Iceberg spec behavior.
376+
377+
For example, given a table that initially has `id` and `name` columns, and later gains a `score` column:
378+
379+
```sql
380+
-- rows written before the column was added return NULL for 'score',
381+
-- while newer rows return the actual value
382+
select id, name, score from iceberg.members order by id;
383+
384+
-- id | name | score
385+
-- ----+-------+-------
386+
-- 1 | alice | NULL
387+
-- 2 | bob | NULL
388+
-- 3 | carol | 42
389+
-- 4 | dave | 99
390+
```
391+
392+
Filter pushdown on newly-added columns also works correctly:
393+
394+
```sql
395+
select name from iceberg.members where score > 50;
396+
397+
-- name
398+
-- ------
399+
-- dave
400+
```
401+
402+
!!! note
403+
404+
The foreign table definition in Postgres must include any new columns to read them. Re-run `import foreign schema` (which will refresh the `schema_id` option) or add the columns manually with `alter foreign table` and update or drop any pinned `schema_id` on the foreign table; otherwise, the FDW may still use an older schema and report `ColumnNotFound` for newly-evolved columns.
405+
374406
## Limitations
375407

376408
This section describes important limitations and considerations when using this FDW:
377409

378410
- Only supports specific data type mappings between Postgres and Iceberg
379411
- UPDATE, DELETE, and TRUNCATE operations are not supported
380-
- [Apache Iceberg schema evolution](https://iceberg.apache.org/spec/#schema-evolution) is not supported
381412
- When using Iceberg REST catalog, only supports AWS S3 (or compatible) as the storage
382413
- Materialized views using these foreign tables may fail during logical backups
383414

wrappers/Cargo.toml

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,9 @@ iceberg_fdw = [
184184
"uuid",
185185
]
186186
duckdb_fdw = [
187-
"arrow-array",
188-
"arrow-schema",
189-
"arrow-json",
187+
"arrow-array-compat",
188+
"arrow-schema-compat",
189+
"arrow-json-compat",
190190
"chrono",
191191
"duckdb",
192192
"regex",
@@ -281,10 +281,10 @@ async-compression = { version = "0.3.15", features = [
281281
"zlib",
282282
], optional = true }
283283
http = { version = "0.2", optional = true }
284-
parquet = { version = "55.1.0", features = ["async"], optional = true }
285-
arrow-array = { version = "55.1.0", optional = true }
286-
arrow-schema = { version = "55.1.0", optional = true }
287-
arrow-json = { version = "55.1.0", optional = true }
284+
parquet = { version = "57.3.0", features = ["async"], optional = true }
285+
arrow-array = { version = "57.3.0", optional = true }
286+
arrow-schema = { version = "57.3.0", optional = true }
287+
arrow-json = { version = "57.3.0", optional = true }
288288

289289
# for mssql_fdw
290290
tiberius = { version = "0.12.2", features = [
@@ -319,13 +319,19 @@ anyhow = { version = "1.0.81", optional = true }
319319
uuid = { version = "1.18.0", features = ["v7"], optional = true }
320320

321321
# for iceberg_fdw
322-
iceberg = { version = "0.6.0", optional = true }
323-
iceberg-catalog-s3tables = { git = "https://github.com/burmecia/iceberg-rust", rev = "6548db2cc02b8ecd65e698e58d372d7dfb342b9c", package="iceberg-catalog-s3tables", optional = true }
324-
iceberg-catalog-rest = { version = "0.6.0", optional = true }
322+
iceberg = { version = "0.8.0", optional = true }
323+
iceberg-catalog-s3tables = { version = "0.8.0", optional = true }
324+
iceberg-catalog-rest = { version = "0.8.0", optional = true }
325325
rust_decimal = { version = "1.37.1", optional = true }
326326

327327
# for duckdb_fdw
328328
duckdb = { version = "=1.3.2", features = ["bundled"], optional = true }
329+
# Version-pinned arrow compat deps matching duckdb's bundled arrow (55.x).
330+
# The workspace uses arrow 57.x for iceberg_fdw; using 57.x here causes type
331+
# incompatibilities because duckdb::arrow::* types implement the 55.x traits.
332+
arrow-array-compat = { package = "arrow-array", version = "55", optional = true }
333+
arrow-json-compat = { package = "arrow-json", version = "55", optional = true }
334+
arrow-schema-compat = { package = "arrow-schema", version = "55", optional = true }
329335

330336
[dev-dependencies]
331337
pgrx-tests = "=0.16.1"

wrappers/dockerfiles/s3/iceberg_seed.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,66 @@ def create_asks_table(catalog, namespace):
324324
print(data)
325325

326326

327+
def create_schema_evolution_table(catalog, namespace):
328+
"""Creates a table and evolves its schema to test schema evolution support.
329+
330+
Step 1: Create table with initial schema (id, name).
331+
Step 2: Insert rows using the original schema.
332+
Step 3: Add a new nullable column 'score' via update_schema().
333+
Step 4: Add another new nullable column 'tag' via update_schema().
334+
Step 5: Insert rows that include the new columns.
335+
Rows inserted before the schema update should return NULL for 'score' and 'tag'.
336+
"""
337+
tblname = f"{namespace}.schema_evolution"
338+
schema = Schema(
339+
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
340+
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
341+
identifier_field_ids=[1],
342+
)
343+
344+
if catalog.table_exists(tblname):
345+
catalog.purge_table(tblname)
346+
tbl = catalog.create_table(identifier=tblname, schema=schema)
347+
348+
# Insert initial rows (no 'score' or 'tag' column yet)
349+
df_v1 = pa.Table.from_pylist(
350+
[
351+
{"id": 1, "name": "alice"},
352+
{"id": 2, "name": "bob"},
353+
],
354+
schema=tbl.schema().as_arrow(),
355+
)
356+
tbl.overwrite(df_v1)
357+
358+
# Evolve schema: add 'score' (integer) column
359+
with tbl.update_schema() as update:
360+
update.add_column("score", IntegerType())
361+
362+
# Evolve schema again: add 'tag' (string) column
363+
with tbl.update_schema() as update:
364+
update.add_column("tag", StringType())
365+
366+
# Refresh table handle so it picks up the new schema
367+
tbl = catalog.load_table(tblname)
368+
369+
# Insert new rows that use the evolved schema
370+
df_v2 = pa.Table.from_pylist(
371+
[
372+
{"id": 3, "name": "carol", "score": 42, "tag": "gold"},
373+
{"id": 4, "name": "dave", "score": 99, "tag": "silver"},
374+
],
375+
schema=tbl.schema().as_arrow(),
376+
)
377+
tbl.append(df_v2)
378+
379+
data = tbl.scan().to_arrow()
380+
print(data)
381+
382+
327383
catalog.create_namespace_if_not_exists(namespace)
328384
ns = catalog.list_namespaces()
329385
tables = catalog.list_tables(namespace)
330386

331387
create_bids_table(catalog, namespace)
332388
create_asks_table(catalog, namespace)
389+
create_schema_evolution_table(catalog, namespace)

wrappers/src/fdw/duckdb_fdw/mapper.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use arrow_array::{Array, RecordBatch, array::ArrayRef};
2-
use arrow_json::ArrayWriter;
1+
use arrow_array_compat::{Array, RecordBatch, array::ArrayRef};
2+
use arrow_json_compat::ArrayWriter;
33
use duckdb::{
44
self,
55
types::{EnumType, ListType, ValueRef},

wrappers/src/fdw/duckdb_fdw/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ enum DuckdbFdwError {
2828
NumericError(#[from] pgrx::datum::numeric_support::error::Error),
2929

3030
#[error("arrow error: {0}")]
31-
ArrowError(#[from] arrow_schema::ArrowError),
31+
ArrowError(#[from] arrow_schema_compat::ArrowError),
3232

3333
#[error("uuid error: {0}")]
3434
UuidConversionError(#[from] uuid::Error),

wrappers/src/fdw/iceberg_fdw/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ This is a foreign data wrapper for [Apache Iceberg](https://iceberg.apache.org/)
1010

1111
| Version | Date | Notes |
1212
| ------- | ---------- | ---------------------------------------------------------------------- |
13+
| 0.1.5 | 2026-03-24 | Add schema evolution support |
1314
| 0.1.4 | 2025-11-21 | Add create_table_if_not_exists option and improve insertion performance |
1415
| 0.1.3 | 2025-09-20 | Add data insertion support |
1516
| 0.1.2 | 2025-07-30 | Large data set query performance improvement |
1617
| 0.1.1 | 2025-05-15 | Refactor server options passdown |
1718
| 0.1.0 | 2025-05-07 | Initial version |
18-

wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
use arrow_array::{Array, RecordBatch, array::ArrayRef, builder::ArrayBuilder};
22
use futures::StreamExt;
33
use iceberg::{
4-
Catalog, NamespaceIdent, TableCreation, TableIdent,
4+
Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent,
5+
arrow::schema_to_arrow_schema,
56
expr::Predicate,
67
scan::ArrowRecordBatchStream,
7-
spec::{DataFileFormat, NestedFieldRef, PrimitiveType, Type},
8+
spec::{DataFileFormat, NestedFieldRef, PartitionKey, PrimitiveType, Type},
89
table::Table,
910
transaction::{ApplyTransactionAction, Transaction},
1011
writer::{
11-
IcebergWriter, IcebergWriterBuilder, base_writer::data_file_writer::DataFileWriterBuilder,
12-
file_writer::ParquetWriterBuilder,
12+
IcebergWriter, IcebergWriterBuilder,
13+
base_writer::data_file_writer::DataFileWriterBuilder,
14+
file_writer::{ParquetWriterBuilder, rolling_writer::RollingFileWriterBuilder},
1315
},
1416
};
15-
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
16-
use iceberg_catalog_s3tables::{S3TablesCatalog, S3TablesCatalogConfig};
17+
use iceberg_catalog_rest::{
18+
REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
19+
};
20+
use iceberg_catalog_s3tables::S3TablesCatalogBuilder;
1721
use parquet::file::properties::WriterProperties;
1822
use pgrx::pg_sys;
1923
use std::collections::{HashMap, HashSet, VecDeque};
@@ -32,7 +36,7 @@ use super::{
3236
use crate::stats;
3337

3438
#[wrappers_fdw(
35-
version = "0.1.4",
39+
version = "0.1.5",
3640
author = "Supabase",
3741
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/iceberg_fdw",
3842
error_type = "IcebergFdwError"
@@ -200,7 +204,7 @@ impl IcebergFdw {
200204
if let Some(table) = &self.table {
201205
let metadata = table.metadata();
202206
let iceberg_schema = metadata.current_schema();
203-
let schema: arrow_schema::Schema = (iceberg_schema.as_ref()).try_into()?;
207+
let schema = schema_to_arrow_schema(iceberg_schema.as_ref())?;
204208

205209
for partition_rows in partitions.iter() {
206210
// create builder for each column
@@ -261,19 +265,26 @@ impl IcebergFdw {
261265
// get partition value from location generator
262266
let partition_value = location_generator.partition_value();
263267

264-
let parquet_writer_builder = ParquetWriterBuilder::new(
265-
WriterProperties::default(),
266-
schema.clone(),
268+
let parquet_writer_builder =
269+
ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
270+
let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
271+
parquet_writer_builder,
267272
table.file_io().clone(),
268273
location_generator,
269274
file_name_generator,
270275
);
271-
let data_file_writer_builder = DataFileWriterBuilder::new(
272-
parquet_writer_builder,
273-
partition_value,
274-
metadata.default_partition_spec().spec_id(),
275-
);
276-
let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?;
276+
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
277+
// convert Option<Struct> partition value to Option<PartitionKey>
278+
let partition_key = partition_value.map(|pv| {
279+
PartitionKey::new(
280+
metadata.default_partition_spec().as_ref().clone(),
281+
metadata.current_schema().clone(),
282+
pv,
283+
)
284+
});
285+
let mut data_file_writer = self
286+
.rt
287+
.block_on(data_file_writer_builder.build(partition_key))?;
277288

278289
// write the record batch to Iceberg and close the writer and get
279290
// the data file
@@ -432,21 +443,20 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
432443
// 1. S3 tables
433444
// 2. REST catalog with S3 (or compatible) as backend storage
434445
let catalog: Box<dyn Catalog> =
435-
if let Some(aws_s3table_arn) = props.get("aws_s3table_bucket_arn") {
436-
let catalog_config = S3TablesCatalogConfig::builder()
437-
.table_bucket_arn(aws_s3table_arn.into())
438-
.properties(props)
439-
.build();
440-
Box::new(rt.block_on(S3TablesCatalog::new(catalog_config))?)
446+
if let Some(aws_s3table_arn) = props.get("aws_s3table_bucket_arn").cloned() {
447+
Box::new(
448+
rt.block_on(
449+
S3TablesCatalogBuilder::default()
450+
.with_table_bucket_arn(aws_s3table_arn)
451+
.load("s3tables", props),
452+
)?,
453+
)
441454
} else {
442-
let catalog_uri = require_option("catalog_uri", &props)?;
443-
let warehouse = require_option_or("warehouse", &props, "warehouse");
444-
let catalog_config = RestCatalogConfig::builder()
445-
.warehouse(warehouse.into())
446-
.uri(catalog_uri.into())
447-
.props(props)
448-
.build();
449-
Box::new(RestCatalog::new(catalog_config))
455+
let catalog_uri = require_option("catalog_uri", &props)?.to_string();
456+
let warehouse = require_option_or("warehouse", &props, "warehouse").to_string();
457+
props.insert(REST_CATALOG_PROP_URI.to_string(), catalog_uri);
458+
props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse);
459+
Box::new(rt.block_on(RestCatalogBuilder::default().load("rest", props))?)
450460
};
451461

452462
stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);
@@ -482,7 +492,16 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
482492
let tbl_ident = TableIdent::from_strs(require_option("table", options)?.split("."))?;
483493
let table = self.rt.block_on(self.catalog.load_table(&tbl_ident))?;
484494

485-
let schema = table.metadata().current_schema();
495+
// resolve schema by id if specified (for schema evolution support), else use current
496+
let schema = if let Some(id_str) = options.get("schema_id") {
497+
let schema_id = id_str.parse::<i32>()?;
498+
table
499+
.metadata()
500+
.schema_by_id(schema_id)
501+
.ok_or_else(|| IcebergFdwError::SchemaNotFound(schema_id))?
502+
} else {
503+
table.metadata().current_schema()
504+
};
486505
for tgt_col in columns {
487506
let col_name = &tgt_col.name;
488507
let field = schema
@@ -491,7 +510,7 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
491510
self.src_fields.push(field.clone());
492511
}
493512

494-
self.predicate = try_pushdown(&table, quals)?;
513+
self.predicate = try_pushdown(schema, quals)?;
495514
self.table = table.into();
496515
self.tgt_cols = columns.to_vec();
497516

@@ -670,11 +689,12 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
670689
r#"create foreign table if not exists {} (
671690
{}
672691
)
673-
server {} options (table '{}'{})"#,
692+
server {} options (table '{}', schema_id '{}'{})"#,
674693
tbl.identifier().name,
675694
fields.join(","),
676695
stmt.server_name,
677696
tbl.identifier(),
697+
schema.schema_id(),
678698
rowid_column.unwrap_or_default(),
679699
));
680700
}

wrappers/src/fdw/iceberg_fdw/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ enum IcebergFdwError {
2727
#[error("cannot import column '{0}' data type '{1}'")]
2828
ImportColumnError(String, String),
2929

30+
#[error("schema with id {0} not found")]
31+
SchemaNotFound(i32),
32+
3033
#[error("vault error: '{0}'")]
3134
VaultError(String),
3235

wrappers/src/fdw/iceberg_fdw/pushdown.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use super::{IcebergFdwError, IcebergFdwResult};
22
use chrono::NaiveDate;
33
use iceberg::{
44
expr::{Predicate, Reference},
5-
spec::{Datum, PrimitiveType, Type},
6-
table::Table,
5+
spec::{Datum, PrimitiveType, Schema, Type},
76
};
87
use pgrx::varlena;
98
use rust_decimal::Decimal;
@@ -98,8 +97,7 @@ fn cell_to_iceberg_datum(cell: &Cell, tgt_type: &Type) -> IcebergFdwResult<Optio
9897

9998
// try to translate quals to predicates and push them down to Iceberg,
10099
// return None if pushdown is not possible
101-
pub(super) fn try_pushdown(table: &Table, quals: &[Qual]) -> IcebergFdwResult<Option<Predicate>> {
102-
let schema = table.metadata().current_schema();
100+
pub(super) fn try_pushdown(schema: &Schema, quals: &[Qual]) -> IcebergFdwResult<Option<Predicate>> {
103101
let mut preds: Vec<Predicate> = Vec::new();
104102

105103
for qual in quals {

0 commit comments

Comments
 (0)