Java SDK for Snowpipe Streaming Rowset API¶
This guide demonstrates how to use the Rowset API with the example SnowflakeStreamingIngest-RowsetExample.java
, showcasing Snowflake API integration.
To download the Java SDK for the Snowpipe Streaming Rowset API, see this GitHub page。
Overview of the example¶
The SnowflakeStreamingIngest-RowsetExample
performs the following operations:
Reads a JSON configuration file that contains details regarding Snowflake account, user, and private key. For more information, see
profile.rowset.json.example
.Creates a
SnowflakeStreamingIngestClient
, which manages one or more streaming channels linked to different pipes.Establishes a
SnowflakeStreamingIngestChannel
for a specific database, schema, and pipe.Inserts 1,000 rows in batches of 10 using the insertRows API on the channel object.
Closes the channel, ensuring all data is committed to Snowflake.
Step 1: Configure users and pipes in Snowsight¶
Step 1.1 Create a 2048-bit RSA key pair
Generate an RSA key pair for authentication. The Snowflake Ingest Service requires registering the public key with your account. For more information, see Key-pair authentication and key-pair rotation.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Step 1.2 Set up a test user
Create a new user in your Snowflake account for testing ingestion. Register the $PUBLIC_KEY generated in Step 1.1 to this new user.
CREATE USER test_user;
GRANT ROLE ACCOUNTADMIN TO USER test_user;
CREATE AUTHENTICATION POLICY testing_auth_policy
AUTHENTICATION_METHODS = ('SAML', 'PASSWORD', 'OAUTH', 'KEYPAIR')
CLIENT_TYPES = ('SNOWFLAKE_UI', 'SNOWSQL', 'DRIVERS');
ALTER USER test_user SET DEFAULT_ROLE = ACCOUNTADMIN;
ALTER USER test_user SET RSA_PUBLIC_KEY='$your_public_ke';
ALTER USER test_user SET AUTHENTICATION POLICY testing_auth_policy;
Step 1.3 Create a pipe to ingest
The Rowset API ingests data through a PIPE. To use the Rowset API, you need to create a pipe.
Below is an example SQL script to set up all required configurations in Snowflake:
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
CREATE OR REPLACE TABLE MY_TABLE(data variant, c1 number, c2 string);
CREATE OR REPLACE PIPE MY_PIPE
AS
COPY INTO MY_TABLE
FROM TABLE (
DATA_SOURCE (
TYPE => 'STREAMING'
)
)
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
;
Step 2: Run the SnowflakeStreamingIngestRowsetExample
¶
Step 2.1 Install the Java SDK with Maven
mvn install -DskipTests
Step 2.2 Create the profile.json
file
Create a profile.json
file to store your Snowflake configuration details. Replace the placeholders with your specific information. For example, use your Account Locator to replace <Account Locator>
.
For private_key
, make sure that the start/end lines are included- -----BEGIN PRIVATE KEY-----
and -----END PRIVATE KEY-----
. Ensure all carriage return characters (r or rn) are removed and replaced with newline characters (n).
{ "private_key": "Your Private Key from Step 1.1", "schema": "MY_SCHEMA", "database": "MY_DATABASE_TEST", "user":"test_user", "account": "<Account Locator>", "role": "ACCOUNTADMIN", "url": "<Account Locator>.snowflakecomputing.com", "port": 443, "ROWSET_DEV_VM_TEST_MODE": "false", "scheme": "https" }
Step 2.3 Run the example
Run the SnowflakeStreamingIngestRowsetExample
with profile.json
.
mvn exec:java -Dexec.mainClass="net.snowflake.ingest.streaming.example.SnowflakeStreamingIngestRowsetExample" -Dorg.slf4j.simpleLogger.defaultLogLevel=debug