diff --git a/warehouse/models/mart/tides/_mart_tides.yml b/warehouse/models/mart/tides/_mart_tides.yml index dcdd8b01fa..02c0f7a2fa 100644 --- a/warehouse/models/mart/tides/_mart_tides.yml +++ b/warehouse/models/mart/tides/_mart_tides.yml @@ -112,3 +112,122 @@ models: description: '{{ doc("column_base64_url") }}' - name: gtfs_dataset_key description: '{{ doc("gtfs_schedule_gtfs_dataset_key") }}' + + - name: fct_tides_trips_performed + description: | + Trips performed reshaped to the TIDES (Transit Integrated Data Exchange + Specification) `trips_performed` schema (https://tides-transit.org/main/). + Sourced from `fct_observed_trips`, joined to `fct_scheduled_trips` for + route metadata and to `fct_tides_vehicle_locations` for canonical + vehicle_id per trip. Filtered upstream to `appeared_in_vp = TRUE` so + every row has a derivable vehicle_id. + columns: + - name: service_date + description: TIDES PK (with trip_id_performed). From `fct_observed_trips.service_date`. + data_tests: + - not_null: + arguments: + config: + where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)' + - name: trip_id_performed + description: TIDES PK (with service_date). From `fct_observed_trips.trip_id`. + data_tests: + - not_null: + arguments: + config: + where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)' + - name: vehicle_id + description: | + Most-frequent vehicle_id observed in `fct_tides_vehicle_locations` + for the same (service_date, trip_id_performed). TIDES required. + data_tests: + - not_null: + arguments: + config: + where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)' + - relationships: + arguments: + to: ref('fct_tides_vehicle_locations') + field: vehicle_id + config: + where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)' + - name: trip_id_scheduled + description: | + Coarse MVP: equals trip_id_performed when the trip appeared in VP + or TU. A stricter join to `fct_scheduled_trips` is a follow-up. + - name: route_id + description: From `fct_scheduled_trips` via trip_instance_key. NULL when no schedule. + - name: route_type + description: GTFS route_type. From `fct_scheduled_trips`. + - name: ntd_mode + description: NULL in MVP. + - name: route_type_agency + description: NULL in MVP. + - name: shape_id + description: From `fct_scheduled_trips`. + - name: pattern_id + description: NULL in MVP. + - name: direction_id + description: From `fct_scheduled_trips`. + - name: operator_id + description: NULL; not derivable from GTFS-RT. + - name: block_id + description: From `fct_scheduled_trips`. + - name: trip_start_stop_id + description: NULL in MVP; needs stop_times join on min stop_sequence. + - name: trip_end_stop_id + description: NULL in MVP; needs stop_times join on max stop_sequence. + - name: schedule_trip_start + description: | + DATETIME in agency tz from `fct_scheduled_trips.trip_first_departure_ts`. + - name: schedule_trip_end + description: | + DATETIME in agency tz from `fct_scheduled_trips.trip_last_arrival_ts`. + - name: actual_trip_start + description: DATETIME in agency tz from `fct_observed_trips.vp_min_ts`. + - name: actual_trip_end + description: DATETIME in agency tz from `fct_observed_trips.vp_max_ts`. + - name: trip_type + description: | + Constant 'In service' in MVP because the model filters to VP-observed + trips. Future expansion could distinguish deadhead / layover / pull-in + / pull-out given supplemental signals. + data_tests: + - accepted_values: + arguments: + values: ['In service', 'Deadhead', 'Layover', 'Pullout', 'Pullin', 'Extra Pullout', 'Extra Pullin', 'Deadhead To Layover', 'Deadhead From Layover', 'Other not in service'] + config: + where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)' + - name: schedule_relationship + description: | + TIDES enum (Scheduled / Added / Unscheduled / Canceled / Duplicated) + mapped from `fct_observed_trips.tu_starting_schedule_relationship`. + data_tests: + - accepted_values: + arguments: + values: ['Scheduled', 'Added', 'Unscheduled', 'Canceled', 'Duplicated'] + config: + where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)' + - name: base64_url + description: '{{ doc("column_base64_url") }}' + - name: gtfs_dataset_key + description: '{{ doc("gtfs_schedule_gtfs_dataset_key") }}' + +exposures: + - name: california_tides + type: application + maturity: low + url: https://tides-transit.org/main/ + description: California TIDES (Transit Integrated Data Exchange Specification) data published to a public GCS bucket. + + depends_on: + - ref("fct_tides_vehicle_locations") + - ref("fct_tides_trips_performed") + + owner: + email: hello@calitp.org + + meta: + methodology: | + Cal-ITP reshapes GTFS-RT vehicle position and trip update messages into the + open TIDES specification for analytics use. See https://tides-transit.org for the spec. diff --git a/warehouse/models/mart/tides/fct_tides_trips_performed.sql b/warehouse/models/mart/tides/fct_tides_trips_performed.sql new file mode 100644 index 0000000000..4f84ab147f --- /dev/null +++ b/warehouse/models/mart/tides/fct_tides_trips_performed.sql @@ -0,0 +1,116 @@ +{{ config(materialized='view') }} + +WITH observed AS ( + SELECT * + FROM {{ ref('fct_observed_trips') }} + -- Drop TU-only trips (no VP) so every row has a derivable vehicle_id. + WHERE appeared_in_vp = TRUE +), + +scheduled AS ( + SELECT + trip_instance_key, + route_id, + route_type, + direction_id, + shape_id, + block_id, + feed_timezone, + trip_first_departure_ts, + trip_last_arrival_ts + FROM {{ ref('fct_scheduled_trips') }} +), + +vehicle_per_trip AS ( + SELECT + service_date, + trip_id_performed, + APPROX_TOP_COUNT(vehicle_id, 1)[OFFSET(0)].value AS vehicle_id + FROM {{ ref('fct_tides_vehicle_locations') }} + WHERE vehicle_id IS NOT NULL + GROUP BY 1, 2 +), + +-- Same feed-key filter as fct_tides_vehicle_locations. +public_subfeed_keys AS ( + SELECT DISTINCT vehicle_positions_gtfs_dataset_key AS gtfs_dataset_key + FROM {{ ref('dim_provider_gtfs_data') }} + WHERE _is_current = TRUE + AND public_customer_facing_or_regional_subfeed_fixed_route = TRUE + AND vehicle_positions_gtfs_dataset_key IS NOT NULL +), + +-- Same publication-key narrowing as fct_tides_vehicle_locations. +publication_keys AS ( + SELECT gtfs_dataset_key + FROM {{ ref('tides_publication_keys') }} +), + +tides_trips_performed AS ( + SELECT + o.service_date, + o.trip_id AS trip_id_performed, + v.vehicle_id, + + -- trip_id_scheduled coarse: trip appeared in VP or TU implies a + -- scheduled trip. A stricter test would require fct_scheduled_trips + -- presence. + o.trip_id AS trip_id_scheduled, + + s.route_id, + s.route_type, + CAST(NULL AS STRING) AS ntd_mode, + CAST(NULL AS STRING) AS route_type_agency, + s.shape_id, + CAST(NULL AS STRING) AS pattern_id, + s.direction_id, + CAST(NULL AS STRING) AS operator_id, + s.block_id, + CAST(NULL AS STRING) AS trip_start_stop_id, + CAST(NULL AS STRING) AS trip_end_stop_id, + + DATETIME(s.trip_first_departure_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS schedule_trip_start, + DATETIME(s.trip_last_arrival_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS schedule_trip_end, + DATETIME(o.vp_min_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS actual_trip_start, + DATETIME(o.vp_max_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS actual_trip_end, + + -- Constant 'In service' since the model filters to VP-observed trips. + 'In service' AS trip_type, + + CASE + WHEN o.tu_starting_schedule_relationship = 'SCHEDULED' THEN 'Scheduled' + WHEN o.tu_starting_schedule_relationship = 'ADDED' THEN 'Added' + WHEN o.tu_starting_schedule_relationship = 'CANCELED' THEN 'Canceled' + WHEN o.tu_starting_schedule_relationship = 'UNSCHEDULED' THEN 'Unscheduled' + WHEN o.tu_starting_schedule_relationship = 'DUPLICATED' THEN 'Duplicated' + END AS schedule_relationship, + + -- Internal columns retained for partitioning and downstream joins; + -- dropped at export. + o.vp_base64_url AS base64_url, + o.vp_gtfs_dataset_key AS gtfs_dataset_key + FROM observed o + INNER JOIN public_subfeed_keys + ON o.vp_gtfs_dataset_key = public_subfeed_keys.gtfs_dataset_key + INNER JOIN publication_keys + ON o.vp_gtfs_dataset_key = publication_keys.gtfs_dataset_key + LEFT JOIN scheduled s + ON s.trip_instance_key = o.trip_instance_key + LEFT JOIN vehicle_per_trip v + ON v.service_date = o.service_date + AND v.trip_id_performed = o.trip_id +), + +-- TIDES IDs are only unique within feed; partition the dedup by feed identity +-- so trips that share trip_id_performed across different feeds both survive. +deduped AS ( + SELECT * + FROM tides_trips_performed + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY service_date, trip_id_performed, gtfs_dataset_key + ORDER BY actual_trip_end DESC NULLS LAST, + actual_trip_start ASC NULLS LAST + ) = 1 +) + +SELECT * FROM deduped