DEV Community

Cover image for Data Ingestion in Snowflake with Google Cloud Storage - Part II
Raj Das
Raj Das

Posted on

Data Ingestion in Snowflake with Google Cloud Storage - Part II

In this article we will go through the process to auto ingest data into Snowflake from Google Cloud Storage (GCS) using a Snow Pipe. Along the way we will understand the required concepts and tasks involved in a step by step manner.

You can refer to the official docs here.

Assumptions

  • You have an active Google Cloud account with permission to create IAM roles, GCS buckets, Pub/Sub topics.
  • You have an active Snowflake account with permission to create Database, Schema, Stage and Integration Objects.

Data Source: Open Data

A Snowpipe is an event-based data ingestion tool that speeds up the process of loading data from files as soon as they arrive at a designated staging location. We will use Google Cloud Storage (GCS) in this example to load data into Snowflake. Google PubSub will be used to notify the Snowpipe as soon as a file is placed in GCS.

As a pre-requisite, refer to the Part-I article and create all the objects mentioned there before continuing further.

Create a Topic (GCP)
We need to create a Topic in Google PubSub. A message will be published to this topic whenever a new file is uploaded into the GCS bucket/folder.

Here we are creating a topic snowpipe_gcs_topic. The gcloud CLI command below will create the topic if it does not already exist. It will also verify that the GCS bucket has permission to publish events to this topic and grant the required permission if necessary. The OBJECT_FINALIZE event is sent out when a new object is successfully created in the bucket.

gcloud storage buckets notifications create -f json gs://snowflake_gcs-stage-bucket -t snowpipe_gcs_topic -e OBJECT_FINALIZE

gcloud pubsub topics describe snowpipe_gcs_topic
Enter fullscreen mode Exit fullscreen mode

PubSub Topic created
PubSub Topic

Create a Subscription (GCP)
Next we need to create a subscription to the topic so that we get notified whenever a new message is published to the topic. Here we are creating a subscription snowpipe_gcs_sub to the topic snowpipe_gcs_topic

gcloud pubsub subscriptions create snowpipe_gcs_sub --topic=snowpipe_gcs_topic

gcloud pubsub subscriptions describe snowpipe_gcs_sub
Enter fullscreen mode Exit fullscreen mode

Subscription Details

Make sure that the subscription state is active
Subscription Active

Service Account Permission (GCP)
A Service Account is created by Snowflake when a Storage Integration Object is created. This Service Account needs to be provided with PubSub role.

Please refer to Part-I for more details on how to create a Storage Integration Object and provide appropriate permissions.
Storage Integration Object
Select the Subscription name in PubSub UI. Click on View Permissions.
Select Subscription
Click on Add Principal.
Add Principal
Input the Service Account name and add the PubSub Subscriber Role to the Service Account.
PubSub Permission
Navigate to the Dashboard page in the Cloud Console, and select your project from the dropdown list.
Dashboard
Click the ADD PEOPLE TO THIS PROJECT button.
Add People to Project
Add the service account name. From the Select a role dropdown, select Monitoring Viewer.
Monitoring Viewer
Click the Save button. The service account name is added to the Monitoring Viewer role.

Create a Notification Integration (Snowflake)
The notification integration references the Pub/Sub subscription in Google Cloud. Snowflake associates the notification integration with a GCS service account created for your Snowflake account. This service account is created when we created the Storage Integration Object.

Replace the GCP-PROJECT-ID in the below command with your own project ID.

CREATE NOTIFICATION INTEGRATION snowflake_gcp_pubsub_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/GCP-PROJECT-ID/subscriptions/snowpipe_gcs_sub';

DESC NOTIFICATION INTEGRATION snowflake_gcp_pubsub_int;
Enter fullscreen mode Exit fullscreen mode

Notification Integration

Create a Snow Pipe (Snowflake)
Lets create a Snow Pipe which will auto-ingest data into the Snowflake table as it arrives in the GCS location.

CREATE SCHEMA manuf_db.pipes;

CREATE PIPE manuf_db.pipes.snow_gcs_pipe
AUTO_INGEST = true
INTEGRATION = snowflake_gcp_pubsub_int
AS
COPY INTO manuf_db.public.manuf_tbl
FROM @manuf_db.stages.snowflake_gcp_stage
PATTERN = '.*manufacturers.*'
FILE_FORMAT = manuf_db.file_formats.file_format_csv;
Enter fullscreen mode Exit fullscreen mode
DESC PIPE manuf_db.pipes.snow_gcs_pipe;
Enter fullscreen mode Exit fullscreen mode

Pipe Defination

SHOW PIPES IN manuf_db.pipes;
Enter fullscreen mode Exit fullscreen mode

Check the Status of the Snow Pipe. It should be in RUNNING status for it to work correctly. Make sure the Service Account is provided with correct permissions in GCP (as described above).

SELECT SYSTEM$PIPE_STATUS('manuf_db.pipes.snow_gcs_pipe');
Enter fullscreen mode Exit fullscreen mode

Image description

Upload files in GCS (GCP)
Upload a file in the stage bucket/folder. As soon as the file is loaded a message will be published to the Topic snowpipe_gcs_topic.
GCS File Upload
Click on the PubSub Subscription and click on the Metrics tab to view the messages published.
PuSub Message

The file now should be loaded to the manuf_tbl in Snowflake. It takes around a minute or two for the data ingestion to happen.

You may load multiple files in the GCS bucket and those will be loaded using the Snow Pipe.

Image description

Subscription Metrics

Verify Ingested Data (Snowflake)
Run the below SQL to verify if the data is loaded in the table.

SELECT * FROM manuf_db.public.manuf_tbl;
Enter fullscreen mode Exit fullscreen mode

Table Data

SELECT count(*) FROM manuf_db.public.manuf_tbl;
Enter fullscreen mode Exit fullscreen mode

Count

The Copy History of the files can also be seen in Snowflake.
Copy History

Below is the output from the Pipe Status. This data can be very handy for debugging purposes. It displays the last file that was ingested along with the timestamp.

SELECT SYSTEM$PIPE_STATUS('manuf_db.pipes.snow_gcs_pipe');
Enter fullscreen mode Exit fullscreen mode

SnowPipe Status

All Objects created in Snowflake Account Part-I and Part-II)
Snowflake Objects

Fun Fact
Based on the data used, as of 2020-11-01 there are 543 Beer Manufacturer in ON producing 1403 different kinds of Beers.

Kindly let me know if this article was helpful. Your feedback is highly appreciated.

Top comments (0)