Using Snowflake Connector for Kafka With Snowpipe Streaming

Optionally replace Snowpipe with Snowpipe Streaming in your data loading chain from Kafka. When the specified flush buffer threshold (time or memory or number of messages) is reached, the connector calls the Snowpipe Streaming API (“API”) to write rows of data to Snowflake tables, unlike Snowpipe, which writes data from temporary staged files. This architecture results in lower load latencies, with corresponding lower costs for loading similar volumes of data.

Version 1.9.1 (or higher) of the Kafka connector is required for use with Snowpipe Streaming. The Kafka connector with Snowpipe Streaming includes the Snowflake Ingest SDK and supports streaming rows from Apache Kafka topics directly into target tables.

Snowpipe Streaming with Kafka connector

Note

The Kafka connector with Snowpipe Streaming currently doesn’t support schema detection or schema evolution. It uses the same Schema of Tables as the one used with Snowpipe.

Minimum Required Version

Kafka connector version 1.9.1 (or later) supports Snowpipe Streaming.

Kafka Configuration Properties

Save your connection settings in the Kafka connector properties file. For more information, see Configuring the Kafka Connector.

Required Properties

Add or edit your connection settings in the Kafka connector properties file. For more information, see Configuring the Kafka Connector.

snowflake.ingestion.method

Required only if using the Kafka connector as the streaming ingest client. Specifies whether to use Snowpipe Streaming or standard Snowpipe to load your Kafka topic data. The supported values are as follows:

  • SNOWPIPE_STREAMING

  • SNOWPIPE (default)

No additional settings are required to choose the backend service to queue and load topic data. Configure additional properties in your Kafka connector properties file as usual.

snowflake.role.name: Access control role to use when inserting the rows into the table.

Buffer and Polling Properties

buffer.flush.time

Number of seconds between buffer flushes, with each flush resulting in insert operations for the buffered records. The Kafka connector calls the Snowpipe Streaming API (“API”) once after each flush.

The minimum value supported for the buffer.flush.time property is 1 (in seconds). For higher average data flow rates, we suggest that you decrease the default value for improved latency. If cost is a greater concern than latency, you could increase the buffer flush time. Be careful to flush the Kafka memory buffer before it becomes full to avoid out of memory exceptions.

Values

1 - No upper limit.

Default

10

buffer.count.records

Number of records buffered in memory per Kafka partition before ingesting to Snowflake.

Values

1 - No upper limit.

Default

10000

buffer.size.bytes

Cumulative size in bytes of records buffered in memory per the Kafka partition before they are ingested in Snowflake as data files.

The records are compressed when they are written to data files. As a result, the size of the records in the buffer may be larger than the size of the data files created from the records.

Values

1 - No upper limit.

Default

20000000 (20 MB)

In addition to the Kafka connector properties, note the Kafka consumer max.poll.records property, which controls the maximum number of records returned by Kafka to Kafka Connect in a single poll. The default value of 500 can be increased, but be mindful of memory constraints. For more information about this property, see the documentation for your Kafka package:

Error Handling and DLQ Properties

errors.tolerance

Specifies how to handle errors encountered by the Kafka connector:

This property supports the following values:

NONE

Stop loading data when the first error is encountered.

ALL

Ignore all errors and continue to load data.

Default

NONE

errors.log.enable

Specifies whether or not to write error messages to the Kafka Connect log file.

This property supports the following values:

TRUE

Write error messages.

FALSE

Do not write error messages.

Default

FALSE

errors.deadletterqueue.topic.name

Specifies the name of the DLQ (dead-letter queue) topic name in Kafka for delivering messages to Kafka that could not be ingested into Snowflake tables. For more information, see Dead-letter Queues (in this topic).

Values

Custom text string

Default

None.

Exactly-Once Semantics

Exactly-once semantics ensures the delivery of Kafka messages without duplication or data loss. This delivery guarantee is set by default for the Kafka connector with Snowpipe Streaming.

Converters

The Kafka connector with Snowpipe Streaming does not support the following key.converter or value.converter values:

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

The custom Snowflake converters handle errors that prevent data from loading by moving files to the table stage. This workflow conflicts with the Snowpipe Streaming Dead-letter Queues.

Dead-letter Queues

The Kafka connector with Snowpipe Streaming supports dead-letter queues (DLQ) for broken records or records that cannot be processed successfully due to a failure.

For more information on monitoring, see the Apache Kafka documentation.

Billing & Usage

For Snowpipe Streaming billing information, see Billing & Viewing the Data Load History for Your Account.

Note that the “Cost Optimization” information in the same topic does not apply to the Kafka connector due to constraints in the Kafka Connect architecture.