New KubernetesExecutor 2.0 in Airflow 2.0

We will introduce you to the new features of KubernetesExecutor 2.0. Spoiler alert !!! The process is faster, more flexible and easier to understand.







Together with Airflow 2.0, we are pleased to present a completely redesigned KubernetesExecutor. This new architecture is faster, more flexible, and easier to understand than KubernetesExecutor 1.10. As a first step, we would like to introduce you to the new features of KubernetesExecutor 2.0!







What is KubernetesExecutor?



In 2018, we introduced the KubernetesExecutor based on the ideas of autoscaling and flexibility. Airflow did not yet have a clear concept for autoscaling Celery Workers (although our recent work with KEDA in this regard has made great strides), so we wanted to create a system that could meet the needs of the user. As a result of this research, a system was created that uses the Kubernetes API to run a pod per-airflow task. A valuable side effect of this Kubernetes API-based system is that it opened up the ability for users to add unique add-ons and constraints for each task.







Using the Kubernetes API and KubernetesExecutor, Airflow users can determine that certain tasks have access to certain secrets, or that a task can only be performed on a node that exists in the European Union (which can be useful for data management). Users can also specify how many resources a task is taking up, which can vary greatly depending on what the task is doing (for example, GPU access is required to run a TensorFlow script). With this API, the KubernetesExecutor allows data engineers to have much finer control over how Airflow performs its tasks than they would just use existing Celery queues.







, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !







KubernetesExecutor



podtemplate



Airflow 1.10.12 pod_template_file



. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .







pod_template_files



Airflow. pod_template_file



, , , CeleryExecutor .







pod pod_template_files



, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.







Execitor_config



Airflow 2.0 executor_config



, . , Python , API Kubernetes. executor_config



podOverride



. , .







, executeor_config



- Airflow 2.0, . , .







podmutationhook



1.10.12, pod_mutation_hook



Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.









KubernetesExecutor. , pod_template_file



pod, Kubernetes pod_override



pod_mutation_hook



pod. , .







, KubernetesExecutor.













, , , . Pod , . .







.













. pod, . V1pod, .









Airflow DevOps, .







, DAG, , executor_config



podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config



, Kubernetes API podOverride , , , , . . , .







, , , , Python pod, . executeor_config



podOverride , PythonOperator API TaskFlow. DAG :







from airflow.decorators import dag, task
from datetime import datetime

import os
import json
import requests
from kubernetes.client import models as k8s

new_config ={ "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}),
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            env=[
                                k8s.V1EnvVar(name="STATE", value="wa")
                                ],
                            )
                        ]
                    )
                )
            }

default_args = {
    'start_date': datetime(2021, 1, 1)
}

@dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():

    @task(executor_config=new_config)
    def get_testing_increase():
        """
        Gets totalTestResultsIncrease field from Covid API for given state and returns value
        """
        url = 'https://covidtracking.com/api/v1/states/'
        res = requests.get(url+'{0}/current.json'.format(os.environ['STATE']))
        return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']}

    get_testing_increase()

dag = taskflow()
      
      





new_config



, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.







KubernetesExecutor



KubernetesExecutor, . , — .







YAML. DAG, DAG git DAG Kubernetes Volume.







, airflow.cfg YAML . YAML .







The best part about these three new features is that they are all available in Airflow 1.10.13. You can start the migration process right away and enjoy the benefits and acceleration of this simpler design. We look forward to your feedback and please do not hesitate to contact us with any questions, feature requests or documentation!








All Articles