Set up the Openflow Connector for Kafka¶
Note
The connector is subject to the Connector Terms.
This topic describes the steps to set up the Openflow Connector for Kafka.
Prerequisites¶
Ensure that you have reviewed Openflow Connector for Kafka.
Ensure that you have set up a Openflow.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new role or use an existing role and grant the Database privileges.
Create a new Snowflake service user with the type as SERVICE.
Grant the Snowflake service user the role you created in the previous steps.
Configure with key-pair auth for the Snowflake SERVICE user from step 2.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.
Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.
Configure the connector¶
As a data engineer, perform the following tasks to configure a connector:
Create a database and schema in Snowflake for the connector to store ingested data.
Download the
connector definition file
.Import the connector definition into Openflow.
Enter an Openflow canvas.
Add a process group to the canvas.
On the Create Process Group pop-up, select the connector definition file to import.
Connect the Upload failure output of the group to your NiFi monitoring component, for example, LogAttribure.
Populate the process group parameters
Right click on the imported process group and select Parameters.
Fill out the required parameter values as described in Flow parameters.
Go back to the canvas.
Right click on the plane and click Enable all Controller Services.
Right click on the plane and click Start. The connector starts data ingestion.
Flow parameters¶
Kafka-related parameters
Parameter Name |
Description |
Required |
---|---|---|
Kafka Bootstrap Servers |
A comma-separated list of Kafka brokers to fetch data from, should contain port, for example kafka-broker:9092. The same instance is used for the DLQ topic. |
Yes |
Kafka Topics |
A comma-separated list of Kafka topics. |
Yes |
Kafka Group Id |
The ID of a consumer group used by the connector. Can be arbitrary but must be unique. |
Yes |
Kafka Auto Offset Reset |
Automatic offset
configuration applied
when no previous
consumer offset is
found corresponding
to Kafka
|
Yes |
Kafka Security Protocol |
Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property. One of: PLAINTEXT / SASL_PLAINTEXT / SASL_SSL / SSL |
Yes |
Kafka SASL Mechanism |
SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property. One of: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512 / AWS_MSK_IAM |
Yes |
Kafka SASL Username |
The username to authenticate to Kafka |
Yes |
Kafka SASL Password |
The password to authenticate to Kafka |
Yes |
Kafka DLQ Topic |
The name or pattern of the Kafka Topics from which the connector consumes data. More than one can be supplied if comma separated. See also \“Kafka Topic Format\” param. |
No |
Kafka Topic Format |
One of: \“names\” / \“pattern\”. Specifies whether the “Kafka Topics” provided are a comma separated list of names or a single regular expression. |
|
Message Format |
The format of messages in Kafka One of: JSON / AVRO. Default: JSON |
Yes |
Schema Registry Authentication Type |
The method of authenticating to schema registry if used. Otherwise, use NONE. One of: NONE / BASIC. Default: NONE |
Yes |
Schema Registry URL |
The URL of Schema Registry. Required for AVRO message format. |
No |
Schema Registry Username |
The username for Schema Registry. Required for AVRO message format. |
No |
Schema Registry Password |
The password for Schema Registry. Required for AVRO message format. |
No |
Kafka Connection Service |
Specifies the component responsible for establishing the connection to Kafka. Provide a controller service ID based on the security protocol you use: SASL_PLAINTEXT / SASL_SSL - Kafka Connection ID SASL_SSL with IAM - Kafka Connection IAM ID SSL - Kafka Connection SSL ID Default: SASL_PLAINTEXT service ID For more information Getting service ID |
Yes |
mTLS Key Password |
A password for mTLS Key stored in the keystore. Required for mTLS authentication. |
No |
mTLS Keystore Filename |
A full path to a keystore storing a client key and certificate for mTLS. Required for mTLS authentication. |
No |
mTLS Keystore Password |
A password for the mTLS keystore. Required for mTLS authentication. |
No |
mTLS Keystore Type |
The type of mTLS keystore. One of: PKCS12 / JKS / BCFKS. Required for mTLS authentication. |
No |
mTLS Truststore Filename |
A full path to a truststore storing broker certificates. For or mTLS only |
No |
mTLS Truststore Password |
A password for the mTLS truststore. For or mTLS only |
No |
mTLS Truststore Type |
The type of mTLS truststore. One of: PKCS12 / JKS / BCFKS. For or mTLS only |
No |
Snowflake-related parameters
Parameter Name |
Description |
Required |
---|---|---|
Snowflake Account |
Snowflake account identifier with organization name and account name formatted as [organization-name] -[account-name] |
Yes |
Snowflake User |
The name of the user used by the connector |
Yes |
Snowflake Role |
The name of the role used by the connector |
Yes |
Snowflake DB |
The database for storing data |
Yes |
Snowflake Schema |
The schema for storing data |
Yes |
Snowflake Warehouse |
Snowflake Warehouse for connections. If empty, the default warehouse for account will be used. |
No |
Snowflake Private Key |
The RSA private key which is used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined |
No |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with —–BEGIN PRIVATE |
No |
Private Key Password |
The password associated with the Snowflake Private Key File |
No |
Other parameters
Parameter Name |
Description |
Required |
---|---|---|
Topic To Table Map |
This optional parameter allows user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. The regular expressions cannot be ambiguous — any matched topic must match only a single target table. If empty or no matches found, topic name will be used as table name. Note: The mapping cannot contain spaces after commas. |
No |
Iceberg Enabled |
Specifies whether the processor ingests data into an Iceberg table. The processor fails if this property doesn’t match the actual table type. Default: false |
Yes |
Schematization Enabled |
Specifies whether to enable schema detection. When enabled a record content is stored in separate columns. Otherwise it is stored as RECORD_CONTENT column of type OBJECT. One of: true / false. Default: true |
Yes |
Topic To Table Map
example values:
topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range
topic[0-4]:low_range,topic[5-9]:high_range
Getting service ID¶
To find service ID for Kafka Connection Service
, do the following:
Right click on the kafka processor group and select Controller Services.
Click three dots next to the service you want and select View Configuration or Edit.
Click Copy next to the ID field.
Authentication¶
The connector supports the following standard Kafka authentication mechanisms to ensure safety:
SASL
mTLS
The connector also supports the following SASL mechanisms:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
AWS_MSK_IAM
Configuring authentication for the connector¶
This section provides examples of how to configure authentication for various mechanisms:
SASL¶
To configure SASL authentication, you will need to provide the following configuration properties:
Kafka Connection Service: ID of the Kafka Connection controller service. For more information, refer to Getting service ID
Kafka Security Protocol: SASL_PLAINTEXT or SASL_SSL
Kafka SASL Mechanism: PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512
SASL Username: Kafka username
SASL Password: Kafka password
SASL with AWS MSK IAM¶
To configure SASL authentication with IAM, use the following configuration properties:
Kafka Connection Service: ID of the
Kafka Connection IAM
controller service. For more information, refer to Getting service IDKafka Security Protocol: SASL_PLAINTEXT or SASL_SSL
Kafka SASL Mechanism: AWS_MSK_IAM
You also need to provide IAM credentials in Openflow with BYOC (bring your own cloud) configurations, deployed in your cloud.
mTLS¶
To configure mTLS authentication, you will need to generate and configure the necessary certificates for both the connector and the Kafka broker.
After obtaining the necessary certificates, generate a keystore for secure storage of both the connector’s private key and its certificate, and a truststore for confirming the Kafka broker certificate. The truststore must contain either the Kafka broker certificate or an alternative certificate in the certification chain that can be utilized to verify the broker’s certificate.
The supported keystore/truststore formats are PKCS12, JKS, and BCFKS.
Once the certificates are prepared, provide the following configuration properties:
Kafka Connection Service: ID of the Kafka Connection SSL controller service. For more information, refer to Getting service ID
Kafka Security Protocol - SSL
mTLS Keystore Filename: A full path to the keystore file, or Asset reference
mTLS Keystore Password: The keystore password
mTLS Keystore Type: The keystore type, such as PKCS12, JKS, or BCFKS
mTLS Key Password: The key password for the private key, if used
mTLS Truststore Filename: A full path to the truststore file, or Asset reference
mTLS Truststore Password: the truststore password
mTLS Truststore Type: the truststore type, such as PKCS12, JKS, or BCFKS
Schema detection and evolution¶
The connector supports schema detection and evolution. The structure of tables in Snowflake can be defined and evolved automatically to support the structure of new data loaded by the connector.
Without schema detection and evolution, the Snowflake table loaded by the connector only consists of two OBJECT
columns: RECORD_CONTENT
and RECORD_METADATA
.
With schema detection and evolution enabled, Snowflake can detect the schema of the streaming data and load data into tables that automatically match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL
constraint from columns missing in new data files.
Schema detection with the connector is supported with or without a provided schema registry. If using schema registry (Avro), the column will be created with the data types defined in the provided schema registry. If there is no schema registry (JSON), the data type will be inferred based on the data provided.
JSON ARRAY is not supported for further schematization.
Examples¶
The following examples demonstrate the tables that are created before and after the schema detection and evolution are enabled for Kafka connector.
Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
Row |
RECORD_METADATA |
RECORD_CONTENT |
1 2 3 4 5 |
{“CreateTime”:1669074170090, “headers”: {“current.iter… {“CreateTime”:1669074170400, “headers”: {“current.iter… {“CreateTime”:1669074170659, “headers”: {“current.iter… {“CreateTime”:1669074170904, “headers”: {“current.iter… {“CreateTime”:1669074171063, “headers”: {“current.iter… |
“account”: “ABC123”, “symbol”: “ZTEST”, “side”:… “account”: “XYZ789”, “symbol”: “ZABZX”, “side”:… “account”: “XYZ789”, “symbol”: “ZTEST”, “side”:… “account”: “ABC123”, “symbol”: “ZABZX”, “side”:… “account”: “ABC123”, “symbol”: “ZTEST”, “side”:… |
After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the connector.
Row |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
1 2 3 4 5 |
{“CreateTime”:1669074170090, “headers”: {“current.iter… {“CreateTime”:1669074170400, “headers”: {“current.iter… {“CreateTime”:1669074170659, “headers”: {“current.iter… {“CreateTime”:1669074170904, “headers”: {“current.iter… {“CreateTime”:1669074171063, “headers”: {“current.iter… |
ABC123 XYZ789 XYZ789 ABC123 ABC123 |
ZTEST ZABZX ZTEST ZABZX ZTEST |
BUY SELL SELL BUY BUY |
3572 3024 799 2033 1558 |
Enabling schema detection¶
Use the Schematization Enabled
property in the connector configuration properties to enable or disable schema detection.
Enabling schema evolution¶
If the connector creates the target table, schema evolution is enabled by default.
If you want to enable or disable schema evolution on the existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION
parameter. You must also use a role that has the OWNERSHIP
privilege on the table. For more information, see Table schema evolution.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured dead-letter queues (DLQ).
Using the Openflow Connector for Kafka with Apache Iceberg™ tables¶
Openflow Connector for Kafka can ingest data into a Snowflake-managed Apache Iceberg™ table.
Requirements and limitations¶
Before you configure the Openflow Kafka connector for Iceberg table ingestion, note the following requirements and limitations:
You must create an Iceberg table before running the connector.
Make sure that the user has access to inserting data into the created tables.
Configuration and setup¶
To configure the Openflow Connector for Kafka for Iceberg table ingestion, follow the steps in Configure the connector with a few differences noted in the following sections.
Enable ingestion into Iceberg table¶
To enable ingestion into an Iceberg table, you must set the Iceberg Enabled
parameter to true
.
Create an Iceberg table for ingestion¶
Before you run the connector, you must create an Iceberg table.
The initial table schema depends on your connector Schematization Enabled
property settings.
If you enable schematization, you must create a table with a column named record_metadata
:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
The connector automatically creates the columns for message fields and alters the record_metadata
column schema.
If you don’t enable schematization, you must create a table with a column named record_content
of a type that matches the actual Kafka message content.
The connector automatically creates the record_metadata
column.
When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
For example, consider the following message:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
To create an Iceberg table for the example message and schematization enabled, use the following statement:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( RECORD_METADATA OBJECT( offset INTEGER, topic STRING, partition INTEGER, key STRING, timestamp TIMESTAMP, SnowflakeConnectorPushTime BIGINT, headers MAP(VARCHAR, VARCHAR) ), id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
To create an Iceberg table for the example message and schematization disabled, run the following:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( RECORD_METADATA OBJECT( offset INTEGER, topic STRING, partition INTEGER, key STRING, timestamp TIMESTAMP, SnowflakeConnectorPushTime BIGINT, headers MAP(VARCHAR, VARCHAR) ), RECORD_CONTENT OBJECT( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Note
RECORD_METADATA must always be created. Field names inside nested structures such as dogs
or cats
are case sensitive.