Advanced Google Cloud Composer Pipeline — Cloud Function Trigger
The Google Cloud Platform offers many different services that when combined, can provide a variety of value-creation tools for organizations. One of such is the ability coordinate tasks using Google Cloud Composer, a powerful orchestration tool which allows you to combine several Google Cloud Platform tools and convert them into sequential, time or event triggered tasks.
Today, we will be working on using Cloud Composer to execute a Dataflow pipeline triggered by an event i.e. the input of a file into a Cloud storage bucket. The inspiration for this article comes from a similar training tutorial found here and the Google Cloud tutorial on this subject found at this link.
- Resources for Completion
a. The CSV file for this tutorial (usa_names.csv) can be found by following this link.
b. The DAG file for our Cloud Composer Environment can be found here.
c. The Dataflow pipeline for this tutorial can be found here.
2. Goal of the Exercise
We want to create a Cloud Function which can trigger a dataflow pipeline once a new CSV file is uploaded in our Cloud Storage bucket. The input is converted in our pipeline and uploaded into a BigQuery table where it is available across the organization. This is useful in situations requiring up-to-date information which can be easily queried using the BigQuery Data Warehouse.
Step 1: Create Cloud Composer Environment
From your navigation menu, go to your Composer tab. There, you will select “Create” and input the following parameters. Leave the other parameters at their default setting:
a. Name: mlcomposer
b. Location: us-central1
It will take between 20–25 minutes for the environment to spin up. You can move on to the other steps in the meantime.
Step 2: Set the Environment Variables
In your Cloud Shell terminal, define these environment variables for your project:
PROJECT = 'your-project-id' # REPLACE WITH YOUR PROJECT ID
REGION = 'us-central1' # REPLACE WITH YOUR REGION e.g. us-central1
Step 3: Create your Cloud Storage Buckets
Navigate to your Cloud Storage tab in the navigation menu and create 2 storage buckets namely “project-id_input” and “project-id_output” by selecting “Create Bucket”. Replace the “project-id” with the id of your project. Select location type as us-central1 and leave all other settings at their default value.
As an added step, once the project-id_output bucket is created, select it and create a folder titled ‘tmp’ in it.
Step 4: Create your BiqQuery Dataset and table
Select BigQuery from the navigation menu and proceed to create a dataset called ml_pipeline in your current project. Make sure to indicate us-central1 as your data location. Leave other settings at their defaults.
Next, create a table under your ml_pipeline dataset. Give it the name ingest_table and define the following schema by selecting the cross icon and adding these field name and types one after the other:
state: STRING, gender: STRING, year: STRING, name: STRING,
number: STRING, created_date: STRING, filename: STRING, load_dt: DATE
Press the “Create table” button after you are done. Leave the other settings at their default setting.
Step 5: Upload files to your Cloud Storage Buckets
We will now proceed to upload our DAG file and dataflow pipeline to our cloud storage buckets. Navigate back to your composer tab in the navigation menu. At this point, your Cloud Composer should be ready for use as indicated by the green checkmark next to its name. Select the DAG’s folder button on the far right and it should take you to the cloud storage folder created specifically for your composer environment.
Here, you will upload the simple_load_dag.py file you downloaded from the link provided in the resources section above. Next, create a folder titled ‘dataflow’ in the same bucket by selecting the ‘Create Folder’ button. In that folder, you will upload the process_delimited.py file provided in the resources section above. You can inspect the files uploaded through the links provided to understand what the dataflow pipeline and DAG file are doing to execute their tasks. Leave the files as they are as they reference variables you will create in the next step.
Step 6: Set the Variable values declared in your DAG file
Your DAG file references several values which need to be specified in your Airflow environment. To do this, navigate to your Cloud Composer Environment and select the Airflow Webserver link. You are asked to login to your project email account and are then taken to the Airflow website.
Navigate to Admin > Variables section, hit the cross icon and add these key — value pairs one after the other in the window that appears saving after each one:
('gcp_project', PROJECT),
('gcp_input_location', PROJECT + '_input'),
('gcp_temp_location', PROJECT + '_output/tmp'),
('gcp_completion_bucket', PROJECT + '_output'),
('input_field_names', 'state,gender,year,name,number,created_date'),
('bq_output_table', 'ml_pipeline.ingest_table')
Remember to replace “PROJECT” with your project id and ignore the quotation marks when supplying the information. For example, the key ‘gcp_temp_location’ would have ‘xxxxx-xxxx-xxx_output/tmp’ as its value pair where the xxxx’s represent your project id.
After you complete these steps, you should have a working Cloud Composer DAG when you navigate back to the home screen. You can manually trigger your dag by selecting the play button to be sure your code works so far.
Step 7: Get your Client_ID Credentials
Save the python code obtained from the “Get the client_id of the IAM proxy” section of that page and name it “get_client_id.py”. Remember to replace the variables project_id = 'YOUR_PROJECT_ID', location = 'us-central1', and composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'
with your project_id, your region (i.e. us-central1 already specified) and your composer environment name i.e. mlcomposer). Run this script in your Google Cloud shell console to get your client_id. Save it for later steps.
Step 8: Create your Cloud Function
Now that we have defined our Cloud Composer Environment, we can now create a Cloud function which will be triggered once a file hits our input storage bucket. Navigate to Cloud Functions and select the “Create Function” icon. Here, you will define the following values:
a. Name of function: gcs-dag-trigger-function
b. Region: us-central1
c. Trigger type: Cloud Storage
d. Event type: On (finalizing/creating) file in the selected bucket
e. Bucket: Select your project-id_input bucket you created.
Hit save and select next, on this screen, you will define the actual function needed to trigger your cloud composer environment.
a. In the Runtime dropdown, select Python 3.9 from the list.
b. For the entry point, write trigger_dag in the space. This specifies the function you would like to execute when the function is called.
c. In the main.py file, copy and paste the python cloud function code found in the “Add the Cloud Function Code” in Google’s Cloud documentation. In this file, you will make the following changes:
a. Because you are using Airflow 2 (launched with your Cloud Composer environment), you need to set the USE_EXPERIMENTAL_API tag to equal False since you will be using the stable REST API. Next, you will replace the client_id with the value you obtained earlier. The webserver_id can be obtained by simply navigating to your airflow webpage and taking the part of the url before the appspot.com e.g ‘12345eee7654321p-tp’. The ‘dag_name’ will be ‘GcsToBigQueryTriggered’. In the requirements.txt file, define the following dependencies required for the cloud function to work:
requests_toolbelt==0.9.1
google-auth==2.6.2
Select ‘Deploy’ when you have completed inputting these settings.
Step 9: Manually Add your Service Account as an Airflow User
Because the Airflow database limits the length of the email field to 64 characters, your default service account may not work with your cloud function. To resolve this, you can preregister an Airflow user for a service account. To do this, first navigate to the IAM & Admin tab in your navigation menu, select the App Engine Default Service account and select the pencil icon on the far right. Add the role, “Cloud Composer API Service Agent” and “Service Account Token Creator” to your account and hit “Save”. Next, run the following command in your Cloud shell environment replacing the ‘SA_NAME@PROJECT_ID.iam.gserviceaccount.com’ with the full name of your app engine service account. A numeric_user_id number will appear which you will use in the next step.
gcloud iam service-accounts describe \
SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
--format="value(oauth2ClientId)"
Lastly, you will create your Airflow User with the Op role with the following code. Run it in your Cloud shell as well replacing the ‘ENVIRONMENT_NAME’ with mlcomposer, the ‘LOCATION’ with ‘us-central1’, the ‘NUMERIC_USER_ID’ with the number you obtained in the last step and specify a ‘UNIQUE_ID’ of your choosing. One you run the code, an account email will appear which you can save for future reference.
Step 10: Test your Cloud Function
Once you have completed all these steps, its finally time to test your cloud function. In your input storage bucket, upload the usa_names.csv file available through the link in the resources section above. This file is specified in your DAG file and can be changed to whatever name you desire depending on what file you would like your DAG to listen for. Once uploaded, return to your Airflow website and click the ‘GcsToBigQueryTriggered’ DAG name. If you followed the steps correctly, you should see a DAG tree started which looks similar to this:
If you navigate to the Dataflow tab in the navigation menu, you should also see a dataflow pipeline commence and complete after a few minutes of running.
After a few minutes, navigate to the BigQuery table and query the ‘ingest_table’ you created to see if it has been populated with data.
You can also visit the Cloud Composer Audit Logs on your Airflow Website (Browse > Audit Logs) to see more details of your cloud composer DAG which was successfully triggered.
Congratulations!! You can now create a Cloud function which triggers a Dataflow Pipeline and outputs to a BigQuery table all orchestrated with Cloud Composer.
NB: Some of these steps are not covered in full in the official documentation provided by Google. They are alluded to but I discovered following the steps verbatim in the Google Cloud tutorial will cause your Cloud Function to fail each time. These steps should help you seamlessly perform this tutorial and build your own pipelines.
Also make sure you tear down and delete all the environments and functions you created so that they do not continue to consume your resources.