From 424257a8f3541fec134cb4d025a2659700bff0c6 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 11 Jun 2026 09:48:08 -0700 Subject: [PATCH 1/4] 33562 - Add Unfreeze Logic and Delta Full Set --- data-tool/flows/common/colin_queries.py | 47 ++++++++++++++----- .../flows/refresh_extract_subset_flow.py | 11 +++-- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/data-tool/flows/common/colin_queries.py b/data-tool/flows/common/colin_queries.py index a1cec9ea61..cc338f7943 100644 --- a/data-tool/flows/common/colin_queries.py +++ b/data-tool/flows/common/colin_queries.py @@ -31,6 +31,7 @@ def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int) -> if not str(corp_list).strip(): raise ValueError('empty corp_list') corp_list_ctes = build_corp_list(corp_list, chunk_size) + frozen_ctes = frozen_cte() query = f""" WITH {corp_list_ctes}, latest_event AS ( @@ -47,19 +48,7 @@ def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int) -> JOIN corp_list c ON c.corp_num = e.corp_num WHERE e.event_timestmp > TIMESTAMP '{timestamp}' - INTERVAL '2' HOUR - -- AND NOT ( - -- EXISTS ( - -- SELECT 1 - -- FROM corporation c2 - -- WHERE c2.corp_num = e.corp_num - -- AND c2.corp_frozen_typ_cd = 'C' - -- ) - -- AND EXISTS ( - -- SELECT 1 - -- FROM corp_early_adopters cea - -- WHERE cea.corp_num = e.corp_num - -- ) - -- ) + {frozen_ctes} ) SELECT le.EVENT_ID, le.corp_num, le.event_typ_cd, @@ -80,6 +69,38 @@ def get_identifiers_per_batch(mig_batch_id: int) -> str: WHERE mcb.mig_batch_id = {mig_batch_id} """ +def unfreeze_identifiers() -> str: + return f""" + UPDATE corporation AS c + SET corp_frozen_type_cd = NULL + FROM mig_group AS mg + JOIN mig_batch AS mb ON mb.mig_group_id = mg.id + JOIN mig_corp_batch AS mcb ON mcb.mig_batch_id = mb.id + WHERE c.corp_num = mcb.corp_num + -- cprd + and mg.name in ('group_0', 'group_1', 'group_3', 'group_4','gcp_migration_group_test','misc_group') + and mg.source_db = 'cprd' + and mg.target_environment = 'prod' + AND c.corp_frozen_type_cd IS NOT NULL; + """ + +def frozen_cte() -> str: + return f""" + AND NOT ( + EXISTS ( + SELECT 1 + FROM corporation c2 + WHERE c2.corp_num = e.corp_num + AND c2.corp_frozen_typ_cd = 'C' + ) + AND EXISTS ( + SELECT 1 + FROM corp_early_adopters cea + WHERE cea.corp_num = e.corp_num + ) + ) + """ + def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int) -> str: """per batch get identifiers""" return get_updated_identifiers(timestamp, corp_list, 999) \ No newline at end of file diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 6f9dd76498..7ff0fb82b0 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -12,7 +12,7 @@ from sqlalchemy.engine import Engine from datetime import datetime, timezone from config import get_named_config -from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch +from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch, unfreeze_identifiers from common.init_utils import colin_oracle_init, get_config from common.query_utils import corpnum_to_oracle_ids, get_cutoff_timestamp_query, get_fallout_corp_nums, prune_candidates_from_account, prune_candidates_from_batch, prune_candidates_from_cp @@ -252,7 +252,7 @@ def extract_pull_flow( print('Running in refresh mode: skipping Postgres DB reset') if reset_extract_postgres: cleanup_extract_postgres_db() - + cutoff = get_cuttoff_timestamp() config = get_config() @@ -312,7 +312,12 @@ def extract_pull_flow( ) if run_result.returncode != 0: raise RuntimeError(f'DbSchemaCLI exited with code {run_result.returncode}') - + + print('Running Unfreezing Corps.......') + unfreeze_after_freeze_flow = unfreeze_identifiers() + if unfreeze_after_freeze_flow.returncode !=0: + raise RuntimeError(f'Unfreezing process of corps after freeze flow exited with code {unfreeze_after_freeze_flow.returncode}') + if refresh_views: refresh_result = run_refresh_views('refresh', 'all') if refresh_result.returncode !=0: From 0ecd5e73d47d544365d22a81482117ab771a2f1f Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 11 Jun 2026 12:55:12 -0700 Subject: [PATCH 2/4] added scope --- data-tool/flows/common/colin_queries.py | 20 +++++++++++-------- .../flows/refresh_extract_subset_flow.py | 19 ++++++++++-------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/data-tool/flows/common/colin_queries.py b/data-tool/flows/common/colin_queries.py index cc338f7943..9c346199db 100644 --- a/data-tool/flows/common/colin_queries.py +++ b/data-tool/flows/common/colin_queries.py @@ -1,6 +1,7 @@ from math import ceil import re +from typing import Literal from sqlalchemy import text @@ -27,13 +28,16 @@ def build_corp_list(corp_list: str, chunksize: int) -> str: corp_list_cte = 'corp_list AS (\n'+ '\n'.join(union_lines) + '\n)' return ',\n'.join([*batch_ctes, corp_list_cte]) -def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int) -> str: - if not str(corp_list).strip(): - raise ValueError('empty corp_list') - corp_list_ctes = build_corp_list(corp_list, chunk_size) - frozen_ctes = frozen_cte() +def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int, scope: Literal['batch', 'full']) -> str: + corp_list_ctes = 'WITH ' + if scope == 'batch': + if not str(corp_list).strip(): + raise ValueError('empty corp_list') + corp_list_ctes += build_corp_list(corp_list, chunk_size) + ',\n' + if scope == 'full': + frozen_ctes = frozen_cte() query = f""" - WITH {corp_list_ctes}, + {corp_list_ctes} latest_event AS ( SELECT e.event_id, e.corp_num, @@ -101,6 +105,6 @@ def frozen_cte() -> str: ) """ -def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int) -> str: +def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int, scope: str) -> str: """per batch get identifiers""" - return get_updated_identifiers(timestamp, corp_list, 999) \ No newline at end of file + return get_updated_identifiers(timestamp, corp_list, chunk_size, scope) \ No newline at end of file diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 7ff0fb82b0..6217231e5c 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -121,17 +121,18 @@ def cleanup_extract_postgres_db() -> None: _reset_extract_postgres_db() @task(name='Get-Updated-Identifiers-Colin', cache_policy=NO_CACHE) -def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int) -> list[dict]: +def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int, scope: str) -> list[dict]: """ Get updated corp nums from colin with cutoff timestamp """ cfg = get_named_config() - mig_sql = get_identifiers_per_batch(mig_batch_id) - with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn: - row = conn.execute(text(mig_sql)).fetchone() + if scope == 'batch': + mig_sql = get_identifiers_per_batch(mig_batch_id) + with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn: + row = conn.execute(text(mig_sql)).fetchone() - corp_list = corpnum_to_oracle_ids(row[0]) if row else None - colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list), chunk_size) + corp_list = corpnum_to_oracle_ids(row[0]) if row else None + colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list), chunk_size, scope) with colin_oracle_engine.connect() as conn: result = conn.execute(text(colin_sql)) @@ -232,7 +233,7 @@ def run_refresh_views(mode: str = 'refresh', targets: str = 'all') -> subproces def extract_pull_flow( corp_file: str, mode: str = 'load', - chunk_size: int = 900, + chunk_size: int = 999, threads: int = 4, pg_fastload: bool = False, pg_disable_method: str = 'table_triggers', @@ -243,6 +244,7 @@ def extract_pull_flow( reset_extract_postgres: bool = True, include_cp: bool = False, target_connection: str = _DEFAULT_TARGET_CONNECTION, + delta_scope: str = 'batch' ) -> None: """ Generate files @@ -260,7 +262,7 @@ def extract_pull_flow( # Get Identifiers feed_path: Path | None = None if mode == 'refresh': - updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=config.MIG_BATCH_IDS, colin_oracle_engine=colin_oracle_engine, chunk_size=chunk_size) + updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=config.MIG_BATCH_IDS, colin_oracle_engine=colin_oracle_engine, chunk_size=chunk_size, scope=delta_scope) print(f'Colin updated identifiers : {len(updated_rows)} rows') _GENERATED_DIR.mkdir(parents=True, exist_ok=True) feed_path = _GENERATED_DIR / f'refresh_corp_feed_{os.getpid()}.tmp' @@ -330,6 +332,7 @@ def extract_pull_flow( p = argparse.ArgumentParser(description='Run Extract-Pull flow....') p.add_argument('--corp_file', default='../data-tool/scripts/generated/delta_ctst.txt', help='Path to newline-delimited corp identifiers') p.add_argument('--mode', default='refresh', choices=('refresh', 'load')) + p.add_argument('--delta-scope', default='batch', choices=('batch', 'full')) p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.') p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads') p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load') From f072e10ab36738885ae07d5d569ce91a7d3d756c Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 11 Jun 2026 13:24:21 -0700 Subject: [PATCH 3/4] added edge case fixes --- data-tool/Makefile | 7 ++++++- data-tool/flows/common/colin_queries.py | 5 ++++- data-tool/flows/refresh_extract_subset_flow.py | 14 +++++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/data-tool/Makefile b/data-tool/Makefile index 60d8af0668..57b3c18fc4 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -120,7 +120,12 @@ run-extract-load: run-extract-refresh: . $(VENV_DIR)/bin/activate && \ - python flows/refresh_extract_subset_flow.py --mode refresh + python flows/refresh_extract_subset_flow.py --mode refresh --delta-scope batch + +run-extract-refresh-full: + . $(VENV_DIR)/bin/activate && \ + python flows/refresh_extract_subset_flow.py --mode refresh --delta-scope full + run-update-colin-ar-ind: ## Run update COLIN AR indicator flow . $(VENV_DIR)/bin/activate && \ diff --git a/data-tool/flows/common/colin_queries.py b/data-tool/flows/common/colin_queries.py index 9c346199db..0aeb8157b6 100644 --- a/data-tool/flows/common/colin_queries.py +++ b/data-tool/flows/common/colin_queries.py @@ -30,7 +30,10 @@ def build_corp_list(corp_list: str, chunksize: int) -> str: def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int, scope: Literal['batch', 'full']) -> str: corp_list_ctes = 'WITH ' + frozen_ctes = '' + join_ctes = 'corporation' if scope == 'batch': + join_ctes = 'corp_list' if not str(corp_list).strip(): raise ValueError('empty corp_list') corp_list_ctes += build_corp_list(corp_list, chunk_size) + ',\n' @@ -49,7 +52,7 @@ def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int, sco ORDER BY e.event_timestmp DESC, e.event_id DESC ) AS rn FROM event e - JOIN corp_list c + JOIN {join_ctes} c ON c.corp_num = e.corp_num WHERE e.event_timestmp > TIMESTAMP '{timestamp}' - INTERVAL '2' HOUR {frozen_ctes} diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 6217231e5c..47fb4c956b 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -120,19 +120,27 @@ def get_cuttoff_timestamp() -> datetime: def cleanup_extract_postgres_db() -> None: _reset_extract_postgres_db() +@task(name='Unfreeze-Identifiers', cache_policy=NO_CACHE) +def run_unfreeze_identifiers() -> None: + cfg = get_named_config() + with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).begin() as conn: + result = conn.execute(text(unfreeze_identifiers())) + print(f'Unfroze corporation rows={result.rowcount}') + @task(name='Get-Updated-Identifiers-Colin', cache_policy=NO_CACHE) def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int, scope: str) -> list[dict]: """ Get updated corp nums from colin with cutoff timestamp """ cfg = get_named_config() + corp_list = '' if scope == 'batch': mig_sql = get_identifiers_per_batch(mig_batch_id) with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn: row = conn.execute(text(mig_sql)).fetchone() corp_list = corpnum_to_oracle_ids(row[0]) if row else None - colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list), chunk_size, scope) + colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list or ''), chunk_size, scope) with colin_oracle_engine.connect() as conn: result = conn.execute(text(colin_sql)) @@ -320,11 +328,11 @@ def extract_pull_flow( if unfreeze_after_freeze_flow.returncode !=0: raise RuntimeError(f'Unfreezing process of corps after freeze flow exited with code {unfreeze_after_freeze_flow.returncode}') - if refresh_views: + if refresh_views and delta_scope == 'batch': refresh_result = run_refresh_views('refresh', 'all') if refresh_result.returncode !=0: raise RuntimeError(f'Refresh-Views exited with code {refresh_result.returncode}') - if mode == 'refresh': + if mode == 'refresh' and delta_scope == 'batch': prune_identifiers = get_fallen_identifiers(updated_corp_nums) prune_fallen_identifiers(prune_identifiers) From 34bc9987de99faf0c009f6f77bd4483533400dbf Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 11 Jun 2026 13:32:58 -0700 Subject: [PATCH 4/4] added unfreeze task --- data-tool/flows/refresh_extract_subset_flow.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 47fb4c956b..c3eedc8d6b 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -324,9 +324,7 @@ def extract_pull_flow( raise RuntimeError(f'DbSchemaCLI exited with code {run_result.returncode}') print('Running Unfreezing Corps.......') - unfreeze_after_freeze_flow = unfreeze_identifiers() - if unfreeze_after_freeze_flow.returncode !=0: - raise RuntimeError(f'Unfreezing process of corps after freeze flow exited with code {unfreeze_after_freeze_flow.returncode}') + run_unfreeze_identifiers() if refresh_views and delta_scope == 'batch': refresh_result = run_refresh_views('refresh', 'all')