Creating an interactive table using Snowpipe Streaming for ingestion

This topic explains how to use interactive tables for streaming ingestion instead of batch ingestion. For general information about interactive tables and interactive warehouses, see Interaktive Tabellen und interaktive Warehouses in Snowflake.

If your data pipeline uses Snowpipe Streaming to ingest data from a source such as Apache Kafka, you can bring that data into an interactive table.

To use Snowpipe Streaming as the source for an interactive table, follow these steps:

  1. Choose a name for the interactive table. Snowflake creates a pipe behind the scenes with a name that’s identical to the interactive table name.

  2. Create the interactive table with a CREATE INTERACTIVE TABLE command. In the simplest form of CREATE INTERACTIVE TABLE, you specify the column definitions and the CLUSTER BY clause.

    For example, the following command creates an interactive table named interactive_streaming and an associated Snowpipe Streaming pipe also named interactive_streaming.

    CREATE INTERACTIVE TABLE interactive_streaming (s VARCHAR, x INT)
      CLUSTER BY (x);
    

    You can specify your own mapping from the field names of your streaming source (such as Kafka) to SQL column names. To do so, write the CREATE INTERACTIVE TABLE as a CREATE TABLE AS SELECT (CTAS) statement, and specify FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')) instead of a source table. For an example of that syntax, see Using the Apache Kafka connector.

Required privileges for Snowpipe Streaming

If you run CREATE OR REPLACE INTERACTIVE TABLE, the grants on the previous table don’t apply to the new table. In that case, apply a new set of grants after running the CREATE OR REPLACE command.

For more information about privileges used in Snowpipe Streaming, see Erforderliche Zugriffsrechte.

The following example shows the commands that you might run to set up a user and role to work with an interactive table named interactive_table that uses streaming for ingestion:

CREATE ROLE streaming_role;
CREATE USER streaming_user;
GRANT ROLE streaming_role TO USER streaming_user;
GRANT USAGE ON SCHEMA my_schema TO ROLE streaming_role;
GRANT ALL ON TABLE interactive_table TO ROLE streaming_role;
GRANT ALL ON PIPE interactive_table TO ROLE streaming_role;

Using the Apache Kafka connector

For this preview feature, you can use the Kafka connector to stream data into your interactive tables.

During the preview period, you use a JAR file provided by Snowflake to set up the Kafka connector to stream data to the interactive table. In the Kafka configuration settings, you specify the names of the Snowflake database, schema, and table that correspond to the interactive table you created in Snowflake.

For Snowpipe Streaming High-Performance Architecture, you must use key-pair authentication instead of basic authentication. For more information about generating the key pair, see Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.

You run an ALTER USER command to associate the public key with the user that creates the interactive table. Then you include the private key in the configuration settings of the Kafka connection, for example in the Amazon MSK service on Amazon Web Services (AWS).

Create the interactive table in Snowflake using SQL first, before you set up the Kafka connector.

The following code shows the general form of the Kafka configuration settings that you use. Substitute your own values for the placeholders, and adjust any numeric values based on your own resource constraints and performance requirements:

{
  "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
  "snowflake.warehouse.name": "<my_warehouse>",
  "tasks.max": "20",
  "snowflake.url.name": "<my_url>.snowflakecomputing.com:443",
  "snowflake.streaming.enable.altering.target.pipes.tables": "false",
  "snowflake.streaming.v2.enabled": "true",
  "jmx": "true",
  "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n<my_private_key>\n-----END PRIVATE KEY-----\n",
  "host": "<my_url>.snowflakecomputing.com:443",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "errors.log.enable": "true",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "buffer.size.bytes": "5000000",
  "snowflake.enable.schematization": "false",
  "snowflake.topic2table.map": "kafka_connector_demo_topic: kafka_connector_demo",
  "topics": "kafka_connector_demo_topic",
  "buffer.flush.time": "1",
  "snowflake.database.name": "<my_database>",
  "snowflake.schema.name": "<my_schema>",
  "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
  "buffer.count.records": "100000",
  "snowflake.role.name": "<my_role_name>",
  "snowflake.user.name": "<my_user_name>",
  "value.converter.schemas.enable": "false",
  "errors.tolerance": "all"
}

The following are the most crucial settings that require these values to work properly with interactive tables:

"snowflake.streaming.enable.altering.target.pipes.tables": "false",
"snowflake.streaming.v2.enabled": "true",
"snowflake.enable.schematization": "false",
"snowflake.topic2table.map": "kafka_connector_demo_topic: kafka_connector_demo",
"topics": "kafka_connector_demo_topic",

The following command maps the names of fields from the Kafka topic to column names in the interactive table. The CLUSTER BY clause uses an expression to cluster together values that arrived in the table on the same day.

CREATE OR REPLACE INTERACTIVE TABLE kafka_connector_demo (
  timestamp TIMESTAMP_NTZ(6),
  country_code VARCHAR(16777216),
  url VARCHAR(16777216),
  topicname VARCHAR(16777216),
  streamingeventtime TIMESTAMP_NTZ(6)
) CLUSTER BY (TRUNC(timestamp, 'day'))
AS (
  SELECT $1:RECORD_CONTENT.timestamp
  , $1:RECORD_CONTENT.country_code
  , $1:RECORD_CONTENT.url
  , $1:RECORD_METADATA.topic
  , SYSDATE() as streamingeventtime
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

Use the RECORD_CONTENT and RECORD_METADATA keywords to map both the data fields and the metadata in the Kafka topic to columns in the interactive table. You can combine the columns derived from streaming data with other columns that use other SQL features, such as the SYSDATE function. In this case, the SYSDATE value is evaluated at the time the interactive table ingests each row of streaming data.

For more information about transformations that you can put in the AS clause during data ingestion, see Transform data during a load. You can use the techniques shown in the COPY INTO … FROM clauses with CREATE INTERACTIVE TABLE … AS clauses.

Monitoring data ingestion

The following Snowpipe Streaming view contains useful information to help you verify that data ingestion is happening to the interactive table and how much data is transferred:

Schema evolution for interactive tables

Interactive tables support schema evolution, which allows Snowflake to automatically add new columns when the incoming data contains fields that don’t yet exist in the table. This is particularly useful when streaming data from external systems like Kafka, where the source schema may change over time.

You enable schema evolution by setting ENABLE_SCHEMA_EVOLUTION = TRUE on the interactive table. You can set this property at table creation or on an existing table using ALTER TABLE.

In addition to automatic schema evolution, interactive tables support these manual ALTER TABLE operations:

  • Adding new columns.

  • Modifying NULL constraints on columns (SET NOT NULL or DROP NOT NULL).

Enabling schema evolution at table creation

Include the ENABLE_SCHEMA_EVOLUTION = TRUE property in the CREATE INTERACTIVE TABLE statement:

CREATE OR REPLACE INTERACTIVE TABLE kafka_connector_demo (
  timestamp TIMESTAMP_NTZ(6),
  country_code VARCHAR(16777216),
  url VARCHAR(16777216)
)
  CLUSTER BY (TRUNC(timestamp, 'day'))
  ENABLE_SCHEMA_EVOLUTION = TRUE;

Enabling schema evolution on an existing table

If the table was created without schema evolution, enable it using ALTER TABLE:

ALTER TABLE kafka_connector_demo
  SET ENABLE_SCHEMA_EVOLUTION = TRUE;

Schema evolution with the Kafka connector

When you use the Kafka connector with schema evolution, two settings work together:

  • On the Snowflake table: ENABLE_SCHEMA_EVOLUTION = TRUE allows Snowflake to automatically add new columns.

  • On the Kafka connector: snowflake.enable.schematization: true enables the connector to detect and propagate schema changes from the data source.

Both settings must be enabled for end-to-end schema evolution to work.

Bemerkung

The Kafka connector configuration shown in the Using the Apache Kafka connector section has snowflake.enable.schematization set to false. Change this setting to true if you want to use schema evolution.

Testing schema evolution with Kafka

The following steps show how to verify that schema evolution works end-to-end with a Kafka data source.

  1. Create an interactive table with schema evolution enabled:

    CREATE OR REPLACE INTERACTIVE TABLE kafka_connector_demo (
      timestamp TIMESTAMP_NTZ(6),
      country_code VARCHAR(16777216),
      url VARCHAR(16777216)
    )
      CLUSTER BY (TRUNC(timestamp, 'day'))
      ENABLE_SCHEMA_EVOLUTION = TRUE;
    
  2. Send a record with the base fields to the Kafka topic:

    echo '{"timestamp": "2026-02-27T07:00:00.000000", "country_code": "US", \
      "url": "https://example.com/home"}' | \
      docker exec -i demo-kafka kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic interactive_demo_topic
    
  3. Send a record that includes a new field (referrer) that doesn’t exist in the table:

    echo '{"timestamp": "2026-02-27T07:05:00.000000", "country_code": "FR", \
      "url": "https://example.com/about", "referrer": "google.com"}' | \
      docker exec -i demo-kafka kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic interactive_demo_topic
    
  4. Verify that Snowflake automatically added the new column:

    DESCRIBE TABLE kafka_connector_demo;
    
    SELECT * FROM kafka_connector_demo LIMIT 10;
    

    The referrer column should appear in the table schema, with a NULL value for the first record and google.com for the second record.

Manual schema changes with ALTER TABLE

You can also add columns and modify NULL constraints manually, regardless of the ENABLE_SCHEMA_EVOLUTION setting:

ALTER TABLE kafka_connector_demo
  ADD COLUMN referrer VARCHAR;

ALTER TABLE kafka_connector_demo
  ALTER COLUMN country_code SET NOT NULL;

Verify the table structure after any schema changes:

DESCRIBE TABLE kafka_connector_demo;

Usage notes for schema evolution

  • The role used to load data must have the EVOLVE SCHEMA or OWNERSHIP privilege on the table.

  • Newly added columns contain NULL values for existing rows.

  • Setting NOT NULL on a column that contains NULL values results in an error.

  • For more information about schema evolution, see Enable automatic table schema evolution.