Working with the Snowflake High Performance connector for Kafka¶
This topic describes how the connector works with tables and pipes, and how to configure the connector with these elements.
How the connector works with tables and pipes¶
The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
By default you don’t have to create a table or pipe before ingestion is begins.
The connector creates a table with columns matching the JSON keys, and relies on the default pipe named {tableName}-STREAMING
which will automatically map the record content’s first-level keys to table columns matching by name (case-insensitive).
You can also create your own table with columns matching the JSON keys.
The connector tries to match the record content’s first-level keys to the table columns by name.
If keys from the JSON do not match the table columns, the connector ignores the keys.
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
If you choose to create your own pipe, you can define the data transformation logic in the pipe’s COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
or
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
When you define your own pipe your destination table columns need not need match the JSON keys. You can rename the columns to your desired names and cast the data types if required.
Topic names, table names, and pipe names¶
Depending on the configuration settings, the connector will use different names for the destination table. The destination table name is always derived from the topic name.
How the connector maps topic names to the destination table¶
The Kafka connector provides two modes for mapping Kafka topic names to Snowflake table names:
Static mapping: The connector derives destination table names using only Kafka topic name.
Explicit topic-to-table mapping mode: You specify custom mappings between topics and tables using the
snowflake.topic2table.mapconfiguration parameter
Static mapping¶
If you do not configure the snowflake.topic2table.map parameter, the connector always derives the table names from the topic name.
Table name generation:
The connector derives the destination table name from the topic name using the following rules:
If the topic name is a valid Snowflake identifier the connector uses the topic name as the destination table name, converted to uppercase).
If the topic name contains invalid characters, the connector:
Replaces invalid characters with underscores
Appends an underscore followed by a hash code to ensure uniqueness
For example, the topic
my-topic.databecomesMY_TOPIC_DATA_<hash>
Pipe name determination:
The connector determines which pipe to use based on the following logic:
The connector checks if a pipe exists with the same name as the destination table name.
If a user-created pipe with that name exists, the connector uses that pipe (user-defined pipe mode).
If not, the connector uses the default pipe named
{tableName}-STREAMING
Note
Snowflake recommends choosing topic names that follow the rules for Snowflake identifier names to ensure predictable table names.
Understanding RECORD_METADATA¶
The connector populates the RECORD_METADATA structure with metadata about the Kafka record. This metadata is sent through the Snowpipe Streaming data source to Snowflake, where it becomes available in pipe transformations using the $1:RECORD_METADATA accessor. RECORD_METADATA structure is available in both user-defined pipe and default pipe modes. Its content can be saved to the column of type VARIANT, or individual fields can be extracted and saved to separate columns.
Example pipe with transformations and metadata:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
In this example:
The pipe extracts specific fields from the Kafka message (order_id, customer_name, order_total)
It also captures metadata fields (topic, offset, and ingestion timestamp)
The values can be cast and/or transformed as needed
How metadata fields are populated¶
The connector automatically populates metadata fields based on the Kafka record properties and connector configuration. You can control which metadata fields are included using these configuration parameters:
snowflake.metadata.topic(default: true) - Includes the topic namesnowflake.metadata.offset.and.partition(default: true) - Includes offset and partitionsnowflake.metadata.createtime(default: true) - Includes the Kafka record timestampsnowflake.metadata.all(default: true) - Includes all available metadata
When snowflake.metadata.all=true (the default), all metadata fields are populated. Setting individual metadata flags to false excludes those specific fields from the RECORD_METADATA structure.
Note
The SnowflakeConnectorPushTime field is always available and represents the time when the connector pushed the record into the ingestion buffer. This is useful for calculating end-to-end ingestion latency.
The RECORD_METADATA structure contains the following information by default:
Field |
Data Type |
Description |
|---|---|---|
topic |
String |
The name of the Kafka topic that the record came from. |
partition |
String |
The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.) |
offset |
number |
The offset in that partition. |
CreateTime / . LogAppendTime |
number |
This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: Kafka ProducerRecord documentation. |
SnowflakeConnectorPushTime |
number |
A timestamp when a record was pushed into an Ingest SDK buffer. The value is the number of milliseconds since midnight January 1, 1970, UTC. For more information, see Estimating ingestion latency. |
key |
String |
If the message is a Kafka KeyedMessage, this is the key for that message.
In order for the connector to store the key in the RECORD_METADATA, the |
headers |
Object |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
The amount of metadata recorded in the RECORD_METADATA column is configurable using optional Kafka configuration properties.
The field names and values are case-sensitive.
How Kafka records are converted before ingestion¶
Before each row is handed over to Snowpipe Streaming, the connector converts the Kafka Connect record value into a Map<String, Object> whose keys must match your target column names (or can be transformed inside a user-defined pipe). Primitive strings, byte arrays, or numbers must be wrapped (for example by using the HoistField SMT) so that the connector receives a structured object. The converter applies the following rules:
Null values are treated as tombstones. They are skipped when
behavior.on.null.values=IGNOREor ingested as empty JSON objects otherwise.Numeric and boolean fields are passed through as-is. Decimal values whose precision is greater than 38 are serialized as strings to stay within Snowflake’s
NUMBERlimits.byte[]andByteBufferpayloads are Base64-encoded strings, so store them inVARIANTorVARCHARcolumns.Arrays remain arrays, and nested objects remain nested maps. Declare
VARIANTcolumns when you rely on the default pipe to land nested data as-is.Maps with non-string keys are emitted as arrays of
[key, value]pairs because Snowflake column names must be text.Record headers and keys are copied into
RECORD_METADATAwhenever the relevant metadata flags are enabled.
If you need the entire message body preserved as a single column, wrap it into a new top-level field using SMTs. See Legacy RECORD_CONTENT column for the transformation pattern.
User-defined pipe mode vs default pipe mode¶
The connector supports two modes for managing data ingestion:
User-defined pipe mode¶
In this mode, you have full control over data transformation and column mapping.
When to use this mode:
You need custom column names that differ from JSON field names
You need to apply data transformations (type casting, masking, filtering)
You want full control over how data is mapped to columns
Default pipe mode¶
In this mode, the connector uses a default pipe named {tableName}-STREAMING and maps kafka record fields to table columns matching by name (case-insensitive).
When to use this mode:
Your kafka record key names match your desired column names
You don’t need custom data transformations
You want a simple configuration
Mapping kafka record keys to table columns with default pipe mode
When using default pipe mode, the connector uses default pipe named {tableName}-STREAMING and maps content’s first-level keys directly to table columns using case-insensitive matching.
Using default pipe mode - example¶
Example 1:¶
Consider the following kafka record content payload:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
You create a table with columns matching the JSON keys (case-insensitive, including special characters):
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
Matching behavior:
"city"(kafka) →cityorCITYorCity(column) - case insensitive"has cat"(kafka) →"has cat"(column) - must be quoted due to space"!@&$#* includes special characters"(kafka) →"!@&$#* includes special characters"(column) - special characters preservedNested objects like
skillsandfamilymap to VARIANT columns automatically
Using user-defined pipe mode - examples¶
This example shows how to configure and use user-defined pipes with custom data transformations.
Example 1:¶
Create a table with your desired schema:
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
Create a pipe that transforms the incoming Kafka records to match your table schema:
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Note that the pipe name (ORDERS) matches the table name (ORDERS). The pipe definition extracts fields from the JSON payload using $1:field_name syntax and maps them to the table columns.
Note
You can access nested JSON fields and fields with special characters using bracket notation, such as $1['field name'] or $1['has cat'].
Configure topic to table mapping:
snowflake.topic2table.map=kafka-orders-topic:ORDERS
This configuration maps the Kafka topic kafka-orders-topic to the pre-existing table and pipe named ORDERS.
Example 2:¶
When you need to access keys in the content that do not have conventional names use the following syntax:
Simple fields:
$1:field_nameFields with spaces or special characters:
$1['field name']or$1['has cat']Fields with unicode characters:
$1[' @&$#* has Łułósżź']Nested fields:
$1:parent.childor$1:parent['child field']
Consider this JSON payload from Kafka:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
You create a destination table with your chosen column names:
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
Then create a pipe with the same name that defines the mapping:
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Key points:
You control column names (e.g., renaming
"has cat"tohas_cat)You can cast data types as needed (e.g.,
$1:age::NUMBER)You can include or exclude fields as desired
You can add metadata fields (e.g.,
$1:RECORD_METADATA.topic)VARIANT columns automatically handle nested JSON structures
Example 3: With interactive tables¶
Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. You can find out more about interactive tables in the interactive tables documentation.
Create an interactive table:
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Configure topic to table mapping:
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
Important considerations:
Interactive tables have specific limitations and query restrictions. Review the interactive tables documentation before using them with the connector.
For interactive tables, any required transformations must be handled in the table definition.
Interactive warehouses are required to query interactive tables efficiently.
Explicit topic-to-table mapping¶
When you configure the snowflake.topic2table.map parameter, the connector operates in explicit mapping mode. This mode allows you to:
Map multiple Kafka topics to a single Snowflake table
Use custom table names that differ from topic names
Apply regex patterns to match multiple topics
Configuration format:
The snowflake.topic2table.map parameter accepts a comma-separated list of topic-to-table mappings in the format:
topic1:table1,topic2:table2,topic3:table3
Example configurations:
Direct topic mapping
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Regex pattern matching
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
This configuration maps all topics ending with _cat (such as orange_cat, calico_cat) to the CAT_TABLE table, and all topics ending with _dog to the DOG_TABLE table.
Many topics to one table
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
This configuration maps both topic1 and topic2 to shared_table, while topic3 maps to other_table.
Important
Regex patterns in the mapping cannot overlap. Each topic must match at most one pattern.
Table names in the mapping must be valid Snowflake identifiers with at least 2 characters, starting with a letter or underscore.
You can map multiple topics to a single table.
Legacy RECORD_CONTENT column¶
In prior versions of the connector (3.x and earlier), when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in a column of type VARIANT. The RECORD_METADATA column continues to be supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:
Note
This pipe definition will not work without additional SMT transformations.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
If you need entire Kafka message content saved to a single column, or you need a handle to the entire Kafka message content in a PIPE transformation, you can use the following SMT transformation that wraps the entire Kafka message content into your desired custom field:
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
This transformation will wrap the entire Kafka message content into a custom field named your_top_level_field_name. You can then access the entire Kafka message content using the $1:your_top_level_field_name accessor in your PIPE transformation.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Alternatively, if you want to save both the entire metadata and content to a single table using the default pipe, do not create a custom pipe; instead, create only a table with two columns: RECORD_CONTENT and your_top_level_field_name.
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
To read more about the HoistField$Value transformation, see the Kafka documentation.
Warning
Saving the entire Kafka message content and metadata to a table can negatively impact your ingestion cost, pipeline speed and latency. If you need the best possible performace, consider saving only the data you need if it is accessible from the top-level of the Kafka record content, or use SMT transformations to extract the data from deeply nested fields to top-level fields.
Handling streaming channel errors and dead-letter queues¶
The connector inspects the Snowpipe Streaming channel status before committing offsets in Kafka. If the connector detects that the rowsErrorCount property on channel has increased since the connector was started, it raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues don’t go unnoticed. To allow ingestion to continue while triaging bad rows, set errors.tolerance=all
errors.tolerance=all
Schema evolution¶
For tables with ENABLE_SCHEMA_EVOLUTION=TRUE, the connector automatically evolves their schema, based on the incoming Kafka records. All connector created tables default to ENABLE_SCHEMA_EVOLUTION=TRUE.
Schema evolution is limited to the following operations:
Adding new columns. The connector will add new columns to the table if the incoming Kafka records contain new fields that are not present in the table.
Dropping the NOT NULL constraint from columns that are missing data in the inserted records
Using the connector with Apache Iceberg™ tables¶
The connector can ingest data into a Snowflake-managed Apache Iceberg™ tables but must meeting the following requirements:
You must have been granted the USAGE privilege on the external volume associated with your Apache Iceberg™ table.
You must create an Apache Iceberg™ table before running the connector.
Grant usage on an external volume¶
To grant USAGE privilege on the external volume associated with your Apache Iceberg™ table to your role for the Kafka connector, run the following statement:
For example, if your Iceberg table uses the kafka_external_volume external volume
and the connector uses the role kafka_connector_role, run the following statement:
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Create an Apache Iceberg™ table for ingestion¶
The connector does not create Iceberg tables automatically and does not support schema evolution. Before you run the connector, you must create an Iceberg table manually.
When you create an Iceberg table, you can use Iceberg data types (including VARIANT) or compatible Snowflake types.
For example, consider the following message:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"options":
{
"can_walk": true,
"can_talk": false
},
"date_added": "2024-10-15"
}
To create an Iceberg table for the example message, use one of the following statements:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id number(38,0), name varchar, body_temperature number(4,2), approved_coffee_types array(varchar), animals_possessed variant, options object(can_walk boolean, can_talk boolean), date_added date ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table' ICEBERG_VERSION = 3;CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, name string, body_temperature float, approved_coffee_types array(string), animals_possessed variant, date_added date, options object(can_walk boolean, can_talk boolean), ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table' ICEBERG_VERSION = 3;
Note
Field names inside nested structures such as dogs or cats are case sensitive.