diff --git a/data-tool/Makefile b/data-tool/Makefile index 60d8af0668..57b3c18fc4 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -120,7 +120,12 @@ run-extract-load: run-extract-refresh: . $(VENV_DIR)/bin/activate && \ - python flows/refresh_extract_subset_flow.py --mode refresh + python flows/refresh_extract_subset_flow.py --mode refresh --delta-scope batch + +run-extract-refresh-full: + . $(VENV_DIR)/bin/activate && \ + python flows/refresh_extract_subset_flow.py --mode refresh --delta-scope full + run-update-colin-ar-ind: ## Run update COLIN AR indicator flow . $(VENV_DIR)/bin/activate && \ diff --git a/data-tool/flows/common/colin_queries.py b/data-tool/flows/common/colin_queries.py index a1cec9ea61..0aeb8157b6 100644 --- a/data-tool/flows/common/colin_queries.py +++ b/data-tool/flows/common/colin_queries.py @@ -1,6 +1,7 @@ from math import ceil import re +from typing import Literal from sqlalchemy import text @@ -27,12 +28,19 @@ def build_corp_list(corp_list: str, chunksize: int) -> str: corp_list_cte = 'corp_list AS (\n'+ '\n'.join(union_lines) + '\n)' return ',\n'.join([*batch_ctes, corp_list_cte]) -def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int) -> str: - if not str(corp_list).strip(): - raise ValueError('empty corp_list') - corp_list_ctes = build_corp_list(corp_list, chunk_size) +def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int, scope: Literal['batch', 'full']) -> str: + corp_list_ctes = 'WITH ' + frozen_ctes = '' + join_ctes = 'corporation' + if scope == 'batch': + join_ctes = 'corp_list' + if not str(corp_list).strip(): + raise ValueError('empty corp_list') + corp_list_ctes += build_corp_list(corp_list, chunk_size) + ',\n' + if scope == 'full': + frozen_ctes = frozen_cte() query = f""" - WITH {corp_list_ctes}, + {corp_list_ctes} latest_event AS ( SELECT e.event_id, e.corp_num, @@ -44,22 +52,10 @@ def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int) -> ORDER BY e.event_timestmp DESC, e.event_id DESC ) AS rn FROM event e - JOIN corp_list c + JOIN {join_ctes} c ON c.corp_num = e.corp_num WHERE e.event_timestmp > TIMESTAMP '{timestamp}' - INTERVAL '2' HOUR - -- AND NOT ( - -- EXISTS ( - -- SELECT 1 - -- FROM corporation c2 - -- WHERE c2.corp_num = e.corp_num - -- AND c2.corp_frozen_typ_cd = 'C' - -- ) - -- AND EXISTS ( - -- SELECT 1 - -- FROM corp_early_adopters cea - -- WHERE cea.corp_num = e.corp_num - -- ) - -- ) + {frozen_ctes} ) SELECT le.EVENT_ID, le.corp_num, le.event_typ_cd, @@ -80,6 +76,38 @@ def get_identifiers_per_batch(mig_batch_id: int) -> str: WHERE mcb.mig_batch_id = {mig_batch_id} """ -def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int) -> str: +def unfreeze_identifiers() -> str: + return f""" + UPDATE corporation AS c + SET corp_frozen_type_cd = NULL + FROM mig_group AS mg + JOIN mig_batch AS mb ON mb.mig_group_id = mg.id + JOIN mig_corp_batch AS mcb ON mcb.mig_batch_id = mb.id + WHERE c.corp_num = mcb.corp_num + -- cprd + and mg.name in ('group_0', 'group_1', 'group_3', 'group_4','gcp_migration_group_test','misc_group') + and mg.source_db = 'cprd' + and mg.target_environment = 'prod' + AND c.corp_frozen_type_cd IS NOT NULL; + """ + +def frozen_cte() -> str: + return f""" + AND NOT ( + EXISTS ( + SELECT 1 + FROM corporation c2 + WHERE c2.corp_num = e.corp_num + AND c2.corp_frozen_typ_cd = 'C' + ) + AND EXISTS ( + SELECT 1 + FROM corp_early_adopters cea + WHERE cea.corp_num = e.corp_num + ) + ) + """ + +def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int, scope: str) -> str: """per batch get identifiers""" - return get_updated_identifiers(timestamp, corp_list, 999) \ No newline at end of file + return get_updated_identifiers(timestamp, corp_list, chunk_size, scope) \ No newline at end of file diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 6f9dd76498..c3eedc8d6b 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -12,7 +12,7 @@ from sqlalchemy.engine import Engine from datetime import datetime, timezone from config import get_named_config -from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch +from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch, unfreeze_identifiers from common.init_utils import colin_oracle_init, get_config from common.query_utils import corpnum_to_oracle_ids, get_cutoff_timestamp_query, get_fallout_corp_nums, prune_candidates_from_account, prune_candidates_from_batch, prune_candidates_from_cp @@ -120,18 +120,27 @@ def get_cuttoff_timestamp() -> datetime: def cleanup_extract_postgres_db() -> None: _reset_extract_postgres_db() +@task(name='Unfreeze-Identifiers', cache_policy=NO_CACHE) +def run_unfreeze_identifiers() -> None: + cfg = get_named_config() + with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).begin() as conn: + result = conn.execute(text(unfreeze_identifiers())) + print(f'Unfroze corporation rows={result.rowcount}') + @task(name='Get-Updated-Identifiers-Colin', cache_policy=NO_CACHE) -def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int) -> list[dict]: +def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int, scope: str) -> list[dict]: """ Get updated corp nums from colin with cutoff timestamp """ cfg = get_named_config() - mig_sql = get_identifiers_per_batch(mig_batch_id) - with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn: - row = conn.execute(text(mig_sql)).fetchone() + corp_list = '' + if scope == 'batch': + mig_sql = get_identifiers_per_batch(mig_batch_id) + with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn: + row = conn.execute(text(mig_sql)).fetchone() - corp_list = corpnum_to_oracle_ids(row[0]) if row else None - colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list), chunk_size) + corp_list = corpnum_to_oracle_ids(row[0]) if row else None + colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list or ''), chunk_size, scope) with colin_oracle_engine.connect() as conn: result = conn.execute(text(colin_sql)) @@ -232,7 +241,7 @@ def run_refresh_views(mode: str = 'refresh', targets: str = 'all') -> subproces def extract_pull_flow( corp_file: str, mode: str = 'load', - chunk_size: int = 900, + chunk_size: int = 999, threads: int = 4, pg_fastload: bool = False, pg_disable_method: str = 'table_triggers', @@ -243,6 +252,7 @@ def extract_pull_flow( reset_extract_postgres: bool = True, include_cp: bool = False, target_connection: str = _DEFAULT_TARGET_CONNECTION, + delta_scope: str = 'batch' ) -> None: """ Generate files @@ -252,7 +262,7 @@ def extract_pull_flow( print('Running in refresh mode: skipping Postgres DB reset') if reset_extract_postgres: cleanup_extract_postgres_db() - + cutoff = get_cuttoff_timestamp() config = get_config() @@ -260,7 +270,7 @@ def extract_pull_flow( # Get Identifiers feed_path: Path | None = None if mode == 'refresh': - updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=config.MIG_BATCH_IDS, colin_oracle_engine=colin_oracle_engine, chunk_size=chunk_size) + updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=config.MIG_BATCH_IDS, colin_oracle_engine=colin_oracle_engine, chunk_size=chunk_size, scope=delta_scope) print(f'Colin updated identifiers : {len(updated_rows)} rows') _GENERATED_DIR.mkdir(parents=True, exist_ok=True) feed_path = _GENERATED_DIR / f'refresh_corp_feed_{os.getpid()}.tmp' @@ -312,12 +322,15 @@ def extract_pull_flow( ) if run_result.returncode != 0: raise RuntimeError(f'DbSchemaCLI exited with code {run_result.returncode}') - - if refresh_views: + + print('Running Unfreezing Corps.......') + run_unfreeze_identifiers() + + if refresh_views and delta_scope == 'batch': refresh_result = run_refresh_views('refresh', 'all') if refresh_result.returncode !=0: raise RuntimeError(f'Refresh-Views exited with code {refresh_result.returncode}') - if mode == 'refresh': + if mode == 'refresh' and delta_scope == 'batch': prune_identifiers = get_fallen_identifiers(updated_corp_nums) prune_fallen_identifiers(prune_identifiers) @@ -325,6 +338,7 @@ def extract_pull_flow( p = argparse.ArgumentParser(description='Run Extract-Pull flow....') p.add_argument('--corp_file', default='../data-tool/scripts/generated/delta_ctst.txt', help='Path to newline-delimited corp identifiers') p.add_argument('--mode', default='refresh', choices=('refresh', 'load')) + p.add_argument('--delta-scope', default='batch', choices=('batch', 'full')) p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.') p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads') p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load')