Currency Rates and Analytics - Using Exchange Rates in the Data Warehouse

Hey! Connected Artemiy - Analytics Engineer from Wheely.





Today I would like to talk about converting financial indicators into different currencies. The question is quite relevant, since a large number of companies have multinational zones of presence, build global-scale analytics, prepare reports in accordance with international standards.





Let me show you how this issue is solved using modern approaches using the example of the Wheely case:





















: Open Exchange Rates, Airflow, Redshift Spectrum, dbt.






legacy- - . . , AED ( ). - BTC, ETH, - .





:





  • , API





  • ,





  • ( )





Matrix of new requirements for working with exchange rates

, . , :





  • API









  • ()





  • legacy-





pivot- . , , . 





pandas . (T ELT) , dbt.





, , – https://openexchangerates.org/





Developer :





  • 10.000 ( )









  • ,





API:





API endpoint /latest.json





- :





Airflow

Airflow. Apache Airflow – - , data engineering . 





(DAG):





  • API





  • (, S3)





  • Slack





DAG:





  • (base currency),













DAG shell-:





TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
 
curl -H "Authorization: Token $OXR_TOKEN" \
 "https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
 | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
      
      



S3:





25 , :





, (, , ). .





, Developer API endpoint /time-series.json, upgrade .





/historical/*.json API :





#!/bin/bash
 
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
 echo $d
 curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
 d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
      
      



, , :





legacy- X ( -) .





, . - .





Data Lake. , :





  • legacy pivot-





  • PARQUET AWS S3





S3 PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
   SELECT 'EUR' AS base_currency
   UNION ALL
   SELECT 'GBP'
   UNION ALL
   SELECT 'RUB'
   UNION ALL
   SELECT 'USD'
)
SELECT
   "day" AS business_dt
   ,b.base_currency
   ,CASE b.base_currency
       WHEN 'EUR' THEN 1
       WHEN 'GBP' THEN gbp_to_eur
       WHEN 'RUB' THEN rub_to_eur
       WHEN 'USD' THEN usd_to_eur
       ELSE NULL
     END AS eur
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_gbp
       WHEN 'GBP' THEN 1
       WHEN 'RUB' THEN rub_to_gbp
       WHEN 'USD' THEN usd_to_gbp
       ELSE NULL
     END AS gbp
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_rub
       WHEN 'GBP' THEN gbp_to_rub
       WHEN 'RUB' THEN 1
       WHEN 'USD' THEN usd_to_rub
       ELSE NULL
     END AS rub
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_usd
       WHEN 'GBP' THEN gbp_to_usd
       WHEN 'RUB' THEN rub_to_usd
       WHEN 'USD' THEN 1
       ELSE NULL
     END AS usd     
FROM ext.currencies c
   CROSS JOIN base b
;
      
      



, S3 , - , . .





DWH S3 External Table

– Amazon Redshift , .





– EXTERNAL TABLE, SQL- , S3. JSON, AVRO, ORC, PARQUET . Redshift Spectrum SQL- Amazon Athena, Presto.





CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
   "timestamp" bigint
   , base varchar(3)
   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;

      
      



rates struct.





dbt. dbt-external-tables EXTERNAL TABLES :





   - name: external
     schema: spectrum
     tags: ["spectrum"]
     loader: S3
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://<BUCKET>/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
      
      



source freshness test . . , .





– 15 – Slack.





() ( API) currencies:





{{
   config(
       materialized='table',
       dist='all',
       sort=["business_dt", "base_currency"]
   )
}}
 
with cbrf as (
 
 select
 
     business_dt
   , null as business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 
 from {{ source('external', 'currencies_cbrf') }}
 where business_dt <= '2021-02-18'
 ),
 
oxr_all as (
 
   select
 
     (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
   , o.base as base_currency
   , o.rates.aed::decimal(10,4) as aed
   , o.rates.eur::decimal(10,4) as eur
   , o.rates.gbp::decimal(10,4) as gbp
   , o.rates.rub::decimal(10,4) as rub
   , o.rates.usd::decimal(10,4) as usd
   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
 
   from {{ source('external', 'currencies_oxr') }} as o
   where business_dt > '2021-02-18'
 
),
 
oxr as (
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 
 from {{ ref('stg_currencies_oxr_all') }}
 where rn = 1
 ),
 
united as (
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from cbrf
 
 union all
 
 select
 
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from oxr
 
)
 
select
 
   business_dt
 , business_ts
 , base_currency
 , aed
 , eur
 , gbp
 , rub
 , usd
 
from united
      
      



Redshift   .





, , . API, - JSON S3,   . :





   select
 
       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
 
   from {{ ref('requests') }} r
       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
           and r.currency = currencies.base_currency

      
      



, :





-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
 
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
 
{%- endmacro %}

      
      



-

– . . , .





Data Engineer OTUS, .





, . – :





  • Data Architecture





  • Data Lake





  • Data Warehouse





  • NoSQL / NewSQL





  • MLOps





.





- Technology Enthusiast.





.








All Articles