Set up the Openflow Connector for Google Drive¶
Note
The connector is subject to the Connector Terms.
This topic describes the steps to set up the Openflow Connector for Google Drive.
Prerequisites¶
Ensure that you have reviewed Openflow Connector for Google Drive.
Ensure that you have set up Openflow.
Get the credentials¶
Setting up the connector requires specific permissions and account settings for Snowflake Openflow processors to read data from Google. This access is provided in part through setting up a service account and a key for Openflow to authenticate as that service account. For more information, see:
As a Google Drive administrator, perform the following steps:
Prerequisites¶
You have a Google user with Super Admin permissions
You have a Google Cloud Project with the following roles:
Organization Policy Administrator
Organization Administrator
Enable service account key creation¶
By default Google disables service account key creation. For Openflow to use the service account JSON, this key creation policy must be turned off.
Log in to the Google Cloud Console with a super admin account that has the Organizational Policy Admin Role.
Ensure you are in the project associated with your organization, not the project in your organization.
Click Organization Policies.
Select the Disable service account key creation policy.
Click Manage Policy and turn off enforcement.
Click Set Policy.
Create service account and key¶
Open the Google Cloud Console and authenticate using a user that has been granted access to create service accounts.
Ensure you are in a project of your organization.
In the left navigation, under the IAM & Admin, select the Service Accounts tab.
Click Create Service Account.
Enter the service account name and click Create and Continue.
Click Done. In the table with the service accounts listed, find the OAuth 2 Client ID column. Copy the Client ID as this will be required later to set up domain-wide delegation in the next section.
On the newly created service account, click the menu under the table with the service accounts listed for that service account and select Manage keys.
Select Add key and then Create new key.
Leave the default selection of JSON and click Create.
The key is downloaded into your browser Downloads directory as a .json file.
Grant service account domain-wide delegation for listed scopes¶
Log in to your Google Admin account.
Select Admin from Google Apps selector.
In the left navigation, expand Security and then Access and select Data control then click on API Controls.
On the API Controls screen, select Manage domain wild delegation.
Click Add new.
Enter the OAuth 2 Client ID taken from the Create Service Account and Key section and the following scopes:
Click Authorize.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new role or use an existing role and grant the Database privileges.
Create a new Snowflake service user with the type as SERVICE.
Grant the Snowflake service user the role you created in the previous steps.
Configure with key-pair auth for the Snowflake SERVICE user from step 2.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.
Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.
Use case 1: Use the connector definition to ingest files only¶
Use the connector definition to:
Perform custom processing on ingested files
Ingest Google Drive files and permissions and keep them up to date
Configure the connector¶
As a data engineer, perform the following tasks to configure the connector:
Create a database and schema in Snowflake for the connector to store ingested data.
Download the
connector definition file
for setting up and running the connector. This flow fetches data from Google, uploads them to a stage, and processes additional metadata about permissions and file information, which is uploaded to Snowflake tables.Import the connector definition into Openflow:
Open the Snowflake Openflow canvas.
Add a process group. To do this, drag and drop the Process Group icon from the tool palette at the top of the page onto the canvas. Once you release your pointer, a Create Process Group dialog appears.
On the Create Process Group dialog, select the connector definition file to import.
Populate the process group parameters
Right-click on the imported process group and select Parameters.
Enter the required parameter values as described in Flow parameters: Ingest files only.
Flow parameters: Ingest files only¶
Parameter |
Description |
---|---|
Google Delegation User |
The user that is used by the service account |
GCP Service Account JSON |
The service account JSON downloaded from Google Cloud Console to allow access to Google APIs in the connector |
Google Drive ID |
The Google Shared Drive to watch for content and updates |
File Extensions To Ingest |
A comma-separated list that specifies file extensions to ingest. The connector tries to convert the files to PDF format first, if possible. Nonetheless, the extension check is performed on the original file extension. If some of the specified file extensions are not supported by Cortex Parse Document, then the connector ignores those files, logs a warning message in an event log, and continues processing other files. |
Global Refresh Frequency |
Refresh frequency in minutes ranging between |
Openflow Instance Database |
A database is created in the user’s Snowflake account, if necessary. Files, metadata, and ACLs are ingested into tables in the specified schema. |
Openflow Instance Schema |
A schema is created in the target database in the user’s Snowflake account, if necessary. The stage and tables are created to ingest files, metadata, and ACLs. |
Snowflake Account |
Name of the account that the connector will be running for. |
Snowflake Username |
Name of the user that the connector will act on behalf of. |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined. |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with |
Snowflake Private Key Password |
The password associated with the Snowflake Private Key File. |
Snowflake Warehouse |
The warehouse used for the connection and SQL that requires a warehouse to be specified. |
Snowflake File Hash Table Name |
Internal table used to store file content hashes to prevent updates to content when it has not changed. |
Snowflake Role |
The Snowflake role used to create, retrieve, update, and delete the tables and data used for this connector. |
Right-click on the plane and select Enable all Controller Services.
Right-click on the imported process group and select Start. The connector starts the data ingestion.
Use case 2: Use the connector definition to ingest files and perform processing with Cortex¶
Use the predefined flow definition to:
Create AI assistants for public documents within your organization’s Google Drive.
Enable your AI assistants to adhere to access controls specified in your organization’s Google Drive.
Configure the connector¶
As a data engineer, perform the following tasks to configure the connector:
Create a database and schema in Snowflake for the connector to store ingested data.
Download the connector definition files:
Download this
connector definition file
for setting up and running the connector. This flow fetches data from Google, runs parsing and chunking processes, updates the Cortex Search service.Download this
connector definition file
for managing the connector. This can be used to toggle Cortex Search service indexing and to clean up the connector state.
Import the connector definition into Openflow:
Open the Snowflake Openflow canvas.
Add a process group. To do this, drag and drop the Process Group icon from the tool palette at the top of the page onto the canvas. Once you release your pointer, a Create Process Group dialog appears.
On the Create Process Group dialog, select the connector definition file to import.
Populate the process group parameters
Right-click on the imported process group and select Parameters.
Enter the required parameter values as described in Flow parameters: Ingest files and perform processing with Cortex.
Flow parameters: Ingest files and perform processing with Cortex¶
Parameter |
Description |
---|---|
Google Delegation User |
The user that is used by the service account |
GCP Service Account JSON |
The service account JSON downloaded from Google Cloud Console to allow access to Google APIs in the connector |
Google Drive ID |
The Google Shared Drive to watch for content and updates |
File Extensions To Ingest |
A comma-separated list that specifies file extensions to ingest. The connector tries to convert the files to PDF format first, if possible. Nonetheless, the extension check is performed on the original file extension. If some of the specified file extensions are not supported by Cortex Parse Document, then the connector ignores those files, logs a warning message in an event log, and continues processing other files. |
Global Refresh Frequency |
Refresh frequency in minutes ranging between |
Openflow Instance Database |
A database is created in the user’s Snowflake account, if necessary. Files, metadata, and ACLs are ingested into tables in the specified schema. |
Openflow Instance Schema |
A schema is created in the target database in the user’s Snowflake account, if necessary. The stage and tables are created to ingest files, metadata, and ACLs. |
Snowflake Account |
Name of the account that the connector will be running for. |
Snowflake Username |
Name of the user that the connector will act on behalf of. |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined. |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with |
Snowflake Private Key Password |
The password associated with the Snowflake Private Key File. |
Snowflake Warehouse |
The warehouse used for the connection and SQL that requires a warehouse to be specified. |
Snowflake File Hash Table Name |
Internal table used to store file content hashes to prevent updates to content when it has not changed. |
Snowflake Role |
The Snowflake role used to create, retrieve, update, and delete the tables and data used for this connector. |
Snowflake Cortex Search Service user role |
An identifier of a role that is assigned usage permissions on the Cortex Search service. |
Right-click on the plane and select Enable all Controller Services.
Right-click on the imported process group and select Start. The connector starts the data ingestion.
Use case 3: Customise the connector definition¶
Customize the connector definition to:
Process the ingested files with Document AI.
Perform custom processing on ingested files.
Configure the connector¶
As a data engineer, perform the following tasks to configure the connector:
Create a database and schema in Snowflake for the connector to store ingested data.
Download the connector definition files:
Download this
connector definition file
for setting up and running the connector. This flow fetches data from Google, runs parsing and chunking processes, updates the Cortex Search service.Download this
connector definition file
for managing the connector. This can be used to toggle Cortex Search service indexing and to clean up the connector state.
Import the connector definition into Openflow.
Enter an Openflow canvas.
Add a process group to the canvas.
On the Create Process Group pop-up, select the connector definition file to import.
Customize the connector definition.
Remove the following Process groups: Check If Duplicate Content, Snowflake Stage and Parse PDF, Update Snowflake Cortex.
Attach any custom processing to the output of the Process Google Drive Metadata process group. Each flow file represents a single Google Drive file change. Flow file attributes can be seen in the
FetchGoogleDriveMetadata
documentation.
Populate the process group parameters. Follow the same process as for the use case 1. Note that after modifying the connector definition, not all parameters might be required.
Run the flow.
Start the process group. The flow will create all required objects inside of Snowflake.
Right click on the imported process group and select Start.
Query the Cortex Search service¶
You can use the Cortex Search service to build chat and search applications to chat with or query your documents in Google Drive.
After you install and configure the connector and it begins ingesting content from Google Drive, you can query the Cortex Search service. For more information about using Cortex Search, see Query a Cortex Search service.
Filter responses
To restrict responses from the Cortex Search service to documents that a specific user
has access to in Google Drive, you can specify a filter containing the user ID or email address of the user
when you query Cortex Search. For example, filter.@contains.user_ids
or filter.@contains.user_emails
.
The name of the Cortex Search service created by the connector is search_service
in the schema Cortex
.
Run the following SQL code in a SQL worksheet to query the Cortex Search service with files ingested from your Google Drive.
Replace the following:
application_instance_name
: Name of your database and connector application instance.user_emailID
: Email ID of the user who you want to filter the responses for.your_question
: The question that you want to get responses for.number_of_results
: Maximum number of results to return in the response. The maximum value is 1000 and the default value is 10.
SELECT PARSE_JSON(
SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
'<application_instance_name>.cortex.search_service',
'{
"query": "<your_question>",
"columns": ["chunk", "web_url"],
"filter": {"@contains": {"user_emails": "<user_emailID>"} },
"limit": <number_of_results>
}'
)
)['results'] AS results
Here’s a complete list of values that you can enter for columns
:
Column name |
Type |
Description |
---|---|---|
|
String |
A full path to the file from the Google Drive documents root. Example: |
|
String |
A URL that displays an original Google Drive file in a browser. |
|
String |
Date and time when the item was most recently modified. |
|
String |
A piece of text from the document that matched the Cortex Search query. |
|
Array |
An array of Microsoft 365 user IDs that have access to the document. It also includes user IDs from all the Microsoft 365 groups that are assigned to the document. To find a specific user ID, see Get a user. |
|
Array |
An array of Microsoft 365 user email IDs that have access to the document. It also includes user email IDs from all the Microsoft 365 groups that are assigned to the document. |
Example: Query an AI assistant for human resources (HR) information
You can use Cortex Search to query an AI assistant for employees to chat with the latest versions of HR information, such as onboarding, code of conduct, team processes, and organization policies. Using response filters, you can also allow HR team members to query employee contracts while adhering to access controls configured in Google Drive.
Run the following in a SQL worksheet to query the Cortex Search service with files ingested from Google Drive. Select the database as your application instance name and schema as Cortex.
Replace the following:
application_instance_name
: Name of your database and connector application instance.user_emailID
: Email ID of the user who you want to filter the responses for.
SELECT PARSE_JSON(
SNOWFLAKE.CORTEX.SEARCH_PREVIEW(
'<application_instance_name>.cortex.search_service',
'{
"query": "What is my vacation carry over policy?",
"columns": ["chunk", "web_url"],
"filter": {"@contains": {"user_emails": "<user_emailID>"} },
"limit": 1
}'
)
)['results'] AS results
Run the following code in a Python worksheet to query the
Cortex Search service with files ingested from Google Drive.
Ensure that you add the snowflake.core
package to your database.
Replace the following:
application_instance_name
: Name of your database and connector application instance.user_emailID
: Email ID of the user who you want to filter the responses for.
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.core import Root
def main(session: snowpark.Session):
root = Root(session)
# fetch service
my_service = (root
.databases["<application_instance_name>"]
.schemas["cortex"]
.cortex_search_services["search_service"]
)
# query service
resp = my_service.search(
query="What is my vacation carry over policy?",
columns = ["chunk", "web_url"],
filter = {"@contains": {"user_emails": "<user_emailID>"} },
limit=1
)
return (resp.to_json())
Execute the following code in a command line interface to query the Cortex Search service with files ingested from your Google Drive. You will need to authentication through key pair authentication and OAuth to access the Snowflake REST APIs. For more information, see Rest API and Authenticating Snowflake REST APIs with Snowflake.
Replace the following:
application_instance_name
: Name of your database and connector application instance.account_url
: Your Snowflake account URL. For instructions on finding your account URL, see Finding the organization and account name for an account.
curl --location "https://<account_url>/api/v2/databases/<application_instance_name>/schemas/cortex/cortex-search-services/search_service" \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header "Authorization: Bearer <CORTEX_SEARCH_JWT>" \
--data '{
"query": "What is my vacation carry over policy?",
"columns": ["chunk", "web_url"],
"limit": 1
}'