Skip to content

Commit 7d7aaa9

Browse files
committed
feat: Add support for WHERE clause push-down
1 parent c225621 commit 7d7aaa9

2 files changed

Lines changed: 137 additions & 30 deletions

File tree

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ foreign server itself.
5858
`etcd_fdw` now also supports limit offset push-down. Wherever possible,
5959
perform LIMIT operations on the remote server.
6060

61+
#### WHERE push-down
62+
`etcd_fdw` now supports WHERE clause push-down for simple key-based comparisons. Whenever possible, equality and range conditions are translated into etcd key scans, so filtering is done on the remote server.
63+
Currently supported operators: `=`, `>=`, `>`, `<=`, `<`, `BETWEEN`, and `LIKE 'prefix%'`.
64+
This behavior is consistent with the prefix, range_end, and key options in `CREATE FOREIGN TABLE`.
6165

6266
Usage
6367
-----

src/lib.rs

Lines changed: 133 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -251,26 +251,11 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
251251
let keys_only = options.get("keys_only").map(|v| v == "true").unwrap_or(false);
252252
let revision = options.get("revision").and_then(|v| v.parse::<i64>().ok()).unwrap_or(0);
253253
let serializable = options.get("consistency").map(|v| v == "s").unwrap_or(false);
254+
let mut qual_key_start: Option<String> = None;
255+
let mut qual_prefix: Option<String> = None;
256+
let mut qual_range_end: Option<String> = None;
254257
let mut get_options = GetOptions::new();
255258

256-
// prefix and range are mutually exclusive
257-
match (prefix.as_ref(), range_end.as_ref()) {
258-
(Some(_), Some(_)) => {
259-
return Err(EtcdFdwError::ConflictingPrefixAndRange);
260-
}
261-
(Some(_), None) => {
262-
get_options = get_options.with_prefix();
263-
}
264-
(None, Some(r)) => {
265-
get_options = get_options.with_range(r.clone());
266-
}
267-
(None, None) => {
268-
if key_start.is_none() {
269-
get_options = get_options.with_all_keys();
270-
}
271-
}
272-
}
273-
274259
if let Some(x) = limit {
275260
get_options = get_options.with_limit(x.count);
276261
}
@@ -287,12 +272,136 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
287272
get_options = get_options.with_serializable();
288273
}
289274

290-
// XXX Support for WHERE clause push-downs is pending
291-
// etcd doesn't have anything like WHERE clause because it
292-
// a NOSQL database.
293-
// But may be we can still support some simple WHERE
294-
// conditions like '<', '>=', 'LIKE', '=' by mapping them
295-
// to key, range_end and prefix options.
275+
// WHERE clause pushdown
276+
for q in _quals {
277+
// only pushdown "key"
278+
if q.field != "key" {
279+
continue;
280+
}
281+
282+
// extract string value
283+
let v = match &q.value {
284+
Value::Cell(Cell::String(s)) => s.clone(),
285+
_ => continue,
286+
};
287+
288+
match q.operator.as_str() {
289+
"=" => {
290+
// equal: start at v, end at v+"\0"
291+
qual_key_start = Some(v.clone());
292+
qual_range_end = Some(format!("{}\0", v));
293+
}
294+
">=" => {
295+
// greater or equal: start at v
296+
qual_key_start = Some(v.clone());
297+
}
298+
">" => {
299+
// greater than: start at v+"\0"
300+
qual_key_start = Some(format!("{}\0", v));
301+
}
302+
"<" => {
303+
// less than: end at v
304+
qual_range_end = Some(v.clone());
305+
}
306+
"<=" => {
307+
// less or equal: end at v+"\0"
308+
qual_range_end = Some(format!("{}\0", v));
309+
}
310+
"~~" => {
311+
// LIKE operator with % suffix only
312+
if let Some(pref) = v.strip_suffix('%') {
313+
qual_prefix = Some(pref.to_string());
314+
}
315+
}
316+
_ => {}
317+
}
318+
}
319+
320+
// Determine the effective prefix based on FDW and WHERE clause options
321+
// If both are present, ensure one is a prefix of the other
322+
// Otherwise, no data will be fetched
323+
// If only one is present, use that as the prefix
324+
let eff_prefix = match (prefix.as_ref(), qual_prefix.as_ref()) {
325+
(Some(_fdw), Some(_where)) => {
326+
if _where.starts_with(_fdw) {
327+
Some(_where.clone())
328+
} else if _fdw.starts_with(_where) {
329+
Some(_fdw.clone())
330+
} else {
331+
return Ok(());
332+
}
333+
}
334+
(Some(_fdw), None) => Some(_fdw.clone()),
335+
(None, Some(_where)) => Some(_where.clone()),
336+
(None, None) => None,
337+
};
338+
339+
// Determine the effective key start based on FDW and WHERE clause options
340+
// If both are present, take the larger one
341+
// Otherwise, take whichever is present
342+
// If neither is present, start from the beginning
343+
let eff_key_start = match (&qual_key_start, &key_start) {
344+
(Some(_where), Some(_fdw)) => {
345+
if _where > _fdw {
346+
_where.clone()
347+
} else {
348+
_fdw.clone()
349+
}
350+
}
351+
(Some(_where), None) => _where.clone(),
352+
(None, Some(_fdw)) => _fdw.clone(),
353+
(None, None) => "\0".to_string(), // start from the beginning
354+
};
355+
356+
// Determine the effective range end based on FDW and WHERE clause options
357+
// If both are present, take the smaller one
358+
// Otherwise, take whichever is present
359+
// If neither is present, go to the end
360+
let mut eff_range_end = match (&qual_range_end, &range_end) {
361+
(Some(_where), Some(_fdw)) => {
362+
if _where < _fdw {
363+
_where.clone()
364+
} else {
365+
_fdw.clone()
366+
}
367+
}
368+
(Some(_where), None) => _where.clone(),
369+
(None, Some(_fdw)) => _fdw.clone(),
370+
(None, None) => "\u{10FFFF}".to_string(), // go to the end
371+
};
372+
373+
// Compute range_end for prefix
374+
// If a prefix is provided, calculate the range_end by incrementing the last byte of the prefix
375+
// This ensures that the range_end is exclusive and covers all keys starting with the prefix
376+
if let Some(p) = &eff_prefix {
377+
let mut bytes = p.as_bytes().to_vec();
378+
for i in (0..bytes.len()).rev() {
379+
if bytes[i] < 0xFF {
380+
bytes[i] += 1;
381+
bytes.truncate(i + 1);
382+
let prefix_range_end = String::from_utf8(bytes).unwrap();
383+
// Ensure the calculated range_end does not exceed the effective range_end
384+
if prefix_range_end < eff_range_end {
385+
eff_range_end = prefix_range_end;
386+
}
387+
break;
388+
}
389+
}
390+
}
391+
392+
// Determine the effective key to start the scan
393+
// If a prefix is provided, use it as the base key and enable prefix-based scanning
394+
// Otherwise, use the effective key start
395+
let key = match &eff_prefix {
396+
Some(p) => {
397+
get_options = get_options.with_prefix();
398+
// Ensure the key starts from the larger of the prefix or the effective key start
399+
std::cmp::max(eff_key_start.clone(), p.clone())
400+
}
401+
None => eff_key_start.clone(),
402+
};
403+
404+
get_options = get_options.with_range(eff_range_end);
296405

297406
// sort pushdown
298407
if let Some(first_sort) = sort.first() {
@@ -311,12 +420,6 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
311420
}
312421
}
313422

314-
// preference order : prefix > key_start > default "\0"
315-
// samllest possible valid key '\0'
316-
let key = prefix.clone()
317-
.or_else(|| key_start.clone())
318-
.unwrap_or_else(|| String::from("\0"));
319-
320423
// Check if columns contains key and value
321424
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
322425
self.fetch_key = colnames.contains(&String::from("key"));

0 commit comments

Comments
 (0)