Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions warehouse/models/mart/tides/_mart_tides.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,122 @@ models:
description: '{{ doc("column_base64_url") }}'
- name: gtfs_dataset_key
description: '{{ doc("gtfs_schedule_gtfs_dataset_key") }}'

- name: fct_tides_trips_performed
description: |
Trips performed reshaped to the TIDES (Transit Integrated Data Exchange
Specification) `trips_performed` schema (https://tides-transit.org/main/).
Sourced from `fct_observed_trips`, joined to `fct_scheduled_trips` for
route metadata and to `fct_tides_vehicle_locations` for canonical
vehicle_id per trip. Filtered upstream to `appeared_in_vp = TRUE` so
every row has a derivable vehicle_id.
columns:
- name: service_date
description: TIDES PK (with trip_id_performed). From `fct_observed_trips.service_date`.
data_tests:
- not_null:
arguments:
config:
where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'
- name: trip_id_performed
description: TIDES PK (with service_date). From `fct_observed_trips.trip_id`.
data_tests:
- not_null:
arguments:
config:
where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'
- name: vehicle_id
description: |
Most-frequent vehicle_id observed in `fct_tides_vehicle_locations`
for the same (service_date, trip_id_performed). TIDES required.
data_tests:
- not_null:
arguments:
config:
where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'
- relationships:
arguments:
to: ref('fct_tides_vehicle_locations')
field: vehicle_id
config:
where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'
- name: trip_id_scheduled
description: |
Coarse MVP: equals trip_id_performed when the trip appeared in VP
or TU. A stricter join to `fct_scheduled_trips` is a follow-up.
- name: route_id
description: From `fct_scheduled_trips` via trip_instance_key. NULL when no schedule.
- name: route_type
description: GTFS route_type. From `fct_scheduled_trips`.
- name: ntd_mode
description: NULL in MVP.
- name: route_type_agency
description: NULL in MVP.
- name: shape_id
description: From `fct_scheduled_trips`.
- name: pattern_id
description: NULL in MVP.
- name: direction_id
description: From `fct_scheduled_trips`.
- name: operator_id
description: NULL; not derivable from GTFS-RT.
- name: block_id
description: From `fct_scheduled_trips`.
- name: trip_start_stop_id
description: NULL in MVP; needs stop_times join on min stop_sequence.
- name: trip_end_stop_id
description: NULL in MVP; needs stop_times join on max stop_sequence.
- name: schedule_trip_start
description: |
DATETIME in agency tz from `fct_scheduled_trips.trip_first_departure_ts`.
- name: schedule_trip_end
description: |
DATETIME in agency tz from `fct_scheduled_trips.trip_last_arrival_ts`.
- name: actual_trip_start
description: DATETIME in agency tz from `fct_observed_trips.vp_min_ts`.
- name: actual_trip_end
description: DATETIME in agency tz from `fct_observed_trips.vp_max_ts`.
- name: trip_type
description: |
Constant 'In service' in MVP because the model filters to VP-observed
trips. Future expansion could distinguish deadhead / layover / pull-in
/ pull-out given supplemental signals.
data_tests:
- accepted_values:
arguments:
values: ['In service', 'Deadhead', 'Layover', 'Pullout', 'Pullin', 'Extra Pullout', 'Extra Pullin', 'Deadhead To Layover', 'Deadhead From Layover', 'Other not in service']
config:
where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'
- name: schedule_relationship
description: |
TIDES enum (Scheduled / Added / Unscheduled / Canceled / Duplicated)
mapped from `fct_observed_trips.tu_starting_schedule_relationship`.
data_tests:
- accepted_values:
arguments:
values: ['Scheduled', 'Added', 'Unscheduled', 'Canceled', 'Duplicated']
config:
where: 'service_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'
- name: base64_url
description: '{{ doc("column_base64_url") }}'
- name: gtfs_dataset_key
description: '{{ doc("gtfs_schedule_gtfs_dataset_key") }}'

exposures:
- name: california_tides
type: application
maturity: low
url: https://tides-transit.org/main/
description: California TIDES (Transit Integrated Data Exchange Specification) data published to a public GCS bucket.

depends_on:
- ref("fct_tides_vehicle_locations")
- ref("fct_tides_trips_performed")

owner:
email: [email protected]

meta:
methodology: |
Cal-ITP reshapes GTFS-RT vehicle position and trip update messages into the
open TIDES specification for analytics use. See https://tides-transit.org for the spec.
116 changes: 116 additions & 0 deletions warehouse/models/mart/tides/fct_tides_trips_performed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{{ config(materialized='view') }}

WITH observed AS (
SELECT *
FROM {{ ref('fct_observed_trips') }}
-- Drop TU-only trips (no VP) so every row has a derivable vehicle_id.
WHERE appeared_in_vp = TRUE
),

scheduled AS (
SELECT
trip_instance_key,
route_id,
route_type,
direction_id,
shape_id,
block_id,
feed_timezone,
trip_first_departure_ts,
trip_last_arrival_ts
FROM {{ ref('fct_scheduled_trips') }}
),

vehicle_per_trip AS (
SELECT
service_date,
trip_id_performed,
APPROX_TOP_COUNT(vehicle_id, 1)[OFFSET(0)].value AS vehicle_id
FROM {{ ref('fct_tides_vehicle_locations') }}
WHERE vehicle_id IS NOT NULL
GROUP BY 1, 2
),

-- Same feed-key filter as fct_tides_vehicle_locations.
public_subfeed_keys AS (
SELECT DISTINCT vehicle_positions_gtfs_dataset_key AS gtfs_dataset_key
FROM {{ ref('dim_provider_gtfs_data') }}
WHERE _is_current = TRUE
AND public_customer_facing_or_regional_subfeed_fixed_route = TRUE
AND vehicle_positions_gtfs_dataset_key IS NOT NULL
),

-- Same publication-key narrowing as fct_tides_vehicle_locations.
publication_keys AS (
SELECT gtfs_dataset_key
FROM {{ ref('tides_publication_keys') }}
),

tides_trips_performed AS (
SELECT
o.service_date,
o.trip_id AS trip_id_performed,
v.vehicle_id,

-- trip_id_scheduled coarse: trip appeared in VP or TU implies a
-- scheduled trip. A stricter test would require fct_scheduled_trips
-- presence.
o.trip_id AS trip_id_scheduled,

s.route_id,
s.route_type,
CAST(NULL AS STRING) AS ntd_mode,
CAST(NULL AS STRING) AS route_type_agency,
s.shape_id,
CAST(NULL AS STRING) AS pattern_id,
s.direction_id,
CAST(NULL AS STRING) AS operator_id,
s.block_id,
CAST(NULL AS STRING) AS trip_start_stop_id,
CAST(NULL AS STRING) AS trip_end_stop_id,

DATETIME(s.trip_first_departure_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS schedule_trip_start,
DATETIME(s.trip_last_arrival_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS schedule_trip_end,
DATETIME(o.vp_min_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS actual_trip_start,
DATETIME(o.vp_max_ts, COALESCE(s.feed_timezone, 'America/Los_Angeles')) AS actual_trip_end,

-- Constant 'In service' since the model filters to VP-observed trips.
'In service' AS trip_type,

CASE
WHEN o.tu_starting_schedule_relationship = 'SCHEDULED' THEN 'Scheduled'
WHEN o.tu_starting_schedule_relationship = 'ADDED' THEN 'Added'
WHEN o.tu_starting_schedule_relationship = 'CANCELED' THEN 'Canceled'
WHEN o.tu_starting_schedule_relationship = 'UNSCHEDULED' THEN 'Unscheduled'
WHEN o.tu_starting_schedule_relationship = 'DUPLICATED' THEN 'Duplicated'
END AS schedule_relationship,

-- Internal columns retained for partitioning and downstream joins;
-- dropped at export.
o.vp_base64_url AS base64_url,
o.vp_gtfs_dataset_key AS gtfs_dataset_key
FROM observed o
INNER JOIN public_subfeed_keys
ON o.vp_gtfs_dataset_key = public_subfeed_keys.gtfs_dataset_key
INNER JOIN publication_keys
ON o.vp_gtfs_dataset_key = publication_keys.gtfs_dataset_key
LEFT JOIN scheduled s
ON s.trip_instance_key = o.trip_instance_key
LEFT JOIN vehicle_per_trip v
ON v.service_date = o.service_date
AND v.trip_id_performed = o.trip_id
),

-- TIDES IDs are only unique within feed; partition the dedup by feed identity
-- so trips that share trip_id_performed across different feeds both survive.
deduped AS (
SELECT *
FROM tides_trips_performed
QUALIFY ROW_NUMBER() OVER (
PARTITION BY service_date, trip_id_performed, gtfs_dataset_key
ORDER BY actual_trip_end DESC NULLS LAST,
actual_trip_start ASC NULLS LAST
) = 1
)

SELECT * FROM deduped
Loading