Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/implementations/l3_lr_ssh.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ The following examples can be used to build complex queries
```python
fc.query(phase='CALVAL')
```
- Reproc/Forward temporality (only for version 3)
```python
fc.query(temporality='REPROC', version='3.0')
```
:::
:::{tab-item} Periods
- A time stamp
Expand Down
118 changes: 111 additions & 7 deletions src/fcollections/core/_filesdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ def _extract_parameters(
) -> dict[str, tuple[dict[str, dcs.DocstringParam], dict[str, inspect.Parameter]]]:
parameters = {}
parameters["reader"] = _reading_parameters(new_class.reader)
parameters["convention"] = _convention_parameters(
new_class.layouts[0].conventions[-1]
)
parameters["convention"] = _convention_parameters(new_class.layouts)
parameters["filter_builders"] = _filter_builders_parameters(
new_class.filter_builders
)
Expand Down Expand Up @@ -208,9 +206,17 @@ def _reading_parameters(


def _convention_parameters(
parser: FileNameConvention,
layouts: list[Layout],
) -> tuple[dict[str, dcs.DocstringParam], dict[str, inspect.Parameter]]:
fields = parser.fields
fields_names = []
fields = []
for layout in layouts:
for convention in layout.conventions:
for field in convention.fields:
if field.name not in fields_names:
fields.append(field)
fields_names.append(field.name)

convention_docstring_parameters = {
field.name: dcs.DocstringParam(
["param", field.name],
Expand Down Expand Up @@ -506,6 +512,7 @@ def _files(
)

predicates, kwargs = self._auto_build_predicates_and_filters(predicates, kwargs)
self._remove_unknown_layout_filters(kwargs)

df = self.discoverer.to_dataframe(
predicates=predicates,
Expand Down Expand Up @@ -534,11 +541,96 @@ def _files(

return df

def _remove_unknown_layout_filters(self, filters: dict[str, tp.Any]):
"""Remove incompatible filters for the current configuration.

If the layouts are disabled (enabled_layouts=False), only the file name
convention will be used to generate metadata and apply filters. Fields
that are in the folders will not be used: the filters matching these
unused field will be removed if given by the user, with a warning
explaining that they should not be given.

Parameters
----------
filters
Mapping of filters given by the user. If the class attribute
``enable_layouts`` is set to False, the layout-specific filters will
be removed in-place

Warns
-----
UserWarning
If ``enable_layouts=False`` and the input parameter ``filters`` has
layout-specific entries.
"""
if not self.enable_layouts:
layout_filters = functools.reduce(
lambda a, b: a | b, [set(layout.names) for layout in self.layouts]
)
shared_filters = {f.name for f in self.layouts[0].conventions[-1].fields}
layout_specific_filters = layout_filters - shared_filters
layout_specific_filters &= set(filters)

if len(layout_specific_filters) > 0:
msg = (
"You have configured layout-specific filters ("
f"{layout_specific_filters} with `enable_layouts=False`: "
"they will be ignored"
)
warnings.warn(msg)

for layout_specific_filter in layout_specific_filters:
filters.pop(layout_specific_filter)

def _auto_build_predicates_and_filters(
self,
predicates: tp.Iterable[tp.Callable[[tuple[tp.Any, ...]], bool]],
kwargs,
):
kwargs: dict[str, tp.Any],
) -> tuple[tp.Callable[[tuple[tp.Any, ...]], bool], dict[str, tp.Any]]:
"""Build filters using the ``filter_builders`` class attribute.

Filter builders can either generate either a simple or complex filter. A
simple filter associates a key with a value, whereas a complex filter is
a predicate applied on the metadata record.

Complex filters - aka predicates - will be added to the input list of
``predicates``. Simple filters will be added to the input simple filters
``kwargs`` (it will be modified in place).

Note
----

Filter builders will intercept a specific value from the ``kwargs``
filters in order to build the additional. This value will be removed
from the result.

Note
----

Filter builders will generate filters that work on one or multiple
values of a record. This method cannot fuse multiple simple filters that
work on the same fields, so if the user has given a filter working on
the same field as an automatically generated simple filter, an error
will be raised.

Parameters
----------
predicates
Iterable of predicates. It will be converted to a list and enriched
with the configured predicates for the class.
kwargs
Simple filters. Will be modified in-place.

Raises
------
ValueError
In case the user has given two incompatible filters.

Returns
-------
tuple[tp.Callable[[tuple[tp.Any, ...]], bool], dict[str, tp.Any]]
The enriched predicates and simple filters
"""
# Auto-build declared predicates and additionnal filters.
predicates = list(predicates)
if self.filter_builders is not None:
Expand Down Expand Up @@ -589,6 +681,9 @@ def _auto_build_predicates_and_filters(
filters.keys(),
)

logger.debug("Removing filter %s", filter_field.name)
kwargs.pop(filter_field.name)

return predicates, kwargs

def _query(self, **kwargs) -> xr_t.Dataset | None:
Expand Down Expand Up @@ -620,6 +715,14 @@ def _query(self, **kwargs) -> xr_t.Dataset | None:
)

df = self._files(
# Building a new dictionary is necessary. The input parameters will
# be modified in place by the _files method. Reading parameters that
# are also listing parameters may be removed: the copy will prevent
# the disappearance of thoses specific parameters. An example would
# be the L3_LR_SSH `bbox` parameter that is converted to a
# pass_number filters and removed from the listing parameters, but
# this parameter is also used by the reader to finely crop the
# datasets.
**{k: kwargs[k] for k in kwargs if k in self.listing_parameters},
unmix=True,
deduplicate=True,
Expand Down Expand Up @@ -846,6 +949,7 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]:
_, edited_filters = self._auto_build_predicates_and_filters(
[], edited_filters
)
self._remove_unknown_layout_filters(edited_filters)
return {x[0] for x in metadata_collector.discover(**edited_filters)}
except LayoutMismatchError:
msg = (
Expand Down
43 changes: 34 additions & 9 deletions src/fcollections/core/_listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ def discover(
stat_fields: tuple[str] = (),
enable_layouts: bool = True,
**filters,
) -> pda.DataFrame:
) -> tp.Generator[tuple[tp.Any, ...], None, None]:
"""
Parameters
----------
Expand Down Expand Up @@ -931,15 +931,23 @@ def discover(
In case ``enable_layouts`` is True and a mismatch between the
layouts and the actual files is detected
"""
for layout in self.layouts:
layouts = [
layout for layout in self.layouts if len(set(filters) - layout.names) == 0
]
logger.debug(
"Removed %d layouts to match all the input filters",
len(self.layouts) - len(layouts),
)

for layout in layouts:
# TODO: We should also be able to give the predicates here -> need
# to modify the Layout interface
layout.set_filters(**filters)

self.root_node.clear()
if enable_layouts:
logger.debug("Using layouts to speed up listing")
visitor = LayoutVisitor(self.layouts, stat_fields)
visitor = LayoutVisitor(layouts, stat_fields)
records = walk(self.root_node, visitor)
else:
logger.debug("Full scan (not using layouts)")
Expand Down Expand Up @@ -997,9 +1005,26 @@ def to_dataframe(
layouts and the actual files is detected
"""
file_convention = self.layouts[0].conventions[-1]
return pda.DataFrame(
self.discover(predicates, stat_fields, enable_layouts, **filters),
columns=[f.name for f in file_convention.fields]
+ ["filename"]
+ list(stat_fields),
)
try:
return pda.DataFrame(
self.discover(predicates, stat_fields, enable_layouts, **filters),
columns=[f.name for f in file_convention.fields]
+ ["filename"]
+ list(stat_fields),
)
except LayoutMismatchError as exc:
layout_filters = functools.reduce(
lambda a, b: a | b, [set(layout.names) for layout in self.layouts]
)
shared_filters = {f.name for f in self.layouts[0].conventions[-1].fields}
layout_specific_filters = layout_filters - shared_filters

layout_specific_filters &= set(filters)
if len(layout_specific_filters) > 0:
note = (
"You may have introduced incompatibilities in your "
"query by setting layout-specific filters: "
f"{str(layout_specific_filters)}."
)
exc.add_note(note)
raise exc
8 changes: 8 additions & 0 deletions src/fcollections/implementations/_gridded_sla.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def __init__(self):
]
)

AVISO_L4_SWOT_LAYOUT_V3 = Layout(
[
*AVISO_L4_SWOT_LAYOUT.conventions[:2],
FileNameConventionGriddedSLA(),
]
)

_DATASET_ID_CONVENTION = build_convention(
complementary="(?P<blending>allsat|demo-allsat-swos|allsat-demo)-l4-duacs-(?P<spatial_resolution>.*)deg",
complementary_fields=[
Expand All @@ -113,6 +120,7 @@ class BasicNetcdfFilesDatabaseGriddedSLA(FilesDatabase, PeriodMixin):
layouts = [
Layout([FileNameConventionGriddedSLA()]),
AVISO_L4_SWOT_LAYOUT,
AVISO_L4_SWOT_LAYOUT_V3,
CMEMS_L4_SSHA_LAYOUT,
]
reader = OpenMfDataset(XARRAY_TEMPORAL_NETCDFS)
Expand Down
34 changes: 34 additions & 0 deletions tests/core/test_filesdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,26 @@ def test_list_files_filter_builders_error(
db_predicate_converter.list_files(a_number=[2, 4], c_number=2)


def test_list_files_layout_specific_arg(db_with_files_bad_layout: FilesDatabaseTest):
db = FilesDatabaseTest(
db_with_files_bad_layout.path, db_with_files_bad_layout.fs, enable_layouts=False
)

expected = pda.DataFrame(
[
(
np.datetime64("2025-01-01", "us"),
2,
"/bad_layout/baz/a_002/bar/a_file_002_20250101.nc",
)
],
columns=["time", "a_number", "filename"],
)

with pytest.warns(UserWarning, match="layout-specific"):
assert expected.equals(db.list_files(a_number=2, b_string=1))


def test_query_empty(db_with_files: FilesDatabaseTest):
assert db_with_files.query(a_number=10) is None

Expand Down Expand Up @@ -687,6 +707,20 @@ def test_filters_value_layouts_disabled_full_scan(
assert values == {1, 2}


def test_filters_value_layouts_disabled_layout_specific_arg(
db_with_files_bad_layout: FilesDatabaseTest,
):
db = FilesDatabaseTest(
db_with_files_bad_layout.path, db_with_files_bad_layout.fs, enable_layouts=False
)
with (
pytest.warns(PerformanceWarning, match="enabled"),
pytest.warns(UserWarning, match="layout-specific"),
):
values = db.filter_values("a_number", b_string=2)
assert values == {1, 2}


def test_filters_value_full_scan_flat(db_with_files: FilesDatabaseTest):
with pytest.warns(PerformanceWarning, match="flat"):
values = db_with_files.filter_values("a_number")
Expand Down
12 changes: 12 additions & 0 deletions tests/core/test_listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,3 +882,15 @@ def test_collector_stat_fields_error(
with pytest.raises(KeyError):
collector, enable_layouts = collector_status
collector.to_dataframe(enable_layouts=enable_layouts, stat_fields=("foo",))


def test_collector_layout_mismatch(
collector_status: tuple[FileSystemMetadataCollector, bool],
):
"""LayoutMismatchError message is annotated if layout-specific filters are
given."""
collector, enable_layouts = collector_status
if not enable_layouts:
# Bad layout (dead branch)
with pytest.raises(LayoutMismatchError, match="layout-specific"):
collector.to_dataframe(enable_layouts=True, resolution="HR")
2 changes: 2 additions & 0 deletions tests/fixtures/_l4_ssha.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def l4_ssha_files() -> list[str]:
return [
"aviso/v1.0/miost/science/dt_global_allsat_phy_l4_20230728_20240912.nc",
"aviso/v1.0/miost/science/dt_global_allsat_phy_l4_20230729_20240912.nc",
"aviso/v3.0/miost/dt_global_allsat_phy_l4_20230728_20240912.nc",
"aviso/v3.0/miost/dt_global_allsat_phy_l4_20230729_20240912.nc",
"aviso/v0.3/4dvarnet/calval/dt_global_allsat_phy_l4_20230729_20240913.nc",
"aviso/v0.3/4dvarqg/calval/dt_global_allsat_phy_l4_20230729_20240912.nc",
"aviso/v0.3/4dvarqg/calval/nrt_global_allsat_phy_l4_20230729_20240912.nc",
Expand Down
Loading
Loading