For over two years now, the data build tool has been actively used at Wheely to manage its Data Warehouse. During this time, considerable experience has been accumulated, we are on a thorny path of trial and error to perfection in Analytics Engineering .
, , dbt, , .
Wheely. dbt, , . .
-
( ) . - . , business stakeholders, . , . , , Keep it simple (KISS) Donβt repeat yourself (DRY).
. DWH , , ( ).
β sources. , ELT-. 1:1 , - . flatten (JSON) .
staging : , , case. , .
Intermediate , . - , 5-10 .
data marts , Data Scientists / Business Users / BI tools. , , :
dimensions: , , , ,
facts: , , , ,
looker: , BI-
120 :
Running with dbt=0.19.0 Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
:
273
493 , not null, unique, foreign key, accepted values
6 SCD (slowly changing dimensions)
532 ( )
7 operations vacuum + analyze
81
, -. , Marketing / Supply / Growth / B2B. , late arriving data /.
dbt run -m +tag:marketing
. . :
dbt
. |____staging | |____webhook | |____receipt_prod | |____core | |____wheely_prod | |____flights_prod | |____online_hours_prod | |____external | |____financial_service |____marts | |____looker | |____dim | |____snapshots | |____facts |____flatten | |____webhook | |____receipt_prod | |____wheely_prod | |____communication_prod |____audit |____sources |____aux | |____dq | | |____marts | | |____external |____intermediate
- . , . Wheely Amazon Redshift.
, . . β journeys ().
(join performance), , sources. - sort merge join:
β sort merge join
{{
config(
materialized='table',
unique_key='request_id',
dist="request_id",
sort="request_id"
)
}}
: city, country, completed timestamp, service group. Interleaved key I/O BI-.
β interleaved sortkey
{{
config(
materialized='table',
unique_key='request_id',
dist="request_id",
sort_type='interleaved',
sort=["completed_ts_loc"
, "city"
, "country"
, "service_group"
, "is_airport"
, "is_wheely_journey"]
)
}}
views ( ), . , staging, , :
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
β . β ephemeral, .. , . . , .
. , , , . (delta) β , . where:
{{
config(
materialized='incremental',
sort='metadata_timestamp',
dist='fine_id',
unique_key='id'
)
}}
with fines as (
select
fine_id
, city_id
, amount
, details
, metadata_timestamp
, created_ts_utc
, updated_ts_utc
, created_dt_utc
from {{ ref('stg_fines') }}
where true
-- filter fines arrived since last processed time
{% if is_incremental() -%}
and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
{%- endif %}
),
...
, MPP , Data Engineer Data Warehouse Analyst ( !).
SQL + Jinja = Flexibility
SQL , Jinja .
, dbt compile & run. . CREATE : clustered by / distributed by / sorted by. :
Model code:
{{
config(
materialized='table',
dist="fine_id",
sort="created_ts_utc"
)
}}
with details as (
select
{{
dbt_utils.star(from=ref('fine_details_flatten'),
except=["fine_amount", "metadata_timestamp", "generated_number"]
)
}}
from {{ ref('fine_details_flatten') }}
where fine_amount > 0
)
select * from details
Compiled code:
with details as (
select
"id",
"fine_id",
"city_id",
"amount",
"description",
"created_ts_utc",
"updated_ts_utc",
"created_dt_utc"
from "wheely"."dbt_test_akozyr"."fine_details_flatten"
where fine_amount > 0
)
select * from details
Run code:
create table
"wheely"."dbt_test_akozyr"."f_chauffeurs_fines"
diststyle key distkey (fine_id)
compound sortkey(created_ts_utc)
as (
with details as (
select
"id",
"fine_id",
"city_id",
"amount",
"description",
"created_ts_utc",
"updated_ts_utc",
"created_dt_utc"
from "wheely"."dbt_test_akozyr"."fine_details_flatten"
where fine_amount > 0
)
select * from details
);
, , dbt. boilerplate code . .
-, , ? , , {{ ref('fine_details_flatten') }}
β . . .
Jinja Wheely dev / test / prod. . . , 3- . :
:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- endmacro %}
. , : , , , (-, ).
β
, , copy-paste . Wheely Do not repeat yourself - . .
:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
:
select
...
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('transfer_min_price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
, {{ convert_currency('pickup_charge', 'currency') }}
, {{ convert_currency('cancel_fee', 'currency') }}
, {{ convert_currency('net_bookings', 'currency') }}
, {{ convert_currency('gross_revenue', 'currency') }}
, {{ convert_currency('service_charge', 'currency') }}
...
from {{ ref('requests_joined') }} r
, , Jinja. SQL-. - :
-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
{%- if is_numeric == true -%}
{%- set src_column = 'round(' + src_column + ', 2)' -%}
{%- set trg_column = 'round(' + trg_column + ', 2)' -%}
{%- endif -%}
CASE
WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
WHEN {{ src_column }} IS NULL THEN 'missing in source'
WHEN {{ trg_column }} IS NULL THEN 'missing in target'
WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
ELSE 'unknown'
END
{%- endmacro %}
UDF-:
UDF
-- cast epoch as human-readable timestamp
{% macro create_udf() -%}
{% set sql %}
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
RETURNS VARCHAR(512)
STABLE
AS $$
# Convert column to binary, strip "0b" prefix, pad out with zeroes
if bitwise_column is not None:
b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
return b
else:
None
$$ LANGUAGE plpythonu
;
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
RETURNS VARCHAR(128)
STABLE
AS $$
SELECT nvl(
DECODE($2, null, null, 'deleted')
, DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
, DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null)
)
$$ LANGUAGE SQL
;
{% endset %}
{% set table = run_query(sql) %}
{%- endmacro %}
, nested structures ( ) (external tables) S3 parquet. .
β
package - , , , . dbt hub , , , .
2 hooks dbt , , . - ( ):
models:
+pre-hook: "{{ logging.log_model_start_event() }}"
+post-hook: "{{ logging.log_model_end_event() }}"
, :
{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
dbt_external_tables Lakehouse, , S3. , , API Open Exchange Rates JSON:
External data stored in S3 accessed vith Redshift Spectrum
- name: external
schema: spectrum
tags: ["spectrum"]
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://data-analytics.wheely.com/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
, , VACUUM + ANALYZE, Redshift PostgreSQL. , , . , dba .
dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
Running in production: dbt Cloud Wheely
dbt Cloud , dbt. , , , IDE ( !) .
: , , :
-, . , cron-, webhook. , - (kicked off from Airflow):
, . Slack Production-. .
dbt , dbt Cloud , . : Airflow, Prefect, Dagster, cron. Github Actions. .
Wheely , , . onboarding.
. :
Head of Data Insights - https://wheely.com/ru/careers/4425384003
Product Analyst, Backoffice - https://wheely.com/ru/careers/4308521003
Product Analyst, Business - https://wheely.com/ru/careers/4425290003
Product Analyst, Chauffeur growth - https://wheely.com/ru/careers/4185132003
Product Analyst, Marketplace - https://wheely.com/ru/careers/4425328003
Product Analyst, Passenger growth - https://wheely.com/ru/careers/4194291003
.
, . - Technology Enthusiast β https://t.me/enthusiastech
, , , dbt !