Hey! Connected Artemiy - Analytics Engineer from Wheely.
Today I would like to talk about converting financial indicators into different currencies. The question is quite relevant, since a large number of companies have multinational zones of presence, build global-scale analytics, prepare reports in accordance with international standards.
Let me show you how this issue is solved using modern approaches using the example of the Wheely case:
: Open Exchange Rates, Airflow, Redshift Spectrum, dbt.
legacy- - . . , AED ( ). - BTC, ETH, - .
:
, API
,
( )
, . , :
API
()
legacy-
pivot- . , , .
pandas . (T ELT) , dbt.
, , – https://openexchangerates.org/
Developer :
10.000 ( )
,
API:
API endpoint /latest.json
- :
Airflow
Airflow. Apache Airflow – - , data engineering .
(DAG):
API
(, S3)
Slack
DAG:
(base currency),
DAG shell-:
TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
curl -H "Authorization: Token $OXR_TOKEN" \
"https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
| aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
S3:
25 , :
, (, , ). .
, Developer API endpoint /time-series.json, upgrade .
/historical/*.json API :
#!/bin/bash
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
echo $d
curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
, , :
legacy- X ( -) .
, . - .
Data Lake. , :
legacy pivot-
PARQUET AWS S3
S3 PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
SELECT 'EUR' AS base_currency
UNION ALL
SELECT 'GBP'
UNION ALL
SELECT 'RUB'
UNION ALL
SELECT 'USD'
)
SELECT
"day" AS business_dt
,b.base_currency
,CASE b.base_currency
WHEN 'EUR' THEN 1
WHEN 'GBP' THEN gbp_to_eur
WHEN 'RUB' THEN rub_to_eur
WHEN 'USD' THEN usd_to_eur
ELSE NULL
END AS eur
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_gbp
WHEN 'GBP' THEN 1
WHEN 'RUB' THEN rub_to_gbp
WHEN 'USD' THEN usd_to_gbp
ELSE NULL
END AS gbp
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_rub
WHEN 'GBP' THEN gbp_to_rub
WHEN 'RUB' THEN 1
WHEN 'USD' THEN usd_to_rub
ELSE NULL
END AS rub
,CASE b.base_currency
WHEN 'EUR' THEN eur_to_usd
WHEN 'GBP' THEN gbp_to_usd
WHEN 'RUB' THEN rub_to_usd
WHEN 'USD' THEN 1
ELSE NULL
END AS usd
FROM ext.currencies c
CROSS JOIN base b
;
, S3 , - , . .
DWH S3 External Table
– Amazon Redshift , .
– EXTERNAL TABLE, SQL- , S3. JSON, AVRO, ORC, PARQUET . Redshift Spectrum SQL- Amazon Athena, Presto.
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
"timestamp" bigint
, base varchar(3)
, rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;
rates struct.
dbt. dbt-external-tables EXTERNAL TABLES :
- name: external
schema: spectrum
tags: ["spectrum"]
loader: S3
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://<BUCKET>/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
– source freshness test . . , .
– 15 – Slack.
() ( API) currencies:
{{
config(
materialized='table',
dist='all',
sort=["business_dt", "base_currency"]
)
}}
with cbrf as (
select
business_dt
, null as business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ source('external', 'currencies_cbrf') }}
where business_dt <= '2021-02-18'
),
oxr_all as (
select
(timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
, (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
, o.base as base_currency
, o.rates.aed::decimal(10,4) as aed
, o.rates.eur::decimal(10,4) as eur
, o.rates.gbp::decimal(10,4) as gbp
, o.rates.rub::decimal(10,4) as rub
, o.rates.usd::decimal(10,4) as usd
, row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
from {{ source('external', 'currencies_oxr') }} as o
where business_dt > '2021-02-18'
),
oxr as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from {{ ref('stg_currencies_oxr_all') }}
where rn = 1
),
united as (
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from cbrf
union all
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from oxr
)
select
business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from united
Redshift .
, , . API, - JSON S3, . :
select
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
from {{ ref('requests') }} r
left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
and r.currency = currencies.base_currency
, :
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
-
– . . , .
Data Engineer OTUS, .
, . – :
Data Architecture
Data Lake
Data Warehouse
NoSQL / NewSQL
MLOps
.