Pipelines help to combine several filters into one pipeline and process the dataset using it. You can build pipelines using datafilters, columnfilters, your custom stages and more.
To use pipeline, create a FilterPipeline class, then add stages to pipeline. Available methods for adding a pipeline stage:
add_datafilter- Runs a DataFilter class. You can run datafilter on multiple GPU devicesadd_columnfilter- Runs a ColumnFilter classadd_shuffle- Shuffles a datasetadd_deduplication- Deduplicates the dataset using the specified columnsadd_dataframe_filter- Custom filter for dataset DataFrame
from DPF.configs import ShardsDatasetConfig
from DPF.dataset_reader import DatasetReader
from DPF.pipelines import FilterPipeline
from DPF.filters.images.info_filter import ImageInfoFilter
from DPF.filters.images.hash_filters import PHashFilter
reader = DatasetReader()
config = ShardsDatasetConfig.from_path_and_columns(
"examples/example_dataset",
image_name_col='image_name',
)
processor = reader.read_from_config(config, workers=4)
pipeline = FilterPipeline("pipeline_example")
pipeline.add_datafilter(
ImageInfoFilter,
{'workers': 4},
processor_run_kwargs={'return_none_on_error': True},
)
pipeline.add_dataframe_filter(lambda df: df[df['is_correct']])
pipeline.add_datafilter(PHashFilter, {'workers': 4})
pipeline.add_deduplication(["image_phash_8"])
pipeline.add_shuffle()
pipeline.run(processor)Example of running datafilter on multiple GPUs:
pipeline.add_datafilter(
LITAFilter,
{'batch_size': 2, 'workers': 1},
devices=['cuda:0', 'cuda:1']
)