In terms of open-source workflow management tools, Apache Airflow has established itself as the de facto standard. At the time of writing this it has 1,827 contributors and 41 releases listed on its Github repo. But as is usual with software, newer implementations have arisen and one of those getting a lot of attention is Prefect. I decided to take a look and see what’s under the hood.
The Prefect ethos
Prefect (the company) was founded in 2017. The CEO is Jeremiah Lowin and he was an Airflow core contributor.
Prefect has an ethos centered around positive vs negative engineering: Engineers spend a significant amount of time handling the unexpected. And we like tooling that supports this with features like modularity, transparent implementations, built-in retries and mostly importantly good logging. The creators of Prefect understand this and have endeavoured to build a tool to support this.
Prefect’s design goal is be minimally invasive when things go right and maximally helpful when go wrong. Nice.
The Hitchhiker’s Guide
There’s also a geeky aspect to it which I enjoy which is the Hitchhiker’s Guide to the Galaxy theme. (If you haven’t read it, do, it contains one of the few famous Dents). Ford Prefect is a character in these books and though I couldn’t find any explicit reference to the book in their documentation, the fact that the search bar has a snarky Marvin and that Towel is one of the orchestrator services left me in no doubt as to the origin of the name.
What about Airflow
So how does it compare to Airflow? I will not be covering that in this article, they have a long, well-thought-out article in the documentation entitled “Why Not Airflow?”. Any challenger to Airflow is going to have to address this question, but the Prefect folk are quite explicit about it. Perhaps in a future blog post I can do a feature-by-feature comparison.
Here I’ll run through some of the core concepts of Prefect. They have excellent documentation with many code samples. My intention here is to show the simplicity of using Prefect and to look a little deeper in terms of their implementation.
from prefect import task
return x + y
A Task is a discrete action in a workflow. It has an optional input and an optional result. It just requires decorating a function 🎄. Simple.
from prefect import task, Flow
return random.randint(0, 100)
return x + y
with Flow('My Functional Flow') as flow:
x = random_number()
y = random_number()
sum = add(x,y)
from prefect import task, Flow, Parameter
print(x + 1)
with Flow('Parameterized Flow') as flow:
x = Parameter('x', default=2)
flow.run(parameters=dict(x=1)) # prints 2
flow.run(parameters=dict(x=100)) # prints 101
flow.run() # prints 3
These are special tasks that receive user input and allow defaults. This is one of the differentiators from Airflow, more here.
A Schedule object can be attached to a flow and allows for complex scheduling configurations.
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule
schedule = IntervalSchedule(interval=timedelta(minutes=2))
with Flow("Hello", schedule) as flow:
This is the currency of Prefect. State objects represent information about running tasks or flows. At any moment, you can learn anything you need to know about a task or flow by examining its current state or the history of its states.
There are three main state types,
Finished. There are subclasses of these states as can be seen above. There is also
MetaState which enhance existing states.
The documentation gives a really good coverage of states. What was most interesting for me is the ability to define state change handlers which allows for really fine grained monitoring and alerts.
Runners and Executors
This is going a bit beyond the “basics” but helped with understanding the underlying processing of tasks and flows
There are two classes, FlowRunner and TaskRunner:
- FlowRunner takes a flow and attempts to run all its tasks, followed by collecting the resulting tasks and if all are complete, returning the final state. If a task is unfinished it will loop through the tasks again and operate on them based on their states (so if a task is finished it will not be run again)
- FlowRunner may also receive parameter values if they have been specified
- TaskRunner executes a single task. It determines from the initial state and any upstream states if the task should be run. Mapping is also handled here which generate dynamic task runners. Post processing is performed to determine the need for a retry (or caching)
Executors run the tasks and support
wait functions. Various types are supported, from a synchronous local process to a completely separate Dask engine for asynchronous processing.
Give it a try!
All of this can be setup and run with a single python package. No extra setup or dependencies.
If you want to give it a try:
- clone the Prefect repo from here and in the terminal open
- install required packages including the Prefect package with
pip install -r requirements.txt(preferably within a Python virtual environment)
To get a feeling for the functionality I do recommend following the full tutorial.
But wait, there’s more!
So perhaps you’re happy with a single flow running happily in a container. But you might want a few more things like:
- managing and visualising multiple flows
- a fancy UI for running and monitoring
- a GraphQL API for integration with other services
And you can have it with Prefect Cloud or Prefect Server.
This is a managed backend that offers the goodies above but also:
- permission and authorization management
- agent monitoring
They support a free tier with 3 users and 10,000 runs per month. And it supports a hybrid configuration in which your data and code are kept private. The execution can be performed on agents within your own infrastructure.
In many projects there’s no possibility to connect to services outside of an internal network. For this there’s Prefect Server which is an open source backend containing:
- a scheduler
- a web server
- a metadata database
You can run this locally with ease. If you have Docker installed you can follow the tutorial below to get a feel for the functionality supported.
Testing out Prefect Server
If you haven’t done so, install prefect with
pip install prefectand clone the Prefect repo from here
Run the following to change the backend to Prefect Server (the default is Prefect Cloud):
prefect backend server
In a terminal start the prefect server. This will spin up a number of containers
prefect server start
In another terminal start an agent.
prefect agent local start
In another terminal, create a project with
prefect create project "conditional"
Ensure that the project has been created by going to
localhost:8080in a browser and selecting the “conditional” project from the dropdown
Open the folder
examples/tutorialand register the conditionals flow with
prefect register --project conditional --path conditional.py --name "Example: Conditional Tasks"
You should now be able to see the flow in the project under “FLOWS”: select it.
A couple of things to try out:
- Run the flow by selecting the flow and then “QUICK RUN” and watch the live updates.
- Visualise the tasks of the flow by selecting “TASKS”.
- Visualise the DAG by selecting “SCHEMATIC”.
- See the details of a specific run by selecting “RUNS” and selecting a run.
Their deployment architecture looks as follows:
This allows for the use of Prefect Cloud for managing and visualising one’s flows. This would still require the provisioning of the execution environment and additional storage and monitoring. They provide guidance on agent provisioning which should cover most cloud setups.
If deploying to a closed environment one would then need to deploy Prefect Server. This could be deployed on a single node similarly to how it can be run locally (which uses
docker-compose under the hood). A more elaborate deployment would be to a Kubernetes cluster and they provide a Helm chart to support this. It is also possible to configure use of a separate Postgres instance instead of the one running in a container, more here.
I have unfortunately not been able to test out the deployment and will hopefully cover it in a future post.
Any decent workflow management tool should have a plethora of options for connecting to external services and Prefect is no exception. The Task Library has an overview and more details can be found by select “prefect.tasks” in the API page.
What I liked
- The minimally invasive goal: Prefect doesn’t force one to conform to use of a specific API or code style
- Simplicity: I could quickly grasp the base concepts
- The introduction tutorial: clearly explained and I could get up and running quickly
- The documentation in general: it’s extensive and well written
What I liked less
- The community is still small: while searching online for details about a the Shell Task I couldn’t find many posts. This will obviously change as the adoption increases
- I did struggle getting Prefect Server up an running as the default is set to Cloud and I missed the command that does the switch
- The ETL example passes data between functions. This is good for demonstrating the data passing possibilities but I can’t foresee doing something similar for large datasets. This can, of course, be done using something like the Databricks Task or perhaps using a combination of Fivetran and DBT
Overall I enjoyed working with Prefect and look forward to using it in future projects.