Running R scripts in Airflow using Airflow BashOperators

In this article, I briefly explain how you can easily create a pipeline that executes a set of R scripts using Apache Airflow.

Airflow in a nutshell

In a nutshell, Airflow is an open-source platform that allows creating, scheduling and monitoring workflows (a sequence of tasks). Although workflows are written in Python, Airflow can implement any language.
It is possible to access Airflow via a web interface, command line or code.

The three principal Airflow core components are:

  • Metadata Database: Airflow uses a SQL database to store metadata about the data pipelines being run. Basically, any database that is compatible with SQLAlchemy can be used, for example, Postgres, MySQL, SQLite, Oracle, etc.
  • Web Server: a web interface that you can use to manage workflows (DAGs), the Airflow environment, and perform administrative actions.
  • Scheduler: the Airflow scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.

In addition to this, other important components are:

  • Executor: Defines how the tasks are going to be executed.
  • Worker: Defines where the tasks are going to be executed.

In Airflow, the workflows are implemented as DAGs (Directed Acyclic Graphs). The official Airflow documentation defines a DAG in the following way:

A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

As stated in the Airflow documentation, a task defines a unit of work within a DAG; it is represented as a node in the DAG graph, and it is written in Python.

The tasks are defined by operators. An operator describes a single task in a workflow and determines what actually gets done by the task.

Airflow provides built-in operators for many common tasks, for example:

For more information about operators check this section.

Unfortunately, Apache Airflow has no support for R, i.e. there is no R operator right now (you could try to write your own and contribute to the community!).
As stated here, there are different ways to run R scripts in Airflow and maybe the best way is to containerize your R script and run it using the DockerOperator, which is included in the standard distribution. I will write about this option in a future article, but for now, I will describe how to run R scripts using BashOperators.

Getting started

First of all, create a virtual environment (with Python and R) and activate it. In this case, I created it using anaconda:

Now, install apache-airflow:

(Note: It is possible that you'll need to execute sudo pip3 install SQLAlchemy==1.3.18to override the newer SQLAlchemy version.)

It is necessary to create the AIRFLOW_HOME directory and set the environment variable AIRFLOW_HOME. To be more explicit, I called this directory airflow_home:

Now check if everything is ok:

This will create some files in the directory that you designed as AIRFLOW_HOME.

Run the following to start the database:

You will see a new file airflow.db in your home directory.

Finally, create a user:

Creating your first DAG

Now we will create our first DAG containing four tasks (A, B, C and D). In this case, each task will be an R script.

Create the dags and scripts directories inside the airflow_home directory:

Now create the following files inside them:

For this tutorial, I have chosen four simple tasks that may not be useful but will serve as an example. In this subsection, I will create each task as an R script.

Task A (A_task.R file): executes a GET request to this Random User Generator API (a free and open-source API for generating random user data) and obtains a list of 200 random users from Canada. Then exports the results as a users.csv file.

Task B (B_task.R file): opens the users.csv file and builds a bar plot(counts by gender). Exports the result as a counts_by_gender.png file.

Task C (C_task.R file): opens the users.csv file and builds a bar plot(counts by age). Exports the result as a counts_by_age.png file.

Task D (D_task.Rmd file): opens the files generated by tasks B and C and builds a simple report htmlreport.

The tasks dependence is shown in the following picture: task B and C will only be triggered after task A completes successfully. Task D will then be triggered when task B and C both complete successfully.

The contents of the run_r.sh file will be useful to run the A, B and C tasks:

(Take into account that you may need to change the interpreter on line #!/usr/bin/env Rscript)

Now you can execute the scripts containing tasks A, B and C with the following commands:

To run task D use:

Now that we have all the necessary pieces, let's start coding our DAG.

The contents of the file my_first_dag.py are the following:

Basically, I defined the default_args and then instantiate the DAG. After that, I generated the tasks by instantiating Bash Operators objects and finally, I set up the task dependencies rules.
For more information about how to create DAGs I suggest that you visit this link.

Run your DAG

It's time to access the Airflow UI:

Note: You can start this as a daemon by adding -Dto your command.

Go ahead and open localhost:8080 to access the Airflow UI.

You will see a bunch of entries here that are Airflow examples. You can always turn them off by setting load_examples to FALSE in the airflow.cfg file.

Now, open a second terminal to schedule the created DAG by starting the Airflow Scheduler:

The scheduler will send tasks for execution and you'll be able to find your created DAG:

If you click on it, you can see the Graph View or the Code, among other options.

You can trigger the DAG by clicking on the 'play' button in the upper right corner. If everything works fine, you should see how your pipeline is executing and after that, all files generated during the execution will appear in the defined directory:

If you get an error in some task, it will appear in red (as failed). You can always click on it and check the logs to see what is the problem.

This is the report created by the final task D:

Summary

In this article, I walked through the many steps of getting started with Apache Airflow, creating your first DAG and execute a workflow composed of tasks defined in R scripts and executing them using Airflow BashOperators.
The complete code is in this GitHub repository.

I hope you find this article useful. If you have any queries you can find me at laura@fathomdata.dev or my LinkedIn.

Happy learning!

Thanks to Andrew Collier for a careful reading of the article and for making useful comments.

Laura Calcagni
Data Scientist at Fathom Data

Physicist & Data Scientist at Fathom Data

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store