Data Warehouse Management Multitool - Wheely + dbt case

For over two years now, the data build tool has been actively used at Wheely to manage its Data Warehouse. During this time, considerable experience has been accumulated, we are on a thorny path of trial and error to perfection in Analytics Engineering .





, , dbt, , .





Wheely. dbt, , . .





-





( ) . - . , business stakeholders, . , . , , Keep it simple (KISS) Don’t repeat yourself (DRY).





. DWH , , ( ).





Data Warehouse Layer Scheme

– sources. , ELT-. 1:1 , - . flatten (JSON) . 





staging : , , case. , .





Intermediate , . - , 5-10 . 





data marts , Data Scientists / Business Users / BI tools. , , :





  • dimensions: , , , ,





  • facts: , , , ,





  • looker: , BI-





120  :





Running with dbt=0.19.0
Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
      
      



:





  • 273





  • 493 , not null, unique, foreign key, accepted values





  • 6 SCD (slowly changing dimensions)





  • 532 ( )





  • 7 operations vacuum + analyze





  • 81





, -. , Marketing / Supply / Growth / B2B. , late arriving data /.





, . Marketing :





dbt run -m +tag:marketing
      
      



. . :





dbt
.

|____staging

| |____webhook

| |____receipt_prod

| |____core

| |____wheely_prod

| |____flights_prod

| |____online_hours_prod

| |____external

| |____financial_service

|____marts

| |____looker

| |____dim

| |____snapshots

| |____facts

|____flatten

| |____webhook

| |____receipt_prod

| |____wheely_prod

| |____communication_prod

|____audit

|____sources

|____aux

| |____dq

| | |____marts

| | |____external

|____intermediate
      
      



- . , . Wheely Amazon Redshift.





, . . – journeys ().





Journeys showcase dependency chain (journeys)
(journeys)

(join performance), , sources. - sort merge join:





– sort merge join
{{
   config(
       materialized='table',
       unique_key='request_id',
       dist="request_id",
       sort="request_id"
   )
}}
      
      



: city, country, completed timestamp, service group. Interleaved key I/O BI-.





– interleaved sortkey
{{
   config(
       materialized='table',
       unique_key='request_id',
       dist="request_id",
       sort_type='interleaved',
       sort=["completed_ts_loc"
               , "city"
               , "country"
               , "service_group"
               , "is_airport"
               , "is_wheely_journey"]
   )
}}
      
      



views ( ), . , staging, , :





       staging:
           +materialized: view
           +schema: staging
           +tags: ["staging"]
      
      



– . – ephemeral, .. , . . , .





. , , , . (delta) – , . where:





{{
   config(
       materialized='incremental',
       sort='metadata_timestamp',
       dist='fine_id',
       unique_key='id'
   )
}}

with fines as

   select 

         fine_id
       , city_id
       , amount
       , details
       , metadata_timestamp
       , created_ts_utc
       , updated_ts_utc
       , created_dt_utc 

   from {{ ref('stg_fines') }}
   where true
   -- filter fines arrived since last processed time
   {% if is_incremental() -%}
       and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
   {%- endif %} 

), 

...
      
      



, MPP , Data Engineer Data Warehouse Analyst ( !).





SQL + Jinja = Flexibility

SQL , Jinja .





, dbt compile & run. . CREATE : clustered by / distributed by / sorted by. :





Model code:
{{
   config(
       materialized='table',
       dist="fine_id",
       sort="created_ts_utc"
   )
}} 

with details as (

  select
       {{
       dbt_utils.star(from=ref('fine_details_flatten'),
           except=["fine_amount", "metadata_timestamp", "generated_number"]
       )
       }}
   from {{ ref('fine_details_flatten') }}
   where fine_amount > 0

)

select * from details
      
      



Compiled code:
with details as (  

   select
  
       "id",
       "fine_id",
       "city_id",
       "amount",
       "description",
       "created_ts_utc",
       "updated_ts_utc",
       "created_dt_utc"

   from "wheely"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0


select * from details
      
      



Run code:
create  table
   "wheely"."dbt_test_akozyr"."f_chauffeurs_fines"
   diststyle key distkey (fine_id)   
   compound sortkey(created_ts_utc)
 as (    

with details as (  

   select

       "id",
       "fine_id",
       "city_id",
       "amount",
       "description",
       "created_ts_utc",
       "updated_ts_utc",
       "created_dt_utc"

   from "wheely"."dbt_test_akozyr"."fine_details_flatten"
   where fine_amount > 0

)

select * from details

 );
      
      



, , dbt. boilerplate code . .





-, , ? , , {{ ref('fine_details_flatten') }}



– . . .





Jinja Wheely dev / test / prod. . . , 3- . :





:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
 
   {%- set default_schema = target.schema -%}

   {%- if target.name == 'prod' and custom_schema_name is not none -%} 

       {{ custom_schema_name | trim }} 

   {%- else -%}

       {{ default_schema }} 

   {%- endif -%} 

{%- endmacro %}
      
      



. , : , , , (-, ).





–

, , copy-paste . Wheely Do not repeat yourself - . .





:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
 
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd 

{%- endmacro %}
      
      



:
 select     

       ...

       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('transfer_min_price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
       , {{ convert_currency('pickup_charge', 'currency') }}
       , {{ convert_currency('cancel_fee', 'currency') }}
       , {{ convert_currency('net_bookings', 'currency') }}
       , {{ convert_currency('gross_revenue', 'currency') }}
       , {{ convert_currency('service_charge', 'currency') }}     

       ... 

   from {{ ref('requests_joined') }} r   
      
      



, , Jinja. SQL-. - :





-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
  
   {%- if is_numeric == true -%}
       {%- set src_column = 'round(' + src_column + ', 2)' -%}       
       {%- set trg_column = 'round(' + trg_column + ', 2)' -%}
   {%- endif -%} 

       CASE
           WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
           WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
           WHEN {{ src_column }} IS NULL THEN 'missing in source'
           WHEN {{ trg_column }} IS NULL THEN 'missing in target'
           WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
           ELSE 'unknown'
       END

{%- endmacro %}
      
      



UDF-:





UDF
-- cast epoch as human-readable timestamp
{% macro create_udf() -%}

{% set sql %}

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
           RETURNS VARCHAR(512)
           STABLE
       AS $$
       # Convert column to binary, strip "0b" prefix, pad out with zeroes
       if bitwise_column is not None
           b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
           return b
       else:
           None 
       $$ LANGUAGE plpythonu
       ;  

       CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
           RETURNS VARCHAR(128)
       STABLE
       AS $$
           SELECT nvl(
                         DECODE($2, null, null, 'deleted')
                       , DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
                       , DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
                       , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null
                   )
       $$ LANGUAGE SQL
       ;         

 {% endset %}

 {% set table = run_query(sql) %}

{%- endmacro %}
      
      



, nested structures ( ) (external tables) S3 parquet. .





–

package - , , , . dbt hub , , , .





2 hooks dbt , , . - ( ):





models:
   +pre-hook: "{{ logging.log_model_start_event() }}"
   +post-hook: "{{ logging.log_model_end_event() }}"
      
      



Monitoring the deployment of dbt models on a Redshift cluster
dbt Redshift

, :





{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
      
      



Macro-generated calendar dimension
,

dbt_external_tables Lakehouse, , S3. , , API Open Exchange Rates JSON:





External data stored in S3 accessed vith Redshift Spectrum
 - name: external
     schema: spectrum
     tags: ["spectrum"]
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://data-analytics.wheely.com/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
      
      



, , VACUUM + ANALYZE, Redshift PostgreSQL. , , . , dba .





dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
      
      



VACUUM + ANALYZE
VACUUM + ANALYZE

Running in production: dbt Cloud Wheely

dbt Cloud , dbt. , , , IDE ( !) .





: , , :





-, . , cron-, webhook. , - (kicked off from Airflow):





, . Slack Production-. .





dbt , dbt Cloud , . : Airflow, Prefect, Dagster, cron. Github Actions. .





Wheely , , . onboarding.





. :





  • Head of Data Insights - https://wheely.com/ru/careers/4425384003





  • Product Analyst, Backoffice - https://wheely.com/ru/careers/4308521003





  • Product Analyst, Business - https://wheely.com/ru/careers/4425290003





  • Product Analyst, Chauffeur growth - https://wheely.com/ru/careers/4185132003





  • Product Analyst, Marketplace - https://wheely.com/ru/careers/4425328003





  • Product Analyst, Passenger growth - https://wheely.com/ru/careers/4194291003





.





, . - Technology Enthusiast – https://t.me/enthusiastech





, , , dbt !








All Articles