How to configure Google Cloud BigQuery connection in Apache Airflow?
Contents
Creating new Airflow connections
Airflow connections are enable us to access external systems such as databases, cloud services and so on. We need to create Airflow connection id with hostnames, port, credentials, etc. This connection id is referred in the workflow. The connections can be defined in the following ways
- in environment variables
- in an external Secrets Backend
- in the Airflow metadata database (using the CLI or web UI)
Airflow connections are the recommended way to store secrets and credentials used in workflows. In this tutorial, we will create a Google BigQuery connection in Airflow web UI. Then we will execute a BigQuery SQL using that connection.
Prerequisite to configure BigQuery connection in Airflow
- Service account
- Service account key
- role – The given service account should have the BigQuery Job User role, which holds the permission to run BigQuery jobs.
Configure BigQuery connection in Airflow
Step 1 : Select Connections from Admin
The Airflow web UI has an option to add the connections. First go to Admin -> Connections page
Next click the plus(+) sign to add new connection
Step 3 : Mention connection parameters
In the Add Connection page, specify the connection parameters as below.
- Connection id – It can be anything. We have specified the connection id as rc_gcp_bq_conn.
- Connection Type – To connect BigQuery, we need to select Google Cloud from the drop down menu.
- Keyfile Path – As we mentioned earlier, we should have the service account key file for this connection.
- Airflow running in Docker Container – We map the specific local directory on Apache Airflow Docker volumes. The service account key file should be placed in the corresponding directory. The Airflow docker volume referred as Keyfile path here.
- Airflow running in GCP Cloud Composer – The file should be placed in the corresponding GCS bucket. The directory mapping between Cloud storage and Airflow is defined in this page. The mapped airflow directory is referred as Keyfile path here.
- Project Id – The GCP project id of the service account should be mentioned in this field.
Step 4 : Save new connection
Finally click the Save button to add this connection in Airflow
Now we can view the Airflow connection entries as below. From here, we can easily edit or delete the connection. The connection id rc_gcp_bq_conn can be used in the Airflow DAG to run the BigQuery.
Example: BigqueryInsertJoboperator with connection id
In the DAG code, we need to set a gcp_conn_id with our connection id rc_gcp_bq_conn as below. So that the task will use corresponding BigQuery connection to run the queries.
1 2 3 4 5 6 7 8 |
# Set a gcp_conn_id to use a connection that you have created. run_sql = BigQueryInsertJobOperator( task_id="run_sql1", configuration={"query": {"query": src_sql, "useLegacySql": False}}, gcp_conn_id="rc_gcp_bq_conn", location="us-central1" ) |
Complete DAG code to run BigQuery
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
#step 1: Import required libraries. from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator from airflow.utils.dates import days_ago, timedelta from airflow.operators.dummy_operator import DummyOperator #step 2: Set default argument variable. default_args = { 'owner':'rc_user', 'retries':5, 'retry_delay': timedelta(minutes=5) } # step 3: Set BigQuery sql. src_sql = """ insert into rc_fin_test_tables.cust_info values(642,'kevin'); """ #step 4: initiate the DAG variable for all the task operators. with DAG("test_bq_conn_dag", default_args=default_args, description="Run the sample Biqguery sql", schedule_interval=None, start_date=days_ago(2), catchup=False ) as dag: #step 5:define the task action using the operator. # Set a gcp_conn_id to use a connection that we created. run_sql = BigQueryInsertJobOperator( task_id="run_sql1", configuration={"query": {"query": src_sql, "useLegacySql": False}}, gcp_conn_id="rc_gcp_bq_conn", location="us-central1" ) """ Dummy task """ dummy_task = DummyOperator(task_id='DummyOperator') #step 6:set dependency for the task run_sql>>dummy_task |
We have executed this DAG in Airflow. The task run_sql1 ran the given BigQuery SQL using connection id rc_gcp_bq_conn.
In the task log, we can see our connection id as below.
Similarly we can use other external systems connection in Apache Airflow.
Recommended Articles
References from GCP official documentation
Your Suggestions