How to schedule and run BigQuery using Cloud Composer?
Contents
Cloud Composer
Cloud Composer is a fully managed orchestration tool in GCP. It is used to author, schedule and monitor the workflows. It is built on the popular and ever-growing open source Apache Airflow project.
Apache Airflow
Apache Airflow is an open source software that allows developers to build data pipelines by writing Python scripts. These scripts, called directed acyclic graphs or DAGs, tell the Airflow engine a list of tasks to execute,
the order in which to execute the tasks, and a schedule of how often each should run.
Steps to schedule and run BigQuery using Cloud Composer
Consider that we have few queries that needs to run daily in BigQuery. Also we want to define the order of sql execution.
For that, we can create the Airflow DAG and deploy into the Cloud Composer environment. Let’s see the steps to create the workflow and execute these tasks.
Task 1
1 2 3 4 5 6 7 8 9 10 |
TRUNCATE TABLE rc_test_project.rc_fin_test_tables.compliance_base; INSERT INTO rc_test_project.rc_fin_test_tables.compliance_base SELECT Customer_Id, Application_Id, Country, Status FROM rc_test_project.rc_fin_test_tables.customer_onboard_status; |
Task 2
1 2 3 4 5 6 7 8 9 |
TRUNCATE TABLE rc_test_project.rc_fin_test_tables.compliance_analytics; INSERT INTO rc_test_project.rc_fin_test_tables.compliance_analytics SELECT Customer_Id, Application_Id, Country, Status FROM rc_test_project.rc_fin_test_tables.compliance_base; |
Step 1 : Create Cloud Composer environment in GCP
We are going to use the Cloud console to create the Cloud Composer environment. As shown below, we need to select the Composer to land the create environment page.
In this window, we need to select create environment option. In the drop down, we need to select the specific Airflow version and auto scaling option. For this example, we are selecting Composer 2 option which creates Airflow 2 with autoscaling.
Next we need to give the name for the environment , select the service account and environment resources based on our requirement.
Finally click the create button to start creating the environment. The approximate time to create an environment is 25 minutes.
Once the Cloud Composer environment is created, we can launch the Airflow web ui by selecting the Airflow web server option as shown below
The below screen is the Airflow home page which is created in Cloud composer environment.
Step.2 : Access GCS bucket to add or update the DAG
Cloud composer use Cloud storage bucket to store the DAG of our cloud composer environment. Let’s check the GCS bucket.
As shown below, GCS bucket is created with our location and environment name us-central1-rc-test-workflo-d87a7a6a-bucket.
Also we can see the Airflow configuration files in this bucket. The DAG details are present in the dags folder.
Step 3 : Create DAG in Airflow
As mentioned earlier, we are going to create two tasks in the workflow. Task1 will insert the customer onboard status into compliance base table. Task 2 will insert the compliance base records into compliance analytics table.
- task 1 : customer_onboard_status -> compliance_base
- task 2 : compliance_base -> compliance_analytics
Let’s write the DAG in Python.
Import the required libraries
1 2 3 4 |
from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator from airflow.operators.email_operator import EmailOperator from airflow.utils.dates import days_ago, timedelta,datetime |
Set the default arguments
1 2 3 4 5 6 7 |
default_args = {"owner": "RevisitClass", "start_date": datetime(2022, 8, 1), "email": "revisitclass@gmail.com", "email_on_failure": True, "email_on_retry": True, "retries": 3, "retry_delay": timedelta(minutes=5)} |
initiate the DAG variable for all the task operators
1 2 3 4 5 6 7 8 9 |
with DAG("daily_refresh_rc_bigquery", default_args=default_args, description="Run the sample Biqguery sql", schedule_interval="@daily", start_date=days_ago(2), catchup=False, template_searchpath="/home/airflow/gcs/dags/scripts" #This is the path where .sql files are placed and being refered using Jinja template ) as dag: |
run sql from template_searchpath
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
run_base = BigQueryInsertJobOperator( task_id="compliance_base", configuration={"query": {"query": "{% include 'insert_compliance_base.sql' %}", "useLegacySql": False}}, location="us-central1" ) run_analytics = BigQueryInsertJobOperator( task_id="compliance_analytics", configuration={"query": {"query": "{% include 'insert_compliance_analytics.sql' %}", "useLegacySql": False}}, location="us-central1" ) |
set dependency for the task
1 |
run_base>>run_analytics |
Let’s combine all the steps and put it in the file with the extension .py. We need to place this file in dags folder under the GCS bucket.
As shown below, we have added our DAG file daily_refresh_rc_bigquery.py in the dags folder
The Airflow web UI takes some 60 or 90 seconds to reflect the DAG changes. Let’s verify our DAG in UI.
Step 4 : Place the sql files in template path
In our DAG file, we have specified the template path as /home/airflow/gcs/dags/scripts. This path is equals to gs://bucket-name/dags/scripts.
In this path, we need to place our SQL files. So that Airflow task can fetch the sql from that path while executing it.
Step 5: Verify the task in Airflow DAG
Now we can open our DAG daily_refresh_rc_bigquery in Airflow and verify the tasks. If we click our DAG name in Airflow web UI, it will take us to the task details as below.
As per our Python DAG file, the tasks compliance_base and compliance_analytics are created under Airflow DAG daily_refresh_rc_bigquery. Also the workflow is scheduled to run daily.
If we want to check our DAG code, we can click the Code option. Airflow has set of colour notation for the tasks which are mentioned in this page.
Currently the workflow is displayed in the tree view. We can change this to graph view by clicking the Graph option. Let’s do that.
The graph view shows the task details in more clear format. It is a Directed Acyclic Graph (DAG). The tasks will run one after another.
First compliance_base will be executed. Once it is completed successfully, the next task compliance_analytics will be executed.
Step 6: Run Airflow DAG to execute BigQuery
Now we can click the RUN button in Airflow to execute the DAG. As shown below, we just clicked the Trigger DAG option under run button
The tasks are executed successfully which means it ran the queries in BigQuery.
If we want to check the task/job log, we can just click the task. It will open the pop up where we can select the log.
The records are inserted successfully in BigQuery tables compliance_base & compliance_analytics.
Task 1 :
Task 2 :
Finally we learnt to schedule and run the BigQuery using Cloud Composer in GCP.
Complete DAG code for your reference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
#step 1: Import required libraries. from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator from airflow.operators.email_operator import EmailOperator from airflow.utils.dates import days_ago, timedelta,datetime #step 2: Set default argument variable. """Run a BQ query based on SQL.""" default_args = {"owner": "RevisitClass", "start_date": datetime(2022, 8, 1), "email": "revisitclass@gmail.com", "email_on_failure": True, "email_on_retry": True, "retries": 3, "retry_delay": timedelta(minutes=5)} #step 3: initiate the DAG variable for all the task operators. #/home/airflow/gcs/dags/scripts with DAG("daily_refresh_rc_bigquery", default_args=default_args, description="Run the sample Biqguery sql", schedule_interval="@daily", start_date=days_ago(2), catchup=False, template_searchpath="/home/airflow/gcs/dags/scripts" #This is the path where .sql files are placed and being refered using Jinja template ) as dag: """ step 4 : run sqlfrom template_searchpath """ run_base = BigQueryInsertJobOperator( task_id="compliance_base", configuration={"query": {"query": "{% include 'insert_compliance_base.sql' %}", "useLegacySql": False}}, location="us-central1" ) """ run sqlfrom template_searchpath path """ run_analytics = BigQueryInsertJobOperator( task_id="compliance_analytics", configuration={"query": {"query": "{% include 'insert_compliance_analytics.sql' %}", "useLegacySql": False}}, location="us-central1" ) #step 5:set dependency for the task run_base>>run_analytics |
Recommended Articles
- How to configure Google Cloud BigQuery connection in Apache Airflow?
- Delete and Truncate statement in BigQuery
- How to run a BigQuery SQL using Python?
- How to execute a Select query on BigQuery using Java?
References from GCP official documentation
- What is Cloud Composer in GCP?
- Writing DAGs in Airflow
- Folders in Cloud Storage Bucket
- Add and Update DAGs
Your Suggestions