Creating an interactive table using streaming ingestion

This topic explains how to stream data into interactive tables. For general information about interactive tables and interactive warehouses, see Snowflake interactive tables and interactive warehouses. For mutable, deduplicated data (inserts, updates, and deletes from a source system), see Using change data capture with interactive tables.

Choosing an ingestion path

You can stream data into an interactive table by using any of the following paths, all of which sit on the Snowpipe Streaming high-performance architecture:

  • Snowpipe Streaming SDK — for custom producers that use the Python or Java client directly. Best when you own the producer application and want the lowest-latency, lowest-overhead path into Snowflake.

  • Snowflake Connector for Kafka — for teams that already run Apache Kafka or a Kafka-compatible service (Amazon MSK, Confluent Cloud). The connector calls the same Snowpipe Streaming v2 APIs under the hood.

  • Snowflake Openflow — for managed, no-code pipelines. Use the PutSnowpipeStreaming2 processor to write records to Snowflake.

Diagram showing three paths into an interactive table: Snowpipe Streaming SDK, the Snowflake Connector for Kafka, and Openflow PutSnowpipeStreaming2. All three uses the Snowpipe Streaming high-performance architecture.

Three ways to stream data into an interactive table.

Note

Interactive tables also support change data capture (CDC) on top of streaming ingestion. If your source emits inserts, updates, and deletes that you want to deduplicate on a primary key, see Using change data capture with interactive tables.

How streaming ingestion works: the default pipe

Every Snowpipe Streaming write goes through a PIPE object, which handles schema validation, in-flight transformations, and pre-clustering. For most interactive-table workloads, Snowflake creates and manages a default pipe for you — no CREATE PIPE needed.

CREATE INTERACTIVE TABLE does not create a pipe. Snowflake creates the default pipe on demand the first time a client opens a channel or calls get_pipe_info against the table. After that, it’s visible to SHOW PIPES and DESCRIBE PIPE.

The default pipe has the following characteristics:

  • Name: <TABLE_NAME>-STREAMING. A table named ORDERS gets a default pipe named ORDERS-STREAMING.

  • Snowflake-managed: You can’t run CREATE, ALTER, or DROP against the default pipe. Its OWNER attribute is NULL.

  • Default ingestion semantics: It uses MATCH_BY_COLUMN_NAME to map incoming fields to columns. It doesn’t support in-flight transformations or pre-clustering.

Define your own named pipe when you need any of the following:

  • In-flight transformations, such as projecting fields from $1:RECORD_CONTENT or calling SQL functions during ingest.

  • Pre-clustering at ingest time (CLUSTER_AT_INGEST_TIME = TRUE).

  • An explicit pipe you can grant on, monitor, and replicate independently from the table.

For the complete reference, see The PIPE object.

Required privileges for streaming ingestion

Before you run any of the ingestion options below, set up a role with the privileges needed to stream into the target table. Snowpipe Streaming also requires key-pair authentication; see Key-pair authentication and key-pair rotation for how to generate a key pair and associate it with the user.

The following example sets up a streaming_role for an interactive table named my_table whose default pipe is MY_TABLE-STREAMING:

CREATE ROLE streaming_role;
CREATE USER streaming_user;
GRANT ROLE streaming_role TO USER streaming_user;
GRANT USAGE ON DATABASE my_db TO ROLE streaming_role;
GRANT USAGE ON SCHEMA my_db.my_schema TO ROLE streaming_role;
GRANT INSERT, SELECT ON TABLE my_db.my_schema.my_table TO ROLE streaming_role;
GRANT OPERATE ON PIPE my_db.my_schema."MY_TABLE-STREAMING" TO ROLE streaming_role;

Note

If you run CREATE OR REPLACE INTERACTIVE TABLE, grants on the old table don’t carry over to the new one; you must reapply them. For the full list of Snowpipe Streaming privileges, see Required access privileges.

Option A: Using the Snowpipe Streaming SDK

The Snowpipe Streaming high-performance SDK lets your producer open a channel against the table’s pipes (default or custom) and append rows directly. This is the lowest-overhead path and is a good fit for custom producers.

Start by setting up a Python virtual environment and installing the SDK. Do not install packages system-wide with --break-system-packages.

python3 -m venv .venv
source .venv/bin/activate
pip install snowflake-ingest snowflake-connector-python

Create the interactive table in Snowflake:

CREATE OR REPLACE INTERACTIVE TABLE sensor_readings (
  device_id   INT,
  reading_ts  TIMESTAMP_NTZ,
  temperature NUMBER(6, 2)
)
CLUSTER BY (DATE_TRUNC('HOUR', reading_ts), device_id);

Create a profile.json file with your account, user, and private-key details. For the format and an example, see Step 2: Configure an authentication profile.

Then open a channel against the default pipe (SENSOR_READINGS-STREAMING) and append rows. The following example shows whta a minimal producer looks like:

import uuid
from datetime import datetime, timezone
from snowflake.ingest.streaming import StreamingIngestClient

client = StreamingIngestClient(
    client_name=f"sensor_demo_{uuid.uuid4().hex[:8]}",
    db_name="MY_DB",
    schema_name="MY_SCHEMA",
    pipe_name="SENSOR_READINGS-STREAMING",
    profile_json="profile.json",
)
channel, _ = client.open_channel("sensor_channel")

now = datetime.now(timezone.utc)
offset = 0
for device_id in range(1, 4):
    row = {
        "DEVICE_ID": device_id,
        "READING_TS": now.strftime("%Y-%m-%d %H:%M:%S.%f"),
        "TEMPERATURE": 68.5 + device_id,
    }
    offset += 1
    channel.append_row(row, str(offset))

channel.close()
client.close()

For the complete SDK reference, see Snowpipe Streaming key concepts and the getting-started tutorial.

Option B: Using the Snowflake Connector for Kafka

Use the Snowflake Connector for Kafka when you already run Kafka (or a Kafka-compatible service such as Amazon MSK or Confluent Cloud). The connector writes records through the Snowpipe Streaming v2 APIs, so the default pipe and custom-pipe rules described above apply.

Download the connector

During the preview period, use the release candidate of the connector that supports interactive tables. Download the JAR from Maven Central:

You must use key-pair authentication instead of basic authentication. Generate a key pair as described in Key-pair authentication and key-pair rotation, and run ALTER USER to associate the public key with the user that creates the interactive table. Then include the private key in the configuration settings of the Kafka Connect worker.

Create the interactive table

Create the interactive table in Snowflake before you register the connector. No pipe is needed up front — Snowflake creates the default pipe (KAFKA_CONNECTOR_DEMO-STREAMING) on demand when the connector opens its first channel:

CREATE OR REPLACE INTERACTIVE TABLE kafka_connector_demo (
  timestamp    TIMESTAMP_NTZ(6),
  country_code VARCHAR,
  url          VARCHAR
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY (TRUNC(timestamp, 'hour'));

Configure the Kafka Connect sink

The following connector configuration targets the table created above. Replace the placeholders marked with angle brackets (<...>) with your own values:

{
  "name": "snowflake-interactive-sink",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "tasks.max": "1",
    "topics": "interactive_demo_topic",
    "snowflake.url.name": "<account>.snowflakecomputing.com:443",
    "snowflake.user.name": "<user>",
    "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n<your_private_key>\n-----END PRIVATE KEY-----",
    "snowflake.role.name": "STREAMING_ROLE",
    "snowflake.database.name": "MY_DB",
    "snowflake.schema.name": "MY_SCHEMA",
    "snowflake.warehouse.name": "<warehouse>",
    "snowflake.topic2table.map": "interactive_demo_topic:kafka_connector_demo",
    "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
    "snowflake.streaming.v2.enabled": "true",
    "snowflake.streaming.enable.altering.target.pipes.tables": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "snowflake.enable.schematization": "true",
    "errors.tolerance": "all",
    "errors.log.enable": "true"
  }
}

The following settings are the most critical for interactive tables:

"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.v2.enabled": "true",
"snowflake.streaming.enable.altering.target.pipes.tables": "false",
"snowflake.topic2table.map": "interactive_demo_topic:kafka_connector_demo",

Register the connector

Save the configuration above to a file named snowflake-sink.json. Create a virtual environment and install the requests library, then register the connector with the Kafka Connect REST API:

python3 -m venv .venv
source .venv/bin/activate
pip install requests
# register_connector.py
import json
import sys
from pathlib import Path

import requests

connect_url = sys.argv[1].rstrip("/")
payload = json.loads(Path(sys.argv[2]).read_text(encoding="utf-8"))
name = payload["name"]

response = requests.put(
    f"{connect_url}/connectors/{name}/config",
    headers={"Content-Type": "application/json"},
    data=json.dumps(payload["config"]),
    timeout=30,
)
response.raise_for_status()
print(response.text)

Run the helper:

python register_connector.py http://localhost:8083 snowflake-sink.json

After the connector is running, check its status with GET /connectors/snowflake-interactive-sink/status, then produce a few Kafka records to the interactive_demo_topic topic and query the interactive table to confirm that rows arrive.

Option C: Using Snowflake Openflow

Snowflake Openflow provides a managed, no-code pipeline for streaming data into Snowflake. Use the PutSnowpipeStreaming2 processor to write newline-delimited JSON records through the Snowpipe Streaming v2 API.

Openflow use the same default-pipe mechanism described in How streaming ingestion works: the default pipe to ingest data into Interactive tables. In the PutSnowpipeStreaming2 processor, point the Pipe Name property at either the table’s default pipe (<TABLE_NAME>-STREAMING) or a custom pipe that you created with a DATA_SOURCE(TYPE => 'STREAMING') COPY expression.

For the full list of Openflow processors, see All processors (alphabetical).

Monitoring data ingestion

The following Snowpipe Streaming view contains useful information to help you verify that data ingestion is happening to the interactive table and how much data is transferred:

Measuring end-to-end latency with row timestamps

Note

Support for ROW_TIMESTAMP on interactive tables is planned for an upcoming preview update.

Once the feature is enabled, you’ll be able to set ROW_TIMESTAMP = TRUE on the interactive table and use the row-level commit timestamp to measure end-to-end latency independently of any client-provided event time column. Latency then becomes a straightforward SQL expression:

-- Available once ROW_TIMESTAMP support for interactive tables is enabled.
ALTER TABLE kafka_connector_demo SET ROW_TIMESTAMP = TRUE;

SELECT AVG(DATEDIFF('millisecond', timestamp, ROW_TIMESTAMP(timestamp))) AS avg_latency_ms
  FROM kafka_connector_demo
  WHERE timestamp > DATEADD(MINUTE, -5, CURRENT_TIMESTAMP());

For general guidance on row timestamps, including how they behave across transactions and dynamic tables, see Use row timestamps to measure latency in your pipelines.

Managing data retention with storage lifecycle policies

Use a storage lifecycle policy to cap the amount of historical streaming data you keep. A common pattern for interactive tables is to retain a rolling window of the last two weeks.

The following policy expires rows whose ingest-time column is older than 14 days:

CREATE OR REPLACE STORAGE LIFECYCLE POLICY expire_policy_14d
  AS (c TIMESTAMP)
  RETURNS BOOLEAN ->
    c < DATEADD(DAY, -14, CURRENT_TIMESTAMP());

ALTER TABLE kafka_connector_demo
  ADD STORAGE LIFECYCLE POLICY expire_policy_14d ON (timestamp);

For the full set of options (archive tiers, tagging, monitoring), see Create and manage storage lifecycle policies.

Schema evolution for interactive tables

Interactive tables support schema evolution, which allows Snowflake to automatically add new columns when the incoming data contains fields that don’t yet exist in the table. This is particularly useful when streaming data from external systems like Kafka, where the source schema may change over time.

You enable schema evolution by setting ENABLE_SCHEMA_EVOLUTION = TRUE on the interactive table. You can set this property at table creation or on an existing table using ALTER TABLE.

In addition to automatic schema evolution, interactive tables support these manual ALTER TABLE operations:

  • Adding new columns.

  • Modifying NULL constraints on columns (SET NOT NULL or DROP NOT NULL).

Enabling schema evolution at table creation

Include ENABLE_SCHEMA_EVOLUTION = TRUE in the CREATE INTERACTIVE TABLE statement (the Kafka connector example already does this):

CREATE OR REPLACE INTERACTIVE TABLE kafka_connector_demo (
  timestamp    TIMESTAMP_NTZ(6),
  country_code VARCHAR,
  url          VARCHAR
)
ENABLE_SCHEMA_EVOLUTION = TRUE
CLUSTER BY (TRUNC(timestamp, 'hour'));

Enabling schema evolution on an existing table

If the table was created without schema evolution, enable it using ALTER TABLE:

ALTER TABLE kafka_connector_demo
  SET ENABLE_SCHEMA_EVOLUTION = TRUE;

Schema evolution with the Kafka connector

When you use the Kafka connector with schema evolution, two settings work together:

  • On the Snowflake table: ENABLE_SCHEMA_EVOLUTION = TRUE allows Snowflake to automatically add new columns.

  • On the Kafka connector: snowflake.enable.schematization: true enables the connector to detect and propagate schema changes from the data source.

Both settings must be enabled for end-to-end schema evolution to work.

Note

The example in Option B: Using the Snowflake Connector for Kafka already sets ENABLE_SCHEMA_EVOLUTION = TRUE on the table and "snowflake.enable.schematization": "true" on the connector, so schema evolution is enabled end-to-end in that example.

Testing schema evolution with Kafka

The following steps show how to verify that schema evolution works end-to-end with a Kafka data source.

  1. Start from the kafka_connector_demo table created in Option B: Using the Snowflake Connector for Kafka. It already has ENABLE_SCHEMA_EVOLUTION = TRUE.

  2. Send a record with the base fields to the Kafka topic:

    echo '{"timestamp": "2026-02-27T07:00:00.000000", "country_code": "US", \
      "url": "https://example.com/home"}' | \
      docker exec -i demo-kafka kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic interactive_demo_topic
    
  3. Send a record that includes a new field (referrer) that doesn’t exist in the table:

    echo '{"timestamp": "2026-02-27T07:05:00.000000", "country_code": "FR", \
      "url": "https://example.com/about", "referrer": "google.com"}' | \
      docker exec -i demo-kafka kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic interactive_demo_topic
    
  4. Verify that Snowflake automatically added the new column:

    DESCRIBE TABLE kafka_connector_demo;
    
    SELECT * FROM kafka_connector_demo LIMIT 10;
    

    The referrer column should appear in the table schema, with a NULL value for the first record and google.com for the second record.

Manual schema changes with ALTER TABLE

You can also add columns and modify NULL constraints manually, regardless of the ENABLE_SCHEMA_EVOLUTION setting:

ALTER TABLE kafka_connector_demo
  ADD COLUMN referrer VARCHAR;

ALTER TABLE kafka_connector_demo
  ALTER COLUMN country_code SET NOT NULL;

Verify the table structure after any schema changes:

DESCRIBE TABLE kafka_connector_demo;

Usage notes for schema evolution

  • The role used to load data must have the EVOLVE SCHEMA or OWNERSHIP privilege on the table.

  • Newly added columns contain NULL values for existing rows.

  • Setting NOT NULL on a column that contains NULL values results in an error.

  • For more information about schema evolution, see Enable automatic table schema evolution.