diff --git a/docs/implementations/l3_lr_ssh.md b/docs/implementations/l3_lr_ssh.md index 124a4e6..012c6b6 100644 --- a/docs/implementations/l3_lr_ssh.md +++ b/docs/implementations/l3_lr_ssh.md @@ -57,6 +57,10 @@ The following examples can be used to build complex queries ```python fc.query(phase='CALVAL') ``` + - Reproc/Forward temporality (only for version 3) + ```python + fc.query(temporality='REPROC', version='3.0') + ``` ::: :::{tab-item} Periods - A time stamp diff --git a/src/fcollections/core/_filesdb.py b/src/fcollections/core/_filesdb.py index ed25a86..5feface 100644 --- a/src/fcollections/core/_filesdb.py +++ b/src/fcollections/core/_filesdb.py @@ -106,9 +106,7 @@ def _extract_parameters( ) -> dict[str, tuple[dict[str, dcs.DocstringParam], dict[str, inspect.Parameter]]]: parameters = {} parameters["reader"] = _reading_parameters(new_class.reader) - parameters["convention"] = _convention_parameters( - new_class.layouts[0].conventions[-1] - ) + parameters["convention"] = _convention_parameters(new_class.layouts) parameters["filter_builders"] = _filter_builders_parameters( new_class.filter_builders ) @@ -208,9 +206,17 @@ def _reading_parameters( def _convention_parameters( - parser: FileNameConvention, + layouts: list[Layout], ) -> tuple[dict[str, dcs.DocstringParam], dict[str, inspect.Parameter]]: - fields = parser.fields + fields_names = [] + fields = [] + for layout in layouts: + for convention in layout.conventions: + for field in convention.fields: + if field.name not in fields_names: + fields.append(field) + fields_names.append(field.name) + convention_docstring_parameters = { field.name: dcs.DocstringParam( ["param", field.name], @@ -506,6 +512,7 @@ def _files( ) predicates, kwargs = self._auto_build_predicates_and_filters(predicates, kwargs) + self._remove_unknown_layout_filters(kwargs) df = self.discoverer.to_dataframe( predicates=predicates, @@ -534,11 +541,96 @@ def _files( return df + def _remove_unknown_layout_filters(self, filters: dict[str, tp.Any]): + """Remove incompatible filters for the current configuration. + + If the layouts are disabled (enabled_layouts=False), only the file name + convention will be used to generate metadata and apply filters. Fields + that are in the folders will not be used: the filters matching these + unused field will be removed if given by the user, with a warning + explaining that they should not be given. + + Parameters + ---------- + filters + Mapping of filters given by the user. If the class attribute + ``enable_layouts`` is set to False, the layout-specific filters will + be removed in-place + + Warns + ----- + UserWarning + If ``enable_layouts=False`` and the input parameter ``filters`` has + layout-specific entries. + """ + if not self.enable_layouts: + layout_filters = functools.reduce( + lambda a, b: a | b, [set(layout.names) for layout in self.layouts] + ) + shared_filters = {f.name for f in self.layouts[0].conventions[-1].fields} + layout_specific_filters = layout_filters - shared_filters + layout_specific_filters &= set(filters) + + if len(layout_specific_filters) > 0: + msg = ( + "You have configured layout-specific filters (" + f"{layout_specific_filters} with `enable_layouts=False`: " + "they will be ignored" + ) + warnings.warn(msg) + + for layout_specific_filter in layout_specific_filters: + filters.pop(layout_specific_filter) + def _auto_build_predicates_and_filters( self, predicates: tp.Iterable[tp.Callable[[tuple[tp.Any, ...]], bool]], - kwargs, - ): + kwargs: dict[str, tp.Any], + ) -> tuple[tp.Callable[[tuple[tp.Any, ...]], bool], dict[str, tp.Any]]: + """Build filters using the ``filter_builders`` class attribute. + + Filter builders can either generate either a simple or complex filter. A + simple filter associates a key with a value, whereas a complex filter is + a predicate applied on the metadata record. + + Complex filters - aka predicates - will be added to the input list of + ``predicates``. Simple filters will be added to the input simple filters + ``kwargs`` (it will be modified in place). + + Note + ---- + + Filter builders will intercept a specific value from the ``kwargs`` + filters in order to build the additional. This value will be removed + from the result. + + Note + ---- + + Filter builders will generate filters that work on one or multiple + values of a record. This method cannot fuse multiple simple filters that + work on the same fields, so if the user has given a filter working on + the same field as an automatically generated simple filter, an error + will be raised. + + Parameters + ---------- + predicates + Iterable of predicates. It will be converted to a list and enriched + with the configured predicates for the class. + kwargs + Simple filters. Will be modified in-place. + + Raises + ------ + ValueError + In case the user has given two incompatible filters. + + Returns + ------- + tuple[tp.Callable[[tuple[tp.Any, ...]], bool], dict[str, tp.Any]] + The enriched predicates and simple filters + """ # Auto-build declared predicates and additionnal filters. predicates = list(predicates) if self.filter_builders is not None: @@ -589,6 +681,9 @@ def _auto_build_predicates_and_filters( filters.keys(), ) + logger.debug("Removing filter %s", filter_field.name) + kwargs.pop(filter_field.name) + return predicates, kwargs def _query(self, **kwargs) -> xr_t.Dataset | None: @@ -620,6 +715,14 @@ def _query(self, **kwargs) -> xr_t.Dataset | None: ) df = self._files( + # Building a new dictionary is necessary. The input parameters will + # be modified in place by the _files method. Reading parameters that + # are also listing parameters may be removed: the copy will prevent + # the disappearance of thoses specific parameters. An example would + # be the L3_LR_SSH `bbox` parameter that is converted to a + # pass_number filters and removed from the listing parameters, but + # this parameter is also used by the reader to finely crop the + # datasets. **{k: kwargs[k] for k in kwargs if k in self.listing_parameters}, unmix=True, deduplicate=True, @@ -846,6 +949,7 @@ def _filter_values(self, filter_name: str, **kwargs: tp.Any) -> set[tp.Any]: _, edited_filters = self._auto_build_predicates_and_filters( [], edited_filters ) + self._remove_unknown_layout_filters(edited_filters) return {x[0] for x in metadata_collector.discover(**edited_filters)} except LayoutMismatchError: msg = ( diff --git a/src/fcollections/core/_listing.py b/src/fcollections/core/_listing.py index dfe076a..423cb6d 100644 --- a/src/fcollections/core/_listing.py +++ b/src/fcollections/core/_listing.py @@ -896,7 +896,7 @@ def discover( stat_fields: tuple[str] = (), enable_layouts: bool = True, **filters, - ) -> pda.DataFrame: + ) -> tp.Generator[tuple[tp.Any, ...], None, None]: """ Parameters ---------- @@ -931,7 +931,15 @@ def discover( In case ``enable_layouts`` is True and a mismatch between the layouts and the actual files is detected """ - for layout in self.layouts: + layouts = [ + layout for layout in self.layouts if len(set(filters) - layout.names) == 0 + ] + logger.debug( + "Removed %d layouts to match all the input filters", + len(self.layouts) - len(layouts), + ) + + for layout in layouts: # TODO: We should also be able to give the predicates here -> need # to modify the Layout interface layout.set_filters(**filters) @@ -939,7 +947,7 @@ def discover( self.root_node.clear() if enable_layouts: logger.debug("Using layouts to speed up listing") - visitor = LayoutVisitor(self.layouts, stat_fields) + visitor = LayoutVisitor(layouts, stat_fields) records = walk(self.root_node, visitor) else: logger.debug("Full scan (not using layouts)") @@ -997,9 +1005,26 @@ def to_dataframe( layouts and the actual files is detected """ file_convention = self.layouts[0].conventions[-1] - return pda.DataFrame( - self.discover(predicates, stat_fields, enable_layouts, **filters), - columns=[f.name for f in file_convention.fields] - + ["filename"] - + list(stat_fields), - ) + try: + return pda.DataFrame( + self.discover(predicates, stat_fields, enable_layouts, **filters), + columns=[f.name for f in file_convention.fields] + + ["filename"] + + list(stat_fields), + ) + except LayoutMismatchError as exc: + layout_filters = functools.reduce( + lambda a, b: a | b, [set(layout.names) for layout in self.layouts] + ) + shared_filters = {f.name for f in self.layouts[0].conventions[-1].fields} + layout_specific_filters = layout_filters - shared_filters + + layout_specific_filters &= set(filters) + if len(layout_specific_filters) > 0: + note = ( + "You may have introduced incompatibilities in your " + "query by setting layout-specific filters: " + f"{str(layout_specific_filters)}." + ) + exc.add_note(note) + raise exc diff --git a/src/fcollections/implementations/_gridded_sla.py b/src/fcollections/implementations/_gridded_sla.py index 50931f9..5d8ebb5 100644 --- a/src/fcollections/implementations/_gridded_sla.py +++ b/src/fcollections/implementations/_gridded_sla.py @@ -91,6 +91,13 @@ def __init__(self): ] ) +AVISO_L4_SWOT_LAYOUT_V3 = Layout( + [ + *AVISO_L4_SWOT_LAYOUT.conventions[:2], + FileNameConventionGriddedSLA(), + ] +) + _DATASET_ID_CONVENTION = build_convention( complementary="(?Pallsat|demo-allsat-swos|allsat-demo)-l4-duacs-(?P.*)deg", complementary_fields=[ @@ -113,6 +120,7 @@ class BasicNetcdfFilesDatabaseGriddedSLA(FilesDatabase, PeriodMixin): layouts = [ Layout([FileNameConventionGriddedSLA()]), AVISO_L4_SWOT_LAYOUT, + AVISO_L4_SWOT_LAYOUT_V3, CMEMS_L4_SSHA_LAYOUT, ] reader = OpenMfDataset(XARRAY_TEMPORAL_NETCDFS) diff --git a/tests/core/test_filesdb.py b/tests/core/test_filesdb.py index 5b339d8..59072fd 100644 --- a/tests/core/test_filesdb.py +++ b/tests/core/test_filesdb.py @@ -424,6 +424,26 @@ def test_list_files_filter_builders_error( db_predicate_converter.list_files(a_number=[2, 4], c_number=2) +def test_list_files_layout_specific_arg(db_with_files_bad_layout: FilesDatabaseTest): + db = FilesDatabaseTest( + db_with_files_bad_layout.path, db_with_files_bad_layout.fs, enable_layouts=False + ) + + expected = pda.DataFrame( + [ + ( + np.datetime64("2025-01-01", "us"), + 2, + "/bad_layout/baz/a_002/bar/a_file_002_20250101.nc", + ) + ], + columns=["time", "a_number", "filename"], + ) + + with pytest.warns(UserWarning, match="layout-specific"): + assert expected.equals(db.list_files(a_number=2, b_string=1)) + + def test_query_empty(db_with_files: FilesDatabaseTest): assert db_with_files.query(a_number=10) is None @@ -687,6 +707,20 @@ def test_filters_value_layouts_disabled_full_scan( assert values == {1, 2} +def test_filters_value_layouts_disabled_layout_specific_arg( + db_with_files_bad_layout: FilesDatabaseTest, +): + db = FilesDatabaseTest( + db_with_files_bad_layout.path, db_with_files_bad_layout.fs, enable_layouts=False + ) + with ( + pytest.warns(PerformanceWarning, match="enabled"), + pytest.warns(UserWarning, match="layout-specific"), + ): + values = db.filter_values("a_number", b_string=2) + assert values == {1, 2} + + def test_filters_value_full_scan_flat(db_with_files: FilesDatabaseTest): with pytest.warns(PerformanceWarning, match="flat"): values = db_with_files.filter_values("a_number") diff --git a/tests/core/test_listing.py b/tests/core/test_listing.py index 2cee3b1..856e1a8 100644 --- a/tests/core/test_listing.py +++ b/tests/core/test_listing.py @@ -882,3 +882,15 @@ def test_collector_stat_fields_error( with pytest.raises(KeyError): collector, enable_layouts = collector_status collector.to_dataframe(enable_layouts=enable_layouts, stat_fields=("foo",)) + + +def test_collector_layout_mismatch( + collector_status: tuple[FileSystemMetadataCollector, bool], +): + """LayoutMismatchError message is annotated if layout-specific filters are + given.""" + collector, enable_layouts = collector_status + if not enable_layouts: + # Bad layout (dead branch) + with pytest.raises(LayoutMismatchError, match="layout-specific"): + collector.to_dataframe(enable_layouts=True, resolution="HR") diff --git a/tests/fixtures/_l4_ssha.py b/tests/fixtures/_l4_ssha.py index 0b8ff8d..4ffd0c6 100644 --- a/tests/fixtures/_l4_ssha.py +++ b/tests/fixtures/_l4_ssha.py @@ -58,6 +58,8 @@ def l4_ssha_files() -> list[str]: return [ "aviso/v1.0/miost/science/dt_global_allsat_phy_l4_20230728_20240912.nc", "aviso/v1.0/miost/science/dt_global_allsat_phy_l4_20230729_20240912.nc", + "aviso/v3.0/miost/dt_global_allsat_phy_l4_20230728_20240912.nc", + "aviso/v3.0/miost/dt_global_allsat_phy_l4_20230729_20240912.nc", "aviso/v0.3/4dvarnet/calval/dt_global_allsat_phy_l4_20230729_20240913.nc", "aviso/v0.3/4dvarqg/calval/dt_global_allsat_phy_l4_20230729_20240912.nc", "aviso/v0.3/4dvarqg/calval/nrt_global_allsat_phy_l4_20230729_20240912.nc", diff --git a/tests/implementations/collections/test_gridded_sla.py b/tests/implementations/collections/test_gridded_sla.py index 2c1639c..645dda4 100644 --- a/tests/implementations/collections/test_gridded_sla.py +++ b/tests/implementations/collections/test_gridded_sla.py @@ -7,7 +7,7 @@ import pytest from fsspec.implementations.local import LocalFileSystem -from fcollections.core import DirNode, FileSystemMetadataCollector +from fcollections.core import DirNode, FileSystemMetadataCollector, LayoutMismatchError from fcollections.implementations import ( AVISO_L4_SWOT_LAYOUT, CMEMS_L4_SSHA_LAYOUT, @@ -220,13 +220,13 @@ def test_generate_layout_aviso(self): @pytest.mark.parametrize( "filters, expected", [ - ({}, [0, 1, 2, 3, 4]), - ({"version": "0.3"}, [2, 3, 4]), - ({"method": "4dvarnet"}, [2]), - ({"phase": "SCIENCE"}, [0, 1]), - ({"time": np.datetime64("2023-07-29")}, [1, 2, 3, 4]), - ({"production_date": np.datetime64("2024-09-13")}, [2]), - ({"delay": Delay.NRT}, [4]), + ({}, [0, 1, 2, 3, 4, 5, 6]), + ({"version": "0.3"}, [4, 5, 6]), + ({"method": "4dvarnet"}, [4]), + ({"phase": "SCIENCE", "version": "1.0"}, [0, 1]), + ({"time": np.datetime64("2023-07-29")}, [1, 3, 4, 6]), + ({"production_date": np.datetime64("2024-09-13")}, [4]), + ({"delay": Delay.NRT}, [6]), ], ) def test_list_layout_aviso( @@ -253,17 +253,48 @@ def test_list_layout_aviso( assert len(expected) > 0 assert expected == actual + def test_list_layout_aviso_incompatibility( + self, + l4_ssha_dir_layout_aviso: Path, + ): + root_path_str = l4_ssha_dir_layout_aviso.as_posix() + root_node = DirNode( + root_path_str, {"name": root_path_str}, LocalFileSystem(), 0 + ) + + collector = FileSystemMetadataCollector( + NetcdfFilesDatabaseGriddedSLA.layouts, root_node + ) + + with pytest.raises(LayoutMismatchError, match="layout-specific"): + collector.to_dataframe(phase="SCIENCE") + + def test_list_layout_aviso_warning( + self, + l4_ssha_dir_layout_aviso: Path, + ): + db = NetcdfFilesDatabaseGriddedSLA( + l4_ssha_dir_layout_aviso, enable_layouts=False + ) + + reference = db.list_files(sort=True) + with pytest.warns(UserWarning, match="layout-specific"): + actual = db.list_files(phase="SCIENCE", sort=True) + + assert len(reference) > 0 + assert reference.equals(actual) + @pytest.mark.parametrize( "filters, expected", [ - ({}, [8, 9, 10, 11]), + ({}, [10, 11, 12, 13]), ({"delay": Delay.DT}, [0]), ({"type": DataType.MY}, [0]), - ({"version": "202411"}, [11]), - ({"time": np.datetime64("2023-07-28")}, [8, 11]), - ({"production_date": np.datetime64("2024-09-13")}, [10]), - ({"spatial_resolution": 0.5}, [10]), - ({"temporal_resolution": "PT12H"}, [9]), + ({"version": "202411"}, [13]), + ({"time": np.datetime64("2023-07-28")}, [10, 13]), + ({"production_date": np.datetime64("2024-09-13")}, [12]), + ({"spatial_resolution": 0.5}, [12]), + ({"temporal_resolution": "PT12H"}, [11]), ], ) def test_list_layout_cmems( diff --git a/tests/implementations/collections/test_l3_lr_ssh.py b/tests/implementations/collections/test_l3_lr_ssh.py index 8986dfc..8d2999c 100644 --- a/tests/implementations/collections/test_l3_lr_ssh.py +++ b/tests/implementations/collections/test_l3_lr_ssh.py @@ -11,7 +11,12 @@ import xarray as xr from utils import brute_force_geographical_selection, extract_box_from_polygon -from fcollections.core import FileNameConvention, Layout, PerformanceWarning +from fcollections.core import ( + FileNameConvention, + Layout, + LayoutMismatchError, + PerformanceWarning, +) from fcollections.implementations import ( AVISO_L3_LR_SSH_LAYOUT_V2, AVISO_L3_LR_SSH_LAYOUT_V3, @@ -760,6 +765,38 @@ def test_list_swot_lr_l3_layout( map(tuple, expected.to_numpy()) ) + def test_list_layout_reproc( + self, + l3_lr_ssh_dir_empty_files_layout: Path, + ): + db = NetcdfFilesDatabaseSwotLRL3(l3_lr_ssh_dir_empty_files_layout) + files = db.list_files(temporality="FORWARD", version="2.0.1") + actual_half_orbits = sorted( + [tuple(x) for x in files[["cycle_number", "pass_number"]].to_numpy()] + ) + assert actual_half_orbits == [(10, 532)] + + def test_list_layout_reproc_incompatibility( + self, + l3_lr_ssh_dir_empty_files_layout: Path, + ): + db = NetcdfFilesDatabaseSwotLRL3(l3_lr_ssh_dir_empty_files_layout) + with pytest.raises(LayoutMismatchError, match="layout-specific"): + db.list_files(temporality="FORWARD") + + def test_list_layout_reproc_warning( + self, + l3_lr_ssh_dir_empty_files_layout: Path, + ): + db = NetcdfFilesDatabaseSwotLRL3( + l3_lr_ssh_dir_empty_files_layout, enable_layouts=False + ) + + expected = db.list_files(sort=True) + with pytest.warns(UserWarning, match="layout-specific"): + actual = db.list_files(sort=True, temporality="FORWARD") + assert expected.equals(actual) + def test_subsets(l3_lr_ssh_dir: Path): db = NetcdfFilesDatabaseSwotLRL3(l3_lr_ssh_dir)