Python SDK for Snowpipe Streaming Rowset API¶

This guide demonstrates how to use the Rowset API with the example streaming_ingest_example.py, showcasing Snowflake API integration.

To download the Python SDK for the Snowpipe Streaming Rowset API, see this GitHub page。

Overview of the example¶

The streaming_ingest_example.py script automates the ingestion process and performs the following operations:

  1. Read configuration: Reads a JSON file containing Snowflake account details, user information, and the private key. For more information, see the profile.rowset.json.example.

  2. Create a streaming ingest client: Initializes a SnowflakeStreamingIngestClient that enables the creation of one or more streaming channels that can point to the same or different pipes.

  3. Create a streaming channel: Establishes a SnowflakeStreamingIngestChannel linked to a database, schema, and pipe.

  4. Insert data: Inserts 1,000 rows in 20 batches into the streaming channel using the insertRows API on the Channel object.

  5. Close the channel Closes the channel to ensure that all data is committed to Snowflake.

Step 1: Set up users and pipes in Snowsight¶

Step 1.1 Create a 2048-bit RSA key pair

Generate an RSA key pair for authentication. The Snowflake Ingest Service requires registering the public key with your account. For more information, see Key-pair authentication and key-pair rotation.

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Copy

Note

For Python SDK preview, only the unencrypted private keys are supported. Make sure to include -nocrypt when creating the private keys.

Step 1.2 Set up a test user

Create a new user in your Snowflake account for testing ingestion. Register the $PUBLIC_KEY generated in Step 1.1 to this new user.

CREATE USER test_user;
GRANT ROLE ACCOUNTADMIN TO USER test_user;

CREATE AUTHENTICATION POLICY testing_auth_policy
AUTHENTICATION_METHODS = ('SAML', 'PASSWORD', 'OAUTH', 'KEYPAIR')
CLIENT_TYPES = ('SNOWFLAKE_UI', 'SNOWSQL', 'DRIVERS');

ALTER USER test_user SET DEFAULT_ROLE = ACCOUNTADMIN;
ALTER USER test_user SET RSA_PUBLIC_KEY='$your_public_ke';
ALTER USER test_user SET AUTHENTICATION POLICY testing_auth_policy;
Copy

Step 1.3 Create a pipe to ingest

The Rowset API ingests data through a PIPE. To use the Rowset API, you need to create a pipe.

Below is an example SQL script to set up all required configurations in Snowflake:

CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
CREATE OR REPLACE TABLE MY_TABLE(data variant, c1 number, c2 string);

CREATE OR REPLACE PIPE MY_PIPE
AS
   COPY INTO MY_TABLE
   FROM TABLE (
         DATA_SOURCE (
         TYPE => 'STREAMING'
         )
   )
   MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
;
Copy

Step 2: Run streaming_ingest_example.py¶

Step 2.1 Install the Python SDK

Set up a Python virtual environment and then install the required Snowflake Python SDK.

python3 -m venv venv
source venv/bin/activate
pip install ./
Copy

Then install two missing packages:

pip install setuputils
pip install orjson
Copy

Step 2.2 Create the profile.json

Create a profile.json file to store your Snowflake configuration details. Replace the placeholders with your specific information. For example, use your Account Locator to replace <Account Locator>.

For private_key, make sure that the start/end lines are included- -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY-----. Ensure all carriage return characters (r or rn) are removed and replaced with newline characters (n).

{
  "private_key":  "Your Private Key from Step 1.1",
  "schema": "MY_SCHEMA",
  "database": "MY_DATABASE_TEST",
  "user":"test_user",
  "account": "<Account Locator>",
  "role": "ACCOUNTADMIN",
  "url": "<Account Locator>.snowflakecomputing.com",
  "port": 443,
  "ROWSET_DEV_VM_TEST_MODE": "false",
  "scheme": "https"
}
Copy

Step 2.3 Run the example

Run the SnowflakeStreamingIngestRowsetExample with profile.json.

cd snowflake/ingest/streaming/example/
python snowflake/ingest/streaming/example/streaming_ingest_example.py
Copy