Impala for Python developer using the example of fraud detection when analyzing traffic in a marketing platform

Hello everyone.





, . . , . Hadoop, HDFS. -, .





( ) , , β€œ... postgres ” . . , , Clickhouse, Snowflake ..,     Impala Hive. . – . , Impala.





:





  • . update, delete, etc ( Kudu, ).





  • … , .





  • python ORM Impala? ODBC?





  • 1000 1 000 000 .





  • .





  • ? , , , real-time’.





  • , - , . , .





30   90 1 .  – Hue ("") – - .





Impala python-    . 2 – impyla cloudera odbc. - , issue . . , , pyodbc. .. , pyodbc aioodbc – pyodbc.





def _convert_timestamp(value):
    unpacked = struct.unpack('6Hxxxx', value)
    if unpacked == (1970, 1, 1, 0, 0, 0):
        return None
    return datetime(*unpacked)
 
 
class Impala:
    def __init__(self):
        self.dsn = (
            f'DRIVER={{{cfg.impala_driver}}};'
            f'HOST={{{cfg.impala_host}}};'
            f'PORT={{{cfg.impala_port}}};'
            f'SCHEMA={{{cfg.impala_default_schema}}};'
            f'UID={{{cfg.impala_uid}}};'
            f'PWD={{{cfg.impala_pwd}}};'
            f'AUTHMECH=3;'
            f'USESASL=1;'
            f'SSL=0;'
        )
 
        self.pool: Optional[Pool] = None
 
    async def connect(self):
        self.pool: Pool = await create_pool(
            minsize=cfg.impala_min_poolsize,
            maxsize=cfg.impala_max_poolsize,
            dsn=self.dsn,
            after_created=self.after_created,
            autocommit=True,
            pool_recycle=55,
        )
 
        async with self.pool.acquire() as connection:
            async with connection.cursor() as cursor:
                cursor: Cursor
 
                result = await cursor.execute('select 1')
                await result.fetchone()
 
    async def disconnect(self):
        if self.pool is not None:
            self.pool.close()
            await self.pool.wait_closed()
 
    @staticmethod
    async def after_created(connection):
        # Driver return zero date values incorrectly
        connection.add_output_converter(SQL_TYPE_TIMESTAMP, _convert_timestamp)
 
        connection.setdecoding(SQL_CHAR, encoding='utf-8')
        connection.setdecoding(SQL_WCHAR, encoding='utf-8')
        connection.setdecoding(SQL_WMETADATA, encoding='utf-16')
        connection.setencoding(encoding='utf-8')
 
    async def _run_with_retries(
        self,
        query,
        *params,
        retries=0,
        max_retries=cfg.impala_max_retries,
    ):
        try:
            async with self.pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    result = await cursor.execute(query, *params)
                    return await getattr(result, 'execute').__call__()
        except Error as e:
            if retries < max_retries:
                retries += 1
                return await self._run_with_retries(
                    query,
                    *params,
                    retries=retries,
                    max_retries=max_retries,
                )
            raise
 
    async def execute(self, query, *params):
        return await self._run_with_retries(query, *params)
 
    @staticmethod
    def to_python(data: Union[List[Row], Row]) -> Union[List[dict], dict]:
        if isinstance(data, List):
            return list(map(Impala.to_python, data))
 
        return {info[0]: data[i] for i, info in enumerate(data.cursor_description)}
      
      



Impala , retry- – . - .





– . , , .. , raw-sql , . – . ,  ( - ?)





SELECT- Impala :





  • full scan, .. , . . , – . – .





  • Impala, . .. , , Parquet – . Parquet : https://www.bigdataschool.ru/blog/apache-parquet-avro-spark-big-data.html





  • Impala , , , ,   (COMPUTE STATS), .





(, GDPR) . Impala , .  KUDU. , primary key , . KUDU primary key ( ), DELETE UPSERT, – . , KUDU (CHAR, DATE, etc.), , , . , . : KUDU Impala. Impala   , .





, . (Google, Yandex .). : , , .. , ( ) , , . – .





, click_event –     , (, , ..).





, ,  . : . , . – .





: IP-, .





UPSERT INTO {self.model.get_tablename()} (
    id,
    rule_run_id,
    player_id,
    event_id,
    ip_hash,
    regs_cnt,
    analysed_start_day,
    analysed_end_day,
    created_at
)
WITH same_ip_regs AS (
    SELECT
        CE.campaign_id,
        CE.campaign_sub_id,
        CE.player_id,
        CE.event_id,
        CE.ip_hash ip_address,
        count(CE.event_id) OVER (PARTITION BY campaign_id, campaign_sub_id, CE.ip_hash) cnt
    FROM click_event CE
    WHERE
        TO_DATE(CE.event_dt) = '{self.config.process_from}' AND
        CE.event_name='registration' AND
        CE.ip_hash != 'unknown'
)
SELECT UUID() id, '{rule_run_id}', player_id, event_id, ip_address, cnt, '{self.config.process_from}', '{self.config.process_until}', now() created_at
FROM same_ip_regs
WHERE cnt>1
      
      



-:





def generate_rules_select_sql(self):
    rules_select_sql = []

    for rule in all_rules:
        rule_sql = f'''
            SELECT player_id, '{rule["slug"]}' rule_slug, rule_run_id
            FROM {rule["tablename"]}
            WHERE created_at >= date_add(now(), -{self.config.process_lookback})
            GROUP BY player_id, rule_slug, rule_run_id
        '''
        rules_select_sql.append(rule_sql)

    return ' UNION ALL '.join(rules_select_sql)
 
async def run_etl(self):
    rules_data_sql = self.generate_rules_select_sql()

    insert_sql = f'''
        INSERT INTO fraud_user
        (
            id,
            player_id,
            rule_slug,
            rule_run_id,
            created_at,
            load_day
        )
        WITH all_users AS (
            {rules_data_sql}
        )
        SELECT UUID() id, au.player_id, au.rule_slug, au.rule_run_id,
            now() created_at, to_date(now()) load_day
        FROM all_users au
        LEFT ANTI JOIN fraud_user fu
            USING(rule_run_id)
    '''

    await impala.execute(insert_sql)
      
      



, , :





INSERT INTO fraud_event
(
    id,
    event_id,
    player_id,
    rule_slug,
    rule_run_id,
    created_at,
    load_day
)
SELECT UUID() id,
    fn.event_id,
    fn.player_id,
    fu.rule_slug,
    fu.rule_run_id,
    now() created_at,
    to_date(now()) load_day
FROM fraud_user fu
INNER JOIN raw_event fn
    ON fn.player_id = fu.player_id
LEFT ANTI JOIN fraud_event fe
    ON fe.event_id = fn.event_id AND fe.rule_run_id = fu.rule_run_id
WHERE fn.dt>= date_add(to_date(now()), -{self.config.process_lookback_for_events})
    AND fu.load_day>=date_add(to_date(now()), -{self.config.process_lookback_for_users})
      
      



: , , . : , - , , , . - – . Impala , .  :





WITH
//  -,    
filtered_fraud_event AS (
    SELECT * FROM fraud_event
    WHERE fraud_event.rule_slug IN (<list_of_required_rules>)
),
//  -,    ,  
fraud_events_wo_dups AS (
    SELECT * FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_id) AS rn FROM filtered_fraud_event
    ) AS t
    WHERE t.rn=1
),
//  -,    ,      
fraud_events_wo_dups_by_rules AS (
    SELECT * FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id, rule_slug ORDER BY event_id) AS rn FROM filtered_fraud_event
    ) AS t
    WHERE t.rn=1
),
//    ,   ,   
augmented_click_event AS (
    SELECT campaign.full_name, click_event.country, click_event.event_id, click_event.event_name
    FROM click_event
    LEFT JOIN campaign ON campaign.id=click_event.campaign_id
    WHERE 
        click_event.event_dt <= '{self.config.report_to}' 
        AND click_event.event_dt >= '{self.config.report_from}' 
        AND click_event.event_name IN ('registration') 
        AND click_event.campaign_id IN (<list_of_required_campaigns>)
)
 
//   .   layer=1 -  ,  .  layer=2 -     .  layer=3 -     .
//          layer-        .
SELECT
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 THEN raw_data.event_count ELSE 0 END)) AS 'event_count',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 and raw_data.event_name='registration' THEN raw_data.event_count ELSE 0 END)) AS 'event_count.registration',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 and raw_data.event_name='registration' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count.registration',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 2 and raw_data.rule_slug='<rule_name>' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_rule_count.<rule_name>',
    ...<the_same_block_for_each_fraud_rule>...
 
    raw_data.country, raw_data.full_name
FROM (
    SELECT 
        augmented_click_event.country, 
        augmented_click_event.event_name,
        augmented_click_event.full_name,
        null AS rule_slug,
        COUNT(augmented_click_event.event_id) AS event_count,
        0 AS fraud_event_count,
        1 AS layer
    FROM augmented_click_event
    GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
 
    UNION ALL
 
    SELECT 
        augmented_click_event.country, 
        augmented_click_event.event_name, 
        augmented_click_event.full_name,
        null AS rule_slug,
        0 AS event_count,
        count(fraud_events_wo_dups.event_id) AS fraud_event_count,
        3 AS layer
    FROM augmented_click_event
    LEFT JOIN fraud_events_wo_dups
    ON fraud_events_wo_dups.event_id=augmented_click_event.event_id
    WHERE fraud_events_wo_dups.event_id IS NOT NULL
    GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
 
    UNION ALL
 
    SELECT 
        augmented_click_event.country, 
        augmented_click_event.event_name, 
        augmented_click_event.full_name,
        fraud_events_wo_dups_by_rules.rule_slug AS rule_slug,
        0 AS event_count,
        count(fraud_events_wo_dups_by_rules.event_id) AS fraud_event_count,
        2 AS layer
    FROM augmented_click_event
    LEFT JOIN fraud_events_wo_dups_by_rules
    ON fraud_events_wo_dups_by_rules.event_id=augmented_click_event.event_id
    WHERE 
        fraud_events_wo_dups_by_rules.event_id IS NOT NULL 
        AND fraud_events_wo_dups_by_rules.rule_slug IN (<list_of_required_rules>)
    GROUP BY 
        augmented_click_event.country, 
        augmented_click_event.event_name, 
        augmented_click_event.full_name, 
        fraud_events_wo_dups_by_rules.rule_slug
) AS raw_data
 
GROUP BY raw_data.country, raw_data.full_name
ORDER BY raw_data.country, raw_data.full_name
      
      



  , :








All Articles