Skip to content

Commit aa5f81b

Browse files
Improve options handling & validation
1 parent 239ab99 commit aa5f81b

2 files changed

Lines changed: 85 additions & 18 deletions

File tree

README.md

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127
2929
```
3030

3131
```sql
32-
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid 'key');
32+
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key');
3333
```
3434

3535
```sql
@@ -99,7 +99,7 @@ Usage
9999
`etcd_fdw` accepts the following table-level options via the
100100
`CREATE FOREIGN TABLE` command.
101101

102-
- **rowid** as *string*, mandatory, no default
102+
- **rowid_column** as *string*, mandatory, no default
103103

104104
Specifies which column should be treated as the unique row identifier.
105105
Usually set to key.
@@ -119,13 +119,30 @@ Usage
119119
Read key-value data at a specific etcd revision.
120120
If 0, the latest revision is used.
121121

122-
- **range** as *string*, optional, no default
122+
- **key** as *string*, optional, no default
123123

124-
Restricts the scan to the half-open interval `[key, range)`.
125-
Example: with range `/gamma` and scan starting at `/`, the query will return keys strictly less than `/gamma`.
124+
The starting key to fetch from etcd.
126125

126+
This option defines the beginning of the range.
127+
If neither `prefix` nor `key` is specified, the FDW will default to `\0` (the lowest possible key).
127128

128-
Note: Cannot be used together with `prefix`.
129+
- **range_end** as *string*, optional, no default
130+
131+
The exclusive end of the key range. Restricts the scan to the half-open interval `[key, range_end)`.
132+
133+
All keys between key (inclusive) and range_end (exclusive) will be returned.
134+
If range_end is omitted, only the single key defined by key will be returned (unless prefix is used).
135+
136+
- **consistency** as *string*, optional, default `l`
137+
138+
Specifies the read consistency level for etcd queries.
139+
140+
141+
Linearizable(`l`), Ensures the result reflects the latest consensus state of the cluster.
142+
Linearizable reads have higher latency but guarantee fresh data.
143+
144+
Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus.
145+
Serializable reads are faster and lighter on the cluster, but may return stale data in some cases
129146

130147
## What doesn't work
131148
etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements.

src/lib.rs

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,20 @@ pub enum EtcdFdwError {
7474
#[error("Key {0} already exists in etcd. No duplicates allowed")]
7575
KeyAlreadyExists(String),
7676

77-
#[error("Options 'prefix' and 'range' cannot be used together")]
77+
#[error("Options 'prefix' and 'range_end' cannot be used together")]
7878
ConflictingPrefixAndRange,
7979

80+
#[error("Options 'prefix' and 'key' should not be used together")]
81+
ConflictingPrefixAndKey,
82+
8083
#[error("Key {0} doesn't exist in etcd")]
8184
KeyDoesntExist(String),
8285

8386
#[error("Invalid option '{0}' with value '{1}'")]
8487
InvalidOption(String, String),
88+
89+
#[error("{0}")]
90+
OptionsError(#[from] OptionsError),
8591
}
8692

8793
impl From<EtcdFdwError> for ErrorReport {
@@ -92,13 +98,13 @@ impl From<EtcdFdwError> for ErrorReport {
9298

9399
/// Check whether dependent options exits
94100
/// i.e username & pass, cert & key
95-
fn require_pair<T>(
96-
a: &Option<T>,
97-
b: &Option<T>,
101+
fn require_pair(
102+
a: bool,
103+
b: bool,
98104
err: EtcdFdwError,
99105
) -> Result<(), EtcdFdwError> {
100106
match (a, b) {
101-
(Some(_), None) | (None, Some(_)) => Err(err),
107+
(true, false) | (false, true) => Err(err),
102108
_ => Ok(()),
103109
}
104110
}
@@ -196,8 +202,8 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
196202

197203
// ssl_cert + ssl_key must be both present or both absent
198204
// username + password must be both present or both absent
199-
require_pair(&cert_path, &key_path, EtcdFdwError::CertKeyMismatch(()))?;
200-
require_pair(&username, &password, EtcdFdwError::UserPassMismatch(()))?;
205+
require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?;
206+
require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?;
201207

202208
config = EtcdConfig {
203209
endpoints: vec![connstr],
@@ -237,13 +243,15 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
237243
) -> Result<(), EtcdFdwError> {
238244
// parse the options defined when `CREATE FOREIGN TABLE`
239245
let prefix = _options.get("prefix").cloned();
240-
let range = _options.get("range").cloned();
246+
let range_end = _options.get("range_end").cloned();
247+
let key_start = _options.get("key").cloned();
241248
let keys_only = _options.get("keys_only").map(|v| v == "true").unwrap_or(false);
242249
let revision = _options.get("revision").and_then(|v| v.parse::<i64>().ok()).unwrap_or(0);
250+
let serializable = _options.get("consistency").map(|v| v == "s").unwrap_or(false);
243251
let mut get_options = GetOptions::new();
244252

245253
// prefix and range are mutually exclusive
246-
match (prefix.as_ref(), range.as_ref()) {
254+
match (prefix.as_ref(), range_end.as_ref()) {
247255
(Some(_), Some(_)) => {
248256
return Err(EtcdFdwError::ConflictingPrefixAndRange);
249257
}
@@ -254,7 +262,9 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
254262
get_options = get_options.with_range(r.clone());
255263
}
256264
(None, None) => {
257-
get_options = get_options.with_all_keys();
265+
if key_start.is_none() {
266+
get_options = get_options.with_all_keys();
267+
}
258268
}
259269
}
260270

@@ -270,13 +280,21 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
270280
get_options = get_options.with_revision(revision);
271281
}
272282

283+
if serializable {
284+
get_options = get_options.with_serializable();
285+
}
286+
287+
// preference order : prefix > key_start > default "\0"
288+
// samllest possible valid key '\0'
289+
let key = prefix.clone()
290+
.or_else(|| key_start.clone())
291+
.unwrap_or_else(|| String::from("\0"));
292+
273293
// Check if columns contains key and value
274294
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
275295
self.fetch_key = colnames.contains(&String::from("key"));
276296
self.fetch_value = colnames.contains(&String::from("value"));
277297

278-
// samllest possible valid key '\0', empty string will not work with with_range
279-
let key = prefix.clone().unwrap_or_else(|| String::from("\0"));
280298
let result = self
281299
.rt
282300
.block_on(self.client.get(key, Some(get_options)));
@@ -452,6 +470,38 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
452470
// This currently also does nothing
453471
Ok(())
454472
}
473+
474+
fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> EtcdFdwResult<()> {
475+
if let Some(oid) = catalog {
476+
if oid == FOREIGN_SERVER_RELATION_ID {
477+
check_options_contain(&options, "connstr")?;
478+
479+
let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok();
480+
let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok();
481+
let username_exists = check_options_contain(&options, "username").is_ok();
482+
let password_exists = check_options_contain(&options, "password").is_ok();
483+
484+
require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?;
485+
require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?;
486+
} else if oid == FOREIGN_TABLE_RELATION_ID {
487+
check_options_contain(&options, "rowid_column")?;
488+
489+
let prefix_exists = check_options_contain(&options, "prefix").is_ok();
490+
let rannge_exists = check_options_contain(&options, "range_end").is_ok();
491+
let key_exists = check_options_contain(&options, "key").is_ok();
492+
493+
if prefix_exists && rannge_exists {
494+
return Err(EtcdFdwError::ConflictingPrefixAndRange);
495+
}
496+
497+
if prefix_exists && key_exists {
498+
return Err(EtcdFdwError::ConflictingPrefixAndKey);
499+
}
500+
}
501+
}
502+
503+
Ok(())
504+
}
455505
}
456506

457507
#[cfg(test)]

0 commit comments

Comments
 (0)