Hello everyone.
, . . , . Hadoop, HDFS. -, .
( ) , , β... postgres β . . , , Clickhouse, Snowflake .., Impala Hive. . β . , Impala.
:
. update, delete, etc ( Kudu, ).
β¦ , .
python ORM Impala? ODBC?
1000 1 000 000 .
.
? , , , real-timeβ.
, - , . , .
30 90 1 . β Hue ("") β - .
Impala python- . 2 β impyla cloudera odbc. - , issue . . , , pyodbc. .. , pyodbc aioodbc β pyodbc.
def _convert_timestamp(value):
unpacked = struct.unpack('6Hxxxx', value)
if unpacked == (1970, 1, 1, 0, 0, 0):
return None
return datetime(*unpacked)
class Impala:
def __init__(self):
self.dsn = (
f'DRIVER={{{cfg.impala_driver}}};'
f'HOST={{{cfg.impala_host}}};'
f'PORT={{{cfg.impala_port}}};'
f'SCHEMA={{{cfg.impala_default_schema}}};'
f'UID={{{cfg.impala_uid}}};'
f'PWD={{{cfg.impala_pwd}}};'
f'AUTHMECH=3;'
f'USESASL=1;'
f'SSL=0;'
)
self.pool: Optional[Pool] = None
async def connect(self):
self.pool: Pool = await create_pool(
minsize=cfg.impala_min_poolsize,
maxsize=cfg.impala_max_poolsize,
dsn=self.dsn,
after_created=self.after_created,
autocommit=True,
pool_recycle=55,
)
async with self.pool.acquire() as connection:
async with connection.cursor() as cursor:
cursor: Cursor
result = await cursor.execute('select 1')
await result.fetchone()
async def disconnect(self):
if self.pool is not None:
self.pool.close()
await self.pool.wait_closed()
@staticmethod
async def after_created(connection):
# Driver return zero date values incorrectly
connection.add_output_converter(SQL_TYPE_TIMESTAMP, _convert_timestamp)
connection.setdecoding(SQL_CHAR, encoding='utf-8')
connection.setdecoding(SQL_WCHAR, encoding='utf-8')
connection.setdecoding(SQL_WMETADATA, encoding='utf-16')
connection.setencoding(encoding='utf-8')
async def _run_with_retries(
self,
query,
*params,
retries=0,
max_retries=cfg.impala_max_retries,
):
try:
async with self.pool.acquire() as connection:
async with connection.cursor() as cursor:
result = await cursor.execute(query, *params)
return await getattr(result, 'execute').__call__()
except Error as e:
if retries < max_retries:
retries += 1
return await self._run_with_retries(
query,
*params,
retries=retries,
max_retries=max_retries,
)
raise
async def execute(self, query, *params):
return await self._run_with_retries(query, *params)
@staticmethod
def to_python(data: Union[List[Row], Row]) -> Union[List[dict], dict]:
if isinstance(data, List):
return list(map(Impala.to_python, data))
return {info[0]: data[i] for i, info in enumerate(data.cursor_description)}
Impala , retry- β . - .
β . , , .. , raw-sql , . β . , ( - ?)
SELECT- Impala :
full scan, .. , . . , β . β .
Impala, . .. , , Parquet β . Parquet : https://www.bigdataschool.ru/blog/apache-parquet-avro-spark-big-data.html
Impala , , , , (COMPUTE STATS), .
(, GDPR) . Impala , . KUDU. , primary key , . KUDU primary key ( ), DELETE UPSERT, β . , KUDU (CHAR, DATE, etc.), , , . , . : KUDU Impala. Impala , .
, . (Google, Yandex .). : , , .. , ( ) , , . β .
, click_event β , (, , ..).
, , . : . , . β .
: IP-, .
UPSERT INTO {self.model.get_tablename()} (
id,
rule_run_id,
player_id,
event_id,
ip_hash,
regs_cnt,
analysed_start_day,
analysed_end_day,
created_at
)
WITH same_ip_regs AS (
SELECT
CE.campaign_id,
CE.campaign_sub_id,
CE.player_id,
CE.event_id,
CE.ip_hash ip_address,
count(CE.event_id) OVER (PARTITION BY campaign_id, campaign_sub_id, CE.ip_hash) cnt
FROM click_event CE
WHERE
TO_DATE(CE.event_dt) = '{self.config.process_from}' AND
CE.event_name='registration' AND
CE.ip_hash != 'unknown'
)
SELECT UUID() id, '{rule_run_id}', player_id, event_id, ip_address, cnt, '{self.config.process_from}', '{self.config.process_until}', now() created_at
FROM same_ip_regs
WHERE cnt>1
-:
def generate_rules_select_sql(self):
rules_select_sql = []
for rule in all_rules:
rule_sql = f'''
SELECT player_id, '{rule["slug"]}' rule_slug, rule_run_id
FROM {rule["tablename"]}
WHERE created_at >= date_add(now(), -{self.config.process_lookback})
GROUP BY player_id, rule_slug, rule_run_id
'''
rules_select_sql.append(rule_sql)
return ' UNION ALL '.join(rules_select_sql)
async def run_etl(self):
rules_data_sql = self.generate_rules_select_sql()
insert_sql = f'''
INSERT INTO fraud_user
(
id,
player_id,
rule_slug,
rule_run_id,
created_at,
load_day
)
WITH all_users AS (
{rules_data_sql}
)
SELECT UUID() id, au.player_id, au.rule_slug, au.rule_run_id,
now() created_at, to_date(now()) load_day
FROM all_users au
LEFT ANTI JOIN fraud_user fu
USING(rule_run_id)
'''
await impala.execute(insert_sql)
, , :
INSERT INTO fraud_event
(
id,
event_id,
player_id,
rule_slug,
rule_run_id,
created_at,
load_day
)
SELECT UUID() id,
fn.event_id,
fn.player_id,
fu.rule_slug,
fu.rule_run_id,
now() created_at,
to_date(now()) load_day
FROM fraud_user fu
INNER JOIN raw_event fn
ON fn.player_id = fu.player_id
LEFT ANTI JOIN fraud_event fe
ON fe.event_id = fn.event_id AND fe.rule_run_id = fu.rule_run_id
WHERE fn.dt>= date_add(to_date(now()), -{self.config.process_lookback_for_events})
AND fu.load_day>=date_add(to_date(now()), -{self.config.process_lookback_for_users})
: , , . : , - , , , . - β . Impala , . :
WITH
// -,
filtered_fraud_event AS (
SELECT * FROM fraud_event
WHERE fraud_event.rule_slug IN (<list_of_required_rules>)
),
// -, ,
fraud_events_wo_dups AS (
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_id) AS rn FROM filtered_fraud_event
) AS t
WHERE t.rn=1
),
// -, ,
fraud_events_wo_dups_by_rules AS (
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id, rule_slug ORDER BY event_id) AS rn FROM filtered_fraud_event
) AS t
WHERE t.rn=1
),
// , ,
augmented_click_event AS (
SELECT campaign.full_name, click_event.country, click_event.event_id, click_event.event_name
FROM click_event
LEFT JOIN campaign ON campaign.id=click_event.campaign_id
WHERE
click_event.event_dt <= '{self.config.report_to}'
AND click_event.event_dt >= '{self.config.report_from}'
AND click_event.event_name IN ('registration')
AND click_event.campaign_id IN (<list_of_required_campaigns>)
)
// . layer=1 - , . layer=2 - . layer=3 - .
// layer- .
SELECT
ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 THEN raw_data.event_count ELSE 0 END)) AS 'event_count',
ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 and raw_data.event_name='registration' THEN raw_data.event_count ELSE 0 END)) AS 'event_count.registration',
ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count',
ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 and raw_data.event_name='registration' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count.registration',
ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 2 and raw_data.rule_slug='<rule_name>' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_rule_count.<rule_name>',
...<the_same_block_for_each_fraud_rule>...
raw_data.country, raw_data.full_name
FROM (
SELECT
augmented_click_event.country,
augmented_click_event.event_name,
augmented_click_event.full_name,
null AS rule_slug,
COUNT(augmented_click_event.event_id) AS event_count,
0 AS fraud_event_count,
1 AS layer
FROM augmented_click_event
GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
UNION ALL
SELECT
augmented_click_event.country,
augmented_click_event.event_name,
augmented_click_event.full_name,
null AS rule_slug,
0 AS event_count,
count(fraud_events_wo_dups.event_id) AS fraud_event_count,
3 AS layer
FROM augmented_click_event
LEFT JOIN fraud_events_wo_dups
ON fraud_events_wo_dups.event_id=augmented_click_event.event_id
WHERE fraud_events_wo_dups.event_id IS NOT NULL
GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
UNION ALL
SELECT
augmented_click_event.country,
augmented_click_event.event_name,
augmented_click_event.full_name,
fraud_events_wo_dups_by_rules.rule_slug AS rule_slug,
0 AS event_count,
count(fraud_events_wo_dups_by_rules.event_id) AS fraud_event_count,
2 AS layer
FROM augmented_click_event
LEFT JOIN fraud_events_wo_dups_by_rules
ON fraud_events_wo_dups_by_rules.event_id=augmented_click_event.event_id
WHERE
fraud_events_wo_dups_by_rules.event_id IS NOT NULL
AND fraud_events_wo_dups_by_rules.rule_slug IN (<list_of_required_rules>)
GROUP BY
augmented_click_event.country,
augmented_click_event.event_name,
augmented_click_event.full_name,
fraud_events_wo_dups_by_rules.rule_slug
) AS raw_data
GROUP BY raw_data.country, raw_data.full_name
ORDER BY raw_data.country, raw_data.full_name
, :