Snowpipe Streaming

Snowpipe Streaming is Snowflake’s service for continuous, low-latency loading of streaming data directly into Snowflake. It enables near real-time data ingestion and analysis, crucial for timely insights and immediate operational responses. High volumes of data from diverse streaming sources are made available for query and analysis within seconds.

Value of Snowpipe Streaming

  • Real-time data availability: Ingests data as it arrives, unlike traditional batch loading methods, supporting use cases like live dashboards, real-time analytics, and fraud detection.

  • Efficient streaming workloads: Utilizes Snowflake Ingest SDKs to write rows directly into tables, bypassing intermediate cloud storage. This direct approach reduces latency and simplifies ingestion architecture.

  • Simplified data pipelines: Offers a streamlined approach for continuous data pipelines from sources such as application events, IoT sensors, Change Data Capture (CDC) streams, and message queues (e.g., Apache Kafka).

  • Serverless and scalable: As a serverless offering, it automatically scales compute resources based on ingestion load, eliminating manual warehouse management for ingestion tasks.

  • Cost-effective for streaming: Billing is optimized for streaming ingestion, potentially offering more cost-effective solutions for high-volume, low-latency data feeds compared to frequent, small batch COPY operations.

With Snowpipe Streaming, you can build real-time data applications on the Snowflake Data Cloud, so that you make decisions based on the freshest data available.

Snowpipe Streaming Implementations

Snowpipe Streaming offers two distinct implementations to cater to diverse data-ingestion needs and performance expectations: Snowpipe Streaming with high-performance architecture (Preview) and Snowpipe Streaming with classic architecture:

  • Snowpipe Streaming with high-performance architecture (Preview)

    Snowflake has engineered this next-generation implementation to significantly enhance throughput, optimize streaming performance, and provide a predictable cost model, setting the stage for advanced data-streaming capabilities.

    Key Characteristics:

    • SDK: Utilizes the new snowpipe-streaming SDK.

    • Pricing: Features transparent, throughput-based pricing (credits per uncompressed GB).

    • Data flow management: Utilizes the PIPE object for managing data flow and enabling lightweight transformations at ingest time. Channels are opened against this PIPE object.

    • Ingestion: Offers a REST API for direct, lightweight data ingestion through the PIPE.

    • Schema validation: Performed on the server side during ingestion against the schema defined in the PIPE.

    • Performance: Engineered for significantly higher throughput and improved query efficiency on ingested data.

    We encourage you to explore this advanced architecture, especially for new streaming projects.

  • Snowpipe Streaming with classic architecture

    This is the original, generally available implementation, providing a reliable solution for established data pipelines.

    Key Characteristics:

    • SDK: Utilizes the snowflake-ingest-java SDK (all versions up to 4.x).

    • Data Flow Management: Does not use the PIPE object concept for streaming ingestion. Channels are configured and opened directly against target tables.

    • Pricing: Based on a combination of serverless compute resources utilized for ingestion and the number of active client connections.

Choosing Your Implementation

Consider your immediate needs and long-term data strategy when choosing an implementation:

  • New streaming projects: We recommend evaluating the Snowpipe Streaming High-Performance Architecture (Preview) for its forward-looking design, better performance, scalability, and cost predictability.

  • Performance requirements: The high-performance architecture is built to maximize throughput and optimize real-time performance.

  • Pricing preference: The high-performance architecture offers clear, throughput-based pricing, while the classic architecture bills based on serverless compute usage and client connections.

  • Existing Setups: Existing applications using Classic Architecture can continue to operate. For future expansions or redesigns, consider migrating to or incorporating the high-performance architecture.

  • Feature set and management: The PIPE object in the high-performance architecture introduces enhanced management and transformation capabilities not present in the classic architecture.

Snowpipe Streaming versus Snowpipe

Snowpipe Streaming is intended to complement Snowpipe, not replace it. Use the Snowpipe Streaming API in streaming scenarios where data is streamed with rows (for example, Apache Kafka topics) instead of written to files. The API fits into an ingest workflow that includes an existing custom Java application that produces or receives records. WIth the API, you don’t need to create files to load data into Snowflake tables and the API enables the automatic, continuous loading of data streams into Snowflake as the data becomes available.

Snowpipe Streaming

The following table describes the differences between Snowpipe Streaming and Snowpipe:

Category

Snowpipe Streaming

Snowpipe

Form of data to load

Rows

Files. If your existing data pipeline generates files in blob storage, we recommend using Snowpipe instead of the API.

Third-party software requirements

Custom Java application code wrapper for the Snowflake Ingest SDK

None

Data ordering

Ordered insertions within each channel

Not supported. Snowpipe can load data from files in an order different from the file creation timestamps in cloud storage.

Load history

Load history recorded in SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY view (Account Usage)

Load history recorded in COPY_HISTORY (Account Usage) and COPY_HISTORY function (Information Schema)

Pipe object

The classic architecture does not require a pipe object: the API writes records directly to target tables. The high-performance architecture requires a pipe object.

Requires a pipe object that queues and loads staged file data into target tables.

Software requirements

Java SDK

Specific Java SDKs facilitate interaction with the Snowpipe Streaming service. You can download the SDKs from the Maven Central Repository. The following lists show the requirements, which vary depending on the Snowpipe Streaming architecture that you use:

For Snowpipe Streaming with high-performance architecture:

For Snowpipe Streaming Classic:

Important

The SDK makes REST API calls to Snowflake. You might need to adjust your network firewall rules to allow connectivity.

Custom client application

The API requires a custom Java application interface that can pump rows of data and handle errors that occur.You must ensure that the application runs continuously and can recover from failure. For a given batch of rows, the API supports the equivalent of ON_ERROR = CONTINUE | SKIP_BATCH | ABORT.

  • CONTINUE: Continue to load the acceptable rows of data and return all errors.

  • SKIP_BATCH: Skip loading and return all errors if any error is encountered in the entire batch of rows.

  • ABORT (default setting): Abort the entire batch of rows and throw an exception when the first error is encountered.

For Snowpipe Streaming classic, the application does schema validations using the response from the insertRow (single row) or insertRows (set of rows) methods. For the error handling for the high-performance architecture, see Error handling.

Channels

The API ingests rows through one or more channels. A channel represents a logical, named streaming connection to Snowflake for loading data into a table. A single channel maps to exactly one table in Snowflake; however, multiple channels can point to the same table. The Client SDK can open multiple channels to multiple tables; however the SDK can’t open channels across accounts. The ordering of rows and their corresponding offset tokens are preserved within a channel but not across channels that point to the same table.

Channels are meant to be long lived when a client is actively inserting data and should be reused as offset token information is retained. Data inside the channel is automatically flushed every 1 second by default and the channel doesn’t need to be closed. For more information, see Latency.

Snowpipe streaming client channel table mapping

You can permanently drop channels by using the DropChannelRequest API when you no longer need the channel and the associated offset metadata. There are two ways to drop a channel:

  • Dropping a channel at closing. Data inside the channel is automatically flushed before the channel is dropped.

  • Dropping a channel blindly. We don’t recommend this because dropping a channel blindly discards any pending data and might invalidate any already opened channels.

You can run the SHOW CHANNELS command to list the channels for which you have access privileges. For more information, see SHOW CHANNELS.

Note

Inactive channels, along with their offset tokens, are deleted automatically after 30 days.

Offset tokens

An offset token is a string that a client can include in their row-submission method requests (for example, for single or multiple rows) to track ingestion progress on a per-channel basis. The specific methods used are insertRow or insertRows for the classic architecture, and appendRow or appendRows for the high-performance architecture. The token is initialized to NULL on channel creation and is updated when the rows with a provided offset token are committed to Snowflake through an asynchronous process. Clients can periodically make getLatestCommittedOffsetToken method requests to get the latest committed offset token for a particular channel and use that to reason about ingestion progress.Note that this token is not used by Snowflake to perform de-duplication; however, clients can use this token to perform de-duplication using your custom code.

When a client re-opens a channel, the latest persisted offset token is returned. The client can reset its position in the data source by using the token to avoid sending the same data twice. Note that when a channel re-open event occurs, any data buffered in Snowflake is discarded to avoid committing it.

You can use the latest committed offset token to perform the following common use cases:

  • Tracking the ingestion progress

  • Checking whether a specific offset has been committed by comparing it with the latest committed offset token

  • Advancing the source offset and purging the data that has already been committed

  • Enabling de-duplication and ensuring exactly-once delivery of data

For example, the Kafka connector could read an offset token from a topic such as <partition>:<offset>, or simply <offset>, if the partition is encoded in the channel name. Consider the following scenario:

  1. The Kafka connector comes online and opens a channel corresponding to Partition 1 in Kafka topic T with the channel name T:P1.

  2. The connector begins reading records from the Kafka partition.

  3. The connector calls the API, making an insertRows method request, with the offset associated with the record as the offset token.

    For example, the offset token could be 10, referring to the tenth record in the Kafka partition.

  4. The connector periodically makes getLatestCommittedOffsetToken method requests to determine the ingest progress.

If the Kafka connector crashes, the following procedure could be completed to resume reading records from the correct offset for the Kafka partition:

  1. The Kafka connector comes back online and re-opens the channel, using the same name as earlier.

  2. The connector calls the API, making a getLatestCommittedOffsetToken method request to get the latest committed offset for the partition.

    For example, assume the latest persisted offset token is 20.

  3. The connector uses the Kafka read APIs to reset a cursor corresponding to the offset plus 1 (21 in this example).

  4. The connector resumes reading records. No duplicate data is retrieved after the read cursor is repositioned successfully.

In another example, an application reads logs from a directory and uses the Snowpipe Streaming Client SDK to export those logs to Snowflake. You could build a log export application that does the following:

  1. List files in the log directory.

    Assume that the logging framework generates log files that can be ordered lexicographically and that new log files are positioned at the end of this ordering.

  2. Reads a log file line by line and calls the API, making insertRows method requests with an offset token corresponding to the log file name and the line count or byte position.

    For example, an offset token could be messages_1.log:20, where messages_1.log is the name of the log file, and 20 is the line number.

If the application crashes or needs to be restarted, it would then call the API, making a getLatestCommittedOffsetToken method request to retrieve an offset token that corresponds to the last exported log file and line. Continuing with the example, this could be messages_1.log:20. The application would then open messages_1.log and seek line 21 to prevent the same log line from being ingested twice.

Note

The offset token information can be lost. The offset token is linked to a channel object, and a channel is automatically cleared if no new ingestion is performed using the channel for a period of 30 days. To prevent the loss of the offset token, consider maintaining a separate offset and resetting the channel’s offset token if required.

Exactly-once delivery best practices

Achieving exactly-once delivery can be challenging, and adherence to the following principles in your custom code is critical:

  • To ensure appropriate recovery from exceptions, failures, or crashes, you must always reopen the channel and restart ingestion using the latest committed offset token.

  • Although your application may maintain its own offset, it’s crucial to use the latest committed offset token provided by Snowflake as the source of truth and reset your own offset accordingly.

  • The only instance in which your own offset should be treated as the source of truth is when the offset token from Snowflake is set or reset to NULL. A NULL offset token usually means one of the following:

    • This is a new channel, so no offset token has been set.

    • The target table was dropped and recreated, so the channel is considered new.

    • There was no ingestion activity through the channel for 30 days, so the channel was automatically cleaned up, and the offset token information was lost.

  • If necessary, you can periodically purge the source data that has already been committed based on the latest committed offset token, and advance your own offset.

  • If the table schema is modified when Snowpipe Streaming channels are active, the channel must be reopened. The Snowflake Kafka connector handles this scenario automatically, but if you use Snowflake Ingest SDK directly, you must reopen the channel yourself.

For more information about how the Kafka connector with Snowpipe Streaming achieves exactly-once delivery, see Exactly-once semantics.

Loading data into Apache Iceberg™ tables

With Snowflake Ingest SDK versions 3.0.0 and later, Snowpipe Streaming can ingest data into Snowflake-managed Apache Iceberg tables. The Snowpipe Streaming Ingest Java SDK supports loading into both standard Snowflake tables (non-Iceberg) and Iceberg tables.

For more information, see Using Snowpipe Streaming Classic with Apache Iceberg™ tables.

Latency

Snowpipe Streaming automatically flushes data within channels every one second. You do not need to close the channel for data to be flushed.

With Snowflake Ingest SDK versions 2.0.4 and later, you can configure the latency by using the option MAX_CLIENT_LAG.

  • For standard Snowflake tables (non-Iceberg), the default MAX_CLIENT_LAG is 1 second.

  • For Iceberg tables (supported by Snowflake Ingest SDK versions 3.0.0 and later), the default MAX_CLIENT_LAG is 30 seconds.

The maximum latency can be set up to 10 minutes. For more information, see MAX_CLIENT_LAG and recommended latency configurations for Snowpipe Streaming.

Note that the Kafka connector for Snowpipe Streaming has its own buffer. After the Kafka buffer flush time is reached, data will be sent with one second latency to Snowflake through Snowpipe Streaming. For more information, see buffer.flush.time.

Migration to optimized files in the classic architecture

The API writes the rows from channels into blobs in cloud storage, which are then committed to the target table. Initially, the streamed data written to a target table is stored in a temporary intermediate file format. At this stage, the table is considered a “mixed table” because partitioned data is stored in a mixture of native and intermediary files. An automated background process migrates data from the active intermediate files to native files that are optimized for query and DML operations as needed.

Replication in the classic architecture

Snowpipe streaming supports the replication and failover of Snowflake tables populated by Snowpipe Streaming and its associated channel offsets from a source account to a target account in different regions and across cloud platforms.

Replication is not supported for the high-performance architecture.

For more information, see Replication and Snowpipe Streaming.

Insert-only operations

The API is currently limited to inserting rows. To modify, delete, or combine data, write the “raw” records to one or more staging tables. Merge, join, or transform the data by using continuous data pipelines to insert modified data into destination reporting tables.

Classes and interfaces

For documentation on the classes and interfaces for the classic architecture, see Snowflake Ingest SDK API.

For the differences betwen the classic and high-performance architectures, see API differences.

Supported Java data types

The following table summarizes which Java data types are supported for ingestion into Snowflake columns:

Snowflake column type

Allowed Java data type

  • CHAR

  • VARCHAR

  • String

  • primitive data types (int, boolean, char, …)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • String (hex-encoded)

  • NUMBER

  • numeric types (BigInteger, BigDecimal, byte, int, double, …)

  • String

  • FLOAT

  • numeric types (BigInteger, BigDecimal, byte, int, double, …)

  • String

  • BOOLEAN

  • boolean

  • numeric types (BigInteger, BigDecimal, byte, int, double, …)

  • String

See boolean conversion details.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • String

    • Integer-stored time

    • HH24:MI:SS.FFTZH:TZM (for example, 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (for example, 20:57:01.123456789)

    • HH24:MI:SS (for example, 20:57:01)

    • HH24:MI (for example, 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Integer-stored date

    • YYYY-MM-DD (for example, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (for example, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (for example, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (for example, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (for example, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (for example, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (for example, 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Integer-stored timestamp

    • YYYY-MM-DD (for example, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (for example, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (for example, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (for example, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (for example, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (for example, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (for example, 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • String (must be a valid JSON)

  • primitive data types and their arrays

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T> where T is a valid VARIANT type

  • T[] where T is a valid VARIANT type

  • List<T> where T is a valid VARIANT type

  • OBJECT

  • String (must be a valid JSON object)

  • Map<String, T> where T is a valid variant type

  • GEOGRAPHY

  • Not supported in the classic architecture, but it’s supported in the high-performance architecture.

  • GEOMETRY

  • Not supported in the classic architecture, but it’s supported in the high-performance architecture.

Required access privileges

Calling the Snowpipe Streaming API requires a role with the following privileges:

Object

Privilege

Table

OWNERSHIP or a minimum of INSERT and EVOLVE SCHEMA (only required when using schema evolution for Kafka connector with Snowpipe Streaming)

Database

USAGE

Schema

USAGE

Pipe

OWNERSHIP (Required only for the high-performance architecture)

Limitations

For Snowpipe Streaming Classic, consider the following:

  • Snowpipe Streaming only supports using 256-bit AES keys for data encryption.

  • If Automatic Clustering is also enabled on the same table that Snowpipe Streaming is inserting into, compute costs for file migration might be reduced. For more information, see Snowpipe Streaming Classic best practices.

  • The following objects or types are not supported:

    • The GEOGRAPHY and GEOMETRY data types

    • Columns with collations

    • TEMPORARY tables

    • The total number of channels per table can’t exceed 10k. We recommend reusing channels when needed. Contact Snowflake Support if you need to open more than 10k channels per table.

The high-performance architecture has other considerations and limitations compared to the classic architecture. For more information, see the high-performance architecture limitations.