Setting up a multinode Airflow cluster with HDP Ambari and Celery for data pipelines

Airflow is the ideal choice for data pipelines, i.e. ETL orchestration and planning. It is widely used and popular for the future data pipelines. It provides backfilling, versioning, and lineage through functional abstraction.

Functional programming is the future.

An operator defines a unit in a workflow, a DAG is a set of Tasks. Operators usually work independently, in fact they can work on completely two different machines. If you are a data engineer and have worked with Apache Spark or Apache Drill, you probably know what a DAG is! Airflow has the same concept.

Create data pipelines that:

  1. Idempotent.

  2. Deterministic.

  3. They have no side effects.

  4. Use consistent sources and directions.

  5. Not updated, not added.

Modularity and scalability are the primary goals of functional data pipelines.

  1. Apache MPack Airflow

a. git clone
b. stop ambari server
c. install the apache mpack for airflow on ambari server
d. start ambari server

  1. Airflow Service Ambari



  1. Executor

executor = CeleryExecutor

Advanced airflow-core-site Executor CeleryExecutor.

  1. SQL Alchemy Connection

sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow

  1. URL-

broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow

URL- Celery result backend Airflow

dags_are_paused_at_creation = True
load_examples = False


nohup airflow flower >> /var/local/airflow/logs/flower.logs &

AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Apr 10 21:03:52 charlie-prod [2019–04–10 21:03:51,962] {} ERROR — Error syncing the celery executor, ignoring it:
Apr 10 21:03:52 charlie-prod [2019–04–10 21:03:51,962] {} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’

pip install --upgrade celery
3.3.5 => 4.3

Apr 11 14:13:13 charlie-prod return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod TypeError: Required argument ‘object’ (pos 1) not found
Apr 11 14:13:13 charlie-prod [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,)
Apr 11 14:13:13 charlie-prod Traceback (most recent call last):
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 289, in __call__
Apr 11 14:13:13 charlie-prod sys.exit(self.workloop(pid=pid))
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 347, in workloop
Apr 11 14:13:13 charlie-prod req = wait_for_job()
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 447, in receive
Apr 11 14:13:13 charlie-prod ready, req = _receive(1.0)
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 419, in _recv
Apr 11 14:13:13 charlie-prod return True, loads(get_payload())
Apr 11 14:13:13 charlie-prod File “/usr/lib64/python2.7/site-packages/billiard/”, line 101, in pickle_loads
Apr 11 14:13:13 charlie-prod return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod TypeError: Required argument ‘object’ (pos 1) not found



broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

pip install pyamqp

sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow



sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow




PostGreSQL: psycopg2


— PostgreSQL Python.

by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
 Could not fetch URL There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping


pip install --trusted-host --trusted-host --trusted-host --upgrade  --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0

resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host — trusted-host — trusted-host — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0
 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
 Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: )
No matching distribution found for apache-airflow[celery]==1.10.0
You are using pip version 8.1.2, however version 19.0.3 is available.
You should consider upgrading via the ‘pip install — upgrade pip’ command.

In the cluster, I manually commented out these lines temporarily ( later reverted after successfully installing the worker ) and added a worker from Ambari that worked like a charm :) and this hack made my day.

After installing worker on another node, you may need to restart the airflow service from Ambari. You can find out more from my previous blog post; Configuring Multi-Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines

