Using Snowflake Connector for Kafka With Snowpipe Streaming¶
Optionally replace Snowpipe with Snowpipe Streaming in your data loading chain from Kafka. When the specified flush buffer threshold (time or memory or number of messages) is reached, the connector calls the Snowpipe Streaming API (“API”) to write rows of data to Snowflake tables, unlike Snowpipe, which writes data from temporary staged files. This architecture results in lower load latencies, with corresponding lower costs for loading similar volumes of data.
Version 1.9.1 (or higher) of the Kafka connector is required for use with Snowpipe Streaming. The Kafka connector with Snowpipe Streaming includes the Snowflake Ingest SDK and supports streaming rows from Apache Kafka topics directly into target tables.
Note
The Kafka connector with Snowpipe Streaming currently doesn’t support schema detection or schema evolution. It uses the same Schema of Tables as the one used with Snowpipe.
Minimum Required Version¶
Kafka connector version 1.9.1 (or later) supports Snowpipe Streaming.
Kafka Configuration Properties¶
Save your connection settings in the Kafka connector properties file. For more information, see Configuring the Kafka Connector.
Required Properties¶
Add or edit your connection settings in the Kafka connector properties file. For more information, see Configuring the Kafka Connector.
snowflake.ingestion.method
Required only if using the Kafka connector as the streaming ingest client. Specifies whether to use Snowpipe Streaming or standard Snowpipe to load your Kafka topic data. The supported values are as follows:
SNOWPIPE_STREAMING
SNOWPIPE
(default)
No additional settings are required to choose the backend service to queue and load topic data. Configure additional properties in your Kafka connector properties file as usual.
snowflake.role.name
: Access control role to use when inserting the rows into the table.
Buffer and Polling Properties¶
buffer.flush.time
Number of seconds between buffer flushes, with each flush resulting in insert operations for the buffered records. The Kafka connector calls the Snowpipe Streaming API (“API”) once after each flush.
The minimum value supported for the
buffer.flush.time
property is1
(in seconds). For higher average data flow rates, we suggest that you decrease the default value for improved latency. If cost is a greater concern than latency, you could increase the buffer flush time. Be careful to flush the Kafka memory buffer before it becomes full to avoid out of memory exceptions.- Values
1
- No upper limit.- Default
10
buffer.count.records
Number of records buffered in memory per Kafka partition before ingesting to Snowflake.
- Values
1
- No upper limit.- Default
10000
buffer.size.bytes
Cumulative size in bytes of records buffered in memory per the Kafka partition before they are ingested in Snowflake as data files.
The records are compressed when they are written to data files. As a result, the size of the records in the buffer may be larger than the size of the data files created from the records.
- Values
1
- No upper limit.- Default
20000000
(20 MB)
In addition to the Kafka connector properties, note the Kafka consumer max.poll.records
property, which controls the maximum number of records returned by Kafka to Kafka Connect in a single poll. The default value of 500
can be increased, but be mindful of memory constraints. For more information about this property, see the documentation for your Kafka package:
Error Handling and DLQ Properties¶
errors.tolerance
Specifies how to handle errors encountered by the Kafka connector:
This property supports the following values:
NONE
Stop loading data when the first error is encountered.
ALL
Ignore all errors and continue to load data.
- Default
NONE
errors.log.enable
Specifies whether or not to write error messages to the Kafka Connect log file.
This property supports the following values:
TRUE
Write error messages.
FALSE
Do not write error messages.
- Default
FALSE
errors.deadletterqueue.topic.name
Specifies the name of the DLQ (dead-letter queue) topic name in Kafka for delivering messages to Kafka that could not be ingested into Snowflake tables. For more information, see Dead-letter Queues (in this topic).
- Values
Custom text string
- Default
None.
Exactly-Once Semantics¶
Exactly-once semantics ensures the delivery of Kafka messages without duplication or data loss. This delivery guarantee is set by default for the Kafka connector with Snowpipe Streaming.
Converters¶
The Kafka connector with Snowpipe Streaming does not support the following key.converter
or value.converter
values:
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
The custom Snowflake converters handle errors that prevent data from loading by moving files to the table stage. This workflow conflicts with the Snowpipe Streaming Dead-letter Queues.
Dead-letter Queues¶
The Kafka connector with Snowpipe Streaming supports dead-letter queues (DLQ) for broken records or records that cannot be processed successfully due to a failure.
For more information on monitoring, see the Apache Kafka documentation.
Billing & Usage¶
For Snowpipe Streaming billing information, see Billing & Viewing the Data Load History for Your Account.
Note that the “Cost Optimization” information in the same topic does not apply to the Kafka connector due to constraints in the Kafka Connect architecture.