Apache Kafka with DLQ and metadata

Note

The connector is subject to the Connector Terms.

This topic describes the Apache Kafka with DLQ and metadata connector. This is the full-featured connector that provides feature parity with the legacy Snowflake connector for Kafka and includes advanced capabilities for production use cases.

Key features

The Apache Kafka with DLQ and metadata connector provides comprehensive functionality:

  • Dead Letter Queue (DLQ) support for failed message handling

  • RECORD_METADATA column with Kafka message metadata

  • Configurable schematization - enable or disable schema detection

  • Iceberg table support with schema evolution

  • Multiple message formats - JSON and AVRO support

  • Schema registry integration for AVRO messages

  • Topic-to-table mapping with advanced patterns

  • SASL authentication support

Specific parameters

In addition to the common parameters described in Set up the Openflow Connector for Kafka, this connector includes additional parameter contexts for advanced features.

Message format and schema parameters

Parameter

Description

Required

Message Format

The format of messages in Kafka. One of: JSON / AVRO. Default: JSON

Yes

AVRO Schema

Avro schema in case schema-text-property is used in AVRO Schema Access Strategy with the AVRO message format. Note: this should only be used in case all messages consumed from the configured Kafka Topic(s) share the same schema.

No

AVRO Schema Access Strategy

The method of accessing the AVRO schema of a message. Required for AVRO. One of: embedded-avro-schema / schema-reference-reader / schema-text-property. Default: embedded-avro-schema

No

Schema registry parameters

Parameter

Description

Required

Schema Registry Authentication Type

The method of authenticating to schema registry if used. Otherwise, use NONE. One of: NONE / BASIC. Default: NONE

Yes

Schema Registry URL

The URL of Schema Registry. Required for AVRO message format.

No

Schema Registry Username

The username for Schema Registry. Required for AVRO message format.

No

Schema Registry Password

The password for Schema Registry. Required for AVRO message format.

No

DLQ and advanced features parameters

Parameter

Description

Required

Kafka DLQ Topic

DLQ topic to send messages with parsing errors to

Yes

Schematization Enabled

Determines whether data is inserted into individual columns or a single RECORD_CONTENT field. One of: true / false. Default: true

Yes

Iceberg Enabled

Specifies whether the processor ingests data into an Iceberg table. The processor fails if this property doesn’t match the actual table type. Default: false

Yes

Schematization behavior

The connector’s behavior changes based on the Schematization Enabled parameter:

Schematization enabled

When schematization is enabled, the connector:

  • Creates individual columns for each field in the message

  • Includes a RECORD_METADATA column with Kafka metadata

  • Automatically evolves the table schema when new fields are detected

  • Flattens nested JSON/AVRO structures into separate columns

Example table structure:

Row

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{“timestamp”:1669074170090, “headers”: {“current.iter…

ABC123

ZTEST

BUY

3572

2

{“timestamp”:1669074170400, “headers”: {“current.iter…

XYZ789

ZABX

SELL

3024

Schematization disabled

When schematization is disabled, the connector:

  • Creates only two columns: RECORD_CONTENT and RECORD_METADATA

  • Stores the entire message content as an OBJECT in RECORD_CONTENT

  • Does not perform automatic schema evolution

  • Provides maximum flexibility for downstream processing

Example table structure:

Row

RECORD_METADATA

RECORD_CONTENT

1

{“timestamp”:1669074170090, “headers”: {“current.iter…

{“account”: “ABC123”, “symbol”: “ZTEST”, “side”:…

2

{“timestamp”:1669074170400, “headers”: {“current.iter…

{“account”: “XYZ789”, “symbol”: “ZABX”, “side”:…

Use the Schematization Enabled property in the connector configuration properties to enable or disable schema detection.

Schema detection and evolution

The connector supports schema detection and evolution. The structure of tables in Snowflake can be defined and evolved automatically to support the structure of new data loaded by the connector.

Without schema detection and evolution, the Snowflake table loaded by the connector only consists of two OBJECT columns: RECORD_CONTENT and RECORD_METADATA.

With schema detection and evolution enabled, Snowflake can detect the schema of the streaming data and load data into tables that automatically match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL constraint from columns missing in new data files.

Schema detection with the connector is supported with or without a provided schema registry. If using schema registry (Avro), the column will be created with the data types defined in the provided schema registry. If there is no schema registry (JSON), the data type will be inferred based on the data provided.

JSON ARRAY is not supported for further schematization.

Enabling schema evolution

If the connector creates the target table, schema evolution is enabled by default.

If you want to enable or disable schema evolution on the existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Table schema evolution.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured dead-letter queues (DLQ).

RECORD_METADATA structure

The RECORD_METADATA column contains important Kafka message metadata:

Field

Description

offset

The message offset within the Kafka partition

topic

The Kafka topic name

partition

The Kafka partition number

key

The message key (if present)

timestamp

The message timestamp

SnowflakeConnectorPushTime

Timestamp when the connector fetched the message from Kafka

headers

Map of message headers (if present)

Dead Letter Queue (DLQ)

The DLQ functionality handles messages that cannot be processed successfully:

DLQ behavior

  • Parse failures - Messages with invalid JSON/AVRO format are sent to the DLQ

  • Schema mismatches - Messages that don’t match the expected schema when schema evolution is disabled

  • Processing errors - Other processing failures during ingestion

Iceberg table support

Openflow Connector for Kafka can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

Requirements and limitations

Before you configure the Openflow Kafka connector for Iceberg table ingestion, note the following requirements and limitations:

  • You must create an Iceberg table before running the connector.

  • Make sure that the user has access to inserting data into the created tables.

Configuration and setup

To configure the Openflow Connector for Kafka for Iceberg table ingestion, follow the steps in Set up the Openflow Connector for Kafka with a few differences noted in the following sections.

Enable ingestion into Iceberg table

To enable ingestion into an Iceberg table, you must set the Iceberg Enabled parameter to true.

Create an Iceberg table for ingestion

Before you run the connector, you must create an Iceberg table. The initial table schema depends on your connector Schematization Enabled property settings.

If you enable schematization, you must create a table with a column named record_metadata:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

The connector automatically creates the columns for message fields and alters the record_metadata column schema.

If you don’t enable schematization, you must create a table with a column named record_content of a type that matches the actual Kafka message content. The connector automatically creates the record_metadata column.

When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

For example, consider the following message:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Iceberg table creation examples

With schematization enabled:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

With schematization disabled:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Note

RECORD_METADATA must always be created. Field names inside nested structures such as dogs or cats are case sensitive.

Use cases

This connector is ideal for:

  • Production environments requiring DLQ

  • Data lineage and auditing where Kafka metadata is important

  • Complex message processing with schema evolution requirements

  • Iceberg table integration

If you need simpler ingestion without metadata or DLQ features, consider the Apache Kafka for JSON/AVRO data format connectors instead.