Welcome to another article on DevOps and DataOps related topics. I am going to show you how we automated Prefect flow deployments on Azure Kubernetes Service with the help of Azure DevOps.
In order to be able to follow along, you need to have knowledge in:
- Azure Cloud
- Azure DevOps
- Azure Kubernetes Services (AKS)
- Data engineering
What is Prefect and why should I use it?
Prefect is a workflow management system or WfMS for short. A WfMS is simply an orchestrator of tasks.
A Task represents a discrete action in a Prefect workflow.
Prefect comes in two flavours: a Cloud version and an open source server. In this article we are going to review how to automatically register flows to a Prefect server.
The above is a typical ETL pipeline where we Extract, Transform and Load the data. These so called tasks (steps) are written in Python and we bundle them in a flow.
A flow is a container for tasks. It represents an entire workflow or application by describing the dependencies between tasks.
Here is an example of a flow that is written in Python:
If you are coming from the AirFlow world, then this looks pretty familiar to you. In fact, flows are just a term used in Prefect to describe a DAG.
We chose to use Prefect because it provides a nice UI and writing complex flows has never been easier. It also has great documentation and a large community. It supports templates and parameters for our flows in order to make them dynamic.
How does Prefect work?
Prefect’s architecture implements the hybrid model where we have all of the components that compose the Prefect server. These components can be allocated to any cloud provider. The Agents that are actually running the flow can be kept on our data premises or (even better) on the laptops of our data engineers.
This adds security to our flows because the server backend doesn’t keep any data and only works with metadata. Agents are actually running the flows.
In order for the Agent to pick up a flow and be able to execute it, this flow needs to be registered.
This way we are letting the Apollo endpoint know about this flow. The Apollo endpoint receives information about the flow’s metadata, the storage, the labels, the runner and the executor. We will get to these in a minute.
Here is a brief overview of each one of the microservices that compose Prefect as a server:
- UI: the user interface that provides a visual dashboard for mutating and querying metadata.
- Apollo: the main endpoint for interacting with the server. This means that both the Agents and UI need access to in order to be able to register, run and visualise flows.
- GraphQL: the server’s business logic that exposes GraphQL mutations (changes).
- Hasura: the GraphQL API that layers on top of Postgres for querying metadata from the database.
- PostgreSQL: the database persistence layer where metadata is stored.
- Towel: runs utilities that are responsible for server maintenance:
- Scheduler: schedules and creates new flow runs.
- Zombie Killer: marks task runs as failed if they fail to heartbeat.
- Lazarus: reschedules flow runs that maintain an unusual state for a period of time.
When deploying flows using Prefect, some additional configuration is required:
- Storage: describes where the flow should be stored and loaded from during execution by the agent
- Run Configuration: describes where and how a flow run should be executed
- Executor: describes where and how tasks in a flow run should be executed.
You can find more information about the storage runner executor in the Prefect official documentation.
Prefect server installation is fairly easy and works well for local development where you would like to have a local storage with a local agent that communicates with a Prefect server somewhere in the cloud. You only need to write your flows and register them, then you will be able to access them from the UI and eventually run them. However, this is not a desirable course of action because DataOps principles are not followed.
The expected outcome is to have both the agent, UI and the Prefect server running in a Kubernetes cluster and be able to register flows from a CI/CD pipeline. This would contribute to traceability and security and after some basic tests give us the confidence that our flows meet certain criteria before we deploy them.
We want to have 3 distinguished environments: dev, stage and production. The pipeline has to be able to dynamically deploy to each environment, based on the branch name or any other criterion.
Lets have a closer look at our setup
Our repository is available here: https://github.com/infinitelambda/prefect-pipeline.
One thing you will notice is that we have divided the code in different files to make it more dynamic and easier to maintain.
For this solution we chose to use a Prefect agent of type Kubernetes.
This agent is running inside Azure Kubernetes Service alongside the Prefect server. This way we can use the Run Configuration of type Kubernetes in our utils/config.py:
As you can see above, during registration it will store the Flow inside Azure Blob Storage from where the Agent will be able to download it and execute it.
We are also providing a custom image for our flow that is built and stored to ACR from the pipeline automatically. This way, we will always have the latest tasks/ and utils/ without having to register the flow again.
We are also providing a custom job_template.yaml in order to tell the Prefect Agent to download the image by setting the value of imagePullPolicy to Always. This action should happen whenever the Prefect Agent runs flows in Kubernetes jobs. We are doing this because the default value of imagePullPolicy is IfNotPresent and this results in not downloading the latest image.
To make it easy to understand, let’s use Pokemon as an example:
The code is pretty self-explanatory. We are simply fetching a Pokemon list from the web, transforming this list and loading it somewhere. Inside the Flow, we are going to call each one of these Classes representing an end-to-end ETL pipeline.
Let’s have a look at the flow.py:
In this flow, we are importing the pokemontasks we saw earlier and we are calling each one of the classes inside. We extract the list of Pokemon and transform the result, which will be loaded in the form of an output.
Infinite Lambda’s way to register Flows with CI/CD
This is a blueprint containing instructions, which can be adjusted to your needs by adding more complexity, security, features, etc.
Below you’ll find a detailed list of what is achieved with this approach:
- We keep the data pipelines as code the same way we treat our infrastructure.
- We follow GitOps principles where Git is the single source of truth.
- We can dynamically import tasks and libraries external to the flow file (because by default, the flow expects to find them locally in the host during registration and execution).
- Registration of the flows only happens when the flow files actually change. Files should not be registered all the time because this is not efficient and it will unnecessarily bump the version of the flows.
- There is scalability for both tasks and flows, meaning the capability to handle as many as we have in separate processes. We achieve that thanks to the Kubernetes runner that spawns a Kubernetes Job for every flow.
Now we have everything necessary in order to be able to deploy flows. In this architecture, we have only one environment for simplicity, the dev one, and we are going to deploy flows on every push. It doesn’t matter if it’s a feature or a main branch, although it’s a good practice to match the environment with the branch, for instance the production environment and the main branch.
The above CI/CD pipeline will import pipeline yml templates that we have written earlier in order to reuse code between different pipelines. In this example, we only have one pipeline but if you have more, you can benefit from the template approach.
As you can see from the templates/register-flows.yml, the script will try to register the flows inside the flow/ directory and if a flow is not present in the project, it will register it. It also checks if the present flows are changed thanks to the –skip-if-flow-metadata-unchanged.
This is a newly added parameter to the register command, which will update the flow only (re-register it) if there has been a meaningful change surrounding the flow structure.
In other words, when we register a flow, metadata of that flow will be saved to the prefect server thanks to the idempotency key that Prefect uses internally. The script will check against this metadata if any changes occurred to the flow file from the last registration. If any changes took place, then the script will re-register the flow. The same script will also bump the version of the flow.
The following will happen in this pipeline:
- The pipeline will be triggered on push
- It will iterate over the files in the flows directory and it will try to register every single flow
- It will build a docker image with the latest tasks and utils and push it to ACR from where the Kubernetes jobs will pull it and deploy it.
In a nutshell, when we schedule the flow to run, the Prefect Agent will download the flow from the Storage Account as well as the image from ACR and will run the Job.
If you look at the Azure Pipeline UI you will see the following:
If everything has run successfully, upon opening the Prefect UI, you should see the following:
What if you wanted to change the number of Pokemons we fetch in tasks/pokemontasks.py from 151 to 10?
Well, you can simply make the change, commit and push. The pipeline will detect that you’ve changed a file inside tasks/ and it will trigger the pipeline. The Docker image will be built containing the new file tasks/pokemontasks.py and pushed to the ACR repo. So, when you run the flow, it will download that latest image from ACR and execute the flow inside it. The flow will have the latest task imported.
The main benefit of this approach is that the flow doesn’t register again because we haven’t changed anything in the flow file. This allows us to develop tasks separately from flows.
In this article, we saw how to automate the process of registering flows by using Azure Pipelines, which allows us to follow the DataOps principles.
If you want to find out more, reach out at email@example.com or check out our other blog posts.