Airflow is the ideal choice for data pipelines, i.e. ETL orchestration and planning. It is widely used and popular for the future data pipelines. It provides backfilling, versioning, and lineage through functional abstraction.
Functional programming is the future.
An operator defines a unit in a workflow, a DAG is a set of Tasks. Operators usually work independently, in fact they can work on completely two different machines. If you are a data engineer and have worked with Apache Spark or Apache Drill, you probably know what a DAG is! Airflow has the same concept.
Create data pipelines that:
Idempotent.
Deterministic.
They have no side effects.
Use consistent sources and directions.
Not updated, not added.
Modularity and scalability are the primary goals of functional data pipelines.
Haskell, Scala, Erlang Kotlin, , , , , ! . â .
ETL / Data Lake / Streaming Infrastructure , Hadoop / Spark , Hortonworks, MapR, Cloudera . . , , Apache Hadoop / Apache Spark Cluster, Airflow Cluster .
ETL , , Oozie, Luigi Airflow. Oozie XML, 2019 ! :), Luigi , Airflow Airbnb.
Luigi Airflow?
Airflow , Luigi cron.
Luigi .
Luigi .
Luigi - Cron.
Luigi .
Luigi , .
Airflow Luigi Scikit-learn, Numpy, Pandas, Theano . .
, Airflow Celery RabbitMQ Ambari.
, .
Airflow Hadoop Spark Cluster, Airflow Spark/Hive/Hadoop Map Reduce, .
!
airflow-ambari-mpack ( Apache Airflow Apache Ambari), FOSS Contributor https://github.com/miho120/ambari-airflow-mpack, .
:
1 4, RabbitMQ .
- Apache MPack Airflow
a. git clone https://github.com/miho120/ambari-mpack.git b. stop ambari server c. install the apache mpack for airflow on ambari server d. start ambari server
- Airflow Service Ambari
Ambari
http://<HOST_NAME>:8080
Ambari (Ambari UI), -> . (Actions -> Add Service)
HDP Ambari Dashboard
1 , Airflow Ambari.
Airflow Ambari
, -, . Airflow -, master , , Install Worker data-.
Ambari Master / Name Airflow
, - Airflow Airflow Name Hadoop / Spark.
, Airflow Worker Data .
, 3 (worker) data .
Airflow Ambari
Ambari UI: 3 Airflow
, , / , . + .
Airflow Ambari:
Airflow Service, Config Ambari.
Airflow Ambari
- Executor
executor = CeleryExecutor
Advanced airflow-core-site Executor CeleryExecutor.
- SQL Alchemy Connection
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
SQL Alchemy Connection
SQL Alchemy postgresql, .
- URL-
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/ celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
URL- Celery result backend Airflow
dags_are_paused_at_creation = True load_examples = False
Airflow-core-site.
, Ambari Airflow, Ambari , Service Actions -> InitDB.
Airflow Initdb Ambari
airflow. Airflow.
- - Airflow:
- RabbitMQ :
- RabbitMQ :
- RabbitMQ :
- Celery Flower
Celery Flower â - Celery. â 5555.
, 3 , ÂŤÂť Celery .
, ÂŤCelery FlowerÂť, -, Celery, . airflow flower
, - Flower.
nohup airflow flower >> /var/local/airflow/logs/flower.logs &
Airflow Ambari HDP Hadoop / Spark Cluster.
, .
, Apache Airflow, ÂŤ Multi-Node Airflow Cluster HDP Ambari Celery Âť. , . , .
, Multi-Node Airflow Cluster.
1. LocalExecutor CeleryExecutor , .
Worker Scheduler Celery.
:
AttributeError: âDisabledBackendâ object has no attribute â_get_task_meta_forâ Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019â04â10 21:03:51,962] {celery_executor.py:112} ERROR â Error syncing the celery executor, ignoring it: Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019â04â10 21:03:51,962] {celery_executor.py:113} ERROR â âDisabledBackendâ object has no attribute â_get_task_meta_forâ
Airflow , , , . . Celery .
:
Celery 3.3.5 ( Airflow 1.10 ( ).
pip install --upgrade celery 3.3.5 => 4.3
2: DAG CeleryExecutor DAG - , , .
Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s)) Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument âobjectâ (pos 1) not found Apr 11 14:13:13 charlie-prod airflow_control.sh: [2019â04â11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(âRequired argument âobjectâ (pos 1) not foundâ,) Apr 11 14:13:13 charlie-prod airflow_control.sh: Traceback (most recent call last): Apr 11 14:13:13 charlie-prod airflow_control.sh: File â/usr/lib64/python2.7/site-packages/billiard/pool.pyâ, line 289, in __call__ Apr 11 14:13:13 charlie-prod airflow_control.sh: sys.exit(self.workloop(pid=pid)) Apr 11 14:13:13 charlie-prod airflow_control.sh: File â/usr/lib64/python2.7/site-packages/billiard/pool.pyâ, line 347, in workloop Apr 11 14:13:13 charlie-prod airflow_control.sh: req = wait_for_job() Apr 11 14:13:13 charlie-prod airflow_control.sh: File â/usr/lib64/python2.7/site-packages/billiard/pool.pyâ, line 447, in receive Apr 11 14:13:13 charlie-prod airflow_control.sh: ready, req = _receive(1.0) Apr 11 14:13:13 charlie-prod airflow_control.sh: File â/usr/lib64/python2.7/site-packages/billiard/pool.pyâ, line 419, in _recv Apr 11 14:13:13 charlie-prod airflow_control.sh: return True, loads(get_payload()) Apr 11 14:13:13 charlie-prod airflow_control.sh: File â/usr/lib64/python2.7/site-packages/billiard/common.pyâ, line 101, in pickle_loads Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s)) Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument âobjectâ (pos 1) not found
:
.
airflow : https://blog.csdn.net/u013492463/article/details/80881260
, , , , .
:
broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
, pyamqp , , .
amqp://
â , librabbitmq
, , py-amqp
, .
pyamqp://
librabbitmq://
, , . pyamqp: // amqp (http://github.com/celery/py-amqp)
:
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
amqp pyamqp .
:
pip install pyamqp
3: SQL Alchemy
:
SQL alchemy connection
sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow
:
:
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
psycopg2
pip
wheel.
PostGreSQL: psycopg2
Psycopg
â PostgreSQL Python.
4: HDP 2.6.2 Ambari, Worker Installation .
- , , Celery worker , DAG .
:) .
by âSSLError(SSLError(âbad handshake: SysCallError(104, âECONNRESETâ)â,),)â: /simple/apache-airflow/ Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by âSSLError(SSLError(âbad handshake: SysCallError(104, âECONNRESETâ)â,),)â: /simple/apache-airflow/ Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by âSSLError(SSLError(âbad handshake: SysCallError(104, âECONNRESETâ)â,),)â: /simple/apache-airflow/ Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by âSSLError(SSLError(âbad handshake: SysCallError(104, âECONNRESETâ)â,),)â: /simple/apache-airflow/ Could not fetch URL https://pypi.org/simple/apache-airflow/: There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=âpypi.orgâ, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(âbad handshake: SysCallError(104, âECONNRESETâ)â,),)) â skipping
:
, pip wheel , worker Ambari. wheelâs of pip.
, pypi wheel.
pip install --trusted-host pypi.python.org --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0
, , . , .
resource_management.core.exceptions.ExecutionFailed: Execution of âexport SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install â trusted-host pypi.python.org â trusted-host pypi.org â trusted-host files.pythonhosted.org â upgrade â ignore-installed apache-airflow[celery]==1.10.0â returned 1. Collecting apache-airflow[celery]==1.10.0 Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by âProtocolError(âConnection aborted.â, error(104, âConnection reset by peerâ))â: /simple/apache-airflow/ Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by âProtocolError(âConnection aborted.â, error(104, âConnection reset by peerâ))â: /simple/apache-airflow/ Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by âProtocolError(âConnection aborted.â, error(104, âConnection reset by peerâ))â: /simple/apache-airflow/ Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by âProtocolError(âConnection aborted.â, error(104, âConnection reset by peerâ))â: /simple/apache-airflow/ Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by âProtocolError(âConnection aborted.â, error(104, âConnection reset by peerâ))â: /simple/apache-airflow/ Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: ) No matching distribution found for apache-airflow[celery]==1.10.0 You are using pip version 8.1.2, however version 19.0.3 is available. You should consider upgrading via the âpip install â upgrade pipâ command.
pip, .
, Hack , , â , celery wheel pip, .
In the cluster, I manually commented out these lines temporarily ( later reverted after successfully installing the worker ) and added a worker from Ambari that worked like a charm :) and this hack made my day.
After installing worker on another node, you may need to restart the airflow service from Ambari. You can find out more from my previous blog post; Configuring Multi-Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines