From 35ce925b986df9964335a449b9b5d32cd29f2263 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Wed, 17 Jun 2026 16:59:33 -0400 Subject: [PATCH 1/7] remove duplicate semicolons in retry_errored_repos Signed-off-by: Adrian Edwards --- collectoss/tasks/start_tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index 51bf25cd7..cd0d24028 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -359,11 +359,11 @@ def retry_errored_repos(self): f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'""" f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;""" f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;;""" + f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;""" f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;;""" + f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;""" f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;;""" + f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;""" ) execute_sql(query) From 0ca55778bb23c15170d6a48b76fdd28f1e861b20 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Wed, 17 Jun 2026 17:21:39 -0400 Subject: [PATCH 2/7] use existing variable Signed-off-by: Adrian Edwards --- collectoss/tasks/start_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index cd0d24028..4e37bab7a 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -259,7 +259,7 @@ def collection_monitor(self): enabled_collection_hooks = [] - with DatabaseSession(logger, self.app.engine) as session: + with DatabaseSession(logger, engine) as session: # Get config values for collection intervals config = SystemConfig(logger, session) From d8f6396040b3f5fb8f8c64d5b454bf24953351e3 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Wed, 17 Jun 2026 17:21:57 -0400 Subject: [PATCH 3/7] no need to instantiate two connections here Signed-off-by: Adrian Edwards --- collectoss/tasks/start_tasks.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index 4e37bab7a..aee9ff29a 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -254,13 +254,14 @@ def collection_monitor(self): logger.info("Checking for repos to collect") - #Get list of enabled phases - enabled_phase_names = get_enabled_phase_names_from_config(engine, logger) - + enabled_collection_hooks = [] with DatabaseSession(logger, engine) as session: + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger) + # Get config values for collection intervals config = SystemConfig(logger, session) core_interval = config.get_value('Tasks', 'core_collection_interval_days') or 15 @@ -345,6 +346,12 @@ def retry_errored_repos(self): engine = self.app.engine logger = logging.getLogger(create_collection_status_records.__name__) + + + with DatabaseSession(logger, engine) as session: + # get_newly_added_repos(session, logger, enabled_phase_names, days_until_collect_again = 1) + + #TODO: Isaac needs to normalize the status's to be abstract in the #collection_status table once collectoss dev is less unstable dev is less unstable. query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'""" From 7c65cb637da23a5bf295a43d9321505cc21effe0 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Thu, 18 Jun 2026 08:55:38 -0400 Subject: [PATCH 4/7] limit new collection to half of the pending capacity Signed-off-by: Adrian Edwards --- collectoss/tasks/util/collection_util.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/collectoss/tasks/util/collection_util.py b/collectoss/tasks/util/collection_util.py index c0b8d1984..4cd6ed690 100644 --- a/collectoss/tasks/util/collection_util.py +++ b/collectoss/tasks/util/collection_util.py @@ -2,6 +2,7 @@ import logging import random import datetime +import math #from celery.result import AsyncResult from celery import chain import sqlalchemy as s @@ -43,7 +44,11 @@ def get_valid_repos(self,session): if limit <= 0: return - new_collection_git_list = get_newly_added_repos(session, limit, hook=self.name) + # fill the remaining limit with half new repos and half recollected repos + # favoring recollection if there isnt an even number of repos + new_collection_limit = math.floor(limit / 2) + + new_collection_git_list = get_newly_added_repos(session, new_collection_limit, hook=self.name) collection_list = [(repo_git, True) for repo_git in new_collection_git_list] self.repo_list.extend(collection_list) limit -= len(collection_list) From 7ce3be34e75ef0b680eb303cfed25a8cd0ac3128 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Thu, 18 Jun 2026 09:03:14 -0400 Subject: [PATCH 5/7] count total new repos in retry_errored task Signed-off-by: Adrian Edwards --- collectoss/tasks/start_tasks.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index aee9ff29a..37d49a1fb 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -347,10 +347,25 @@ def retry_errored_repos(self): logger = logging.getLogger(create_collection_status_records.__name__) - with DatabaseSession(logger, engine) as session: - # get_newly_added_repos(session, logger, enabled_phase_names, days_until_collect_again = 1) + # if we have a lot of new repos to collect, we should skip retrying repos that have errored until collection is caught up. + total_new_repos = 0 + + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger) + + if primary_repo_collect_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, logger, "core")) + + if secondary_repo_collect_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, logger, "secondary")) + + if facade_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, logger, "facade")) + + if machine_learning_phase.__name__ in enabled_phase_names: + total_new_repos += len(get_newly_added_repos(session, logger, "ml")) #TODO: Isaac needs to normalize the status's to be abstract in the #collection_status table once collectoss dev is less unstable dev is less unstable. From 4fe004e89ee333bcf281f561bfda36ec81bc81bf Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Thu, 18 Jun 2026 09:13:02 -0400 Subject: [PATCH 6/7] make error retry of repos conditional on there being no new repos Signed-off-by: Adrian Edwards --- collectoss/tasks/start_tasks.py | 48 ++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index 37d49a1fb..487edb143 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -367,28 +367,32 @@ def retry_errored_repos(self): if machine_learning_phase.__name__ in enabled_phase_names: total_new_repos += len(get_newly_added_repos(session, logger, "ml")) - #TODO: Isaac needs to normalize the status's to be abstract in the - #collection_status table once collectoss dev is less unstable dev is less unstable. - query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'""" - f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is NULL;""" - f"""UPDATE collection_status SET core_status = '{CollectionState.PENDING.value}'""" - f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is NULL;""" - f"""UPDATE collection_status SET facade_status = '{CollectionState.PENDING.value}'""" - f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is NULL;""" - f"""UPDATE collection_status SET ml_status = '{CollectionState.PENDING.value}'""" - f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is NULL;""" - - f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;""" - f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;""" - f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;""" - f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'""" - f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;""" - ) - - execute_sql(query) + if total_new_repos == 0: + + #TODO: Isaac needs to normalize the status's to be abstract in the + #collection_status table once collectoss dev is less unstable dev is less unstable. + query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'""" + f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is NULL;""" + f"""UPDATE collection_status SET core_status = '{CollectionState.PENDING.value}'""" + f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is NULL;""" + f"""UPDATE collection_status SET facade_status = '{CollectionState.PENDING.value}'""" + f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is NULL;""" + f"""UPDATE collection_status SET ml_status = '{CollectionState.PENDING.value}'""" + f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is NULL;""" + + f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;""" + f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;""" + f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;""" + f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'""" + f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;""" + ) + + execute_sql(query) + else: + logger.warning(f"Skipping retry of errored repos so we dont overwhelm collection when there are {total_new_repos} new repos to collect") From d634101d5fe98f848d22cf5919383a861ef6dcb1 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Sun, 21 Jun 2026 00:55:44 -0400 Subject: [PATCH 7/7] call get_newly_added_repos with proper signature This adds a limit for the repo queries that is so high, it should effectively be unlimited (we are trying to count newly added repos in each enabled phase). Signed-off-by: Adrian Edwards --- collectoss/tasks/start_tasks.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/collectoss/tasks/start_tasks.py b/collectoss/tasks/start_tasks.py index 487edb143..8c402496c 100644 --- a/collectoss/tasks/start_tasks.py +++ b/collectoss/tasks/start_tasks.py @@ -355,17 +355,19 @@ def retry_errored_repos(self): #Get list of enabled phases enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger) + query_limit = 1_000_000 + if primary_repo_collect_phase.__name__ in enabled_phase_names: - total_new_repos += len(get_newly_added_repos(session, logger, "core")) + total_new_repos += len(get_newly_added_repos(session, query_limit, "core")) if secondary_repo_collect_phase.__name__ in enabled_phase_names: - total_new_repos += len(get_newly_added_repos(session, logger, "secondary")) + total_new_repos += len(get_newly_added_repos(session, query_limit, "secondary")) if facade_phase.__name__ in enabled_phase_names: - total_new_repos += len(get_newly_added_repos(session, logger, "facade")) + total_new_repos += len(get_newly_added_repos(session, query_limit, "facade")) if machine_learning_phase.__name__ in enabled_phase_names: - total_new_repos += len(get_newly_added_repos(session, logger, "ml")) + total_new_repos += len(get_newly_added_repos(session, query_limit, "ml")) if total_new_repos == 0: