Skip to content

Commit bee29b0

Browse files
authored
Merge pull request #3 from TechArtists/multiple-project-source
Multiple projects source
2 parents a7e9b08 + 50ed272 commit bee29b0

9 files changed

Lines changed: 340 additions & 86 deletions

File tree

README.md

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,35 @@ Welcome to your new dbt project!
22

33
### Using the starter project
44

5-
Try running the following commands:
6-
- dbt run
7-
- dbt test
5+
6+
This library works by default for one google cloud project:
7+
# "OVERBASE:SOURCES":
8+
# - {project_id: google_cloud_project_id,
9+
# analytics_dataset_id: schema_id,
10+
# events_table: events_table_prefix*,
11+
# crashlytics_dataset_id: crashlytics_dataset,
12+
# crashlytics_table: crashlytics_table_prefix*}
13+
14+
Adding more project_ids and multiple dataset_ids for specific datasets is also possible.
15+
A few additional steps are required for multiple sources to be added as sources.
16+
17+
1) OVERBASE:SOURCES_READY must be set to false (default).
18+
2) add projects and datasets to the OVERBASE:SOURCES variable in your dbt project
19+
3) run the following command to generate sources for all projects:
20+
dbt run-operation -q generate_firebase_sources > models/firebase_sources.yml
21+
4) change OVERBASE:SOURCES_READY to true
22+
# "OVERBASE:SOURCES":
23+
# - {project_id: google_cloud_project_id,
24+
# analytics_dataset_id: schema_id,
25+
# events_table: events_table_prefix*,
26+
# crashlytics_dataset_id: crashlytics_dataset,
27+
# crashlytics_table: crashlytics_table_prefix*}
28+
# - {project_id: google_cloud_project_id2,
29+
# analytics_datasets_id: [schema_id,schema_id2],
30+
# events_table: events_table_prefix*,
31+
# crashlytics_dataset_id: crashlytics_dataset,
32+
# crashlytics_table: crashlytics_table_prefix*}
33+
834

935

1036
### Resources:
@@ -17,5 +43,4 @@ Try running the following commands:
1743

1844
## TODO
1945

20-
- tests for counts from _raw to _events
2146
- why the DAU counts in app_health (aka _events) doesn't match the ones from raw. There's a dimension in there that's not fully disjunct, maybe make a _events_disjunct table as well

dbt_project.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ seeds:
3333
vars:
3434
overbase_firebase:
3535
"OVERBASE:DONT_CARE": "MAKE_YAML_WORK" # optional
36+
"OVERBASE:SOURCES_READY" : false
3637
# "OVERBASE:SOURCES":
3738
# - {project_id: watermark-maker,
3839
# analytics_dataset_id: analytics_150733022,

macros/dateFilters.sql

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,58 @@
1-
{%- macro analyticsTestDateFilter(fieldName, extend = 0) -%}
2-
{{ ccDateFilterFor(fieldName, forceIncremental = True, extend = extend) }}
3-
{%- endmacro %}
4-
5-
{%- macro analyticsTestTableSuffixFilter(extend =0) -%}
6-
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(forceIncremental = True, extend=extend + 1) -%} {# extended by one day because TABLE_SUFFIX is not always UTC #}
7-
REPLACE(_TABLE_SUFFIX, 'intraday_', '') BETWEEN FORMAT_DATE('%Y%m%d', {{ startEndTSTuple[0] }}) AND FORMAT_DATE('%Y%m%d', {{ startEndTSTuple[1] }})
8-
{%- endmacro -%}
9-
10-
{%- macro analyticsDateFilterFor(dateField,extend = 0) -%}
11-
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(extend = extend) -%}
12-
{{ dateField }} BETWEEN DATE({{ startEndTSTuple[0] }}) AND DATE({{ startEndTSTuple[1] }})
1+
{%- macro analyticsDateFilterFor(dateField, extend = 0) -%}
2+
{%- if sqlmesh_incremental is defined -%}
3+
{{ dateField }} BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
4+
{%- else -%}
5+
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(extend = extend) -%}
6+
{{ dateField }} BETWEEN DATE({{ startEndTSTuple[0] }}) AND DATE({{ startEndTSTuple[1] }})
7+
{%- endif -%}
138
{%- endmacro -%}
149

1510
{%- macro crashlyticsDateFilterFor(dateField, extend = 0) -%}
16-
{%- set startEndTSTuple = overbase_firebase.crashlyticsStartEndTimestampsTuple(extend=extend) -%}
17-
{{ dateField }} BETWEEN DATE({{ startEndTSTuple[0] }}) AND DATE({{ startEndTSTuple[1] }})
11+
{%- if sqlmesh_incremental is defined -%}
12+
{{ dateField }} BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
13+
{%- else -%}
14+
{%- set startEndTSTuple = overbase_firebase.crashlyticsStartEndTimestampsTuple(extend=extend) -%}
15+
{{ dateField }} BETWEEN DATE({{ startEndTSTuple[0] }}) AND DATE({{ startEndTSTuple[1] }})
16+
{%- endif -%}
1817
{%- endmacro -%}
1918

2019
{%- macro analyticsTSFilterFor(tsField, extend = 0) -%}
21-
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(extend=extend) -%}
22-
{{ tsField }} BETWEEN {{ startEndTSTuple[0] }} AND {{ startEndTSTuple[1] }}
20+
{%- if sqlmesh_incremental is defined -%}
21+
{{ tsField }} BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
22+
{%- else -%}
23+
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(extend=extend) -%}
24+
{{ tsField }} BETWEEN {{ startEndTSTuple[0] }} AND {{ startEndTSTuple[1] }}
25+
{%- endif -%}
2326
{%- endmacro -%}
2427

2528
{%- macro crashlyticsTSFilterFor(tsField, extend = 0) -%}
26-
{%- set startEndTSTuple = overbase_firebase.crashlyticsStartEndTimestampsTuple(extend=extend) -%}
27-
{{ tsField }} BETWEEN {{ startEndTSTuple[0] }} AND {{ startEndTSTuple[1] }}
29+
{%- if sqlmesh_incremental is defined -%}
30+
{{ tsField }} BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
31+
{%- else -%}
32+
{%- set startEndTSTuple = overbase_firebase.crashlyticsStartEndTimestampsTuple(extend=extend) -%}
33+
{{ tsField }} BETWEEN {{ startEndTSTuple[0] }} AND {{ startEndTSTuple[1] }}
34+
{%- endif -%}
35+
{%- endmacro -%}
36+
37+
{%- macro analyticsTableSuffixFilter(extend = 0) -%}
38+
{%- if sqlmesh_incremental is defined -%}
39+
REPLACE(_TABLE_SUFFIX, 'intraday_', '')
40+
BETWEEN FORMAT_DATE('%Y%m%d', '{{ start_ds }}' -1)
41+
AND FORMAT_DATE('%Y%m%d', '{{ end_ds }}')
42+
{%- else -%}
43+
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(extend=extend + 1) -%}
44+
REPLACE(_TABLE_SUFFIX, 'intraday_', '')
45+
BETWEEN FORMAT_DATE('%Y%m%d', {{ startEndTSTuple[0] }})
46+
AND FORMAT_DATE('%Y%m%d', {{ startEndTSTuple[1] }})
47+
{%- endif -%}
48+
{%- endmacro -%}
49+
50+
{%- macro analyticsTestDateFilter(fieldName, extend = 0) -%}
51+
{{ overbase_firebase.analyticsDateFilterFor(fieldName, extend=extend) }}
2852
{%- endmacro -%}
2953

30-
{%- macro analyticsTableSuffixFilter(extend =0) -%}
31-
{%- set startEndTSTuple = overbase_firebase.analyticsStartEndTimestampsTuple(extend=extend + 1) -%} {# extended by one day because TABLE_SUFFIX is not always UTC #}
32-
REPLACE(_TABLE_SUFFIX, 'intraday_', '') BETWEEN FORMAT_DATE('%Y%m%d', {{ startEndTSTuple[0] }}) AND FORMAT_DATE('%Y%m%d', {{ startEndTSTuple[1] }})
54+
{%- macro analyticsTestTableSuffixFilter(extend = 0) -%}
55+
{{ overbase_firebase.analyticsTableSuffixFilter(extend=extend) }}
3356
{%- endmacro -%}
3457

3558
{%- macro analyticsStartEndTimestampsTuple(forceIncremental = False, extend = 0) -%}

macros/overbase_mandatory_vars.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
{%- endmacro %}
99

1010
{% macro verify_all_overbase_mandatory_variables() -%}
11-
{{- overbase_firebase.compile_time_mandatory_var("OVERBASE:FIREBASE_PROJECT_ID", "overbase") -}}
12-
{{- overbase_firebase.compile_time_mandatory_var("OVERBASE:FIREBASE_ANALYTICS_DATASET_ID", "firebase_analytics_raw_test") -}}
11+
{{- overbase_firebase.compile_time_mandatory_var("OVERBASE:SOURCES", "overbase") -}}
12+
{{- overbase_firebase.compile_time_mandatory_var("OVERBASE:SOURCES_READY", "firebase_analytics_raw_test") -}}
1313
{{- overbase_firebase.compile_time_mandatory_var("OVERBASE:FIREBASE_ANALYTICS_FULL_REFRESH_START_DATE", "2018-01-01") -}}
1414
{{- overbase_firebase.compile_time_mandatory_var("OVERBASE:FIREBASE_CRASHLYTICS_FULL_REFRESH_START_DATE", "2018-01-01") -}}
1515

models/analytics/fb_analytics_events_raw.sql

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
) }}
1313

1414
-- https://support.google.com/firebase/answer/7029846
15-
SELECT TIMESTAMP_MICROS(event_timestamp) as event_ts
15+
SELECT
16+
project_id,
17+
dataset_id,
18+
TIMESTAMP_MICROS(event_timestamp) as event_ts
1619
, DATE(TIMESTAMP_MICROS(event_timestamp)) as event_date
1720
, TIMESTAMP_MICROS(user_first_touch_timestamp) as install_ts
1821
, {{ overbase_firebase.calculate_age_between_timestamps("TIMESTAMP_MICROS(event_timestamp)", "TIMESTAMP_MICROS(user_first_touch_timestamp)") }} as install_age
@@ -57,22 +60,64 @@ SELECT TIMESTAMP_MICROS(event_timestamp) as event_ts
5760
) AS users_ltv
5861
, STRUCT<firebase_app_id STRING, stream_id STRING, advertising_id STRING>(
5962
LOWER(app_info.firebase_app_id), LOWER(stream_id), LOWER({{ null_if_length_zero('device.advertising_id') }})
63+
) as other_ids
6064
, {{ overbase_firebase.generate_date_timezone_struct('TIMESTAMP_MICROS(event_timestamp)') }} as event_dates
6165
, {{ overbase_firebase.generate_date_timezone_struct('TIMESTAMP_MICROS(user_first_touch_timestamp)') }} as install_dates
6266
, COUNT(1) OVER (PARTITION BY user_pseudo_id, event_bundle_sequence_id, event_name, event_timestamp, event_previous_timestamp) as duplicates_cnt
63-
-- FROM {{ source("firebase_analytics", "events") }} as events
6467
FROM
6568
(
66-
{% set projects = var('OVERBASE:SOURCES', []) %}
69+
{%- set projects = var('OVERBASE:SOURCES', []) -%}
70+
{%- set ready = var('OVERBASE:SOURCES_READY', false) -%}
6771

68-
{% for p in projects %}
69-
{% if not loop.first %}UNION ALL{% endif %}
70-
select
71-
'{{ p.project_id }}' as project_id,
72-
*
73-
from {{ source('firebase_analytics__' ~ p.project_id, 'events') }}
74-
WHERE {{ overbase_firebase.analyticsTableSuffixFilter() }}
75-
{% endfor %}
72+
{%- set first = (projects[0] if projects and (projects[0] is mapping) else {}) -%}
73+
{%- set pid0 = first.get('project_id', 'fallback_project') -%}
74+
{%- set ads_raw0 = first.get('analytics_dataset_ids') if first.get('analytics_dataset_ids') is not none else first.get('analytics_dataset_id') -%}
75+
{%- if ads_raw0 is string -%}
76+
{%- set ds0 = ads_raw0 -%}
77+
{%- elif ads_raw0 is iterable and (ads_raw0 | length) > 0 -%}
78+
{%- set ds0 = ads_raw0[0] -%}
79+
{%- else -%}
80+
{%- set ds0 = 'fallback_dataset' -%}
81+
{%- endif -%}
82+
83+
{%- if not ready -%}
84+
-- FALLBACK: use the single parse-safe source until generated sources are ready
85+
SELECT
86+
'{{ pid0 }}' as project_id,
87+
'{{ ds0 }}' as dataset_id,
88+
*
89+
FROM {{ source('firebase_analytics__fallback', 'events') }}
90+
WHERE {{ overbase_firebase.analyticsTableSuffixFilter() }}
91+
AND {{ overbase_firebase.analyticsDateFilterFor('DATE(TIMESTAMP_MICROS(event_timestamp))') }}
92+
{%- else -%}
93+
{%- set ns = namespace(first=true) -%}
94+
{%- for p in projects -%}
95+
{%- set pid = p.get('project_id') -%}
96+
{%- if not pid %}{% continue %}{% endif -%}
97+
{%- set ads_raw = p.get('analytics_dataset_ids') if p.get('analytics_dataset_ids') is not none else p.get('analytics_dataset_id') -%}
98+
{%- if ads_raw is string -%}
99+
{%- set ads_list = [ads_raw] -%}
100+
{%- elif ads_raw is iterable -%}
101+
{%- set ads_list = ads_raw -%}
102+
{%- else -%}
103+
{%- set ads_list = [] -%}
104+
{%- endif -%}
105+
106+
{%- for ds in ads_list -%}
107+
{%- if pid and ds -%}
108+
{% if not ns.first %}UNION ALL{% endif %}
109+
{% set ns.first = false %}
110+
SELECT
111+
'{{ pid }}' as project_id,
112+
'{{ ds }}' as dataset_id,
113+
*
114+
FROM {{ source('firebase_analytics__' ~ pid ~ '__' ~ ds, 'events') }}
115+
WHERE {{ overbase_firebase.analyticsTableSuffixFilter() }}
116+
AND {{ overbase_firebase.analyticsDateFilterFor('DATE(TIMESTAMP_MICROS(event_timestamp))') }}
117+
{%- endif -%}
118+
{%- endfor -%}
119+
{%- endfor -%}
120+
{%- endif -%}
76121
) as events
77122
LEFT JOIN {{ref('ob_iso_country')}} as country_codes
78123
ON LOWER(events.geo.country) = LOWER(country_codes.firebase_name)

0 commit comments

Comments
 (0)