Using Snowflake Connector for Kafka With Snowpipe Streaming¶
Optionally replace Snowpipe with Snowpipe Streaming in your data loading chain from Kafka. When the specified flush buffer threshold (time or memory or number of messages) is reached, the connector calls the Snowpipe Streaming API (“API”) to write rows of data to Snowflake tables, unlike Snowpipe, which writes data from temporary staged files. This architecture results in lower load latencies, with corresponding lower costs for loading similar volumes of data.
Version 1.9.1 (or higher) of the Kafka connector is required for use with Snowpipe Streaming. The Kafka connector with Snowpipe Streaming includes the Snowflake Ingest SDK and supports streaming rows from Apache Kafka topics directly into target tables.
Note
The Kafka connector with Snowpipe Streaming currently doesn’t support schema detection or schema evolution. It uses the same Schema of Tables as the one used with Snowpipe.
Minimum Required Version¶
Kafka connector version 1.9.1 (or later) supports Snowpipe Streaming.
Kafka Configuration Properties¶
Save your connection settings in the Kafka connector properties file. For more information, see Configuring the Kafka Connector.
Required Properties¶
Add or edit your connection settings in the Kafka connector properties file. For more information, see Configuring the Kafka Connector.
snowflake.ingestion.method
Required only if using the Kafka connector as the streaming ingest client. Specifies whether to use Snowpipe Streaming or standard Snowpipe to load your Kafka topic data. The supported values are as follows:
SNOWPIPE_STREAMING
SNOWPIPE
(default)
No additional settings are required to choose the backend service to queue and load topic data. Configure additional properties in your Kafka connector properties file as usual.
snowflake.role.name
Access control role to use when inserting the rows into the table.
Buffer and Polling Properties¶
buffer.flush.time
Number of seconds between buffer flushes, with each flush resulting in insert operations for the buffered records. The Kafka connector calls the Snowpipe Streaming API (“API”) once after each flush.
The minimum value supported for the
buffer.flush.time
property is1
(in seconds). For higher average data flow rates, we suggest that you decrease the default value for improved latency. If cost is a greater concern than latency, you could increase the buffer flush time. Be careful to flush the Kafka memory buffer before it becomes full to avoid out of memory exceptions.- Values
1
- No upper limit.- Default
10
buffer.count.records
Number of records buffered in memory per Kafka partition before ingesting to Snowflake.
- Values
1
- No upper limit.- Default
10000
buffer.size.bytes
Cumulative size in bytes of records buffered in memory per the Kafka partition before they are ingested in Snowflake as data files.
The records are compressed when they are written to data files. As a result, the size of the records in the buffer may be larger than the size of the data files created from the records.
- Values
1
- No upper limit.- Default
20000000
(20 MB)
In addition to the Kafka connector properties, note the Kafka consumer max.poll.records
property, which controls the maximum number of records returned by Kafka to Kafka Connect in a single poll. The default value of 500
can be increased, but be mindful of memory constraints. For more information about this property, see the documentation for your Kafka package:
Error Handling and DLQ Properties¶
errors.tolerance
Specifies how to handle errors encountered by the Kafka connector:
This property supports the following values:
NONE
Stop loading data when the first error is encountered.
ALL
Ignore all errors and continue to load data.
- Default
NONE
errors.log.enable
Specifies whether or not to write error messages to the Kafka Connect log file.
This property supports the following values:
TRUE
Write error messages.
FALSE
Do not write error messages.
- Default
FALSE
errors.deadletterqueue.topic.name
Specifies the name of the DLQ (dead-letter queue) topic name in Kafka for delivering messages to Kafka that could not be ingested into Snowflake tables. For more information, see Dead-letter Queues (in this topic).
- Values
Custom text string
- Default
None.
Exactly-Once Semantics¶
Exactly-once semantics ensures the delivery of Kafka messages without duplication or data loss. This delivery guarantee is set by default for the Kafka connector with Snowpipe Streaming.
The Kafka Connector adopts a one-to-one mapping between partition and channel and utilizes two distinct offsets:
Consumer offset: This tracks the most recent offset consumed by the consumer and is managed by Kafka.
Offset token: This tracks the most recent committed offset in Snowflake and is managed by Snowflake.
The Kafka connector achieves exactly-once delivery by implementing the following best practices:
Opening/reopening a channel:
When opening or reopening a channel for a given partition, the Kafka Connector uses the latest committed offset token retrieved from Snowflake through the
getLatestCommittedOffsetToken
API as the source of truth and resets the consumer offset in Kafka accordingly.If the consumer offset is no longer within the data retention period, an exception is thrown, and you can decide on the appropriate action to take.
The only scenario in which the Kafka Connector does not reset the consumer offset in Kafka and uses it as the source of truth is when the offset token from Snowflake is NULL. In this case, the connector accepts the offset sent by Kafka, and the offset token is subsequently updated.
Processing records:
To ensure an additional layer of safety against non-continuous offsets that could arise from potential bugs in Kafka, Snowflake maintains an in-memory variable that tracks the latest processed offset. Snowflake only accepts rows if the current row’s offset equals the latest processed offset plus one, thereby adding an extra layer of protection to ensure that the ingestion process is continuous and accurate.
Dealing with exceptions, failures, crashes recovery:
As part of the recovery process, Snowflake consistently adheres to the channel open/reopen logic outlined earlier by reopening the channel and resetting the consumer offset with the latest committed offset token. By doing so, Kafka is signaled to send the data from the offset value that is one greater than the latest committed offset token, enabling us to resume ingestion from the point of failure with minimal to no data loss.
Implementing a retry mechanism:
To account for potential transient issues, Snowflake incorporates a retry mechanism in the API calls. Snowflake retries these API calls multiple times to increase the chances of success and mitigate the risk of intermittent failures affecting the ingestion process.
Advancing the consumer offset:
At regular intervals, Snowflake advances the consumer offset using the latest committed offset token to ensure that the ingestion process is continuously aligned with the latest state of data in Snowflake.
Converters¶
The Kafka connector with Snowpipe Streaming does not support the following key.converter
or value.converter
values:
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
The custom Snowflake converters handle errors that prevent data from loading by moving files to the table stage. This workflow conflicts with the Snowpipe Streaming Dead-letter Queues.
Dead-letter Queues¶
The Kafka connector with Snowpipe Streaming supports dead-letter queues (DLQ) for broken records or records that cannot be processed successfully due to a failure.
For more information on monitoring, see the Apache Kafka documentation.
Billing & Usage¶
For Snowpipe Streaming billing information, see Snowpipe Streaming Costs.