Apache Kafka avec DLQ et métadonnées

Note

Le connecteur est soumis aux conditions d’utilisation du connecteur.

Cette rubrique décrit le connecteur Apache Kafka avec DLQ et métadonnées. Il s’agit du connecteur complet qui offre une parité de fonctions avec l’ancien connecteur Snowflake pour Kafka et inclut des capacités avancées pour les cas d’utilisation en production.

Fonctionnalités clés

Le connecteur Apache Kafka avec DLQ et métadonnées offre des fonctionnalités complètes :

  • Prise en charge de la file d’attente de lettres mortes (DLQ) pour la gestion des messages en échec

  • Colonne RECORD_METADATA avec les métadonnées des messages Kafka

  • Configuration de la schématisation - activer ou désactiver la détection des schémas

  • Prise en charge des tables Iceberg avec évolution du schéma

  • Prise en charge de plusieurs formats de messages - JSON et AVRO

  • Intégration du registre des schémas pour les messages AVRO

  • Mappage sujet-table avec des modèles avancés

  • Prise en charge de l’authentificationSASL

Paramètres spécifiques

Outre les paramètres communs décrits à l’adresse Paramétrage du connecteur Openflow pour Kafka, ce connecteur comprend des contextes de paramètres supplémentaires pour les fonctions avancées.

Format du message et paramètres du schéma

Paramètre

Description

Obligatoire

Format du message

Le format des messages dans Kafka. Une des options : JSON / AVRO. Par défaut : JSON

Oui

Schéma AVRO

Schéma Avro dans le cas où schema-text-property est utilisé dans la stratégie d’accès au schéma AVRO avec le format de message AVRO. Remarque : cette option ne doit être utilisée que dans le cas où tous les messages consommés à partir du ou des sujets Kafka configurés partagent le même schéma.

Non

Stratégie d’accès au schéma AVRO

La méthode d’accès au schéma AVRO d’un message. Obligatoire pour AVRO. L’un des éléments suivants : embedded-avro-schema / schema-reference-reader / schema-text-property. Valeur par défaut : embedded-avro-schema

Non

Paramètres du registre des schémas

Paramètre

Description

Obligatoire

Type d’authentification du registre du schéma

La méthode d’authentification au registre des schémas, si elle est utilisée. Sinon, utilisez NONE. Une des options : NONE / BASIC. Par défaut : NONE

Oui

URL du registre des schémas

L’URL du registre des schémas. Exigence pour le format du message AVRO.

Non

Nom d’utilisateur du registre des schémas

Le nom d’utilisateur pour le registre des schémas. Exigence pour le format du message AVRO.

Non

Mot de passe du registre des schémas

Le mot de passe pour le registre des schémas. Exigence pour le format du message AVRO.

Non

DLQ et les paramètres des fonctions avancées

Paramètre

Description

Obligatoire

Rubrique Kafka DLQ

Rubrique DLQ pour envoyer les messages contenant des erreurs d’analyse à

Oui

La schématisation est activée

Détermine si les données sont insérées dans des colonnes individuelles ou dans un champ unique RECORD_CONTENT. Un des éléments suivants : true / false. Par défaut : true

Oui

Iceberg activé

Indique si le processeur intègre des données dans une table Iceberg. Le processeur échoue si cette propriété ne correspond pas au type de table réel. Par défaut : false

Oui

Comportement de schématisation

Le comportement du connecteur change en fonction du paramètre Activation de la schématisation :

La schématisation est activée

Lorsque la schématisation est activée, le connecteur :

  • Crée des colonnes individuelles pour chaque champ du message

  • Inclut une colonne RECORD_METADATA avec des métadonnées Kafka

  • Fait évoluer automatiquement le schéma de la table lorsque de nouveaux champs sont détectés

  • Aplatissement des structures imbriquées JSON/AVRO en colonnes distinctes

Exemple de structure de table :

Ligne

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{« timestamp »:1669074170090, « headers »: {« current.iter…

ABC123

ZTEST

BUY

3572

2

{« timestamp »:1669074170400, « headers »: {« current.iter…

XYZ789

ZABX

SELL

3024

Schématisation désactivée

Lorsque la schématisation est désactivée, le connecteur :

  • Ne crée que deux colonnes : RECORD_CONTENT et RECORD_METADATA

  • Enregistre le contenu complet du message sous la forme d’un OBJECT dans RECORD_CONTENT

  • N’effectue pas d’évolution automatique du schéma

  • Offre une flexibilité maximale pour le traitement en aval

Exemple de structure de table :

Ligne

RECORD_METADATA

RECORD_CONTENT

1

{« timestamp »:1669074170090, « headers »: {« current.iter…

{« account »: « ABC123 », « symbol »: « ZTEST », « side »:…

2

{« timestamp »:1669074170400, « headers »: {« current.iter…

{« account »: « XYZ789 », « symbol »: « ZABX », « side »:…

Utilisez la propriété Activation de la schématisation dans les propriétés de configuration du connecteur pour activer ou désactiver la détection des schémas.

Détection et évolution des schémas

Le connecteur prend en charge la détection et l’évolution des schémas. La structure des tables dans Snowflake peut être définie et évoluer automatiquement pour supporter la structure des nouvelles données chargées par le connecteur.

Sans détection et évolution du schéma, la table Snowflake chargée par le connecteur ne comporte que deux colonnes OBJECT: RECORD_CONTENT et RECORD_METADATA.

Lorsque la détection et l’évolution des schémas sont activées, Snowflake peut détecter le schéma des données en continu et charger les données dans des tables qui correspondent automatiquement à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la NOT NULL contrainte des colonnes manquantes dans les nouveaux fichiers de données.

La détection des schémas avec le connecteur est possible avec ou sans registre de schémas fourni. Si vous utilisez le registre des schémas (Avro), la colonne sera créée avec les types de données définis dans le registre des schémas fourni. En l’absence de registre de schéma (JSON), le type de données sera déduit sur la base des données fournies.

Le ARRAY JSON n’est pas pris en charge pour une schématisation plus poussée.

Permettre l’évolution des schémas

Si le connecteur crée la table cible, l’évolution du schéma est activée par défaut.

Si vous souhaitez activer ou désactiver l’évolution du schéma sur la table existante, utilisez la commande ALTER TABLE afin d’ensemble le paramètre ENABLE_SCHEMA_EVOLUTION. Vous devez également utiliser un rôle qui a le privilège OWNERSHIP sur la table. Pour plus d’informations, voir Évolution du schéma de table.

Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur essaiera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente de lettres mortes configurées (DLQ).

Structure RECORD_METADATA

La colonne RECORD_METADATA contient des métadonnées importantes sur les messages Kafka :

Champ

Description

offset

Le décalage du message à l’intérieur de la partition Kafka

topic

Le nom du sujet Kafka

partition

Le numéro de la partition Kafka

key

La clé du message (le cas échéant)

timestamp

L’horodatage du message

SnowflakeConnectorPushTime

Horodatage du moment où le connecteur a récupéré le message dans Kafka

headers

Mappage des en-têtes du message (le cas échéant)

File d’attente lettres mortes (DLQ)

La fonctionnalité DLQ permet de traiter les messages qui ne peuvent être traités avec succès :

Comportement DLQ

  • Échecs d’analyse - Les messages dont le format JSON/AVRO n’est pas valide sont envoyés au DLQ

  • Non-concordance des schémas - Messages ne correspondant pas au schéma attendu lorsque l’évolution des schémas est désactivée

  • Erreurs de traitement - Autres défaillances de traitement pendant l’ingestion

Support de table Iceberg

Le connecteur Openflow for Kafka peut ingérer des données dans une table Apache Iceberg™ gérée par Snowflake lorsque Iceberg activé est réglé sur true.

Exigences et limitations

Avant de configurer le connecteur Openflow Kafka pour l’ingestion de tables Iceberg, notez les exigences et les limites suivantes :

  • Vous devez créer une table Iceberg avant d’exécuter le connecteur.

  • Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.

Configuration et définition

Pour configurer le connecteur Openflow pour Kafka pour l’ingestion de tables Iceberg, suivez les étapes du site Paramétrage du connecteur Openflow pour Kafka avec quelques différences notées dans les sections suivantes.

Activer l’ingestion dans la table Iceberg

Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled la valeur true.

Créer une table Iceberg pour l’ingestion

Avant d’exécuter le connecteur, vous devez créer une table Iceberg. Le schéma de table initial dépend des paramètres de la propriété de votre connecteur Schématisation activée.

Si vous activez la schématisation, vous devez créer une table avec une colonne nommée record_metadata:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Le connecteur crée automatiquement les colonnes pour les champs de message et modifie le schéma des colonnes de record_metadata.

Si vous n’activez pas la schématisation, vous devez créer une table avec une colonne nommée record_content d’un type correspondant au contenu réel du message Kafka. Le connecteur crée automatiquement la colonne record_metadata.

Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg ou des types Snowflake compatibles. Le type semi-structuré VARIANT n’est pas pris en charge. Utilisez plutôt un structuré OBJECT ou MAP.

Prenons l’exemple du message suivant :

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Exemples de création de tables Icebergs

Avec schématisation activée :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Avec schématisation désactivée :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Note

RECORD_METADATA doit toujours être créé. Les noms de champ à l’intérieur de structures imbriquées telles que dogs ou cats sont sensibles à la casse.

Cas d’utilisation

Ce connecteur est idéal pour :

  • Environnements de production exigeant DLQ

  • Lignage des données et audit où les métadonnées Kafka sont importantes

  • Traitement de messages complexes avec des exigences d’évolution du schéma

  • Intégration de la table Iceberg

Si vous avez besoin d’une ingestion plus simple, sans métadonnées ni fonctions DLQ, optez plutôt pour les connecteurs Apache Kafka pour le format de données JSON/AVRO.