Kubernetes operators on Airflow

Kubernetes is an open-source system for automating the deployment, scaling, and management of containerized applications. It is becoming increasingly popular for managing data pipelines, particularly those built with Apache Airflow.

https://cloudwithease.com/what-is-kubernetes/

One of the main benefits of using Kubernetes in data pipelines is the ability to easily scale and manage the resources required for processing large volumes of data. With Kubernetes, you can define the resources required for each task in your pipeline and the system will automatically scale up or down as needed to ensure that your pipeline is running efficiently.

In addition to resource management, Kubernetes also provides features such as self-healing, rollbacks, and canary deployments, which can help ensure that your pipeline is robust and reliable.

To use Kubernetes with Airflow, you will need to set up a Kubernetes cluster and install the KubernetesExecutor and related dependencies in your Airflow environment. Once this is done, you can configure your Airflow DAG to use the KubernetesExecutor and specify the resources required for each task.

Here is an example of a simple Airflow DAG that uses the KubernetesExecutor to run a Python script as a Kubernetes Pod:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.kubernetes.pod import PodOperator

default_args = {
    'owner': 'me',
    'start_date': datetime(2022, 1, 1)
}

dag = DAG(
    'kubernetes_pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

def print_hello():
    print("Hello World!")

# Define the KubernetesPodOperator
task = KubernetesPodOperator(
    task_id='kubernetes_task',
    name='kubernetes_task',
    namespace='default',
    image='python:3.7',
    cmds=['python'],
    arguments=['/app/hello.py'],
    resources={'request_cpu': '100m', 'request_memory': '256Mi'},
    is_delete_operator_pod=True,
    in_cluster=True,
    get_logs=True,
    dag=dag
)

# Set the task dependencies
task >> print_hello

In this example, the KubernetesPodOperator runs a Python script as a Kubernetes Pod and specifies the resources required for the task. The in_cluster parameter indicates that the operator should run within the Kubernetes cluster, and the get_logs parameter specifies that the logs for the task should be retrieved and stored in Airflow.

Using Kubernetes with Airflow can greatly improve the scalability and reliability of your data pipeline. It is a powerful tool that can help you manage the resources required for processing large volumes of data and ensure that your pipeline is running smoothly.

In addition to the KubernetesExecutor, Airflow also provides the KubernetesPodOperator, which allows you to define and run individual tasks as Kubernetes Pods. This can be useful for tasks that require specific resources or need to be run in a specific environment.

To use the KubernetesPodOperator, you will need to specify the image to be used for the Pod, the commands to be run, and any arguments or environment variables that are required. You can also specify resource requirements and other advanced options such as affinity rules and tolerations.

Here is an example of how you can use the KubernetesPodOperator to run a task that processes data from a file stored in a Google Cloud Storage bucket:

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = {
    'owner': 'me',
    'start_date': datetime(2022, 1, 1)
}

dag = DAG(
    'kubernetes_pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

# Define the KubernetesPodOperator
process_data = KubernetesPodOperator(
    task_id='process_data',
    name='process_data',
    namespace='default',
    image='gcr.io/my-project/process-data:latest',
    cmds=['python', '/app/process_data.py'],
    arguments=['--input-file', 'gs://my-bucket/input.csv', '--output-file', 'gs://my-bucket/output.csv'],
    resources={'request_cpu': '100m', 'request_memory': '256Mi'},
    env_vars={'GOOGLE_APPLICATION_CREDENTIALS': '/app/service-account.json'},
    secrets=[{
        'secret': 'service-account',
        'key': 'service-account.json'
    }],
    volume_mounts=[{
        'name': 'service-account',
        'mountPath': '/app/service-account.json',
        'readOnly': True
    }],
    volumes=[{
        'name': 'service-account',
        'secret': {
            'secretName': 'service-account'
        }
    }],
    is_delete_operator_pod=True,
    in_cluster=True,
    get_logs=True,
    dag=dag
)

In this example, the KubernetesPodOperator is used to run a Python script that processes data from a file stored in a Google Cloud Storage bucket. The arguments parameter specifies the input and output files, and the env_vars parameter sets the environment variable for the Google Cloud Storage authentication. The secrets and volumes parameters are used to mount a Kubernetes Secret containing the service account key file to the Pod, and the volume_mounts parameter specifies the mount path for the secret.

Using the KubernetesPodOperator in your data pipeline can give you greater control over the resources and environment in which your tasks are run, and can help to ensure that your tasks have the resources they need to run efficiently.

In addition to using the KubernetesPodOperator to run individual tasks, you can also use Kubernetes to scale your data pipeline horizontally by running multiple instances of your pipeline in parallel. This can be especially useful for tasks that are resource-intensive or have long running times.

To scale your pipeline horizontally, you can use the KubernetesHorizontalPodAutoscaler to automatically scale the number of replicas of your pipeline based on the resource usage of your tasks.

Here is an example of how you can use the KubernetesHorizontalPodAutoscaler to scale your pipeline:

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.secret import Secret

default_args = {
    'owner': 'me',
    'start_date': datetime(2022, 1, 1)
}

dag = DAG(
    'kubernetes_pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

# Define the Pod, Secret, and PodLauncher objects
pod = Pod(
    namespace='default',
    image='gcr.io/my-project/process-data:latest',
    cmds=['python', '/app/process_data.py'],
    arguments=['--input-file', 'gs://my-bucket/input.csv', '--output-file', 'gs://my-bucket/output.csv'],
    resources={'request_cpu': '100m', 'request_memory': '256Mi'},
    env_vars={'GOOGLE_APPLICATION_CREDENTIALS': '/app/service-account.json'},
    secrets=[{
        'secret': 'service-account',
        'key': 'service-account.json'
    }],
    volume_mounts=[{
        'name': 'service-account',
        'mountPath': '/app/service-account.json',
        'readOnly': True
    }],
    volumes=[{
        'name': 'service-account',
        'secret': {
            'secretName': 'service-account'
        }
    }],
    is_delete_operator_pod=True,
    in_cluster=True,
    get_logs=True
)

secret = Secret(
    secret_name='service-account',
    data_items=[{
        'key': 'service-account.json',
        'value': 'base64-encoded-service-account-key'
    }]
)

launcher = PodLauncher(
    namespace='default',
    image='gcr.io/my-project/pod-launcher:latest',
    image_pull_policy='Always',
    image_pull_secrets=[{
        'name': 'gcr-registry-key'
    }]
)

# Define the KubernetesPodOperator
process_data = KubernetesPodOperator( task_id='process_data',                 name='process_data', 
    pod=pod, 
    secrets=[secret],
    pod_launcher=launcher,
    hpa_max_replicas=10, 
    hpa_target_cpu_utilization_percentage=70,
    dag=dag )

In this example, the KubernetesPodOperator is configured to use the Pod, Secret, and PodLauncher objects that were previously defined. The hpa_max_replicas parameter specifies the maximum number of replicas that the KubernetesHorizontalPodAutoscaler should create, and the hpa_target_cpu_utilization_percentage parameter specifies the target CPU utilization percentage at which the KubernetesHorizontalPodAutoscaler should scale up or down.

Using the KubernetesHorizontalPodAutoscaler in your data pipeline can help to ensure that your tasks have the resources they need to run efficiently, even when faced with sudden spikes in demand or resource-intensive workloads.

In summary, Kubernetes can be a powerful tool for managing data pipelines built with Apache Airflow. It provides features such as resource management, self-healing, and canary deployments, and can be used to scale your pipeline horizontally to ensure that your tasks have the resources they need to run efficiently. By using the KubernetesExecutor, KubernetesPodOperator, and KubernetesHorizontalPodAutoscaler in your data pipeline, you can take advantage of the power and flexibility of Kubernetes to build reliable and scalable data processing solutions.

Did you find this article valuable?

Support Harsh Daiya by becoming a sponsor. Any amount is appreciated!