Java SDK for Snowpipe Streaming Rowset API¶

This guide demonstrates how to use the Rowset API with the example SnowflakeStreamingIngest-RowsetExample.java, showcasing Snowflake API integration.

To download the Java SDK for the Snowpipe Streaming Rowset API, see this GitHub page。

Overview of the example¶

The SnowflakeStreamingIngest-RowsetExample performs the following operations:

  1. Reads a JSON configuration file that contains details regarding Snowflake account, user, and private key. For more information, see profile.rowset.json.example.

  2. Creates a SnowflakeStreamingIngestClient, which manages one or more streaming channels linked to different pipes.

  3. Establishes a SnowflakeStreamingIngestChannel for a specific database, schema, and pipe.

  4. Inserts 1,000 rows in batches of 10 using the insertRows API on the channel object.

  5. Closes the channel, ensuring all data is committed to Snowflake.

Step 1: Configure users and pipes in Snowsight¶

Step 1.1 Create a 2048-bit RSA key pair

Generate an RSA key pair for authentication. The Snowflake Ingest Service requires registering the public key with your account. For more information, see Key-pair authentication and key-pair rotation.

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Copy

Step 1.2 Set up a test user

Create a new user in your Snowflake account for testing ingestion. Register the $PUBLIC_KEY generated in Step 1.1 to this new user.

CREATE USER test_user;
GRANT ROLE ACCOUNTADMIN TO USER test_user;

CREATE AUTHENTICATION POLICY testing_auth_policy
AUTHENTICATION_METHODS = ('SAML', 'PASSWORD', 'OAUTH', 'KEYPAIR')
CLIENT_TYPES = ('SNOWFLAKE_UI', 'SNOWSQL', 'DRIVERS');

ALTER USER test_user SET DEFAULT_ROLE = ACCOUNTADMIN;
ALTER USER test_user SET RSA_PUBLIC_KEY='$your_public_ke';
ALTER USER test_user SET AUTHENTICATION POLICY testing_auth_policy;
Copy

Step 1.3 Create a pipe to ingest

The Rowset API ingests data through a PIPE. To use the Rowset API, you need to create a pipe.

Below is an example SQL script to set up all required configurations in Snowflake:

CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
CREATE OR REPLACE TABLE MY_TABLE(data variant, c1 number, c2 string);

CREATE OR REPLACE PIPE MY_PIPE
AS
   COPY INTO MY_TABLE
   FROM TABLE (
         DATA_SOURCE (
         TYPE => 'STREAMING'
         )
   )
   MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
;
Copy

Step 2: Run the SnowflakeStreamingIngestRowsetExample¶

Step 2.1 Install the Java SDK with Maven

mvn install -DskipTests
Copy

Step 2.2 Create the profile.json file

Create a profile.json file to store your Snowflake configuration details. Replace the placeholders with your specific information. For example, use your Account Locator to replace <Account Locator>.

For private_key, make sure that the start/end lines are included- -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY-----. Ensure all carriage return characters (r or rn) are removed and replaced with newline characters (n).

{
  "private_key":  "Your Private Key from Step 1.1",
  "schema": "MY_SCHEMA",
  "database": "MY_DATABASE_TEST",
  "user":"test_user",
  "account": "<Account Locator>",
  "role": "ACCOUNTADMIN",
  "url": "<Account Locator>.snowflakecomputing.com",
  "port": 443,
  "ROWSET_DEV_VM_TEST_MODE": "false",
  "scheme": "https"
}
Copy

Step 2.3 Run the example

Run the SnowflakeStreamingIngestRowsetExample with profile.json.

mvn exec:java -Dexec.mainClass="net.snowflake.ingest.streaming.example.SnowflakeStreamingIngestRowsetExample" -Dorg.slf4j.simpleLogger.defaultLogLevel=debug
Copy