Skip to content

Anant/example-cassandra-etl-with-airflow-and-spark

Repository files navigation

Cassandra ETL with Airflow and Spark

In this walkthrough, we will cover how we can use Airflow to trigger Spark ETL jobs that move date into and within Cassandra. This demo will be relatively simple; however, it can be expanded upon with the addition of other technologies like Kafka, setting scheduling on the Spark jobs to make it a concurrent process, or in general creating more complex Cassandra ETL pipelines. We will focus on showing you how to connect Airflow, Spark, and Cassandra, and in our case today, specifically DataStax Astra. The reason we are using DataStax Astra is because we want everyone to be able to do this demo without having to worry about OS incompatibilities and the sort. For that reason, we will also be using Gitpod

For this walkthrough, we will use 2 Spark jobs. The first Spark job will load 100k rows from a CSV and then write it into a Cassandra table. The second Spark job will read the data from the prior Cassandra table, do some transformations, and then write the transformed data into a different Cassandra table. We also used PySpark to reduce the number of steps to get this working. If we used Scala, we would be required to build the JAR's and that would require more time. If you are interested in seeing how to use the Airflow Spark Submit Operator and run Scala Spark jobs, check out this walkthrough!

If you have not already opened this in gitpod, then CTRL + Click the button below and get started!

Open in Gitpod

Note: Gitpod will start with two terminals side-by-side. Always use the first one (labeled "all-commands"), except when specified otherwise.

1. Set up DataStax Astra

ASTRA DB is the simplest way to run Cassandra with zero operations at all - just push the button and get your cluster. No credit card required, 40M read/write operations and about 80GB storage monthly for free - sufficient to run small production workloads. If you use up your credits the databases will pause, no charge, and you will be given the option to upgrade to a higher tier.

1a. Create your Astra account & Database

Leveraging Database creation guide create a database. Right-Click the following button with Open in a new TAB.

Field Value
Database Name workshops
Keyspace Name airflowdemo
Regions Select GOOGLE CLOUD, then an Area close to you, then a region with no LOCK 🔒 icons: the LOCKed regions are the region not accessible to the Free Tier.

ℹ️ Note: If you already have a database workshops, simply add a keyspace airflowdemo using the Add Keyspace button on the bottom right hand corner of the DB Dashboard page. You may have to "Resume" the database first in case it is in "hibernated" state.

While the database is being created, you will also get a Security token (needed to authenticate with your database and start using it): please IGNORE THIS ONE, as we will be soon creating a new, more powerful token for today.

The status will change from Pending to Active when the database is ready, this usually only takes 2-3 minutes.

1b. Create a security token

Note: this step is very important, as the token generated automatically for you with the database lacks some permissions we'll use in the workshop.

Create a token for your app, using the "Database Administrator" role. Keep it handy for later use (best to download it in CSV format, as the values will not be visible afterward). This will provide authentication later when interacting with the database. Keep in mind that all three strings will be needed today (Client ID, Client Secret, Token).

⚠️ Important

The instructor will show the token creation on screen,
but will then destroy it immediately for security reasons.

1c. Install Astra CLI

Astra CLI is a command-line utility to interact with Astra in several ways. As you'll be using it a few times in the following, first run the installation in :

curl -Ls "https://dtsx.io/get-astra-cli" | bash

Run the setup, providing the "Token" (the AstraCS:... string) when prompted:

source ~/.bashrc      # required on any terminal created before installation
astra setup

As a test, you can run

astra db list

or even the DB creation command (which will be a no-op if you created the DB already)

astra db create workshops -k airflowdemo --if-not-exist --wait

You are good to go - to find out more about Astra CLI, have a look at Awesome-Astra.

1d. Download the Secure Connect Bundle

Use the CLI to download the Secure Connect Bundle to access the DB:

astra db download-scb workshops -f secure-connect-workshops.zip
Show me how to do this without Astra CLI

On your Astra DB Dashboard:

  • locate your workshops database and click on it;
  • find and open the "Connect" tab;
  • find the "Drivers" options and make sure not to choose "Cloud Native";
  • select any language (irrelevant) and click the "Download Bundle" button below it;
  • drag-and-drop the zip file from your computer to Gitpod, renaming it to secure-connect-workshops.zip if needed and checking it is in the repo's root folder.

1e. Run a short CQL setup script

Use the CLI to launch a small script to create a couple of tables in your database:

astra db cqlsh workshops -f setup.cql
Show me how to do this without Astra CLI

On your Astra DB Dashboard:

  • locate your workshops database and click on it;
  • find and open the "CQL Console" tab;
  • wait a couple of seconds and a CQL console should open in the browser;
  • open setup.cql in the Gitpod editor and paste its contents to the CQL Web console.

2. Set up Airflow

We will be using the quick start script that Airflow provides here. You will be asked to configure a password for the "admin" user, which will be needed later to access Airflow's Web interface. Do not forget what you are entering!

Note: Run this command on the second Gitpod terminal (labeled "run-airflow"), as this will not return control to your prompt. You can switch the active terminal, if needed, through the switcher on the lower-right panel of Gitpod.

# Run on the "run-airflow" console!
bash setup.sh

3. Start Spark in standalone mode

3.1 - Start Master

(Get back to the "all-commands" console.) Start the Spark Master with:

# Run on the "all-commands" console!
./spark-3.0.1-bin-hadoop2.7/sbin/start-master.sh

3.2 - Start worker

Open port 8081 in the browser (you can do so by running gp preview --external `gp url 8081` and checking your popup blocker), copy the master URL, and paste in the designated spot below

Show me

Locating the Spark Master URL

./spark-3.0.1-bin-hadoop2.7/sbin/start-slave.sh <master-URL>

Refresh the Spark Master UI (the page on port 8081 you just opened). Check that a "Worker" has now appeared in the list.

Show me

A worker has appeared

4. Move spark_dag.py to ~/airflow/dags

4.1 - Create ~/airflow/dags

mkdir ~/airflow/dags

4.2 - Move spark_dag.py

mv spark_dag.py ~/airflow/dags

5. Update the TODO's in properties.config with your specific parameters

gp open properties.conf

Paste your master URL and, in the username/password fields, the "Client ID" and "Client Secret" found in your Database Administrator token.

6, Open port 8080 to see Airflow UI and check if example_cassandra_etl exists.

To open the address, you can run gp preview --external `gp url 8080` (and then check your popup blocker). The credentials are admin and the password you chose when you ran the setup.sh script.

If it does not exist yet, give it a few seconds to refresh.

7. Update Spark Connection, unpause the example_cassandra_etl, and drill down by clicking on example_cassandra_etl as shown below.

7.1 - Under the Admin section of the menu, select Connections, then spark_default and update the host from the default (yarn) to the Spark master URL found earlier as shown below. Save once done.

Show me

Opening the connection settings in Airflow

Saving the Spark Master URL in Airflow

7.2 - Select the DAG menu item and return to the dashboard. Unpause the example_cassandra_etl, and then click on the example_cassandra_etllink.

Show me

Unpausing the ETL job in Airflow

8. Trigger the DAG from the tree view and click on the graph view afterwards as shown below.

Show me

Triggering the DAG in Airflow

9. Confirm data in Astra DB

9.1 - Check previous_employees_by_job_title

Run a SELECT statement on the database table to check it has been populated.

You can run it directly in your console with:

astra db cqlsh workshops -e "select * from airflowdemo.previous_employees_by_job_title where job_title='Dentist' limit 20;"
Show me how to do this without Astra CLI

On your Astra DB Dashboard:

  • locate your workshops database and click on it;
  • find and open the "CQL Console" tab;
  • wait a couple of seconds and a CQL console should open in the browser;
  • Paste the following and click Enter: select * from airflowdemo.previous_employees_by_job_title where job_title='Dentist' limit 20;

9.2 - Check days_worked_by_previous_employees_by_job_title

Similarly as above,

astra db cqlsh workshops -e "select * from airflowdemo.days_worked_by_previous_employees_by_job_title where job_title='Dentist' limit 20;"
Show me how to do this without Astra CLI

On your Astra DB Dashboard:

  • locate your workshops database and click on it;
  • find and open the "CQL Console" tab;
  • wait a couple of seconds and a CQL console should open in the browser;
  • Paste the following and click Enter: select * from airflowdemo.days_worked_by_previous_employees_by_job_title where job_title='Dentist' limit 20;

Conclusion(s)

And that will wrap up our walkthrough. Again, this is a introduction on how to set up a basic Cassandra ETL process run by Airflow and Spark. As mentioned above, these baby steps can be used to further expand and create more complex and scheduled / repeated Cassandra ETL processes run by Airflow and Spark.