Skip to content

Commit a7e9b08

Browse files
authored
Merge pull request #2 from TechArtists/multiple-project-source
Multiple project source
2 parents bfc9334 + 41c3155 commit a7e9b08

3 files changed

Lines changed: 99 additions & 12 deletions

File tree

macros/generate_sources.sql

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
{% macro generate_firebase_sources(
2+
var_name='OVERBASE:SOURCES',
3+
source_prefix='firebase',
4+
indent=2,
5+
include_comments=true
6+
) %}
7+
{#
8+
var_name: name of the dbt var containing the projects list
9+
source_prefix: prefix for source names ("firebase")
10+
indent: number of spaces for indentation (2)
11+
include_comments: whether to include comment lines in YAML (true/false)
12+
#}
13+
14+
{% set projects = var(var_name, []) %}
15+
16+
{% if projects is not iterable or (projects | length == 0) %}
17+
{{ exceptions.raise_compiler_error(
18+
"Var '" ~ var_name ~ "' must be a non-empty list of dicts."
19+
) }}
20+
{% endif %}
21+
22+
{% set sp = ' ' * indent %}
23+
{% set nl = '\n' %}
24+
{% set out = [] %}
25+
26+
{% do out.append('version: 2') %}
27+
{% do out.append('') %}
28+
{% do out.append('sources:') %}
29+
30+
{% for p in projects %}
31+
{% set pid = (p.get('project_id') or '') | trim %}
32+
{% set ads = (p.get('analytics_dataset_id') or '') | trim %}
33+
{% set events_table = (p.get('events_table') or 'events_*') | trim %}
34+
{% set cds = (p.get('crashlytics_dataset_id') or '') | trim %}
35+
{% set crash_table = (p.get('crashlytics_table') or '') | trim %}
36+
37+
{% if pid == '' or ads == '' %}
38+
{{ exceptions.raise_compiler_error("Missing required keys in " ~ var_name) }}
39+
{% endif %}
40+
41+
{% if include_comments %}
42+
{% do out.append(sp ~ "# Analytics for " ~ pid) %}
43+
{% endif %}
44+
{% do out.append(sp ~ "- name: " ~ source_prefix ~ "_analytics__" ~ pid) %}
45+
{% do out.append(sp ~ " database: " ~ pid) %}
46+
{% do out.append(sp ~ " schema: " ~ ads) %}
47+
{% do out.append(sp ~ " tables:") %}
48+
{% do out.append(sp ~ " - name: events") %}
49+
{% do out.append(sp ~ " identifier: " ~ events_table) %}
50+
{% do out.append('') %}
51+
52+
{% if cds != '' %}
53+
{% if include_comments %}
54+
{% do out.append(sp ~ "# Crashlytics for " ~ pid) %}
55+
{% endif %}
56+
{% do out.append(sp ~ "- name: " ~ source_prefix ~ "_crashlytics__" ~ pid) %}
57+
{% do out.append(sp ~ " database: " ~ pid) %}
58+
{% do out.append(sp ~ " schema: " ~ cds) %}
59+
{% do out.append(sp ~ " tables:") %}
60+
{% do out.append(sp ~ " - name: events") %}
61+
{% do out.append(sp ~ " identifier: " ~ crash_table) %}
62+
{% do out.append('') %}
63+
{% endif %}
64+
{% endfor %}
65+
66+
{% set result = out | join(nl) | trim %}
67+
{{ print(result) }}
68+
{{ return(result) }}
69+
{% endmacro %}

models/analytics/fb_analytics_events_raw.sql

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,26 @@ SELECT TIMESTAMP_MICROS(event_timestamp) as event_ts
6060
, {{ overbase_firebase.generate_date_timezone_struct('TIMESTAMP_MICROS(event_timestamp)') }} as event_dates
6161
, {{ overbase_firebase.generate_date_timezone_struct('TIMESTAMP_MICROS(user_first_touch_timestamp)') }} as install_dates
6262
, COUNT(1) OVER (PARTITION BY user_pseudo_id, event_bundle_sequence_id, event_name, event_timestamp, event_previous_timestamp) as duplicates_cnt
63-
FROM {{ source("firebase_analytics", "events") }} as events
63+
-- FROM {{ source("firebase_analytics", "events") }} as events
64+
FROM
65+
(
66+
{% set projects = var('OVERBASE:SOURCES', []) %}
67+
68+
{% for p in projects %}
69+
{% if not loop.first %}UNION ALL{% endif %}
70+
select
71+
'{{ p.project_id }}' as project_id,
72+
*
73+
from {{ source('firebase_analytics__' ~ p.project_id, 'events') }}
74+
WHERE {{ overbase_firebase.analyticsTableSuffixFilter() }}
75+
{% endfor %}
76+
) as events
6477
LEFT JOIN {{ref('ob_iso_country')}} as country_codes
6578
ON LOWER(events.geo.country) = LOWER(country_codes.firebase_name)
6679
LEFT JOIN {{ref("ob_iso_language")}} as language_codes
6780
ON LOWER(SPLIT(events.device.language,'-')[SAFE_OFFSET(0)]) = language_codes.alpha_2
6881
LEFT JOIN {{ref('ob_iso_country')}} as language_region_codes -- some language have 3 parts (e.g. zh-hans-us), so just get the last one
6982
ON LOWER(ARRAY_REVERSE(SPLIT(events.device.language,'-'))[SAFE_OFFSET(0)]) = language_region_codes.alpha_2
7083

71-
WHERE True
72-
AND {{ overbase_firebase.analyticsTableSuffixFilter() }} -- already extended by 1 day compared to event_timestamp filter
7384

85+
QUALIFY ROW_NUMBER() OVER (PARTITION BY user_pseudo_id, event_bundle_sequence_id, event_name, event_timestamp, event_previous_timestamp) = 1

tests/events_raw_incremental.sql

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@
33
) }}
44

55
WITH stg AS (
6-
SELECT event_date, SUM(duplicates_cnt) AS cnt FROM {{ ref('fb_analytics_events_raw') }}
6+
SELECT event_date,project_id, SUM(duplicates_cnt) AS cnt FROM {{ ref('fb_analytics_events_raw') }}
77
WHERE {{ overbase_firebase.analyticsTestDateFilter('event_date',extend=2) }}
88
and event_date <= current_date -5
9-
GROUP BY 1
9+
GROUP BY 1,2
1010
)
1111
, src AS (
1212

13-
SELECT DATE(TIMESTAMP_MICROS(event_timestamp)) as event_date,COUNT(*) AS cnt
14-
FROM {{ source("firebase_analytics", "events") }}
15-
WHERE {{ overbase_firebase.analyticsTestTableSuffixFilter(extend = 3) }}
16-
AND {{ overbase_firebase.analyticsTestDateFilter('DATE(TIMESTAMP_MICROS(event_timestamp))',extend=2) }}
17-
AND DATE(TIMESTAMP_MICROS(event_timestamp)) <= current_date -5 --buffer because firebase keeps refreshing the recent partitions
18-
GROUP BY 1
13+
{% set projects = var('OVERBASE:SOURCES', []) %}
14+
15+
{% for p in projects %}
16+
{% if not loop.first %}UNION ALL{% endif %}
17+
SELECT DATE(TIMESTAMP_MICROS(event_timestamp)) as event_date,COUNT(*) AS cnt,
18+
'{{ p.project_id }}' as project_id,
19+
from {{ source('firebase_analytics__' ~ p.project_id, 'events') }}
20+
WHERE {{ overbase_firebase.analyticsTestTableSuffixFilter(extend = 3) }}
21+
AND {{ overbase_firebase.analyticsTestDateFilter('DATE(TIMESTAMP_MICROS(event_timestamp))',extend=2) }}
22+
AND DATE(TIMESTAMP_MICROS(event_timestamp)) <= current_date -5 --buffer because firebase keeps refreshing the recent partitions
23+
GROUP BY 1,2
24+
{% endfor %}
1925
)
2026
select * from
21-
stg left join src on stg.event_date = src.event_date
27+
stg left join src on stg.event_date = src.event_date and src.project_id=stg.project_id
2228
where stg.cnt <> src.cnt

0 commit comments

Comments
 (0)