Dépannage du connecteur Kafka

Cette section explique comment résoudre les problèmes rencontrés lors de l’intégration de données à l’aide du connecteur Kafka.

Dans ce chapitre :

Notifications d’erreur

Configuration des notifications d’erreur pour Snowpipe Lorsque Snowpipe rencontre des erreurs de fichiers pendant un chargement, la fonction envoie une notification à un service de messagerie dans le Cloud configuré, ce qui permet d’analyser vos fichiers de données. Pour plus d’informations, voir Notifications d’erreur Snowpipe.

Étapes de dépannage général

Effectuez les étapes suivantes pour résoudre les problèmes liés aux charges utilisant le connecteur Kafka.

Étape 1 : Affichage de l’historique COPY de la table

Interrogez l’historique des activités de chargement pour la table cible. Pour plus d’informations, voir Vue COPY_HISTORY. Si la sortie COPY_HISTORY n’inclut pas un ensemble de fichiers attendus, interrogez une période antérieure. Si les fichiers étaient des doublons d’anciens fichiers, l’historique de chargement aurait peut-être enregistré l’activité lors de la tentative de chargement des fichiers d’origine. La colonne STATUS indique si un ensemble particulier de fichiers a été chargé, partiellement chargé ou non. La colonne FIRST_ERROR_MESSAGE fournit une raison lorsqu’une tentative est partiellement chargée ou échouée.

Le connecteur Kafka déplace les fichiers qu’il n’a pas pu charger vers la zone de préparation associée à la table cible. La syntaxe pour référencer une zone de préparation de table est @[namespace.]%table_name.

Répertoriez tous les fichiers situés dans la zone de préparation de la table avec LIST.

Par exemple :

LIST @mydb.public.%mytable;
Copy

Les noms de fichiers sont dans l’un des formats suivants. Les conditions qui produisent chaque format sont décrites dans le tableau :

Type de fichier

Description

Octets bruts

Ces fichiers correspondent au modèle suivant :

<connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz

Pour ces fichiers, les enregistrements Kafka n’ont pas pu être convertis des octets bruts au format de fichier source (Avro, JSON ou Protobuf).

Une cause commune de ce problème est une panne de réseau qui a entraîné la destruction d’un caractère de l’enregistrement. Le connecteur Kafka ne pouvait plus analyser les octets bruts, ce qui a entraîné l’endommagement d’un enregistrement.

Format du fichier source (Avro, JSON ou Protobuf)

Ces fichiers correspondent au modèle suivant :

<connector_name>/<table_name>/<partition>/<start_offset>_<end_offset>_<timestamp>.<file_type>.gz

Pour ces fichiers, après que le connecteur Kafka ait reconverti les octets bruts au format de fichier source, Snowpipe a rencontré une erreur et n’a pas pu charger le fichier.

Les sections suivantes fournissent des instructions pour résoudre les problèmes liés à chacun des types de fichiers :

Octets bruts

Le nom de fichier <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz comprend le décalage exact de l’enregistrement qui n’a pas été converti des octets bruts au format de fichier source. Pour résoudre les problèmes, renvoyer l’enregistrement au connecteur Kafka comme un nouvel enregistrement.

Format du fichier source (Avro, JSON ou Protobuf)

Si Snowpipe n’a pas pu charger les données à partir de fichiers de la zone de préparation interne créée pour le sujet Kafka, le connecteur Kafka déplace les fichiers vers la zone de préparation de la table cible dans le format du fichier source.

Notez que si un ensemble de fichiers pose plusieurs problèmes, la colonne FIRST_ERROR_MESSAGE dans la sortie COPY_HISTORY indique seulement la première erreur rencontrée. Pour visualiser toutes les erreurs dans les fichiers, il est nécessaire de récupérer les fichiers de la zone de préparation de la table, de les télécharger vers une zone de préparation nommée, puis d’exécuter une instruction COPY INTO <table> avec l’option de copie VALIDATION_MODE définie sur RETURN_ALL_ERRORS. L’option de copie VALIDATION_MODE commande une instruction COPY pour valider les données à charger et retourner les résultats en fonction de l’option de validation spécifiée. Aucune donnée n’est chargée lorsque cette option de copie est spécifiée. Dans l’instruction, faites référence à l’ensemble des fichiers que vous avez tenté de charger à l’aide du connecteur Kafka.

Lorsque tous les problèmes liés aux fichiers de données sont résolus, vous pouvez charger les données manuellement à l’aide d’une ou plusieurs instructions COPY.

L’exemple suivant fait référence aux fichiers de données situés dans la zone de préparation de la table pour la table mytable de la base de données et du schéma mydb.public.

Pour valider les fichiers de données dans la zone de préparation de la table et résoudre les erreurs :

  1. Répertoriez tous les fichiers situés dans la zone de préparation de la table avec LIST.

    Par exemple :

    LIST @mydb.public.%mytable;
    
    Copy

    Les exemples de cette section supposent que JSON est le format source des fichiers de données.

  2. Téléchargez les fichiers créés par le connecteur Kafka sur votre machine locale en utilisant GET.

    Par exemple, téléchargez les fichiers dans un répertoire nommé data sur votre machine locale :

    Linux ou macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. Créez une zone de préparation interne nommée en utilisant CREATE STAGE qui stocke les fichiers de données avec le même format que vos fichiers Kafka source.

    Par exemple, créer une zone de préparation interne nommée kafka_json qui stocke les fichiers JSON :

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. Téléchargez les fichiers que vous avez téléchargés à partir de la zone de préparation de la table en utilisant PUT.

    Par exemple, téléchargez les fichiers téléchargés dans le répertoire data de votre machine locale :

    Linux ou macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. Créer une table temporaire avec deux colonnes de variantes à des fins de test. La table n’est utilisée que pour valider le fichier de données mis en zone de préparation. Aucune donnée n’est chargée dans la table. La table est automatiquement détruite lorsque la session utilisateur en cours se termine :

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. Récupérez toutes les erreurs rencontrées dans le fichier de données en exécutant une instruction COPY INTO *table* … VALIDATION_MODE = “RETURN_ALL_ERRORS”. L’instruction valide le fichier dans la zone de préparation indiquée. Aucune donnée n’est chargée dans la table :

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. Corrigez toutes les erreurs signalées dans les fichiers de données sur votre machine locale.

  8. Téléchargez les fichiers corrigés vers la zone de préparation de la table ou vers la zone de préparation interne nommée en utilisant PUT.

    L’exemple suivant permet de télécharger les fichiers vers la zone de préparation de la table, en écrasant les fichiers existants :

    Linux ou macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. Chargez les données dans la table cible à l’aide de COPY INTO table sans l’option VALIDATION_MODE.

    Vous pouvez éventuellement utiliser l’option de copie PURGE = TRUE pour supprimer les fichiers de données de la zone de préparation une fois les données chargées correctement, ou supprimer manuellement les fichiers de la zone de préparation de table à l’aide de REMOVE :

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

Étape 2 : Analyse du fichier journal du connecteur Kafka

Si la vue COPY_HISTORY ne contient aucun enregistrement du chargement de données, analysez le fichier journal du connecteur Kafka. Le connecteur écrit des événements dans le fichier journal. Notez que le connecteur Snowflake Kafka partage le même fichier journal avec tous les plug-ins du connecteur Kafka. Le nom et l’emplacement de ce fichier journal doivent figurer dans votre fichier de configuration du connecteur Kafka. Pour plus d’informations, consultez la documentation fournie avec votre logiciel Apache Kafka.

Recherchez dans le fichier journal du connecteur Kafka les messages d’erreur liés à Snowflake. Beaucoup de ces messages porteront la chaîne ERROR et contiendront le nom de fichier com.snowflake.kafka.connector... pour faciliter la recherche de ces messages.

Les erreurs possibles que vous pourriez rencontrer incluent :

Erreur de configuration:

Causes possibles de l’erreur :

  • Le connecteur ne dispose pas des informations appropriées pour s’abonner au sujet.

  • Le connecteur ne dispose pas des informations appropriées pour écrire dans la table Snowflake (la paire de clés pour l’authentification peut être incorrecte, par ex.).

Notez que le connecteur Kafka valide ses paramètres. Le connecteur génère une erreur pour chaque paramètre de configuration incompatible. Le message d’erreur est écrit dans le fichier journal du cluster Kafka Connect. Si vous suspectez un problème de configuration, vérifiez les erreurs dans ce fichier journal.

Erreurs de lecture:

Le connecteur n’a peut-être pas été en mesure de lire Kafka pour les raisons suivantes :

  • Kafka ou Kafka Connect peuvent ne pas être en cours d’exécution.

  • Le message n’a peut-être pas encore été envoyé.

  • Le message a peut-être été supprimé (expiré).

Erreur d’écriture (zone de préparation):

Causes possibles de l’erreur :

  • Privilèges insuffisants sur la zone de préparation.

  • La zone de préparation est hors cadre.

  • La zone de préparation a été détruite.

  • Un autre utilisateur ou processus a écrit des fichiers inattendus sur la zone de préparation.

Erreur d’écriture (table):

Causes possibles de l’erreur :

  • Privilèges insuffisants sur la table.

Étape 3 : vérifier Kafka Connect

Si aucune erreur n’est signalée dans le fichier journal de Kafka Connect, vérifiez Kafka Connect. Pour obtenir des instructions de dépannage, consultez la documentation fournie par votre fournisseur de logiciels Apache Kafka.

Résolution de problèmes spécifiques

Lignes en double avec la même partition de rubrique et le même décalage

Lors du chargement de données à l’aide de la version 1.4 du connecteur Kafka (ou supérieur), les lignes en double dans la table cible avec la même partition de sujet et le même décalage peuvent indiquer que l’opération de chargement a dépassé le délai d’exécution par défaut de 300000 millisecondes (300 secondes). Pour vérifier la cause, consultez le fichier journal de Kafka Connect pour l’erreur suivante :

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

Pour résoudre l’erreur, dans le fichier de configuration de Kafka (par exemple <kafka_dir>/config/connect-distributed.properties), modifiez l’une des propriétés suivantes :

consumer.max.poll.interval.ms

Augmentez le délai d’exécution à 900000 (900 secondes).

consumer.max.poll.records

Diminuez le nombre d’enregistrements chargés avec chaque opération à 50.

Échec de réponse de migration du décalage du canal Streaming - Code d’erreur : 5023

Lors de la mise à niveau vers la version du connecteur v2.1.0 (ou supérieure), un changement a été introduit dans le format du nom du canal Snowpipe Streaming. Par conséquent, la logique de détection d’informations sur les décalages précédemment validés ne trouvera aucune information sur ceux précédemment validés. Cela se manifestera par l’exception suivante :

com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023

Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support

Message: Snowflake experienced a transient exception, please retry the migration request.
Copy

Pour résoudre cette erreur, dans le fichier de configuration Kafka (par exemple, <kafka_dir>/config/connect-distributed.properties), ajoutez la propriété de configuration suivante :

enable.streaming.channel.offset.migration

Désactivez la migration automatique des décalages en la définissant sur false.

Configuration du connecteur pour prendre en charge plusieurs rubriques

Nous avons rencontré un problème avec une seule instance de connecteur Kafka prenant en charge un grand nombre de rubriques, chacune ayant plusieurs partitions. La configuration du connecteur, même si elle semblait valide, a entraîné un cycle de rééquilibrage sans fin sans possibilité d’ingérer des données dans Snowflake. Le problème était propre au mode d’ingestion de Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING), mais les directives s’appliquent également au mode d’ingestion Snowpipe (snowflake.ingestion.method=SNOWPIPE). Le problème se manifeste dans le fichier journal par l’enregistrement à plusieurs reprises de ce message de journal :

[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed

Cela peut généralement se produire lorsque vous configurez votre connecteur pour l’ingestion de rubriques via Regex. Nous vous recommandons d’appliquer l’ensemble d’options suivant au fichier de configuration Kafka (par exemple, <kafka_dir>/config/connect-distributed.properties) :

consumer.override.partition.assignment.strategy

Configurez la stratégie d’affectation de partition aux tâches en tant que org.apache.kafka.clients.consumer.CooperativeStickyAssignor - cela entraînera une distribution uniforme des canaux ingérés entre les tâches disponibles, réduisant ainsi le risque de rééquilibrage.

tasks.max

Le nombre de tâches instanciées par connecteur ne doit pas dépasser le nombre de CPU disponibles - le pilote sous-jacent implémente un mécanisme de limitation en fonction des CPU disponibles. L’augmentation du nombre de requêtes simultanées entraînera une pression accrue sur la mémoire de votre système, mais entraînera également des temps de traitement d’insertion plus longs, conduisant directement à l’absence de pulsations du connecteur.

Lorsqu’on parle de valeurs de délai d’expiration du connecteur, il existe un ensemble de propriétés de configuration qui les affectent directement :

consumer.override.heartbeat.interval.ms

Définit la fréquence à laquelle le thread du moniteur (un moniteur est associé à chaque tâche) enverra une pulsation à Kafka. La valeur par défaut est 3000 ms, mais en cas de charge système plus élevée, vous pouvez essayer de l’augmenter à 5000 ms.

consumer.override.session.timeout.ms

Définit la durée pendant laquelle le courtier attendra avant de supposer que le consommateur est dans un état non valide et de tenter un rééquilibrage. Ce paramètre doit être généralement 3 fois supérieur à l’intervalle de pulsation. Par conséquent, si vous avez configuré la pulsation sur 5000 ms, définissez ce paramètre sur 15000 ms.

consumer.override.max.poll.interval.ms

Définit l’intervalle maximal entre l’appel à poll() et le connecteur Kafka sous-jacent. Le temps passé entre les interrogations correspond essentiellement au lot de données traité par le connecteur (y compris le téléchargement vers Snowflake et la validation). Dans les scénarios où vous avez plusieurs tâches de traitement des données, la connexion Snowflake sous-jacente peut commencer à limiter les requêtes, ce qui entraîne des délais de traitement plus longs. Selon votre scénario, vous pouvez augmenter cette valeur jusqu’à 20 minutes (1200000 ms), en particulier lorsque vous démarrez le connecteur avec un grand nombre d’enregistrements initiaux à ingérer.

consumer.override.rebalance.timeout.ms

Lorsqu’un rééquilibrage se produit, dans un scénario avec un grand nombre de canaux par tâche, il existe de nombreuses logiques sous-jacentes par canal pour déterminer où reprendre le traitement. Ce code est exécuté de manière séquentielle. Par conséquent, plus il y a de canaux par tâche et plus la configuration initiale durera. Configurez cette propriété sur une valeur suffisamment grande pour permettre à chaque canal de réaliser son initialisation. Une valeur de 3 minutes (180000 ms) est un bon point de départ.

Il est également important de connaître la mémoire de tas disponible pour le connecteur. Ceci est particulièrement important dans les scénarios où plusieurs connecteurs s’exécutent simultanément ou lorsqu’un connecteur ingère des données provenant de plusieurs rubriques. La partition de chaque rubrique correspond à un seul canal et, par conséquent, nécessite de la mémoire.

Assurez-vous d’ajuster les paramètres de mémoire de votre processus de connexion Kafka via le paramètre Xmx. Une façon d’y parvenir est de définir la variable d’environnement KAFKA_OPTS en conséquence (c’est-à-dire, KAFKA_OPTS=-Xmx4G).

Purge inattendue des fichiers par le nettoyeur de fichiers

Lors de l’utilisation du connecteur Kafka avec SNOWPIPE, vous pourriez rencontrer un problème au moment de l’ingestion des données dans une seule table à partir de plusieurs rubriques. Si votre configuration ne dispose pas de l’entrée snowflake.topic2table.map ou s’il existe un mappage 1:1 entre la rubrique et la table, ce problème ne surviendra pas.

Le connecteur Kafka génère des fichiers avec des enregistrements à charger vers une zone de préparation. Ces fichiers sont formatés selon le modèle suivant : snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz. Le problème se situe dans le <partition-id> : si plusieurs rubriques chargent des données dans une seule table, des doublons sont susceptibles d’apparaître dans la valeur partition-id. Cela ne constitue pas un problème dans le cadre d’un fonctionnement normal du connecteur. Cependant, si le connecteur redémarre ou se rééquilibre, le processus de nettoyage peut associer de manière inexacte les fichiers chargés dans la zone de préparation (mais pas encore ingérés) à la mauvaise partition, et décider de les supprimer, ce qui peut entraîner une perte de données.

Le connecteur avec la version 2.4.x corrige ce problème en ajoutant le code de hachage de la rubrique source à l”partition-id pour garantir des noms de fichiers uniques qui correspondent exactement à la partition d’une rubrique unique. Ce correctif est activé par défaut - snowflake.snowpipe.stageFileNameExtensionEnabled - et concerne uniquement les configurations où une table cible est répertoriée plusieurs fois dans snowflake.topic2table.map.

Si votre configuration est affectée par cette fonctionnalité, vous risquez de vous retrouver avec des fichiers obsolètes chargés dans votre zone de préparation. Lorsque le connecteur démarre, il vérifie si votre zone de préparation contient de tels fichiers. Vous devez rechercher les entrées de journal commençant par NOTE: For table, suivi de la liste des fichiers détectés.

Vous pouvez également vérifier si certains fichiers sont affectés à la zone de préparation manuellement :

  1. Recherchez la zone de préparation concernée :

    show stages like 'snowflake_kafka_connector%<your table name>';
    
    Copy
  2. Répertoriez les fichiers en zone de préparation :

    list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
    
    Copy

La commande ci-dessus répertorie tous les fichiers correspondant à la zone de préparation de votre table et ayant des IDs de partition compris entre 0 et 9999. Ces fichiers ne seront plus ingérés, vous pouvez donc les télécharger ou les supprimer.

Signalement des problèmes

Lorsque vous contactez le support Snowflake pour obtenir de l’aide, veuillez disposer des fichiers suivants :

  • Fichier de configuration pour votre connecteur Kafka.

    Important

    Supprimez la clé privée avant de fournir le fichier à Snowflake.

  • Copiez le journal du connecteur Kafka. Assurez-vous que le fichier ne contient pas d’informations confidentielles ou sensibles.

  • Fichier journal JDBC.

    Pour générer le fichier journal, définissez la variable d’environnement JDBC_TRACE = true sur votre cluster Kafka Connect avant d’exécuter le connecteur Kafka.

    Pour plus d’informations sur le fichier journal JDBC, voir cet article dans la communauté Snowflake.

  • Connectez le fichier journal.

    Pour produire le fichier journal, éditez le fichier etc/kafka/connect-log4j.properties. Définissez la propriété log4j.appender.stdout.layout.ConversionPattern comme suit :

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    Les contextes des connecteurs sont disponibles dans la version 2.3 et supérieure de Kafka.

    Pour plus d’informations, voir l’information Améliorations de la journalisation sur le site Web de Confluent.