Apache Kafka avec DLQ et métadonnées¶
Note
Le connecteur est soumis aux conditions d’utilisation du connecteur.
Cette rubrique décrit le connecteur Apache Kafka avec DLQ et métadonnées. Il s’agit du connecteur complet qui offre une parité de fonctions avec l’ancien connecteur Snowflake pour Kafka et inclut des capacités avancées pour les cas d’utilisation en production.
Fonctionnalités clés¶
Le connecteur Apache Kafka avec DLQ et métadonnées offre des fonctionnalités complètes :
Prise en charge de la file d’attente de lettres mortes (DLQ) pour la gestion des messages en échec
Colonne RECORD_METADATA avec les métadonnées des messages Kafka
Configuration de la schématisation - activer ou désactiver la détection des schémas
Prise en charge des tables Iceberg avec évolution du schéma
Prise en charge de plusieurs formats de messages - JSON et AVRO
Intégration du registre des schémas pour les messages AVRO
Mappage sujet-table avec des modèles avancés
Prise en charge de l’authentificationSASL
Paramètres spécifiques¶
Outre les paramètres communs décrits à l’adresse Paramétrage du connecteur Openflow pour Kafka, ce connecteur comprend des contextes de paramètres supplémentaires pour les fonctions avancées.
Format du message et paramètres du schéma¶
Paramètre |
Description |
Obligatoire |
---|---|---|
Format du message |
Le format des messages dans Kafka. Une des options : JSON / AVRO. Par défaut : JSON |
Oui |
Schéma AVRO |
Schéma Avro dans le cas où schema-text-property est utilisé dans la stratégie d’accès au schéma AVRO avec le format de message AVRO. Remarque : cette option ne doit être utilisée que dans le cas où tous les messages consommés à partir du ou des sujets Kafka configurés partagent le même schéma. |
Non |
Stratégie d’accès au schéma AVRO |
La méthode d’accès au schéma AVRO d’un message. Obligatoire pour AVRO. L’un des éléments suivants : embedded-avro-schema / schema-reference-reader / schema-text-property. Valeur par défaut : embedded-avro-schema |
Non |
Paramètres du registre des schémas¶
Paramètre |
Description |
Obligatoire |
---|---|---|
Type d’authentification du registre du schéma |
La méthode d’authentification au registre des schémas, si elle est utilisée. Sinon, utilisez NONE. Une des options : NONE / BASIC. Par défaut : NONE |
Oui |
URL du registre des schémas |
L’URL du registre des schémas. Exigence pour le format du message AVRO. |
Non |
Nom d’utilisateur du registre des schémas |
Le nom d’utilisateur pour le registre des schémas. Exigence pour le format du message AVRO. |
Non |
Mot de passe du registre des schémas |
Le mot de passe pour le registre des schémas. Exigence pour le format du message AVRO. |
Non |
DLQ et les paramètres des fonctions avancées¶
Paramètre |
Description |
Obligatoire |
---|---|---|
Rubrique Kafka DLQ |
Rubrique DLQ pour envoyer les messages contenant des erreurs d’analyse à |
Oui |
La schématisation est activée |
Détermine si les données sont insérées dans des colonnes individuelles ou dans un champ unique RECORD_CONTENT. Un des éléments suivants : true / false. Par défaut : true |
Oui |
Iceberg activé |
Indique si le processeur intègre des données dans une table Iceberg. Le processeur échoue si cette propriété ne correspond pas au type de table réel. Par défaut : false |
Oui |
Comportement de schématisation¶
Le comportement du connecteur change en fonction du paramètre Activation de la schématisation :
La schématisation est activée¶
Lorsque la schématisation est activée, le connecteur :
Crée des colonnes individuelles pour chaque champ du message
Inclut une colonne RECORD_METADATA avec des métadonnées Kafka
Fait évoluer automatiquement le schéma de la table lorsque de nouveaux champs sont détectés
Aplatissement des structures imbriquées JSON/AVRO en colonnes distinctes
Exemple de structure de table :
Ligne |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|---|
1 |
{« timestamp »:1669074170090, « headers »: {« current.iter… |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
{« timestamp »:1669074170400, « headers »: {« current.iter… |
XYZ789 |
ZABX |
SELL |
3024 |
Schématisation désactivée¶
Lorsque la schématisation est désactivée, le connecteur :
Ne crée que deux colonnes : RECORD_CONTENT et RECORD_METADATA
Enregistre le contenu complet du message sous la forme d’un OBJECT dans RECORD_CONTENT
N’effectue pas d’évolution automatique du schéma
Offre une flexibilité maximale pour le traitement en aval
Exemple de structure de table :
Ligne |
RECORD_METADATA |
RECORD_CONTENT |
---|---|---|
1 |
{« timestamp »:1669074170090, « headers »: {« current.iter… |
{« account »: « ABC123 », « symbol »: « ZTEST », « side »:… |
2 |
{« timestamp »:1669074170400, « headers »: {« current.iter… |
{« account »: « XYZ789 », « symbol »: « ZABX », « side »:… |
Utilisez la propriété Activation de la schématisation
dans les propriétés de configuration du connecteur pour activer ou désactiver la détection des schémas.
Détection et évolution des schémas¶
Le connecteur prend en charge la détection et l’évolution des schémas. La structure des tables dans Snowflake peut être définie et évoluer automatiquement pour supporter la structure des nouvelles données chargées par le connecteur.
Sans détection et évolution du schéma, la table Snowflake chargée par le connecteur ne comporte que deux colonnes OBJECT
: RECORD_CONTENT
et RECORD_METADATA
.
Lorsque la détection et l’évolution des schémas sont activées, Snowflake peut détecter le schéma des données en continu et charger les données dans des tables qui correspondent automatiquement à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la NOT NULL
contrainte des colonnes manquantes dans les nouveaux fichiers de données.
La détection des schémas avec le connecteur est possible avec ou sans registre de schémas fourni. Si vous utilisez le registre des schémas (Avro), la colonne sera créée avec les types de données définis dans le registre des schémas fourni. En l’absence de registre de schéma (JSON), le type de données sera déduit sur la base des données fournies.
Le ARRAY JSON n’est pas pris en charge pour une schématisation plus poussée.
Permettre l’évolution des schémas¶
Si le connecteur crée la table cible, l’évolution du schéma est activée par défaut.
Si vous souhaitez activer ou désactiver l’évolution du schéma sur la table existante, utilisez la commande ALTER TABLE afin d’ensemble le paramètre ENABLE_SCHEMA_EVOLUTION
. Vous devez également utiliser un rôle qui a le privilège OWNERSHIP
sur la table. Pour plus d’informations, voir Évolution du schéma de table.
Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur essaiera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente de lettres mortes configurées (DLQ).
Structure RECORD_METADATA¶
La colonne RECORD_METADATA contient des métadonnées importantes sur les messages Kafka :
Champ |
Description |
---|---|
offset |
Le décalage du message à l’intérieur de la partition Kafka |
topic |
Le nom du sujet Kafka |
partition |
Le numéro de la partition Kafka |
key |
La clé du message (le cas échéant) |
timestamp |
L’horodatage du message |
SnowflakeConnectorPushTime |
Horodatage du moment où le connecteur a récupéré le message dans Kafka |
headers |
Mappage des en-têtes du message (le cas échéant) |
File d’attente lettres mortes (DLQ)¶
La fonctionnalité DLQ permet de traiter les messages qui ne peuvent être traités avec succès :
Comportement DLQ¶
Échecs d’analyse - Les messages dont le format JSON/AVRO n’est pas valide sont envoyés au DLQ
Non-concordance des schémas - Messages ne correspondant pas au schéma attendu lorsque l’évolution des schémas est désactivée
Erreurs de traitement - Autres défaillances de traitement pendant l’ingestion
Support de table Iceberg¶
Le connecteur Openflow for Kafka peut ingérer des données dans une table Apache Iceberg™ gérée par Snowflake lorsque Iceberg activé est réglé sur true.
Exigences et limitations¶
Avant de configurer le connecteur Openflow Kafka pour l’ingestion de tables Iceberg, notez les exigences et les limites suivantes :
Vous devez créer une table Iceberg avant d’exécuter le connecteur.
Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.
Configuration et définition¶
Pour configurer le connecteur Openflow pour Kafka pour l’ingestion de tables Iceberg, suivez les étapes du site Paramétrage du connecteur Openflow pour Kafka avec quelques différences notées dans les sections suivantes.
Activer l’ingestion dans la table Iceberg¶
Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled
la valeur true
.
Créer une table Iceberg pour l’ingestion¶
Avant d’exécuter le connecteur, vous devez créer une table Iceberg. Le schéma de table initial dépend des paramètres de la propriété de votre connecteur Schématisation activée
.
Si vous activez la schématisation, vous devez créer une table avec une colonne nommée record_metadata
:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Le connecteur crée automatiquement les colonnes pour les champs de message et modifie le schéma des colonnes de record_metadata
.
Si vous n’activez pas la schématisation, vous devez créer une table avec une colonne nommée record_content
d’un type correspondant au contenu réel du message Kafka. Le connecteur crée automatiquement la colonne record_metadata
.
Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg ou des types Snowflake compatibles. Le type semi-structuré VARIANT n’est pas pris en charge. Utilisez plutôt un structuré OBJECT ou MAP.
Prenons l’exemple du message suivant :
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Exemples de création de tables Icebergs¶
Avec schématisation activée :
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Avec schématisation désactivée :
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
RECORD_CONTENT OBJECT(
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Note
RECORD_METADATA doit toujours être créé. Les noms de champ à l’intérieur de structures imbriquées telles que dogs
ou cats
sont sensibles à la casse.
Cas d’utilisation¶
Ce connecteur est idéal pour :
Environnements de production exigeant DLQ
Lignage des données et audit où les métadonnées Kafka sont importantes
Traitement de messages complexes avec des exigences d’évolution du schéma
Intégration de la table Iceberg
Si vous avez besoin d’une ingestion plus simple, sans métadonnées ni fonctions DLQ, optez plutôt pour les connecteurs Apache Kafka pour le format de données JSON/AVRO.