Set up the Openflow Connector for Kafka

Note

The connector is subject to the Connector Terms.

This topic describes the steps to set up the Openflow Connector for Kafka.

Prerequisites

  1. Ensure that you have reviewed Openflow Connector for Kafka.

  2. Ensure that you have set up a Openflow.

Set up Snowflake account

As a Snowflake account administrator, perform the following tasks:

  1. Create a new role or use an existing role and grant the Database privileges.

  2. Create a new Snowflake service user with the type as SERVICE.

  3. Grant the Snowflake service user the role you created in the previous steps.

  4. Configure with key-pair auth for the Snowflake SERVICE user from step 2.

  5. Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.

    Note

    If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.

    1. Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.

    2. In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.

    3. At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.

  6. If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.

  7. Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.

Configure the connector

As a data engineer, perform the following tasks to configure a connector:

  1. Create a database and schema in Snowflake for the connector to store ingested data.

  2. Download the connector definition file.

  3. Import the connector definition into Openflow.

    1. Enter an Openflow canvas.

    2. Add a process group to the canvas.

    3. On the Create Process Group pop-up, select the connector definition file to import.

    4. Connect the Upload failure output of the group to your NiFi monitoring component, for example, LogAttribure.

  4. Populate the process group parameters

    1. Right click on the imported process group and select Parameters.

    2. Fill out the required parameter values as described in Flow parameters.

  5. Go back to the canvas.

  6. Right click on the plane and click Enable all Controller Services.

  7. Right click on the plane and click Start. The connector starts data ingestion.

Flow parameters

Kafka-related parameters

Parameter Name

Description

Required

Kafka Bootstrap Servers

A comma-separated list of Kafka brokers to fetch data from, should contain port, for example kafka-broker:9092. The same instance is used for the DLQ topic.

Yes

Kafka Topics

A comma-separated list of Kafka topics.

Yes

Kafka Group Id

The ID of a consumer group used by the connector. Can be arbitrary but must be unique.

Yes

Kafka Auto Offset Reset

Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka auto.offset.reset property. One of: earliest / latest. Default: latest

Yes

Kafka Security Protocol

Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property. One of: PLAINTEXT / SASL_PLAINTEXT / SASL_SSL / SSL

Yes

Kafka SASL Mechanism

SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property. One of: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512 / AWS_MSK_IAM

Yes

Kafka SASL Username

The username to authenticate to Kafka

Yes

Kafka SASL Password

The password to authenticate to Kafka

Yes

Kafka DLQ Topic

The name or pattern of the Kafka Topics from which the connector consumes data. More than one can be supplied if comma separated. See also \“Kafka Topic Format\” param.

No

Kafka Topic Format

One of: \“names\” / \“pattern\”. Specifies whether the “Kafka Topics” provided are a comma separated list of names or a single regular expression.

Message Format

The format of messages in Kafka One of: JSON / AVRO. Default: JSON

Yes

Schema Registry Authentication Type

The method of authenticating to schema registry if used. Otherwise, use NONE. One of: NONE / BASIC. Default: NONE

Yes

Schema Registry URL

The URL of Schema Registry. Required for AVRO message format.

No

Schema Registry Username

The username for Schema Registry. Required for AVRO message format.

No

Schema Registry Password

The password for Schema Registry. Required for AVRO message format.

No

Kafka Connection Service

Specifies the component responsible for establishing the connection to Kafka. Provide a controller service ID based on the security protocol you use:

SASL_PLAINTEXT / SASL_SSL - Kafka Connection ID

SASL_SSL with IAM - Kafka Connection IAM ID

SSL - Kafka Connection SSL ID

Default: SASL_PLAINTEXT service ID

For more information Getting service ID

Yes

mTLS Key Password

A password for mTLS Key stored in the keystore. Required for mTLS authentication.

No

mTLS Keystore Filename

A full path to a keystore storing a client key and certificate for mTLS. Required for mTLS authentication.

No

mTLS Keystore Password

A password for the mTLS keystore. Required for mTLS authentication.

No

mTLS Keystore Type

The type of mTLS keystore. One of: PKCS12 / JKS / BCFKS. Required for mTLS authentication.

No

mTLS Truststore Filename

A full path to a truststore storing broker certificates. For or mTLS only

No

mTLS Truststore Password

A password for the mTLS truststore. For or mTLS only

No

mTLS Truststore Type

The type of mTLS truststore. One of: PKCS12 / JKS / BCFKS. For or mTLS only

No

Snowflake-related parameters

Parameter Name

Description

Required

Snowflake Account

Snowflake account identifier with organization name and account name formatted as [organization-name] -[account-name]

Yes

Snowflake User

The name of the user used by the connector

Yes

Snowflake Role

The name of the role used by the connector

Yes

Snowflake DB

The database for storing data

Yes

Snowflake Schema

The schema for storing data

Yes

Snowflake Warehouse

Snowflake Warehouse for connections. If empty, the default warehouse for account will be used.

No

Snowflake Private Key

The RSA private key which is used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined

No

Snowflake Private Key File

The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with —–BEGIN PRIVATE

No

Private Key Password

The password associated with the Snowflake Private Key File

No

Other parameters

Parameter Name

Description

Required

Topic To Table Map

This optional parameter allows user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. The regular expressions cannot be ambiguous — any matched topic must match only a single target table. If empty or no matches found, topic name will be used as table name. Note: The mapping cannot contain spaces after commas.

No

Iceberg Enabled

Specifies whether the processor ingests data into an Iceberg table. The processor fails if this property doesn’t match the actual table type. Default: false

Yes

Schematization Enabled

Specifies whether to enable schema detection. When enabled a record content is stored in separate columns. Otherwise it is stored as RECORD_CONTENT column of type OBJECT. One of: true / false. Default: true

Yes

Topic To Table Map example values:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

Getting service ID

To find service ID for Kafka Connection Service, do the following:

  1. Right click on the kafka processor group and select Controller Services.

  2. Click three dots next to the service you want and select View Configuration or Edit.

  3. Click Copy next to the ID field.

Authentication

The connector supports the following standard Kafka authentication mechanisms to ensure safety:

  • SASL

  • mTLS

The connector also supports the following SASL mechanisms:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • AWS_MSK_IAM

Configuring authentication for the connector

This section provides examples of how to configure authentication for various mechanisms:

SASL

To configure SASL authentication, you will need to provide the following configuration properties:

  • Kafka Connection Service: ID of the Kafka Connection controller service. For more information, refer to Getting service ID

  • Kafka Security Protocol: SASL_PLAINTEXT or SASL_SSL

  • Kafka SASL Mechanism: PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512

  • SASL Username: Kafka username

  • SASL Password: Kafka password

SASL with AWS MSK IAM

To configure SASL authentication with IAM, use the following configuration properties:

  • Kafka Connection Service: ID of the Kafka Connection IAM controller service. For more information, refer to Getting service ID

  • Kafka Security Protocol: SASL_PLAINTEXT or SASL_SSL

  • Kafka SASL Mechanism: AWS_MSK_IAM

You also need to provide IAM credentials in Openflow with BYOC (bring your own cloud) configurations, deployed in your cloud.

mTLS

To configure mTLS authentication, you will need to generate and configure the necessary certificates for both the connector and the Kafka broker.

After obtaining the necessary certificates, generate a keystore for secure storage of both the connector’s private key and its certificate, and a truststore for confirming the Kafka broker certificate. The truststore must contain either the Kafka broker certificate or an alternative certificate in the certification chain that can be utilized to verify the broker’s certificate.

The supported keystore/truststore formats are PKCS12, JKS, and BCFKS.

Once the certificates are prepared, provide the following configuration properties:

  • Kafka Connection Service: ID of the Kafka Connection SSL controller service. For more information, refer to Getting service ID

  • Kafka Security Protocol - SSL

  • mTLS Keystore Filename: A full path to the keystore file, or Asset reference

  • mTLS Keystore Password: The keystore password

  • mTLS Keystore Type: The keystore type, such as PKCS12, JKS, or BCFKS

  • mTLS Key Password: The key password for the private key, if used

  • mTLS Truststore Filename: A full path to the truststore file, or Asset reference

  • mTLS Truststore Password: the truststore password

  • mTLS Truststore Type: the truststore type, such as PKCS12, JKS, or BCFKS

Schema detection and evolution

The connector supports schema detection and evolution. The structure of tables in Snowflake can be defined and evolved automatically to support the structure of new data loaded by the connector.

Without schema detection and evolution, the Snowflake table loaded by the connector only consists of two OBJECT columns: RECORD_CONTENT and RECORD_METADATA.

With schema detection and evolution enabled, Snowflake can detect the schema of the streaming data and load data into tables that automatically match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL constraint from columns missing in new data files.

Schema detection with the connector is supported with or without a provided schema registry. If using schema registry (Avro), the column will be created with the data types defined in the provided schema registry. If there is no schema registry (JSON), the data type will be inferred based on the data provided.

JSON ARRAY is not supported for further schematization.

Examples

The following examples demonstrate the tables that are created before and after the schema detection and evolution are enabled for Kafka connector.

Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.

Row

RECORD_METADATA

RECORD_CONTENT

1 2 3 4 5

{“CreateTime”:1669074170090, “headers”: {“current.iter… {“CreateTime”:1669074170400, “headers”: {“current.iter… {“CreateTime”:1669074170659, “headers”: {“current.iter… {“CreateTime”:1669074170904, “headers”: {“current.iter… {“CreateTime”:1669074171063, “headers”: {“current.iter…

“account”: “ABC123”, “symbol”: “ZTEST”, “side”:… “account”: “XYZ789”, “symbol”: “ZABZX”, “side”:… “account”: “XYZ789”, “symbol”: “ZTEST”, “side”:… “account”: “ABC123”, “symbol”: “ZABZX”, “side”:… “account”: “ABC123”, “symbol”: “ZTEST”, “side”:…

After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the connector.

Row

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1 2 3 4 5

{“CreateTime”:1669074170090, “headers”: {“current.iter… {“CreateTime”:1669074170400, “headers”: {“current.iter… {“CreateTime”:1669074170659, “headers”: {“current.iter… {“CreateTime”:1669074170904, “headers”: {“current.iter… {“CreateTime”:1669074171063, “headers”: {“current.iter…

ABC123 XYZ789 XYZ789 ABC123 ABC123

ZTEST ZABZX ZTEST ZABZX ZTEST

BUY SELL SELL BUY BUY

3572 3024 799 2033 1558

Enabling schema detection

Use the Schematization Enabled property in the connector configuration properties to enable or disable schema detection.

Enabling schema evolution

If the connector creates the target table, schema evolution is enabled by default.

If you want to enable or disable schema evolution on the existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Table schema evolution.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured dead-letter queues (DLQ).

Using the Openflow Connector for Kafka with Apache Iceberg™ tables

Openflow Connector for Kafka can ingest data into a Snowflake-managed Apache Iceberg™ table.

Requirements and limitations

Before you configure the Openflow Kafka connector for Iceberg table ingestion, note the following requirements and limitations:

  • You must create an Iceberg table before running the connector.

  • Make sure that the user has access to inserting data into the created tables.

Configuration and setup

To configure the Openflow Connector for Kafka for Iceberg table ingestion, follow the steps in Configure the connector with a few differences noted in the following sections.

Enable ingestion into Iceberg table

To enable ingestion into an Iceberg table, you must set the Iceberg Enabled parameter to true.

Create an Iceberg table for ingestion

Before you run the connector, you must create an Iceberg table. The initial table schema depends on your connector Schematization Enabled property settings.

If you enable schematization, you must create a table with a column named record_metadata:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

The connector automatically creates the columns for message fields and alters the record_metadata column schema.

If you don’t enable schematization, you must create a table with a column named record_content of a type that matches the actual Kafka message content. The connector automatically creates the record_metadata column.

When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

For example, consider the following message:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

To create an Iceberg table for the example message and schematization enabled, use the following statement:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

To create an Iceberg table for the example message and schematization disabled, run the following:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Note

RECORD_METADATA must always be created. Field names inside nested structures such as dogs or cats are case sensitive.