Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion data-tool/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ run-extract-load:

run-extract-refresh:
. $(VENV_DIR)/bin/activate && \
python flows/refresh_extract_subset_flow.py --mode refresh
python flows/refresh_extract_subset_flow.py --mode refresh --delta-scope batch

run-extract-refresh-full:
. $(VENV_DIR)/bin/activate && \
python flows/refresh_extract_subset_flow.py --mode refresh --delta-scope full


run-update-colin-ar-ind: ## Run update COLIN AR indicator flow
. $(VENV_DIR)/bin/activate && \
Expand Down
70 changes: 49 additions & 21 deletions data-tool/flows/common/colin_queries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from math import ceil
import re
from typing import Literal

from sqlalchemy import text

Expand All @@ -27,12 +28,19 @@
corp_list_cte = 'corp_list AS (\n'+ '\n'.join(union_lines) + '\n)'
return ',\n'.join([*batch_ctes, corp_list_cte])

def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int) -> str:
if not str(corp_list).strip():
raise ValueError('empty corp_list')
corp_list_ctes = build_corp_list(corp_list, chunk_size)
def get_updated_identifiers(timestamp: str, corp_list: str, chunk_size: int, scope: Literal['batch', 'full']) -> str:
corp_list_ctes = 'WITH '
frozen_ctes = ''
join_ctes = 'corporation'
if scope == 'batch':
join_ctes = 'corp_list'
if not str(corp_list).strip():
raise ValueError('empty corp_list')
corp_list_ctes += build_corp_list(corp_list, chunk_size) + ',\n'
if scope == 'full':
frozen_ctes = frozen_cte()
query = f"""
WITH {corp_list_ctes},
{corp_list_ctes}
latest_event AS (
SELECT e.event_id,
e.corp_num,
Expand All @@ -44,22 +52,10 @@
ORDER BY e.event_timestmp DESC, e.event_id DESC
) AS rn
FROM event e
JOIN corp_list c
JOIN {join_ctes} c
ON c.corp_num = e.corp_num
WHERE e.event_timestmp > TIMESTAMP '{timestamp}' - INTERVAL '2' HOUR
-- AND NOT (
-- EXISTS (
-- SELECT 1
-- FROM corporation c2
-- WHERE c2.corp_num = e.corp_num
-- AND c2.corp_frozen_typ_cd = 'C'
-- )
-- AND EXISTS (
-- SELECT 1
-- FROM corp_early_adopters cea
-- WHERE cea.corp_num = e.corp_num
-- )
-- )
{frozen_ctes}
) SELECT le.EVENT_ID,
le.corp_num,
le.event_typ_cd,
Expand All @@ -80,6 +76,38 @@
WHERE mcb.mig_batch_id = {mig_batch_id}
"""

def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int) -> str:
def unfreeze_identifiers() -> str:
return f"""
UPDATE corporation AS c
SET corp_frozen_type_cd = NULL
FROM mig_group AS mg
JOIN mig_batch AS mb ON mb.mig_group_id = mg.id
JOIN mig_corp_batch AS mcb ON mcb.mig_batch_id = mb.id
WHERE c.corp_num = mcb.corp_num
-- cprd
and mg.name in ('group_0', 'group_1', 'group_3', 'group_4','gcp_migration_group_test','misc_group')
and mg.source_db = 'cprd'
and mg.target_environment = 'prod'
AND c.corp_frozen_type_cd IS NOT NULL;
"""

Check warning on line 92 in data-tool/flows/common/colin_queries.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add replacement fields or use a normal string instead of an f-string.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ63luW20Jh2gKG5R4-B&open=AZ63luW20Jh2gKG5R4-B&pullRequest=4476

def frozen_cte() -> str:
return f"""
AND NOT (
EXISTS (
SELECT 1
FROM corporation c2
WHERE c2.corp_num = e.corp_num
AND c2.corp_frozen_typ_cd = 'C'
)
AND EXISTS (
SELECT 1
FROM corp_early_adopters cea
WHERE cea.corp_num = e.corp_num
)
)
"""

Check warning on line 109 in data-tool/flows/common/colin_queries.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add replacement fields or use a normal string instead of an f-string.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ63luW20Jh2gKG5R4-C&open=AZ63luW20Jh2gKG5R4-C&pullRequest=4476

def get_updated_identifiers_for_batch(timestamp: str, corp_list: str, chunk_size: int, scope: str) -> str:
"""per batch get identifiers"""
return get_updated_identifiers(timestamp, corp_list, 999)
return get_updated_identifiers(timestamp, corp_list, chunk_size, scope)
40 changes: 27 additions & 13 deletions data-tool/flows/refresh_extract_subset_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sqlalchemy.engine import Engine
from datetime import datetime, timezone
from config import get_named_config
from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch
from common.colin_queries import get_identifiers_per_batch, get_updated_identifiers_for_batch, unfreeze_identifiers
from common.init_utils import colin_oracle_init, get_config
from common.query_utils import corpnum_to_oracle_ids, get_cutoff_timestamp_query, get_fallout_corp_nums, prune_candidates_from_account, prune_candidates_from_batch, prune_candidates_from_cp

Expand Down Expand Up @@ -120,18 +120,27 @@
def cleanup_extract_postgres_db() -> None:
_reset_extract_postgres_db()

@task(name='Unfreeze-Identifiers', cache_policy=NO_CACHE)
def run_unfreeze_identifiers() -> None:
cfg = get_named_config()
with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).begin() as conn:
result = conn.execute(text(unfreeze_identifiers()))
print(f'Unfroze corporation rows={result.rowcount}')

@task(name='Get-Updated-Identifiers-Colin', cache_policy=NO_CACHE)
def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int) -> list[dict]:
def get_updated_identifiers_colin(cutoff_timestamp: str, mig_batch_id: int, colin_oracle_engine: Engine, chunk_size: int, scope: str) -> list[dict]:
"""
Get updated corp nums from colin with cutoff timestamp
"""
cfg = get_named_config()
mig_sql = get_identifiers_per_batch(mig_batch_id)
with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn:
row = conn.execute(text(mig_sql)).fetchone()
corp_list = ''
if scope == 'batch':
mig_sql = get_identifiers_per_batch(mig_batch_id)
with create_engine(cfg.SQLALCHEMY_DATABASE_URI_COLIN_MIGR).connect() as conn:
row = conn.execute(text(mig_sql)).fetchone()

corp_list = corpnum_to_oracle_ids(row[0]) if row else None
colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list), chunk_size)
corp_list = corpnum_to_oracle_ids(row[0]) if row else None
colin_sql = get_updated_identifiers_for_batch(cutoff_timestamp, str(corp_list or ''), chunk_size, scope)

with colin_oracle_engine.connect() as conn:
result = conn.execute(text(colin_sql))
Expand Down Expand Up @@ -230,19 +239,20 @@

@flow(name='Extract-Subset-Flow', log_prints=True, persist_result=False)
def extract_pull_flow(
corp_file: str,
mode: str = 'load',
chunk_size: int = 900,
chunk_size: int = 999,
threads: int = 4,
pg_fastload: bool = False,
pg_disable_method: str = 'table_triggers',
out: str | None=None,
run_dbschemacli: bool = False,
dbschemacli_cmd: str = 'dbschemacli',
refresh_views: bool = True,
reset_extract_postgres: bool = True,
include_cp: bool = False,
target_connection: str = _DEFAULT_TARGET_CONNECTION,
delta_scope: str = 'batch'

Check warning on line 255 in data-tool/flows/refresh_extract_subset_flow.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Function "extract_pull_flow" has 14 parameters, which is greater than the 13 authorized.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ64QcfyvxKY-zkoYzkG&open=AZ64QcfyvxKY-zkoYzkG&pullRequest=4476
) -> None:
"""
Generate files
Expand All @@ -252,15 +262,15 @@
print('Running in refresh mode: skipping Postgres DB reset')
if reset_extract_postgres:
cleanup_extract_postgres_db()

cutoff = get_cuttoff_timestamp()

config = get_config()
colin_oracle_engine = colin_oracle_init(config)
# Get Identifiers
feed_path: Path | None = None
if mode == 'refresh':
updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=config.MIG_BATCH_IDS, colin_oracle_engine=colin_oracle_engine, chunk_size=chunk_size)
updated_rows = get_updated_identifiers_colin(cutoff_timestamp=cutoff, mig_batch_id=config.MIG_BATCH_IDS, colin_oracle_engine=colin_oracle_engine, chunk_size=chunk_size, scope=delta_scope)
print(f'Colin updated identifiers : {len(updated_rows)} rows')
_GENERATED_DIR.mkdir(parents=True, exist_ok=True)
feed_path = _GENERATED_DIR / f'refresh_corp_feed_{os.getpid()}.tmp'
Expand Down Expand Up @@ -312,19 +322,23 @@
)
if run_result.returncode != 0:
raise RuntimeError(f'DbSchemaCLI exited with code {run_result.returncode}')

if refresh_views:

print('Running Unfreezing Corps.......')
run_unfreeze_identifiers()

if refresh_views and delta_scope == 'batch':
refresh_result = run_refresh_views('refresh', 'all')
if refresh_result.returncode !=0:
raise RuntimeError(f'Refresh-Views exited with code {refresh_result.returncode}')
if mode == 'refresh':
if mode == 'refresh' and delta_scope == 'batch':
prune_identifiers = get_fallen_identifiers(updated_corp_nums)
prune_fallen_identifiers(prune_identifiers)

if __name__ == '__main__':
p = argparse.ArgumentParser(description='Run Extract-Pull flow....')
p.add_argument('--corp_file', default='../data-tool/scripts/generated/delta_ctst.txt', help='Path to newline-delimited corp identifiers')
p.add_argument('--mode', default='refresh', choices=('refresh', 'load'))
p.add_argument('--delta-scope', default='batch', choices=('batch', 'full'))
p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.')
p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads')
p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load')
Expand Down