Dépannage du connecteur Kafka

Cette section explique comment résoudre les problèmes rencontrés lors de l’intégration de données à l’aide du connecteur Kafka.

Dans ce chapitre :

Notifications d’erreurs

Configuration des notifications d’erreur pour Snowpipe Lorsque Snowpipe rencontre des erreurs de fichiers pendant un chargement, la fonction envoie une notification à un service de messagerie dans le Cloud configuré, ce qui permet d’analyser vos fichiers de données. Pour plus d’informations, voir Notifications d’erreur Snowpipe.

Etapes de dépannage général

Effectuez les étapes suivantes pour résoudre les problèmes liés aux charges utilisant le connecteur Kafka.

Étape 1 : affichage de l’historique de COPY de la table

Interrogez l’historique des activités de chargement pour la table cible. Pour plus d’informations, voir Vue COPY_HISTORY. Si la sortie COPY_HISTORY n’inclut pas un ensemble de fichiers attendus, interrogez une période antérieure. Si les fichiers étaient des doublons d’anciens fichiers, l’historique de chargement aurait peut-être enregistré l’activité lors de la tentative de chargement des fichiers d’origine. La colonne STATUS indique si un ensemble particulier de fichiers a été chargé, partiellement chargé ou non. La colonne FIRST_ERROR_MESSAGE fournit une raison lorsqu’une tentative est partiellement chargée ou échouée.

Le connecteur Kafka déplace les fichiers qu’il n’a pas pu charger vers la zone de préparation associée à la table cible. La syntaxe pour référencer une zone de préparation de table est @[namespace.]%table_name.

Répertoriez tous les fichiers situés dans la zone de préparation de la table avec LIST.

Par exemple :

LIST @mydb.public.%mytable;
Copy

Les noms de fichiers sont dans l’un des formats suivants. Les conditions qui produisent chaque format sont décrites dans le tableau :

Type de fichier

Description

Octets bruts

Ces fichiers correspondent au modèle suivant :

<connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz

Pour ces fichiers, les enregistrements Kafka n’ont pas pu être convertis des octets bruts au format de fichier source (Avro, JSON ou Protobuf).

Une cause commune de ce problème est une panne de réseau qui a entraîné la destruction d’un caractère de l’enregistrement. Le connecteur Kafka ne pouvait plus analyser les octets bruts, ce qui a entraîné l’endommagement d’un enregistrement.

Format du fichier source (Avro, JSON ou Protobuf)

Ces fichiers correspondent au modèle suivant :

<connector_name>/<table_name>/<partition>/<start_offset>_<end_offset>_<timestamp>.<file_type>.gz

Pour ces fichiers, après que le connecteur Kafka ait reconverti les octets bruts au format de fichier source, Snowpipe a rencontré une erreur et n’a pas pu charger le fichier.

Les sections suivantes fournissent des instructions pour résoudre les problèmes liés à chacun des types de fichiers :

Octets bruts

Le nom de fichier <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz comprend le décalage exact de l’enregistrement qui n’a pas été converti des octets bruts au format de fichier source. Pour résoudre les problèmes, renvoyer l’enregistrement au connecteur Kafka comme un nouvel enregistrement.

Format du fichier source (Avro, JSON ou Protobuf)

Si Snowpipe n’a pas pu charger les données à partir de fichiers de la zone de préparation interne créée pour le sujet Kafka, le connecteur Kafka déplace les fichiers vers la zone de préparation de la table cible dans le format du fichier source.

Notez que si un ensemble de fichiers pose plusieurs problèmes, la colonne FIRST_ERROR_MESSAGE dans la sortie COPY_HISTORY indique seulement la première erreur rencontrée. Pour visualiser toutes les erreurs dans les fichiers, il est nécessaire de récupérer les fichiers de la zone de préparation de la table, de les télécharger vers une zone de préparation nommée, puis d’exécuter une instruction COPY INTO <table> avec l’option de copie VALIDATION_MODE définie sur RETURN_ALL_ERRORS. L’option de copie VALIDATION_MODE commande une instruction COPY pour valider les données à charger et retourner les résultats en fonction de l’option de validation spécifiée. Aucune donnée n’est chargée lorsque cette option de copie est spécifiée. Dans l’instruction, faites référence à l’ensemble des fichiers que vous avez tenté de charger à l’aide du connecteur Kafka.

Lorsque tous les problèmes liés aux fichiers de données sont résolus, vous pouvez charger les données manuellement à l’aide d’une ou plusieurs instructions COPY.

L’exemple suivant fait référence aux fichiers de données situés dans la zone de préparation de la table pour la table mytable de la base de données et du schéma mydb.public.

Pour valider les fichiers de données dans la zone de préparation de la table et résoudre les erreurs :

  1. Répertoriez tous les fichiers situés dans la zone de préparation de la table avec LIST.

    Par exemple :

    LIST @mydb.public.%mytable;
    
    Copy

    Les exemples de cette section supposent que JSON est le format source des fichiers de données.

  2. Téléchargez les fichiers créés par le connecteur Kafka sur votre machine locale en utilisant GET.

    Par exemple, téléchargez les fichiers dans un répertoire nommé data sur votre machine locale :

    Linux ou macOS
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. Créez une zone de préparation interne nommée en utilisant CREATE STAGE qui stocke les fichiers de données avec le même format que vos fichiers Kafka source.

    Par exemple, créer une zone de préparation interne nommée kafka_json qui stocke les fichiers JSON :

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. Téléchargez les fichiers que vous avez téléchargés à partir de la zone de préparation de la table en utilisant PUT.

    Par exemple, téléchargez les fichiers téléchargés dans le répertoire data de votre machine locale :

    Linux ou macOS
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. Créer une table temporaire avec deux colonnes de variantes à des fins de test. La table n’est utilisée que pour valider le fichier de données mis en zone de préparation. Aucune donnée n’est chargée dans la table. La table est automatiquement détruite lorsque la session utilisateur en cours se termine :

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. Récupérez toutes les erreurs rencontrées dans le fichier de données en exécutant une instruction COPY INTO *table* … VALIDATION_MODE = “RETURN_ALL_ERRORS”. L’instruction valide le fichier dans la zone de préparation indiquée. Aucune donnée n’est chargée dans la table :

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. Corrigez toutes les erreurs signalées dans les fichiers de données sur votre machine locale.

  8. Téléchargez les fichiers corrigés vers la zone de préparation de la table ou vers la zone de préparation interne nommée en utilisant PUT.

    L’exemple suivant permet de télécharger les fichiers vers la zone de préparation de la table, en écrasant les fichiers existants :

    Linux ou macOS
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. Chargez les données dans la table cible à l’aide de COPY INTO table sans l’option VALIDATION_MODE.

    Vous pouvez éventuellement utiliser l’option de copie PURGE = TRUE pour supprimer les fichiers de données de la zone de préparation une fois les données chargées correctement, ou supprimer manuellement les fichiers de la zone de préparation de table à l’aide de REMOVE :

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

Étape 2 : analyser le fichier journal du connecteur Kafka

Si la vue COPY_HISTORY ne contient aucun enregistrement du chargement de données, analysez le fichier journal du connecteur Kafka. Le connecteur écrit des événements dans le fichier journal. Notez que le connecteur Snowflake Kafka partage le même fichier journal avec tous les plug-ins du connecteur Kafka. Le nom et l’emplacement de ce fichier journal doivent figurer dans votre fichier de configuration du connecteur Kafka. Pour plus d’informations, consultez la documentation fournie avec votre logiciel Apache Kafka.

Recherchez dans le fichier journal du connecteur Kafka les messages d’erreur liés à Snowflake. Beaucoup de ces messages porteront la chaîne ERROR et contiendront le nom de fichier com.snowflake.kafka.connector... pour faciliter la recherche de ces messages.

Les erreurs possibles que vous pourriez rencontrer incluent :

Erreur de configuration

Causes possibles de l’erreur :

  • Le connecteur ne dispose pas des informations appropriées pour s’abonner au sujet.

  • Le connecteur ne dispose pas des informations appropriées pour écrire dans la table Snowflake (la paire de clés pour l’authentification peut être incorrecte, par ex.).

Notez que le connecteur Kafka valide ses paramètres. Le connecteur génère une erreur pour chaque paramètre de configuration incompatible. Le message d’erreur est écrit dans le fichier journal du cluster Kafka Connect. Si vous suspectez un problème de configuration, vérifiez les erreurs dans ce fichier journal.

Erreurs de lecture

Le connecteur n’a peut-être pas été en mesure de lire Kafka pour les raisons suivantes :

  • Kafka ou Kafka Connect peuvent ne pas être en cours d’exécution.

  • Le message n’a peut-être pas encore été envoyé.

  • Le message a peut-être été supprimé (expiré).

Erreur d’écriture (zone de préparation)

Causes possibles de l’erreur :

  • Privilèges insuffisants sur la zone de préparation.

  • La zone de préparation est hors cadre.

  • La zone de préparation a été détruite.

  • Un autre utilisateur ou processus a écrit des fichiers inattendus sur la zone de préparation.

Erreur d’écriture (table)

Causes possibles de l’erreur :

  • Privilèges insuffisants sur la table.

Étape 3 : vérifier Kafka Connect

Si aucune erreur n’est signalée dans le fichier journal de Kafka Connect, vérifiez Kafka Connect. Pour obtenir des instructions de dépannage, consultez la documentation fournie par votre fournisseur de logiciels Apache Kafka.

Résolution de problèmes spécifiques

Lignes en double avec la même partition de sujet et le même décalage

Lors du chargement de données à l’aide de la version 1.4 du connecteur Kafka (ou supérieur), les lignes en double dans la table cible avec la même partition de sujet et le même décalage peuvent indiquer que l’opération de chargement a dépassé le délai d’exécution par défaut de 300000 millisecondes (300 secondes). Pour vérifier la cause, consultez le fichier journal de Kafka Connect pour l’erreur suivante :

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

Pour résoudre l’erreur, dans le fichier de configuration de Kafka (par exemple <kafka_dir>/config/connect-distributed.properties), modifiez l’une des propriétés suivantes :

consumer.max.poll.interval.ms

Augmentez le délai d’exécution à 900000 (900 secondes).

consumer.max.poll.records

Diminuez le nombre d’enregistrements chargés avec chaque opération à 50.

Signaler des problèmes

Lorsque vous contactez le support Snowflake pour obtenir de l’aide, veuillez disposer des fichiers suivants :

  • Fichier de configuration pour votre connecteur Kafka.

    Important

    Supprimez la clé privée avant de fournir le fichier à Snowflake.

  • Copiez le journal du connecteur Kafka. Assurez-vous que le fichier ne contient pas d’informations confidentielles ou sensibles.

  • Fichier journal JDBC.

    Pour générer le fichier journal, définissez la variable d’environnement JDBC_TRACE = true sur votre cluster Kafka Connect avant d’exécuter le connecteur Kafka.

    Pour plus d’informations sur le fichier journal JDBC, voir cet article dans la communauté Snowflake.

  • Connectez le fichier journal.

    Pour produire le fichier journal, éditez le fichier etc/kafka/connect-log4j.properties. Définissez la propriété log4j.appender.stdout.layout.ConversionPattern comme suit :

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    Les contextes des connecteurs sont disponibles dans la version 2.3 et supérieure de Kafka.

    Pour plus d’informations, voir l’information Améliorations de la journalisation sur le site Web de Confluent.