Dépannage du connecteur Kafka

Cette section explique comment résoudre les problèmes rencontrés lors de l’intégration de données à l’aide du connecteur Kafka.

Dans ce chapitre :

Dépannage des problèmes

Cette section décrit une approche méthodique pour résoudre les problèmes liés aux chargements utilisant le connecteur Kafka.

Étape 1. Affichage de l’historique COPY de la table

Interrogez l’historique des activités de chargement pour la table cible. Pour plus d’informations, voir Vue COPY_HISTORY. Si la sortie COPY_HISTORY n’inclut pas un ensemble de fichiers attendus, interrogez une période antérieure. Si les fichiers étaient des doublons d’anciens fichiers, l’historique de chargement aurait peut-être enregistré l’activité lors de la tentative de chargement des fichiers d’origine.

La colonne STATUS indique si un ensemble particulier de fichiers a été chargé, partiellement chargé ou non. La colonne FIRST_ERROR_MESSAGE fournit une raison lorsqu’une tentative est partiellement chargée ou échouée.

Validation des fichiers de données et résolution des erreurs

Si Snowpipe n’a pas pu charger les données à partir de fichiers de la zone de préparation interne créée pour le sujet Kafka, le connecteur Kafka déplace les fichiers vers la zone de préparation spéciale associée à la table cible. La syntaxe pour référencer une zone de préparation de table est @[espace_noms.]%nom_table.

Notez que si un ensemble de fichiers pose plusieurs problèmes, la colonne FIRST_ERROR_MESSAGE dans la sortie COPY_HISTORY indique seulement la première erreur rencontrée. Pour afficher toutes les erreurs dans les fichiers, exécutez une instruction COPY INTO <table> avec l’option de copie VALIDATION_MODE définie sur RETURN_ALL_ERRORS. L’option de copie VALIDATION_MODE commande une instruction COPY pour valider les données à charger et retourner les résultats en fonction de l’option de validation spécifiée. Aucune donnée n’est chargée lorsque cette option de copie est spécifiée. Dans l’instruction, faites référence à l’ensemble des fichiers que vous avez tenté de charger à l’aide du connecteur Kafka.

Lorsque tous les problèmes liés aux fichiers de données sont résolus, vous pouvez charger les données manuellement à l’aide d’une ou plusieurs instructions COPY.

L’exemple suivant fait référence aux fichiers de données situés dans la zone de préparation de la table pour la table mytable de la base de données et du schéma mydb.public.

Pour valider les fichiers de données dans la zone de préparation de la table et résoudre les erreurs :

  1. Répertoriez tous les fichiers situés dans la zone de préparation de la table (avec LIST) :

    LIST @mydb.public.%mytable;
    
    +-----------------+------+----------------------------------+-------------------------------+
    | name            | size | md5                              | last_modified                 |
    |-----------------+------+----------------------------------+-------------------------------|
    | myfile.csv.gz   |  512 | a123cdef1234567890abcdefghijk123 | Tue, 22 Oct 2019 14:20:31 GMT |
    +-----------------+------+----------------------------------+-------------------------------+
    
  2. Récupérez toutes les erreurs rencontrées dans le fichier (à l’aide de COPY INTO table … VALIDATION_MODE = “RETURN_ALL_ERRORS”) :

    COPY INTO mydb.public.mytable FROM @mydb.public.%mytable VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
  3. Téléchargez les fichiers de données de la zone de préparation de la table dans un répertoire local (à l’aide de GET) :

    Linux/Mac
    GET @mydb.public.%mytable file:///tmp/;
    
    Windows
    GET @mydb.public.%mytable file://C:\temp\;
    
  4. Corrigez toutes les erreurs dans les fichiers de données.

  5. Placez les fichiers dans la zone de préparation de la table ou dans la zone de préparation interne nommée pour le sujet Kafka (avec PUT). Dans cet exemple, nous mettons en zone de préparation les fichiers dans la table, en remplaçant les fichiers existants :

    Linux/Mac
    PUT file:///tmp/myfile.csv @mydb.public.%mytable;
    
    Windows
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable;
    
  6. Chargez les données dans la table cible (à l’aide de COPY INTO table sans l’option VALIDATION_MODE). Vous pouvez éventuellement utiliser l’option de copie PURGE = TRUE pour supprimer les fichiers de données de la zone de préparation une fois les données chargées correctement ou supprimer manuellement les fichiers de la zone de préparation de table (à l’aide de REMOVE) :

    COPY INTO mydb.public.mytable FROM @mydb.public.%mytable PURGE = TRUE;
    

Étape 2 : analyser le fichier journal du connecteur Kafka

Si la vue COPY_HISTORY ne contient aucun enregistrement du chargement de données, analysez le fichier journal du connecteur Kafka. Le connecteur écrit des événements dans le fichier journal. Notez que le connecteur Snowflake Kafka partage le même fichier journal avec tous les plug-ins du connecteur Kafka. Le nom et l’emplacement de ce fichier journal doivent figurer dans votre fichier de configuration du connecteur Kafka. Pour plus d’informations, consultez la documentation fournie avec votre logiciel Apache Kafka.

Recherchez dans le fichier journal du connecteur Kafka les messages d’erreur liés à Snowflake. Beaucoup de ces messages porteront la chaîne ERROR et contiendront le nom de fichier com.snowflake.kafka.connector... pour faciliter la recherche de ces messages.

Les erreurs possibles que vous pourriez rencontrer incluent :

Erreur de configuration

Causes possibles de l’erreur :

  • Le connecteur ne dispose pas des informations appropriées pour s’abonner au sujet.

  • Le connecteur ne dispose pas des informations appropriées pour écrire dans la table Snowflake (la paire de clés pour l’authentification peut être incorrecte, par ex.).

Notez que le connecteur Kafka valide ses paramètres. Le connecteur génère une erreur pour chaque paramètre de configuration incompatible. Le message d’erreur est écrit dans le fichier journal du cluster Kafka Connect. Si vous suspectez un problème de configuration, vérifiez les erreurs dans ce fichier journal.

Erreurs de lecture

Le connecteur n’a peut-être pas été en mesure de lire Kafka pour les raisons suivantes :

  • Kafka ou Kafka Connect peuvent ne pas être en cours d’exécution.

  • Le message n’a peut-être pas encore été envoyé.

  • Le message a peut-être été supprimé (expiré).

Erreur d’écriture (zone de préparation)

Causes possibles de l’erreur :

  • Privilèges insuffisants sur la zone de préparation.

  • La zone de préparation est hors cadre.

  • La zone de préparation a été détruite.

  • Un autre utilisateur ou processus a écrit des fichiers inattendus sur la zone de préparation.

Erreur d’écriture (table)

Causes possibles de l’erreur :

  • Privilèges insuffisants sur la table.

Étape 3 : vérifier Kafka Connect

Si aucune erreur n’est signalée dans le fichier journal de Kafka Connect, vérifiez Kafka Connect. Pour obtenir des instructions de dépannage, consultez la documentation fournie par votre fournisseur de logiciels Apache Kafka.

Résolution de problèmes spécifiques

Lignes en double avec la même partition de sujet et le même décalage

Lors du chargement de données à l’aide de la version 1.4 du connecteur Kafka (ou supérieur), les lignes en double dans la table cible avec la même partition de sujet et le même décalage peuvent indiquer que l’opération de chargement a dépassé le délai d’exécution par défaut de 300000 millisecondes (300 secondes). Pour vérifier la cause, consultez le fichier journal de Kafka Connect pour l’erreur suivante :

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)

Pour résoudre l’erreur, dans le fichier de configuration de Kafka (par exemple <kafka_dir>/config/connect-distributed.properties), modifiez l’une des propriétés suivantes :

consumer.max.poll.interval.ms

Augmentez le délai d’exécution à 900000 (900 secondes).

consumer.max.poll.records

Diminuez le nombre d’enregistrements chargés avec chaque opération à 50.

Signaler des problèmes

Lorsque vous contactez le support Snowflake pour obtenir de l’aide, veuillez disposer des fichiers suivants :

  • Fichier de configuration pour votre connecteur Kafka.

    Important

    Supprimez la clé privée avant de fournir le fichier à Snowflake.

  • Copiez le journal du connecteur Kafka. Assurez-vous que le fichier ne contient pas d’informations confidentielles ou sensibles.