At Infinite Lambda we have already helped many of our clients to setup Apache Airflow. The tool itself is great, one of the best orchestrators on the data engineering market. Although deploying it on your cloud, and tailoring it to your needs… well, the required engineering hours are still not negligible.
In terms of the execution environment, Kubernetes is hardly a question anymore. You can google and find many example repositories on how to deploy Airflow on Kubernetes or EKS (Kubernetes in AWS), but these deployments are usually fairly limited.
Wondering if there is a setup where you can run all your different resources and time-demanding workloads on Airflow? Just keep reading so you can find it out now!
Why is this deployment different?
Often when we think of Kubernetes, we might say autoscaling, flexibility, elastic environment. Basically, it does not matter what I want to execute, I just execute it without worrying about resource management.
This is very much true for bigger deployments where there is a mix of hundreds of different node types under K8s, which can fulfil various types of workloads (small container with e.g. 400 MB of memory, or big container with e.g. 6 GB of memory). Because of the autoscaling policies and containerisation technology, the bigger your server park is, the operation cost the better follow the actual usage because the unused resource is going to be negligible comparing to the total cluster resource.
This is actually not 100% true for SMEs and startups who do not have such a high resource demand. For them, choosing an inappropriate instance type, or overestimating different scenarios can end up having a 2 - 3x bigger operation cost. In order to avoid significant overspending, you have to be well aware of what is going on the cluster and how to manage your workloads.
If you want to be fully flexible and elastic, but in the same time you want to keep operating cost as low as possible, we might have a solution for you…
…which we have now open-sourced: https://github.com/infinitelambda/apache_airflow_on_eks
How does it work?
Let's assume a very simplistic model to demonstrate the different CPU and memory requirements. Imagine we have the Airflow deployment on a small instance reserving a certain amount of CPU and memory slice, but you still have some memory available.
Assume we have 4 tasks that need to be executed: task#1 and task#2 can be still executed on the same instance without triggering any autoscaling policy. Beyond that, if your workload still needs to run task#3 and task#4, it can only happen on a separate instance after the scale-up finished as you can see on the figure below.
What is going to happen if we need either a bigger CPU and memory demanding job than what our current instance will accommodate for?
It is actually never going to be executed unless we specify another node group in Kubernetes with a bigger instance. So now we have 2 node groups: a small one and a big one, where the autoscaling policy applies to both of them. This cluster can now handle any kind of task because both the “small” and the “big” resource demanded job can be now satisfied with the cluster. The “small“ could be executed on any instance type, meanwhile the “big“ can only run on the bigger one.
What are its limitations?
One point worth mentioning here. By default, each node group has to have a minimum of 1 instance running all the time. It does not matter whether you run something on it, or it is basically empty and waiting for your request, you still have to pay for it. And now the question is the use case: What if you have 5 small jobs running twice every day and you have 1 big job running once a week? In that case, you are going to have 2 instances always up and running, the small one’s usage is acceptable (depending on execution time), but the big instance usage will be extremely low. (If it's a half-hour job, then its 0.29% usage). Why should we pay for something which is not being used?
Using Fargate as an executor
A solution to resolve this issue is using Fargate as an execution engine on EKS, AWS launched it in December 2019. What it means is that considering the use case above, using Fargate as an execution platform for the ad-hoc or very rare jobs, you just pay for the resource that you actually are using, since the Fargate pricing is based on CPU per hour, and memory GB per hour.
Can we achieve everything with the same deployment?
Yes! We have open-sourced a solution where you can define in Airflow KubernetesPodOperator what node group or execution platform you want to use run your workload on. You have full control over resource management, therefore no difficulty allocating enough resources to the job any more, and also no surprising AWS bills.
Example:
I’m going to demonstrate this solution with a use case where a mix of the different platforms is used. In this case, Airflow is deployed as a data platform ingestion solution whereby a job (JobAPI) runs every hour and triggers an external job by calling the API. Since no data is flowing through this container, its resource requirements are very low. (To give a more accurate requirement estimation, once we are done with the development of the docker image, we get the CPU and memory requirements executing the docker stats command locally). There is another job JobDatabaseTable running 3x a day and running for 10 database tables (=30 executions), that selects millions of records from the source database table, and ingests it to our destination. Since data is now flowing through, we assign bigger requirements for this job. And here we have the last job called JobClassification, that is running either in an ad-hoc manner or once a week, using python data science libraries to classify a dataset.
Side-note: When using Airflow it is good practice to use KubernetePodOperator solely which executes a bash job. The launcher.sh script normally has a wrapper around the python code for ingestion
First of all, we have to specify resource requirements.
[php]JobAPI -> resources = {'request_cpu': '0.50',
'request_memory': '0.7Gi'}
JobDatabaseTable -> resources = {'request_cpu': '1.00',
'request_memory': '4Gi'}
JobClassification -> resources = {'request_cpu': '4.0',
'request_memory': '8Gi'}[/php]
After that, we have to specify which platform they should run on. In KubernetesPodOperator the namespace argument allows us to do that. We could set up 'fargate' for the Fargate profile, and we could create 'small' which would mean the node group of small instances (e.g: t2.small), and similarly, we could create 'big' for e.g. t2.xlarge node group.
In our current solution, we use 'fargate' for Fargate and 'default' for the rest of the EC2 node groups, so the assignment will be automatic using the default namespace. We can still stick to a specific instance type using an extra argument of
node_selectors={'beta.kubernetes.io/instance-type': 't2.small'},
so that we can be sure that the workload is actually running on the correct instance.
[php]task = KubernetesPodOperator(
namespace='default',
# By default, Kubernetes placed this DAG on the medium node, but
# we wanted to run this on the small one,
# so we have given a label here that is only
# true for the small instance.
# The node labels can be checked
# with running 'kubectl get node --show-labels'
node_selectors={'beta.kubernetes.io/instance-type':
't2.small'},
# You need to specify the Dag ECR Repository URL here
image="",
image_pull_policy="Always",
is_delete_operator_pod=True,
name=DAG_NAME,
in_cluster=True,
task_id=DAG_NAME,
cmds=["/bin/bash", "-c"],
arguments=["source /usr/local/airflow/venv/bin/activate && /usr/local/airflow/ci/launch_memory_user_light.sh"],
startup_timeout_seconds=600,
resources = {'request_cpu': '0.50',
'request_memory': '0.7Gi'},
get_logs=True,
default_args=default_args )[/php]
Operator definition for the EC2 node groups
[php]task = KubernetesPodOperator(
# Specifying a DAG to run in
# fargate is done through changing to the 'fargate' namespace,
# because this is the namespace Fargate operates in.
namespace='fargate',
# You need to specify the Dag ECR Repository URL here
image="",
image_pull_policy="Always",
is_delete_operator_pod=True,
name=DAG_NAME,
in_cluster=True,
task_id=DAG_NAME,
cmds=["/bin/bash", "-c"],
arguments=["source /usr/local/airflow/venv/bin/activate && /usr/local/airflow/ci/launch_memory_user_fargate.sh"],
# Fargate needs increased startup time
startup_timeout_seconds=600,
resources = {'request_cpu': '4.0',
'request_memory': '8.0Gi'},
get_logs=True,
default_args=default_args )[/php]
Operator definition for the Fargate profile
Summary
At Infinite Lambda we have helped many of our clients in Airflow deployments where not just the flexibility but also the operation cost played an important role. This solution provides a Swiss Army Knife deployment that optimises for both. It is prepared for all the possible use-cases, everything is in one place.
As a bonus, our repository also contains Grafana and Prometheus stack on top of Airflow, meaning that monitoring the resources and the various instances is now straightforward.
Check it out: https://github.com/infinitelambda/apache_airflow_on_eks
Feel free to send us some feedback or raise PR: we are open to discussing contributions and collaboration!