Deduplicate Enghouse ticket_results#5144
Conversation
|
Warehouse report: Failed to add ci-report to a comment. Review the ci-report in the Summary. |
|
Terraform plan in iac/cal-itp-data-infra-staging/airflow/us Plan: 1 to add, 9 to change, 1 to destroy.Terraform used the selected providers to generate the following execution
plan. Resource actions are indicated with the following symbols:
+ create
!~ update in-place
- destroy
Terraform will perform the following actions:
# google_storage_bucket_object.calitp-staging-composer["dags/airtable_issue_management.py"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer" {
!~ crc32c = "F3GRiw==" -> (known after apply)
!~ detect_md5hash = "jfUfWHhCEBMLm+jeORHn7w==" -> "different hash"
!~ generation = 1777081402145446 -> (known after apply)
id = "calitp-staging-composer-dags/airtable_issue_management.py"
!~ md5hash = "jfUfWHhCEBMLm+jeORHn7w==" -> (known after apply)
name = "dags/airtable_issue_management.py"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer["plugins/operators/airtable_issues_email_operator.py"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer" {
!~ crc32c = "teQqow==" -> (known after apply)
!~ detect_md5hash = "eopXX15B6gJXv314s81Xgg==" -> "different hash"
!~ generation = 1777081402147724 -> (known after apply)
id = "calitp-staging-composer-plugins/operators/airtable_issues_email_operator.py"
!~ md5hash = "eopXX15B6gJXv314s81Xgg==" -> (known after apply)
name = "plugins/operators/airtable_issues_email_operator.py"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer["plugins/operators/airtable_issues_update_operator.py"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer" {
!~ crc32c = "IAu8XA==" -> (known after apply)
!~ detect_md5hash = "qBYQP0FDh4xB1EW6MMOqaw==" -> "different hash"
!~ generation = 1777081402143462 -> (known after apply)
id = "calitp-staging-composer-plugins/operators/airtable_issues_update_operator.py"
!~ md5hash = "qBYQP0FDh4xB1EW6MMOqaw==" -> (known after apply)
name = "plugins/operators/airtable_issues_update_operator.py"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-catalog will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer-catalog" {
!~ content = (sensitive value)
!~ crc32c = "PCZSkg==" -> (known after apply)
!~ detect_md5hash = "emi1LB4jwlju+91Lb2/ikw==" -> "different hash"
!~ generation = 1777081402704275 -> (known after apply)
id = "calitp-staging-composer-data/warehouse/target/catalog.json"
!~ md5hash = "emi1LB4jwlju+91Lb2/ikw==" -> (known after apply)
name = "data/warehouse/target/catalog.json"
# (16 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-dags["models/intermediate/payments/_int_payments.yml"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
!~ crc32c = "AkMXcg==" -> (known after apply)
!~ detect_md5hash = "v/JBoD58WEdM/XZscf2Ufw==" -> "different hash"
!~ generation = 1776453636833756 -> (known after apply)
id = "calitp-staging-composer-data/warehouse/models/intermediate/payments/_int_payments.yml"
!~ md5hash = "v/JBoD58WEdM/XZscf2Ufw==" -> (known after apply)
name = "data/warehouse/models/intermediate/payments/_int_payments.yml"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-dags["models/intermediate/payments/int_payments__enghouse_ticket_results_deduped.sql"] will be created
+ resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
+ bucket = "calitp-staging-composer"
+ content = (sensitive value)
+ content_type = (known after apply)
+ crc32c = (known after apply)
+ detect_md5hash = "different hash"
+ generation = (known after apply)
+ id = (known after apply)
+ kms_key_name = (known after apply)
+ md5hash = (known after apply)
+ md5hexhash = (known after apply)
+ media_link = (known after apply)
+ name = "data/warehouse/models/intermediate/payments/int_payments__enghouse_ticket_results_deduped.sql"
+ output_name = (known after apply)
+ self_link = (known after apply)
+ source = "../../../../warehouse/models/intermediate/payments/int_payments__enghouse_ticket_results_deduped.sql"
+ storage_class = (known after apply)
}
# google_storage_bucket_object.calitp-staging-composer-dags["models/mart/payments/fct_payments_rides_enghouse.sql"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
!~ crc32c = "TnAu0g==" -> (known after apply)
!~ detect_md5hash = "xBUm/8jgAixqjWGlHPa0ow==" -> "different hash"
!~ generation = 1776796010157437 -> (known after apply)
id = "calitp-staging-composer-data/warehouse/models/mart/payments/fct_payments_rides_enghouse.sql"
!~ md5hash = "xBUm/8jgAixqjWGlHPa0ow==" -> (known after apply)
name = "data/warehouse/models/mart/payments/fct_payments_rides_enghouse.sql"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-dags["models/mart/transit_database/fct_close_expired_issues.sql"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
!~ crc32c = "jurzxA==" -> (known after apply)
!~ detect_md5hash = "FNZIWVUFTxMYmmQO3b4Yzw==" -> "different hash"
!~ generation = 1777081402132841 -> (known after apply)
id = "calitp-staging-composer-data/warehouse/models/mart/transit_database/fct_close_expired_issues.sql"
!~ md5hash = "FNZIWVUFTxMYmmQO3b4Yzw==" -> (known after apply)
name = "data/warehouse/models/mart/transit_database/fct_close_expired_issues.sql"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-dags["models/mart/transit_database/fct_close_rt_completeness_issues.sql"] will be destroyed
# (because key ["models/mart/transit_database/fct_close_rt_completeness_issues.sql"] is not in for_each map)
- resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
- bucket = "calitp-staging-composer" -> null
- content_type = "text/plain; charset=utf-8" -> null
- crc32c = "hlte0g==" -> null
- detect_md5hash = "qh2Ssdf2gAuvEPszrY1Ihg==" -> null
- event_based_hold = false -> null
- generation = 1777081402144466 -> null
- id = "calitp-staging-composer-data/warehouse/models/mart/transit_database/fct_close_rt_completeness_issues.sql" -> null
- md5hash = "qh2Ssdf2gAuvEPszrY1Ihg==" -> null
- md5hexhash = "aa1d92b1d7f6800baf10fb33ad8d4886" -> null
- media_link = "https://storage.googleapis.com/download/storage/v1/b/calitp-staging-composer/o/data%2Fwarehouse%2Fmodels%2Fmart%2Ftransit_database%2Ffct_close_rt_completeness_issues.sql?generation=1777081402144466&alt=media" -> null
- metadata = {} -> null
- name = "data/warehouse/models/mart/transit_database/fct_close_rt_completeness_issues.sql" -> null
- output_name = "data/warehouse/models/mart/transit_database/fct_close_rt_completeness_issues.sql" -> null
- self_link = "https://www.googleapis.com/storage/v1/b/calitp-staging-composer/o/data%2Fwarehouse%2Fmodels%2Fmart%2Ftransit_database%2Ffct_close_rt_completeness_issues.sql" -> null
- source = "../../../../warehouse/models/mart/transit_database/fct_close_rt_completeness_issues.sql" -> null
- storage_class = "STANDARD" -> null
- temporary_hold = false -> null
# (6 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-dags["models/staging/payments/enghouse/stg_enghouse__ticket_results.sql"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer-dags" {
!~ crc32c = "pSICtA==" -> (known after apply)
!~ detect_md5hash = "DuHPTMzsaFO6KUjZCoxkvw==" -> "different hash"
!~ generation = 1769734706517996 -> (known after apply)
id = "calitp-staging-composer-data/warehouse/models/staging/payments/enghouse/stg_enghouse__ticket_results.sql"
!~ md5hash = "DuHPTMzsaFO6KUjZCoxkvw==" -> (known after apply)
name = "data/warehouse/models/staging/payments/enghouse/stg_enghouse__ticket_results.sql"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-staging-composer-manifest will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-staging-composer-manifest" {
!~ content = (sensitive value)
!~ crc32c = "pXu3nA==" -> (known after apply)
!~ detect_md5hash = "u82/0ZJPyawniK5Kr112gQ==" -> "different hash"
!~ generation = 1777081404101058 -> (known after apply)
id = "calitp-staging-composer-data/warehouse/target/manifest.json"
!~ md5hash = "u82/0ZJPyawniK5Kr112gQ==" -> (known after apply)
name = "data/warehouse/target/manifest.json"
# (16 unchanged attributes hidden)
}
Plan: 1 to add, 9 to change, 1 to destroy.📝 Plan generated in Deploy dbt #1735 |
|
Terraform plan in iac/cal-itp-data-infra/airflow/us Plan: 1 to add, 3 to change, 0 to destroy.Terraform used the selected providers to generate the following execution
plan. Resource actions are indicated with the following symbols:
+ create
!~ update in-place
Terraform will perform the following actions:
# google_storage_bucket_object.calitp-composer-dags["models/intermediate/payments/_int_payments.yml"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-composer-dags" {
!~ crc32c = "AkMXcg==" -> (known after apply)
!~ detect_md5hash = "v/JBoD58WEdM/XZscf2Ufw==" -> "different hash"
!~ generation = 1776457910933366 -> (known after apply)
id = "calitp-composer-data/warehouse/models/intermediate/payments/_int_payments.yml"
!~ md5hash = "v/JBoD58WEdM/XZscf2Ufw==" -> (known after apply)
name = "data/warehouse/models/intermediate/payments/_int_payments.yml"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-composer-dags["models/intermediate/payments/int_payments__enghouse_ticket_results_deduped.sql"] will be created
+ resource "google_storage_bucket_object" "calitp-composer-dags" {
+ bucket = "calitp-composer"
+ content = (sensitive value)
+ content_type = (known after apply)
+ crc32c = (known after apply)
+ detect_md5hash = "different hash"
+ generation = (known after apply)
+ id = (known after apply)
+ kms_key_name = (known after apply)
+ md5hash = (known after apply)
+ md5hexhash = (known after apply)
+ media_link = (known after apply)
+ name = "data/warehouse/models/intermediate/payments/int_payments__enghouse_ticket_results_deduped.sql"
+ output_name = (known after apply)
+ self_link = (known after apply)
+ source = "../../../../warehouse/models/intermediate/payments/int_payments__enghouse_ticket_results_deduped.sql"
+ storage_class = (known after apply)
}
# google_storage_bucket_object.calitp-composer-dags["models/mart/payments/fct_payments_rides_enghouse.sql"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-composer-dags" {
!~ crc32c = "TnAu0g==" -> (known after apply)
!~ detect_md5hash = "xBUm/8jgAixqjWGlHPa0ow==" -> "different hash"
!~ generation = 1776796001653822 -> (known after apply)
id = "calitp-composer-data/warehouse/models/mart/payments/fct_payments_rides_enghouse.sql"
!~ md5hash = "xBUm/8jgAixqjWGlHPa0ow==" -> (known after apply)
name = "data/warehouse/models/mart/payments/fct_payments_rides_enghouse.sql"
# (17 unchanged attributes hidden)
}
# google_storage_bucket_object.calitp-composer-dags["models/staging/payments/enghouse/stg_enghouse__ticket_results.sql"] will be updated in-place
!~ resource "google_storage_bucket_object" "calitp-composer-dags" {
!~ crc32c = "pSICtA==" -> (known after apply)
!~ detect_md5hash = "DuHPTMzsaFO6KUjZCoxkvw==" -> "different hash"
!~ generation = 1769734710416800 -> (known after apply)
id = "calitp-composer-data/warehouse/models/staging/payments/enghouse/stg_enghouse__ticket_results.sql"
!~ md5hash = "DuHPTMzsaFO6KUjZCoxkvw==" -> (known after apply)
name = "data/warehouse/models/staging/payments/enghouse/stg_enghouse__ticket_results.sql"
# (17 unchanged attributes hidden)
}
Plan: 1 to add, 3 to change, 0 to destroy.📝 Plan generated in Deploy dbt #1735 |
ffdaac2 to
36126e1
Compare
|
Warehouse report 📦 Checks/potential follow-upsChecks indicate the following action items may be necessary.
New models 🌱calitp_warehouse.intermediate.payments.int_payments__enghouse_ticket_results_deduped DAGLegend (in order of precedence)
|
36126e1 to
59ec096
Compare
lauriemerrell
left a comment
There was a problem hiding this comment.
Sorry..... I am being indecisive.........
| SELECT | ||
| *, | ||
| ROW_NUMBER() OVER (PARTITION BY _content_hash ORDER BY (SELECT NULL)) AS row_num | ||
| ROW_NUMBER() OVER ( |
There was a problem hiding this comment.
I am a little conflicted about this logic, thinking out loud.
One definite note: In the data so far it doesn't look like there are any rows where start_dttm isn't populated but end_dttm is (in fact looks like end_dttm might always be null....?), so I don't think we need to include it in the logic here.
My general question though is whether to use this logic (which is ordering preference based on which field is populated) or to use logic based on ordering by the timestamps themselves, probably the most recent row?
And now that I'm saying this I'm wondering if we actually want to combine the rows rather than drop one....
Ok.... I think I have talked myself into: Let's make an intermediate model where we group by _content_hash and combine these rows and just take the earliest start_dttm and created_dttm for the rest of the content? If that makes sense. So don't add this logic here, instead let the dups stay in staging and add another step where we dedupe by combining the rows to keep as much information as possible?
There was a problem hiding this comment.
Split into an int_ model and updated the model that referenced it
| SELECT * FROM ( | ||
| SELECT | ||
| *, | ||
| ROW_NUMBER() OVER ( |
There was a problem hiding this comment.
Ok sorry I think my last comment was confusing but -- rather than doing it this way, can we actually group by content hash and take the min of these columns (check how that handles nulls) so that we populate as many as possible, and update the docs to note that these columns might be a combination of input rows?
Rather than just keeping one dttm value
Impacted ExposuresNo exposures are impacted by the changes in this PR. Changed models
|

Description
Describe your changes and why you're making them. Please include the context, motivation, and relevant dependencies.
Resolves #4964
Modifies the deduplication step to order by non-null datetime columns, and keep the first one. Observations on the ticket indicated that duplicate records had a mix of non-null and null date time records, while all other columns were equivalent.
This broadly fixes the duplicate data issue, though I did find two pairs of records that survive, as noted in the testing block.
Type of change
How has this been tested?
Include commands/logs/screenshots as relevant.
If making changes to dbt models, make sure they were created or update on Staging. Please run the command
uv run dbt run -s CHANGED_MODEL --target staginganduv run dbt test -s CHANGED_MODEL --target staging, then include the output in this section of the PR.I had to run this test query against prod because I didn't have duplicate records available in staging
Separately, I tested the deduplication itself and found that there are two sets of 2 records each that survive deduplication. Each pair has identical tap_ids, but very different other values.
` uv run dbt test -s stg_enghouse__ticket_results+`
Post-merge follow-ups
Document any actions that must be taken post-merge to deploy or otherwise implement the changes in this PR (for example, running a full refresh of some incremental model in dbt). If these actions will take more than a few hours after the merge or if they will be completed by someone other than the PR author, please create a dedicated follow-up issue and link it here to track resolution.