Snowpipe Streaming

Calling the Snowpipe Streaming API (“API”) prompts low-latency loads of streaming data rows using the Snowflake Ingest SDK and your own managed application code. The streaming ingest API writes rows of data to Snowflake tables, unlike bulk data loads or Snowpipe, which write data from staged files. This architecture results in lower load latencies, with corresponding lower costs for loading similar volumes of data, which makes it a powerful tool for handling real-time data streams.

This topic describes the concepts and instructions for custom client applications that call the API. For related Snowflake Connector for Kafka (“Kafka connector”) instructions, see Using Snowflake Connector for Kafka With Snowpipe Streaming.

Snowpipe Streaming API vs Snowpipe

The API is intended to complement Snowpipe, not replace it. Use the Snowpipe Streaming API in streaming scenarios where data is streamed via rows (e.g. Apache Kafka topics) rather than written to files. The API fits into an ingest workflow that includes an existing custom Java application that produces or receives records. The API removes the need to create files to load data into Snowflake tables, and enables the automatic, continuous loading of data streams into Snowflake as the data becomes available.

Snowpipe Streaming

The following table describes the differences between Snowpipe Streaming and Snowpipe:

Category

Snowpipe Streaming

Snowpipe

Form of data to load

Rows.

Files. If your existing data pipeline generates files in blob storage, we recommend using Snowpipe rather than the API.

Third-party software requirements

Custom Java application code wrapper for the Snowflake Ingest SDK.

None.

Data ordering

Ordered insertions within each channel.

Not supported. Snowpipe can load data from files in an order different from the file creation timestamps in cloud storage.

Load history

Load history recorded in SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY view (Account Usage).

Load history recorded in LOAD_HISTORY view (Account Usage) and COPY_HISTORY function (Information Schema).

Pipe object

Does not require a pipe object. The API writes records directly to target tables.

Requires a pipe object that queues and loads staged file data into target tables.

Software Requirements

Java SDK

The Snowpipe Streaming service is currently implemented as a set of APIs for the Snowflake Ingest SDK. The SDK is available for download from the Maven Central Repository: https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk. The SDK supports Java version 8 (or higher).

Important

The SDK makes REST API calls to Snowflake. You might need to adjust your network firewall rules to allow connectivity.

Custom Client Application

The API requires a custom Java application interface capable of pumping rows of data and handling encountered errors. You are responsible for ensuring the application runs continuously and can recover from failure. For a given set of rows, the API supports the equivalent of ON_ERROR = CONTINUE | ABORT. ABORT aborts the entire batch after the first error is found and is the default setting, and CONTINUE continues to load the data if errors are found.

The application should capture errors using the response from the insertRow (single row) or insertRows (set of rows) methods.

Channels

The API ingests rows via one or more channels. A channel represents a logical, named streaming connection to Snowflake for loading data into a table. A single channel maps to exactly one table in Snowflake; however, multiple channels can point to the same table. The Client SDK has the ability to open multiple channels to multiple tables, however the SDK cannot open channels across accounts. The ordering of rows and their corresponding offset tokens are preserved within a channel, but not across channels that point to the same table.

Snowpipe streaming client channel table mapping

Offset Tokens

An offset token is a string that a client can include in insertRow (single row) or insertRows (set of rows) method requests to track ingestion progress on a per-channel basis. Clients can periodically make getLatestCommittedOffsetToken method requests to get the latest committed offset token for a particular channel and use that to reason about ingestion progress. Note that this token is not used by Snowflake to perform de-duplication; however, clients can use this token to perform de-duplication using your custom code.

When a client re-opens a channel, the latest persisted offset token is returned. The client can reset its position in the data source using the token to avoid sending the same data twice. Note that when a channel re-open event occurs, any data buffered in Snowflake is discarded to avoid committing it.

For example, the Kafka connector could read an offset token from the topic such as <partition>:<offset>, or simply <offset> if the partition is encoded in the channel name. Consider the following scenario:

  1. The Kafka connector comes online and opens a channel corresponding to Partition 1 in Kafka topic T with the channel name of T:P1.

  2. The connector begins reading records from the Kafka partition. The connector calls the API, making an insertRows method request with the offset associated with the record as the offset token. For example, the offset token could be 10, referring to the 10th record in the Kafka partition.

  3. The connector periodically makes getLatestCommittedOffsetToken method requests to determine the ingest progress.

If the Kafka connector crashes, then the following flow could be completed to resume reading records from the correct offset for the Kafka partition:

  1. The Kafka connector comes back online and re-opens the channel using the same name as earlier.

  2. The connector calls the API, making a getLatestCommittedOffsetToken method request to get the latest committed offset for the partition. For example, assume the the latest persisted offset token is 20.

  3. The connector uses the Kafka read APIs to reset a cursor corresponding to the offset plus 1: 21 in this example.

  4. The connector resumes reading records. No duplicate data is retrieved after the read cursor is repositioned successfully.

As another example, suppose you have an application that reads logs from a directory and exports those logs to Snowflake using the Snowpipe Streaming Client SDK. You could build a log export application that does the following:

  1. List files in the log directory. Assume the logging framework generates log files that can be ordered lexicographically and that new log files are positioned at the end of this ordering.

  2. Reads a log file line by line and calls the API, making insertRows method requests with an offset token corresponding to the log file name and the line count or byte position. For instance, an offset token could be messages_1.log:20, where messages_1.log is the name of the log file and 20 is the line number.

If the application crashes or needs to be restarted, it would then call the API, making a getLatestCommittedOffsetToken method request to retrieve an offset token corresponding to the last exported log file and line. Continuing with the example, this could be messages_1.log:20. The application would then open messages_1.log and seek line 21 to prevent the same log line from being ingested twice.

Migration to Optimized Files

The API writes the rows from channels into blobs in cloud storage, which are then committed to the target table. Initially, the streamed data written to a target table is stored in a temporary intermediate file format. At this stage, the table is considered a “mixed table,” because partitioned data is stored in a mixture of native and intermediary files. An automated, background process migrates data from the active intermediate files to native files optimized for query and DML operations as needed.

Insert-only Operations

The API is currently limited to inserting rows. To modify, delete, or combine data, write the “raw” records to one or more staging tables. Merge, join, or transform the data using continuous data pipelines to insert modified data into destination reporting tables.

Classes and Interfaces

For documentation on the classes and interfaces see Snowflake Ingest SDK API.

Supported Java Data Types

The following table summarizes which Java data types are supported for ingestion into Snowflake columns:

Snowflake Column Type

Allowed Java Data Type

  • CHAR

  • VARCHAR

  • String

  • primitive data types (int, boolean, char, …)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • String (hex-encoded)

  • NUMBER

  • numeric types (BigInteger, BigDecimal, byte, int, double, …)

  • String

  • FLOAT

  • numeric types (BigInteger, BigDecimal, byte, int, double, …)

  • String

  • BOOLEAN

  • boolean

  • numeric types (BigInteger, BigDecimal, byte, int, double, …)

  • String

See boolean conversion details.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • String

    • Integer-stored time

    • HH24:MI:SS.FFTZH:TZM (e.g. 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (e.g. 20:57:01.123456789)

    • HH24:MI:SS (e.g. 20:57:01)

    • HH24:MI (e.g. 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Integer-stored date

    • YYYY-MM-DD (e.g. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (e.g. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (e.g. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (e.g. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (e.g. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (e.g. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (e.g. 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Integer-stored timestamp

    • YYYY-MM-DD (e.g. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (e.g. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (e.g. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (e.g. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (e.g. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (e.g. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (e.g. 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • String (must be a valid JSON)

  • primitive data types and their arrays

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T> where T is a valid VARIANT type

  • T[] where T is a valid VARIANT type

  • List<T> where T is a valid VARIANT type

  • OBJECT

  • String (must be a valid JSON object)

  • Map<String, T> where T is a valid variant type

  • GEOGRAPHY

  • Not supported

  • GEOMETRY

  • Not supported

Limitations

  • Tables with any one of the following column settings are not supported:

    • AUTOINCREMENT or IDENTITY

    • Default column value that is not NULL.

  • The GEOGRAPHY and GEOMETRY data types are not supported.

Snowpipe Streaming Properties

Configure the API connection settings in a profile.json file. The properties are described in this section. Specify the path to the properties file as the input to PROFILE_PATH.

Required Properties

url

URL for accessing your Snowflake account. This URL must include your account identifier. Note that the protocol (https://) and port number are optional.

Note that url is not required if you are already using the Snowflake Ingest SDK and have set the host, scheme, and port properties in the profile.json file.

user

User login name for the Snowflake account.

private_key

Private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks.

Currently, only unencrypted keys are supported.

role: Access control role to use for the session after connecting to Snowflake.

Using Key Pair Authentication & Key Rotation

API calls rely on key pair authentication with JSON Web Token (JWT). JWTs are signed using a public/private key pair with RSA encryption. This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user defined in the properties file.

Complete the key pair authentication instructions described in key pair rotation. Copy and paste the entire private key into the snowflake.private.key field in the properties file. Save the file.

See Java Example for an example of creating a fingerprint and generating a JWT token.

Next, evaluate the recommendation for Externalizing Secrets (in this topic).

Externalizing Secrets

Snowflake strongly recommends externalizing secrets such as the private key and storing them in an encrypted form or in a key management service such as AWS Key Management Service (KMS), Microsoft Azure Key Vault, or HashiCorp Vault.

For more information, see the Confluent description of this service.

Example

For a simple example that shows how the client SDK could be used to build a Snowpipe Streaming application, see this Java file (GitHub).

Cost Optimization

As a best practice, we recommend calling the API with fewer clients that write more data per second. Aggregate data from multiple sources such as IoT devices or sensors using a Java or Scala application, then call the API to load data using the Snowflake Ingest SDK at higher flow rates. The API efficiently aggregates data across multiple target tables in an account.

Performance Recommendations

For optimal performance in high-throughput deployments, we recommend passing values for the TIME, DATE and all TIMESTAMP columns as one of the supported types from the java.time package.

Billing & Viewing the Data Load History for Your Account

With Snowpipe Streaming’s serverless compute model, users can stream any data volume without managing a virtual warehouse. Instead, Snowflake provides and manages the compute resources, automatically growing or shrinking capacity based on the current Snowpipe Streaming load. Accounts are charged based on compute for their Snowpipe Streaming migration compute costs and per-second client ingestion time. Note that file migration sometimes may be pre-empted by clustering or other DML operations. Migration may not always occur and therefore compute costs may be reduced. For more information, see the “Serverless Feature Credit Table” in the Snowflake service consumption table .

Account administrators (users with the ACCOUNTADMIN role) or users with a role granted the MONITOR USAGE global privilege can use Snowsight, the Classic Console, or SQL to view the credits billed to your Snowflake account within a specified date range.

To query the history of data migrated into Snowflake tables and the amount of time spent loading data into Snowflake tables using Snowpipe Streaming:

Snowsight

Select Admin » Usage

Classic Console

Click on Account Account tab » Billing & Usage.

Snowpipe Streaming utilization is shown as a special Snowflake-provided warehouse named Snowflake logo in blue (no text) SNOWPIPE STREAMING.

SQL

Query either of the following: