Apache Airflow: Making ETL Easier

Hi, I'm Dmitry Logvinenko - Data Engineer of the Analytics Department of the Lucky Group of Companies.



I'm going to tell you about a great tool for developing ETL processes - Apache Airflow. But Airflow is so versatile and multifaceted that you should take a closer look at it even if you are not dealing with data streams, but have the need to periodically launch any processes and monitor their execution.



And yes, I will not only tell, but also show: the program has a lot of code, screenshots and recommendations.





What you usually see when you google the word Airflow / Wikimedia Commons



Table of contents





Introduction



Apache Airflow is just like Django:



  • written in Python,
  • there is an excellent admin panel,
  • expandable without limit,


- only better, and made for completely different purposes, namely (as written before kata):



  • ( Celery/Kubernetes )
  • workflow Python-
  • API , ( ).


Apache Airflow :



  • ( SQL Server PostgreSQL, API , 1) DWH ODS ( Vertica Clickhouse).
  • cron, ODS, .


32 50 GB . Airflow :



  • 200 ( workflows, ),
  • 70 ,
  • ( ) .


, , , ΓΌber-, :



SQL Server’, 50 β€” , , ( , --), Orders ( ). , (-, -, ETL-) , , Vertica.

!



, ( )



( )



, SQL- , ETL- aka :



  • Informatica Power Center β€” , , , . 1% . ? , -, - . -, , ---. , Airbus A380/, .



    , 30







  • SQL Server Integration Server β€” . : SQL Server , ETL- - . : , … , . dtsx ( XML ) , ? , ? , , . , , :







. SSIS-...



… . Apache Airflow.



, ETL- β€” Python-, . , Python- - 13” .





, , Airflow, , Celery , .



, docker-compose.yml :



  • Airflow: Scheduler, Webserver. Flower Celery- ( apache/airflow:1.10.10-python3.7, );
  • PostgreSQL, Airflow ( , . .), Celery β€” ;
  • Redis, Celery;
  • Celery worker, .
  • ./dags . , .


- ( ), - . https://github.com/dm-logv/airflow-tutorial.


docker-compose.yml
version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker


:



  • puckel/docker-airflow – . , .
  • Airflow airflow.cfg, ( ), .
  • , production-ready: heartbeats , . , .
  • , :

    • , .
    • β€” .


:



$ docker-compose up --scale worker=3


, , -:







«», :



  • Scheduler β€” Airflow, , , : , , .



    , , (, , ) - run_duration β€” . .



  • DAG ( «») β€” Β« Β», , (. ) Package SSIS Workflow Informatica.



    , .



  • DAG Run β€” , execution_date. ( , , ).



  • Operator β€” , - . :



    • action, PythonOperator, () Python-;
    • transfer, , , MsSqlToHiveTransfer;
    • sensor - . HttpSensor , , GoogleCloudStorageToS3Operator. : Β«? !Β» , . , .


  • Task β€” .



  • Task instance β€” - , - ( , LocalExecutor CeleryExecutor), (. . β€” ), .







, , .



, :



from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)


:



  • ;
  • sql_server_ds β€” List[namedtuple[str, str]] Airflow Connections ;
  • dag β€” , globals(), Airflow . :

    • orders β€” -,
    • , ,
    • , 6 ( timedelta() cron- 0 0 0/6 ? * * *, β€” @daily);
  • workflow() , . .
  • :

    • ;
    • PythonOperator, workflow(). ( ) . provide_context , **context.


. :



  • -,
  • , ( Airflow, Celery ).


, .





?



docker-compose.yml requirements.txt .



:





β€” task instances, .



, :





, , β€” . β€” .



, ./dags, β€” git Gitlab, Gitlab CI master.


Flower



-, , - β€” Flower.



-:





, :





:





β€” :







, , .





β€” . Airflow , .



task instances.



, :





, Clear . , , - , .





, β€” Airflow. , : Browse/Task Instances





:





( , ):





,



DAG, update_reports.py:



from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent(""" ,  """),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""\
         , ,  {{ dag.dag_id }} 
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]


- ? : , ; , ; , ( , ).



:



  • from commons.operators import TelegramBotSendMessage β€” , , . ( );
  • default_args={} β€” ;
  • to='{{ var.value.all_the_kings_men }}' β€” to , Jinja email-, Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS β€” . , ;
  • tg_bot_conn_id='tg_main' β€” conn_id , Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED β€” Telegram ;
  • task_concurrency=1 β€” task instances . , VerticaOperator ( );
  • report_update >> [email, tg] β€” VerticaOperator , :



    - , . Tree View :





β€” .



β€” Jinja-, . , :



SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE


{{ ds }} execution_date YYYY-MM-DD: 2020-07-14. , ( Tree View), .



Rendered -. :





:





: Macros Reference



, , , .



, ( ). Admin/Variables :





, :



TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')


, JSON. JSON-:



bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}


: {{ var.json.bot_config.bot.token }}.



. : Admin/Connections , / . :





( , ), ( tg_main) β€” , Airflow ( - β€” ), .



: BaseHook.get_connection(), , ( Round Robin, Airflow).



Variables Connections, , , : , β€” Airflow. C , , , UI. β€” - , () .

β€” . Airflow β€” . , JiraHook Jira ( -), SambaHook smb-.





, , TelegramBotSendMessage



commons/operators.py :



from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)


, Airflow, :



  • BaseOperator, Airflow- ( )
  • template_fields, Jinja .
  • __init__(), , .
  • .
  • TelegramBotHook, -.
  • () BaseOperator.execute(), Airfow , β€” , . (, , stdout stderr β€” Airflow , , , .)


, commons/hooks.py. , :



from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client


, , :



  • , β€” : conn_id;
  • : get_conn(), - extra ( JSON), ( !) Telegram-: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • TelegramBot, .


. c TelegramBotHook().clent TelegramBotHook().get_conn().



, Telegram REST API, python-telegram-bot sendMessage.



class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))


β€” : TelegramBotSendMessage, TelegramBotHook, TelegramBot β€” , , Open Source.

, . , ...





- ! ? !



- ?



, - ? SQL Server Vertica , , !



, - . .



:



  1. ,
  2. SQL Server
  3. Vertica


, , docker-compose.yml:



docker-compose.db.yml
version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py


:



  • Vertica dwh ,
  • SQL Server,
  • - ( mssql_init.py!)


, , :



$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3


, , Data Profiling/Ad Hoc Query:





,



ETL- , : , , , :



with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15


session.py
from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}\n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass


. :



source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)


  1. Airflow pymssql-
  2. β€” .
  3. pandas, DataFrame β€” .


{dt} %s , , pandas pymssql params: List, tuple.

, pymssql , pyodbc.

, Airflow :





, . . . --, ?! :



if df.empty:
    raise AirflowSkipException('No rows to load')


AirflowSkipException Airflow, , , . , pink.



:



df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])


:



  • , ,
  • ( ),
  • β€” ( ) .


: Vertica. , , β€” CSV!



# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)


  1. StringIO.
  2. pandas DataFrame CSV-.
  3. Vertica .
  4. copy() !


, , , :



session.loaded_rows = cursor.rowcount
session.successful = True


.



. :


create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)


VerticaOperator() ( , ). , :


for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load




β€” , β€” , β€” ,

, ?

, «»



, : ETL-: SSIS Airflow… … , , , !



- , Apache Airflow β€” β€” .



: , β€” Airflow : , , ( , ).



, -



,



  • start_date. , . start_date . , start_date , schedule_interval β€” , DAG .



    start_date = datetime(2020, 7, 7, 0, 1, 2)


    .



    : Task is missing the start_date parameter, , .



  • . , ( Airflow ), -, , . . , PostgreSQL 20 5 , .



  • LocalExecutor. , , . LocalExecutor’ , , , CeleryExecutor. , , Celery , Β«, , !Β»



  • :



    • Connections ,
    • SLA Misses , ,
    • XCom ( !) .


  • . ? . Gmail >90k Airflow, - 100 .





: Apache Airflow Pitfails




, , Airflow :



  • REST API β€” Experimental, . , / , DAG Run .



  • CLI β€” , WebUI, . :



    • backfill .

      , , : Β« , , 1 13 ! ---!Β». :

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • : initdb, resetdb, upgradedb, checkdb.
    • run, , . , LocalExecutor, Celery-.
    • test, .
    • connections .


  • Python API β€” , , . /home/airflow/dags, ipython ? , , :



    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)


  • Airflow. , , API.



    , , . β€” , .



    , SQL!
    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0








Airflow .





, :






All Articles