Set up the Openflow Connector for Kafka¶
Note
This connector is subject to the Snowflake Connector Terms.
Prerequisites¶
Ensure that you have reviewed Snowflake Openflow Connector for Kafka.
Ensure that you have Set up Openflow - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the Kafka connector. The connector must be able to connect to all Kafka brokers in the cluster.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new Snowflake service user with the type as SERVICE.
Create a new role or use an existing role and grant the Database privileges.
The connector requires a user to create the destination table. Make sure the user has the required privileges for managing Snowflake objects:
Object
Privilege
Notes
Database
USAGE
Schema
USAGE
Table
OWNERSHIP
Required for the connector to ingest data into a table.
Snowflake recommends creating a separate user and role for each Kafka Cluster for better access control.
You can use the following script to create and configure a custom role (requires SECURITYADMIN or equivalent):
Note
Privileges must be granted directly to the connector role and cannot be inherited.
Configure the destination table
Snowflake highly recommends using server-side schema evolution for schema changes and an error table for DML error logging. The following example shows how to create a table and add proper OWNERSHIP permissions.
The connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector. It automatically maps the record content’s first-level keys to table columns matching by name (case-insensitive).
With Schema evolution enabled, Snowflake can automatically expand the destination table by adding new columns that are detected in the incoming stream and dropping NOT NULL constraints to accommodate new data patterns. For more information, see Table schema evolution.
If ENABLE_SCHEMA_EVOLUTION isn’t enabled, you must create the schema manually by extending the table definition. The connector tries to match the record content’s first-level keys to the table columns by name. If keys from the JSON don’t match the table columns, the connector ignores the keys.
(Optional) Configure a secrets manager
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Determine how you’ll authenticate to the secrets manager after it’s configured. On AWS, Snowflake recommends using the EC2 instance role associated with Openflow so no other secrets need to be persisted.
Configure a Parameter Provider associated with this Secrets Manager in Openflow from the hamburger menu in the upper right. Navigate to Controller Settings > Parameter Provider and fetch your parameter values.
Reference all credentials with the associated parameter paths so no sensitive values need to be persisted within Openflow.
Grant access to users
For any other Snowflake users who require access to the raw ingested data by the connector (for example, for custom processing in Snowflake), grant those users the role created in step 1.
Set up the connector¶
As a data engineer, perform the following tasks to install and configure the connector:
Install the connector¶
To install the connector, do the following:
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
On the Openflow connectors page, find the connector and select Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and select Add.
Note
Before you install the connector, ensure that you have created a database, schema, and a table in Snowflake for the connector to store ingested data.
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Configure the connector¶
If needed, customize the connector configuration before configuring the built-in parameters.
Populate the process group parameters
Right click on the imported process group and select Parameters.
Fill out the required parameter values
Parameters¶
The following table describes the parameters for the Openflow Connector for Kafka:
Parameter |
Description |
Required |
|---|---|---|
Kafka Auto Offset Reset |
Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka Possible values: earliest: automatically reset the offset to the earlier offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset found for the consumer group. Default: latest |
Yes |
Kafka Bootstrap Servers |
A comma-separated list of Kafka bootstrap servers, should contain a port, for example |
Yes |
Kafka Consumer Group ID |
The ID of a consumer group used by the connector. Can be arbitrary but must be unique. |
Yes |
Kafka SASL Password |
Password provided with configured password when using SASL512 SCRAM Mechanism |
|
Kafka SASL Username |
Username provided with configured password when using SASL512 SCRAM Mechanism |
|
Kafka Topic Format |
One of: names / pattern. Specifies whether the “Kafka Topics” provided are a comma separated list of names or a single regular expression. |
Yes |
Kafka Topics |
A comma-separated list of Kafka topics or a regular expression. |
Yes |
Snowflake Destination Database |
The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
Yes |
Snowflake Destination Schema |
The schema where data is persisted, which must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. See the following examples:
|
Yes |
Snowflake Destination Table |
The table where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
Yes |
Start the connector¶
Right-click on the plane and select Enable all Controller Services.
Right-click on the plane and select Start. The connector starts data ingestion.
Understanding KAFKAMETADATA column¶
The connector populates the KAFKAMETADATA structure with metadata about the Kafka record. The structure contains the following information:
Field |
Data Type |
Description |
|---|---|---|
topic |
String |
The name of the Kafka topic that the record came from. |
partition |
number |
The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.) |
offset |
number |
The offset in that partition. |
timestamp |
number |
Timestamp when the record was added to Kafka. |
key |
String |
If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the |
headers |
Object |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
Measuring ingestion latency¶
For change tracking, incremental processing, and time-travel queries based on row modification time, the ROW_TIMESTAMP feature can be used.
Enable it by running the following command on your destination table:
After row timestamps are enabled, tables expose the METADATA$ROW_LAST_COMMIT_TIME column, which returns the timestamp when each row was last modified.
For more information, see METADATA$ROW_LAST_COMMIT_TIME.
Note
Row timestamp isn’t available for interactive tables. For more information, see Snowflake interactive tables and interactive warehouses.
Using the connector with Apache Iceberg™ tables¶
The connector can ingest data into Snowflake-managed Apache Iceberg™ tables but must meet the following requirements:
You must have been granted the USAGE privilege on the external volume associated with your Apache Iceberg™ table.
You must create an Apache Iceberg™ table before running the connector.
Grant usage on an external volume¶
For example, if your Iceberg table uses the kafka_external_volume external volume and the connector uses the role openflow_kafka_connector_role, run the following statement:
Create an Apache Iceberg™ table for ingestion¶
The connector doesn’t create Iceberg tables automatically and doesn’t support schema evolution. Before you run the connector, you must create an Iceberg table manually.
When you create an Iceberg table, you can use Iceberg data types (including VARIANT) or compatible Snowflake types.
For example, consider the following message:
To create an Iceberg table for the example message, use one of the following statements:
Using the connector with Interactive Tables¶
Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. For more information, see Snowflake interactive tables and interactive warehouses.
Create an interactive table:
Important considerations:
Interactive tables have specific limitations and query restrictions. Review Snowflake interactive tables and interactive warehouses before using them with the connector.
For interactive tables, any required transformations must be handled in the table definition.
Interactive warehouses are required to query interactive tables efficiently.
Using the connector with a customer-defined schema for the destination table¶
The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:
By default you don’t have to specify all fields from the JSON thanks to the ENABLE_SCHEMA_EVOLUTION = TRUE feature. However, if you prefer a static schema, it can be created by running:
Using the connector with a customer-defined PIPE¶
If you choose to create your own pipe, you can define the data transformation logic in the pipe’s COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:
When you define your own pipe, your destination table columns don’t need to match the JSON keys. You can rename the columns to your desired names and cast the data types if required.
To adjust the connector to work with a custom pipe, perform the following tasks:
Right-click on the PublishSnowpipeStreaming processor used in your Kafka ingestion flow in the Openflow canvas.
Select Configure from the context menu.
Navigate to the Properties tab.
In the Destination type field, pick Pipe.
In the Pipe field, type the name of your PIPE.
Select Apply to save the configuration.
Customizing error handling¶
Error handling is split between Openflow-side failures and server-side failures within the Snowpipe Streaming service.
Openflow Errors (Client-Side Failures): Errors such as unparseable payloads or custom transformation failures occur before records reach Snowflake. By default these records are discarded. It’s possible to process these errors in Openflow - use FlowFiles from the parse failure relationship in the ConsumeKafka processor.
Snowpipe Streaming Errors (Server-Side Failures): Errors for records that successfully reach Snowflake but are incompatible with the destination table’s schema (for example, type mismatches) are captured by the Snowflake infrastructure. When error logging is enabled on the destination table (
error_logging = true), these failed rows are automatically ingested into the destination Error table.