DEV Community

Raju Nekadi
Raju Nekadi

Posted on

Scheduling a BigQuery SQL script, using Apache Airflow, with an example

In my day to day work one of the most common use cases for Apache Airflow is to run hundreds of scheduled BigQuery SQL scripts. Developers who start with Airflow often ask the following questions
 
How to use airflow to orchestrate sql?

This post aims to cover the above questions. This post assumes you have a basic understanding of Apache Airflow , Python and BigQuery SQL.
 
Let’s assume we want to run a BigQuery SQL script every day at midnight.

DAG: Directed Acyclic Graph, In Airflow this is used to denote a data pipeline which runs on a scheduled interval. A DAG can be made up of one or more individual tasks.

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
 
default_arg = {'owner': 'raju', 'start_date': '2024-04-29'}
 
dag = DAG('simple-bigquery-dag',
          default_args=default_arg,
          schedule_interval='0 0 * * *')
 
bqsql_task = BigQueryInsertJobOperato(dag=dag,
                           gcp_conn_id ='bqsql_default', 
                           task_id='bqsql_task'
                           configuration={"query": {
                               "query": "{% include 
                           'sql/bqsql_sample_query.sql' %}",
                            "useLegacySql": False,}},
                           params={'project_id': MYPROJECT_ID,
                           'dataset_name': MYDATASET_NAME,
        }
 
Enter fullscreen mode Exit fullscreen mode

 
 
In the above script

0 0 * * * is a cron schedule format, denoting that the DAG should be run everyday at midnight, which is denoted by the 0th hour of every day. (note that Airflow by default runs on UTC time) gcp_conn_id is the connection id for your BigQuery SQL database, you can set this in admin -> connections from airflow UI. There you will set the username and password that Airflow uses to access your database. ‍

The SQL script to perform this operation is stored in a separate file bqsql_sample_query.sql. This file is read in when the DAG is being run.

 

CREATE TABLE IF NOT EXISTS {{params.project_id}}.{{params.dataset_name}}.Inventory
INSERT {{params.project_id}}.{{params.dataset_name}}.Inventory (product, quantity)
VALUES('top load washer', 10),
      ('front load washer', 20),
      ('dryer', 30),
      ('refrigerator', 10),
      ('microwave', 20),
      ('dishwasher', 30),
      ('oven', 5);
Enter fullscreen mode Exit fullscreen mode

 

There are 2 key concepts in the templated SQL script shown above

Airflow macros: They provide access to the metadata that is available for each DAG run. We use the execution date as it provides the previous date over which we want to aggregate the data. ref: https://airflow.apache.org/docs/stable/macros.html
 
Templated parameters: If we want our SQL script to have some parameters that can be filled at run time from the DAG, we can pass them as parameters to the task. In our example we passed the project_id (MYDATASET_NAME).
 
Conclusion
 
In this post we saw

  • How to schedule BigQuery SQL scripts using Apache Airflow
  • How Airflow connects to the database using the connection id
  • How to pass in parameters at run-time using input parameters and macros. Hope this gives you an understanding of how to schedule BigQuery SQL scripts in Airflow and how to use templating.

Some Final Words
If this blog was helpful and you wish to show a little support, you could:

  1. 👍 300 times for this story
  2. Follow me on LinkedIn: https://www.linkedin.com/in/raju-n-203b2115/

These actions really really really help me out, and are much appreciated!

Top comments (1)

Collapse
 
geazi_anc profile image
Geazi Anc

Great article! What is the Bigquery connection? How do I find it? Thanks!