Using the Snowflake Connector for Kafka with Apache Iceberg™ tables¶
Beginning with version 3.0.0, the Snowflake Connector for Kafka can ingest data into a Snowflake-managed Apache Iceberg™ table.
Requirements and limitations¶
Before configuring the connector for Iceberg table ingestion, note the following requirements and limitations:
Iceberg table ingestion requires version 3.0.0 or later of the Kafka connector.
Iceberg table ingestion is supported by the Kafka connector with Snowpipe Streaming. It’s not supported by the Kafka connector with Snowpipe.
Iceberg table ingestion is not supported when
snowflake.streaming.enable.single.buffer
is set tofalse
.You must create an Iceberg table before running the connector. Refer to the Configuration and setup in this topic for detailed schema.
Schema evolution limitations¶
Schema evolution feature for Iceberg is fully supported for schematized data types like AVRO or Protobuf.
For plain JSON without schema, the following messages are handled as invalid and sent to DLQ:
Messages with a new column if the corresponding value is
null
or[]
Messages with a new field in structured object if the corresponding value is
null
or[]
Snowflake table schema can be changed manually using the ALTER TABLE statement to successfully ingest the previous messages.
Configuration and setup¶
To configure the Kafka connector for Iceberg table ingestion, follow the regular setup steps for a Snowpipe Streaming-based connector with a few differences noted in this section.
Grant usage on an external volume¶
In comparison to a regular Snowpipe Streaming setup, one more grant is required for a role used by the Kafka connector.
For example, if iceberg table is going to use kafka_external_volume
and connector is going to use role kafka_connector_role
, run the following statement:
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Create an Iceberg table for ingestion¶
Before you run the connector, you must create an Iceberg table. The initial schema depends on the connector snowflake.enable.schematization
settings.
When the schematization is enabled, create a table with a column named record_metadata
:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
The connector automatically creates the record_content
column and alters the record_metadata
column schema.
When the schematization is not enabled, create a table with a column named record_content
of a type that matches the actual Kafka message content.
The connector automatically creates the record_metadata
column.
When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
For example, consider the following message:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
To create an Iceberg table for the example message, use the following statement:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( record_content OBJECT( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Note that field names inside nested structures for example, dogs
or cats
are case sensitive.
Configuration properties¶
snowflake.streaming.iceberg.enabled
Specifies whether the connector ingests data into an Iceberg table. The connector fails if this property does not match the actual table type.
- Values:
true
false
- Default:
false