diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index 51bf25cd7..8c402496c 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -254,12 +254,13 @@ def collection_monitor(self): logger.info("Checking for repos to collect") - #Get list of enabled phases - enabled_phase_names = get_enabled_phase_names_from_config(engine, logger) - + enabled_collection_hooks = [] - with DatabaseSession(logger, self.app.engine) as session: + with DatabaseSession(logger, engine) as session: + + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger) # Get config values for collection intervals config = SystemConfig(logger, session) @@ -345,28 +346,55 @@ def retry_errored_repos(self): engine = self.app.engine logger = logging.getLogger(create_collection_status_records.__name__) - #TODO: Isaac needs to normalize the status's to be abstract in the - #collection_status table once collectoss dev is less unstable dev is less unstable. - query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'""" - f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is NULL;""" - f"""UPDATE collection_status SET core_status = '{CollectionState.PENDING.value}'""" - f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is NULL;""" - f"""UPDATE collection_status SET facade_status = '{CollectionState.PENDING.value}'""" - f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is NULL;""" - f"""UPDATE collection_status SET ml_status = '{CollectionState.PENDING.value}'""" - f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is NULL;""" + + with DatabaseSession(logger, engine) as session: + + # if we have a lot of new repos to collect, we should skip retrying repos that have errored until collection is caught up. + total_new_repos = 0 + + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger) + + query_limit = 1_000_000 + + if primary_repo_collect_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, query_limit, "core")) - f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;""" - f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;;""" - f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;;""" - f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;;""" - ) + if secondary_repo_collect_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, query_limit, "secondary")) - execute_sql(query) + if facade_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, query_limit, "facade")) + + if machine_learning_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, query_limit, "ml")) + + if total_new_repos == 0: + + #TODO: Isaac needs to normalize the status's to be abstract in the + #collection_status table once collectoss dev is less unstable dev is less unstable. + query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'""" + f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is NULL;""" + f"""UPDATE collection_status SET core_status = '{CollectionState.PENDING.value}'""" + f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is NULL;""" + f"""UPDATE collection_status SET facade_status = '{CollectionState.PENDING.value}'""" + f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is NULL;""" + f"""UPDATE collection_status SET ml_status = '{CollectionState.PENDING.value}'""" + f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is NULL;""" + + f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;""" + f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;""" + f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;""" + f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;""" + ) + + execute_sql(query) + else: + logger.warning(f"Skipping retry of errored repos so we dont overwhelm collection when there are {total_new_repos} new repos to collect") diff --git a/collectoss/tasks/util/collection_util.py b/collectoss/tasks/util/collection_util.py index c0b8d1984..4cd6ed690 100644 --- a/collectoss/tasks/util/collection_util.py +++ b/collectoss/tasks/util/collection_util.py @@ -2,6 +2,7 @@ import logging import random import datetime +import math #from celery.result import AsyncResult from celery import chain import sqlalchemy as s @@ -43,7 +44,11 @@ def get_valid_repos(self,session): if limit <= 0: return - new_collection_git_list = get_newly_added_repos(session, limit, hook=self.name) + # fill the remaining limit with half new repos and half recollected repos + # favoring recollection if there isnt an even number of repos + new_collection_limit = math.floor(limit / 2) + + new_collection_git_list = get_newly_added_repos(session, new_collection_limit, hook=self.name) collection_list = [(repo_git, True) for repo_git in new_collection_git_list] self.repo_list.extend(collection_list) limit -= len(collection_list)