Best practices for Snowpipe Streaming with high-performance architecture¶

This guide outlines key best practices to design and implement robust data ingestion pipelines by using Snowpipe Streaming with high-performance architecture. By following these best practices, you ensure that your pipelines are durable, reliable, and have efficient error handling.

Manage channels strategically¶

Apply the following channel-management strategies for performance and long-term stability:

  • Use long-lived channels: To minimize overhead, open a channel once, and then keep it active for the duration of the ingestion task. Avoid repeatedly opening and closing channels.

  • Use deterministic channel names: Apply a consistent, predictable naming convention — for example, source-env-region-client-id — to simplify troubleshooting and facilitate automated recovery processes.

  • Scale out with multiple channels: To increase throughput, open multiple channels. These channels can point to a single target pipe or to multiple pipes, depending on service limits and your throughput requirements.

  • Monitor channel status: Regularly use the getChannelStatus method to monitor the health of your ingestion channels.

    • Track the last_committed_offset_token to verify that data is being ingested successfully and that the pipeline is making progress.

    • Monitor the row_error_count to detect bad records or other ingestion issues early.

Validate the schema consistently¶

Ensure that incoming data conforms to the expected table schema to prevent ingestion failures and maintain data integrity:

  • Client-side validation: Implement schema validation on the client side to provide immediate feedback and reduce server-side errors. Although full row-by-row validation offers maximum safety, a method that performs better might involve selective validation; for example, at batch boundaries or by sampling rows.

  • Server-side validation: The high-performance architecture can offload schema validation to the server. Errors and their counts are reported through getChannelStatus if schema mismatches occur during ingestion into the target pipe and table.

Persist state for reliable recovery¶

To prevent data loss or duplication, your application must persist its state to handle restarts and failures gracefully:

  • Persist the offset token: After each successful API call, persist the last_committed_offset_token to durable storage.

  • Resume from the last point: On application restart, fetch the last committed token from Snowflake and resume ingestion from that precise point. This guarantees exactly once processing and ensures continuity.

Add client-side metadata columns¶

To enable robust error detection and recovery, you must carry ingestion metadata as part of the row payload. This requires planning your data shape and PIPE definition in advance.

Add the following columns to your row payload before ingestion:

  • CHANNEL_ID (For example, a compact INTEGER.)

  • STREAM_OFFSET (A BIGINT that is monotonically increasing per channel, such as a Kafka partition offset.)

Together, these columns uniquely identify records per channel and enable you to trace the data’s origin.

Optional: Add a PIPE_ID column if multiple pipes ingest into the same target table. If you do this, you can easily trace rows back to their ingestion pipeline. You can store descriptive pipe names in a separate lookup table, mapping them to compact integers to reduce storage costs.

Detect and recover from errors using metadata offsets¶

Combine channel monitoring with your metadata columns to detect and recover from issues:

  • Monitor status: Regularly check getChannelStatus. An increasing row_error_count is a strong indicator of a potential problem.

  • Detect missing records: If errors are detected, use a SQL query to identify missing or out-of-order records by checking for gaps in your STREAM_OFFSET sequence.

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

Optimize ingestion performance and cost with MATCH_BY_COLUMN_NAME¶

Configure your pipe to map the necessary columns from your source data instead of ingesting all data into a single VARIANT column. To do this, use MATCH_BY_COLUMN_NAME = CASE_SENSITIVE or apply transformations in your pipe definition. This best practice not only optimizes your ingestion costs but also enhances the overall performance of your streaming data pipeline.

This best practice has the following important advantages:

  • By using MATCH_BY_COLUMN_NAME = CASE_SENSITIVE, you’re only billed for the data values that are ingested into your target table. In contrast, ingesting data into a single VARIANT column bills you for all JSON bytes, including both the keys and the values. For data with verbose or numerous JSON keys, this can lead to a significant and unnecessary increase in your ingestion costs.

  • Snowflake’s processing engine is more computationally efficient. Instead of parsing the entire JSON object into a VARIANT, and then extracting the required columns, this method directly extracts the necessary values.