Running R scripts in Airflow using Airflow BashOperators
In this article, I briefly explain how you can easily create a pipeline that executes a set of R scripts using Apache Airflow.
Airflow in a nutshell
In a nutshell, Airflow is an open-source platform that allows creating, scheduling and monitoring workflows (a sequence of tasks). Although workflows are written in Python, Airflow can implement any language.
It is possible to access Airflow via a web interface, command line or code.
Core Components
The three principal Airflow core components are:
- Metadata Database: Airflow uses a SQL database to store metadata about the data pipelines being run. Basically, any database that is compatible with SQLAlchemy can be used, for example, Postgres, MySQL, SQLite, Oracle, etc.
- Web Server: a web interface that you can use to manage workflows (DAGs), the Airflow environment, and perform administrative actions.
- Scheduler: the Airflow scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
In addition to this, other important components are:
- Executor: Defines how the tasks are going to be executed.
- Worker: Defines where the tasks are going to be executed.
DAGs
In Airflow, the workflows are implemented as DAGs (Directed Acyclic Graphs). The official Airflow documentation defines a DAG in the following way:
A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
Tasks and Operators
As stated in the Airflow documentation, a task defines a unit of work within a DAG; it is represented as a node in the DAG graph, and it is written in Python.
The tasks are defined by operators. An operator describes a single task in a workflow and determines what actually gets done by the task.
Airflow provides built-in operators for many common tasks, for example:
BashOperator
- executes a bash commandPythonOperator
- calls an arbitrary Python functionEmailOperator
- sends an email
For more information about operators check this section.
Running R scripts in Airflow
Unfortunately, Apache Airflow has no support for R, i.e. there is no R operator right now (you could try to write your own and contribute to the community!).
As stated here, there are different ways to run R scripts in Airflow and maybe the best way is to containerize your R script and run it using the DockerOperator, which is included in the standard distribution. I will write about this option in a future article, but for now, I will describe how to run R scripts using BashOperators.
Getting started
First of all, create a virtual environment (with Python and R) and activate it. In this case, I created it using anaconda:
cd path/to/Airflow-R-tutorial
conda create -n my_airflow_env r-essentials r-base
conda activate my_airflow_env
Now, install apache-airflow:
sudo pip3 install apache-airflow
(Note: It is possible that you'll need to execute sudo pip3 install SQLAlchemy==1.3.18
to override the newer SQLAlchemy version.)
It is necessary to create the AIRFLOW_HOME
directory and set the environment variable AIRFLOW_HOME
. To be more explicit, I called this directory airflow_home
:
cd path/to/Airflow-R-tutorial
mkdir airflow_home
export AIRFLOW_HOME=`pwd`/airflow_home
Now check if everything is ok:
airflow version
This will create some files in the directory that you designed as AIRFLOW_HOME
.
Run the following to start the database:
airflow db init
You will see a new file airflow.db
in your home directory.
Finally, create a user:
airflow users create \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--role Admin \
--email admin@example.org
Creating your first DAG
Now we will create our first DAG containing four tasks (A, B, C and D). In this case, each task will be an R script.
Folders and files
Create the dags
and scripts
directories inside the airflow_home
directory:
cd path/to/Airflow-R-tutorial/airflow_home
mkdir dags
mkdir scripts
Now create the following files inside them:
cd path/to/Airflow-R-tutorial/airflow_home/dags
touch my_first_dag.pycd path/to/my_airflow_project/airflow_home/scripts
touch A_task.R
touch B_task.R
touch C_task.R
touch D_task.R
touch run_r.sh
The tasks
For this tutorial, I have chosen four simple tasks that may not be useful but will serve as an example. In this subsection, I will create each task as an R script.
Task A (A_task.R
file): executes a GET request to this Random User Generator API (a free and open-source API for generating random user data) and obtains a list of 200 random users from Canada. Then exports the results as a users.csv
file.
library(httr)
library(jsonlite)res = GET(
url = “https://randomuser.me/api/",
query = list(
results=200,
nat="ca",
inc="gender,name,dob"
)
)data = fromJSON(content(res, "text"))write.csv(data$results,”users.csv”, row.names = FALSE)
Task B (B_task.R
file): opens the users.csv
file and builds a bar plot(counts by gender). Exports the result as a counts_by_gender.png
file.
library(ggplot2)
# Load data
data <- read.csv("./airflow_home/scripts/users.csv", header=TRUE)
# Barplot
p <- ggplot(data, aes(x = as.factor(gender), fill = gender)) +
geom_bar(stat = "count", position = "stack") +
labs(x = "Gender",
y = "Count")
png("./airflow_home/scripts/counts_by_gender.png")
print(p)
dev.off()
Task C (C_task.R
file): opens the users.csv
file and builds a bar plot(counts by age). Exports the result as a counts_by_age.png
file.
library(ggplot2)
# Load data
data <- read.csv("./airflow_home/scripts/users.csv", header=TRUE)
# Barplot
p <- ggplot(data, aes(x = as.factor(dob.age))) +
geom_bar(stat = "count", position = "stack", fill = "#FF6666") +
labs(x = "Age",
y = "Count")
png("./airflow_home/scripts/counts_by_age.png", width = 800, height = 400)
print(p)
dev.off()
Task D (D_task.Rmd
file): opens the files generated by tasks B and C and builds a simple report html
report.
---
title: “Simple Report”
output: html_document
---```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)
```### Counts by gender
![](counts_by_gender.png)### Counts by Age
![](counts_by_age.png)
The tasks dependence is shown in the following picture: task B and C will only be triggered after task A completes successfully. Task D will then be triggered when task B and C both complete successfully.
Bash scripts and commands
The contents of the run_r.sh
file will be useful to run the A, B and C tasks:
#!/usr/bin/env Rscriptargs = commandArgs(trailingOnly=TRUE)
setwd(dirname(args[1]))source(args[1])
(Take into account that you may need to change the interpreter on line #!/usr/bin/env Rscript
)
Now you can execute the scripts containing tasks A, B and C with the following commands:
cd path/to/Airflow-R-tutorial/airflow_home/scripts
chmod u+x run_r.sh./run_r.sh path/to/scripts/folder/A_task.R # To run task A
./run_r.sh path/to/scripts/folder/B_task.R # To run task B
./run_r.sh path/to/scripts/folder/C_task.R # To run task C
To run task D use:
Rscript -e "rmarkdown::render('D_task.Rmd')"
The workflow
Now that we have all the necessary pieces, let's start coding our DAG.
The contents of the file my_first_dag.py
are the following:
import airflow
from airflow.models import DAG
from airflow.operators.bash import BashOperator
import os# Get current directory
cwd = os.getcwd()
cwd = cwd + '/airflow_home/scripts/'# Define the default arguments
args = {
'owner': 'your_name',
'start_date': airflow.utils.dates.days_ago(2),
}# Instantiate the DAG passing the args as default_args
dag = DAG(
dag_id='my_dag_id',
default_args=args,
schedule_interval=None
)# Define the 4 tasks:
A = BashOperator(
task_id='A_get_users',
bash_command=f'{cwd}run_r.sh {cwd}A_task.R ',
dag=dag,
)B = BashOperator(
task_id='B_counts_by_gender',
bash_command=f'{cwd}run_r.sh {cwd}B_task.R ',
dag=dag,
)C = BashOperator(
task_id='C_counts_by_age',
bash_command=f'{cwd}run_r.sh {cwd}C_task.R ',
dag=dag,
)command_line = 'Rscript -e "rmarkdown::render('+ "'" + f'{cwd}D_task.Rmd' + "')" + '"'
D = BashOperator(
task_id='D_html_report',
bash_command=f'{command_line} ',
dag=dag,
)# Define the task dependencies
A >> B
A >> C
[B, C] >> D
Basically, I defined the default_args and then instantiate the DAG. After that, I generated the tasks by instantiating Bash Operators objects and finally, I set up the task dependencies rules.
For more information about how to create DAGs I suggest that you visit this link.
Run your DAG
It's time to access the Airflow UI:
airflow webserver
Note: You can start this as a daemon by adding -D
to your command.
Go ahead and open localhost:8080 to access the Airflow UI.
You will see a bunch of entries here that are Airflow examples. You can always turn them off by setting load_examples
to FALSE
in the airflow.cfg
file.
Now, open a second terminal to schedule the created DAG by starting the Airflow Scheduler:
conda activate my_airflow_env
cd path/to/Airflow-R-tutorial
export AIRFLOW_HOME=`pwd`/airflow_homeairflow scheduler
The scheduler will send tasks for execution and you'll be able to find your created DAG:
If you click on it, you can see the Graph View or the Code, among other options.
You can trigger the DAG by clicking on the 'play' button in the upper right corner. If everything works fine, you should see how your pipeline is executing and after that, all files generated during the execution will appear in the defined directory:
If you get an error in some task, it will appear in red (as failed). You can always click on it and check the logs to see what is the problem.
This is the report created by the final task D:
Summary
In this article, I walked through the many steps of getting started with Apache Airflow, creating your first DAG and execute a workflow composed of tasks defined in R scripts and executing them using Airflow BashOperators.
The complete code is in this GitHub repository.
I hope you find this article useful. If you have any queries you can find me at laura@fathomdata.dev or my LinkedIn.
Happy learning!
Thanks to Andrew Collier for a careful reading of the article and for making useful comments.
Laura Calcagni
Software Data Engineer