We will introduce you to the new features of KubernetesExecutor 2.0. Spoiler alert !!! The process is faster, more flexible and easier to understand.
Together with Airflow 2.0, we are pleased to present a completely redesigned KubernetesExecutor. This new architecture is faster, more flexible, and easier to understand than KubernetesExecutor 1.10. As a first step, we would like to introduce you to the new features of KubernetesExecutor 2.0!
What is KubernetesExecutor?
In 2018, we introduced the KubernetesExecutor based on the ideas of autoscaling and flexibility. Airflow did not yet have a clear concept for autoscaling Celery Workers (although our recent work with KEDA in this regard has made great strides), so we wanted to create a system that could meet the needs of the user. As a result of this research, a system was created that uses the Kubernetes API to run a pod per-airflow task. A valuable side effect of this Kubernetes API-based system is that it opened up the ability for users to add unique add-ons and constraints for each task.
Using the Kubernetes API and KubernetesExecutor, Airflow users can determine that certain tasks have access to certain secrets, or that a task can only be performed on a node that exists in the European Union (which can be useful for data management). Users can also specify how many resources a task is taking up, which can vary greatly depending on what the task is doing (for example, GPU access is required to run a TensorFlow script). With this API, the KubernetesExecutor allows data engineers to have much finer control over how Airflow performs its tasks than they would just use existing Celery queues.
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.
, , , . Pod , . .
.
. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
The best part about these three new features is that they are all available in Airflow 1.10.13. You can start the migration process right away and enjoy the benefits and acceleration of this simpler design. We look forward to your feedback and please do not hesitate to contact us with any questions, feature requests or documentation!