Problembehandlung des Kafka-Konnektors¶
In diesem Abschnitt wird beschrieben, wie Probleme behoben werden, die beim Erfassen von Daten mithilfe des Kafka-Konnektors auftreten.
Unter diesem Thema:
Fehlerbenachrichtigungen¶
Konfigurieren Sie Fehlerbenachrichtigungen für Snowpipe. Wenn Snowpipe während eines Ladevorgangs auf Dateifehler stößt, sendet das Feature eine Benachrichtigung an einen konfigurierten Cloudmessagingdienst und ermöglicht so die Analyse Ihrer Datendateien. Weitere Informationen dazu finden Sie unter Snowpipe-Fehlerbenachrichtigungen.
Allgemeine Problembehandlung¶
Führen Sie die folgenden Schritte aus, um Probleme mit dem Laden von Daten mithilfe des Kafka-Konnektors zu beheben.
Schritt 1: COPY-Verlauf für die Tabelle anzeigen¶
Fragen Sie den Ladeaktivitätsverlauf für die Zieltabelle ab. Weitere Informationen dazu finden Sie unter COPY_HISTORY-Ansicht. Wenn die COPY_HISTORY-Ausgabe keine erwarteten Dateien enthält, fragen Sie einen früheren Zeitraum ab. Wenn die Dateien Duplikate früherer Dateien waren, wurde die Aktivität möglicherweise im Ladeverlauf aufgezeichnet, als versucht wurde, die ursprünglichen Dateien zu laden. In der Spalte STATUS
wird gezeigt, ob ein bestimmter Satz von Dateien geladen, teilweise geladen oder nicht geladen wurde. Die Spalte FIRST_ERROR_MESSAGE
bietet einen Hinweis auf die Ursache, warum ein Ladeversuch nur teilweise erfolgreich war oder fehlgeschlagen ist.
Der Kafka-Konnektor verschiebt Dateien in den der Zieltabelle zugeordneten Stagingbereich. Die Syntax zum Verweisen auf einen Tabellen-Stagingbereich lautet @[namespace.]%table_name
.
Listen Sie mit LIST alle Dateien auf, die sich im Tabellen-Stagingbereich befinden.
Beispiel:
LIST @mydb.public.%mytable;
Die Dateinamen haben eines der folgenden Formate. Die Bedingungen, die zu jedem Format führen, sind in der Tabelle beschrieben:
Dateityp |
Beschreibung |
---|---|
Raw-Bytes |
Diese Dateien stimmen mit dem folgenden Muster überein:
Für diese Dateien konnten Kafka-Datensätze nicht von Raw-Bytes in das Quelldateiformat (Avro, JSON oder Protobuf) konvertiert werden. Eine häufige Ursache für dieses Problem ist ein Netzwerkfehler, der dazu geführt hat, dass ein Zeichen aus dem Datensatz entfernt wurde. Der Kafka-Konnektor konnte die Raw-Bytes nicht mehr parsen, was zu einer Beschädigung des Datensatzes führte. |
Quelldateiformat (Avro, JSON oder Protobuf) |
Diese Dateien stimmen mit dem folgenden Muster überein:
Wenn der Kafka-Konnektor für diese Dateien die Raw-Bytes zurück in das Quelldateiformat konvertiert hatte, stieß Snowpipe auf einen Fehler und konnte die Datei nicht laden. |
Die folgenden Abschnitte enthalten Anweisungen zur Lösung von Problemen mit dem jeweiligen Dateityp:
Raw-Bytes¶
Der Dateiname <Konnektorname>/<Tabellenname>/<Partition>/offset_(<Schlüssel>/<Wert>_)<Zeitstempel>.gz
enthält den genauen Offset des Datensatzes, der nicht von Raw-Bytes in das Quelldateiformat konvertiert wurde. Um Probleme zu lösen, senden Sie den Datensatz als neuen Datensatz erneut an den Kafka-Konnektor.
Quelldateiformat (Avro, JSON oder Protobuf)¶
Wenn Snowpipe keine Daten aus Dateien in den internen Stagingbereich laden konnte, der für das Kafka-Thema erstellt wurde, verschiebt der Kafka-Konnektor die Dateien im Quelldateiformat in den Stagingbereich, der der Zieltabelle zugeordnet ist.
Wenn ein Satz von Dateien mehrere Probleme aufweist, wird in der Spalte FIRST_ERROR_MESSAGE
der COPY_HISTORY-Ausgabe nur der erste aufgetretene Fehler angezeigt. Um alle Fehler in den Dateien anzuzeigen, ist es erforderlich, die Dateien aus dem Tabellen-Stagingbereich abzurufen, sie in einen benannten Stagingbereich hochzuladen und dann eine COPY INTO <Tabelle>-Anweisung auszuführen, wobei die Kopieroption VALIDATION_MODE auf RETURN_ALL_ERRORS
gesetzt sein muss. Mit der Kopieroption VALIDATION_MODE wird die COPY-Anweisung angewiesen, die zu ladenden Daten zu validieren und Ergebnisse auf Basis der angegebenen Validierungsoption zurückzugeben. Bei Angabe dieser Kopieroption werden keine Daten geladen. Verweisen Sie in der Anweisung auf die Dateien, die Sie mit dem Kafka-Konnektor zu laden versucht haben.
Wenn Probleme mit den Datendateien behoben sind, können Sie die Daten manuell mit einer oder mehreren COPY-Anweisungen laden.
Das folgende Beispiel verweist auf Datendateien im Tabellen-Stagingbereich für die Tabelle mytable
in der Datenbank mydb.public
und im Schema.
So validieren Sie Datendateien im Tabellen-Stagingbereich und beheben Fehler:
Listen Sie mit LIST alle Dateien auf, die sich im Tabellen-Stagingbereich befinden.
Beispiel:
LIST @mydb.public.%mytable;
Die Beispiele in diesem Abschnitt gehen davon aus, dass JSON das Quellformat für die Datendateien ist.
Laden Sie die vom Kafka-Konnektor erstellten Dateien mit GET auf Ihren lokalen Computer herunter.
Laden Sie die Dateien beispielsweise in ein Verzeichnis namens
data
auf Ihrem lokalen Computer herunter:- Linux oder macOS:
GET @mydb.public.%mytable file:///data/;
- Microsoft Windows:
GET @mydb.public.%mytable file://C:\data\;
Erstellen Sie mit CREATE STAGE einen benannten internen Stagingbereich, der die Datendateien im gleichen Format speichert wie die Kafka-Quelldateien.
Erstellen Sie beispielsweise einen internen Stagingbereich mit dem Namen
kafka_json
, der JSON-Dateien speichert:CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
Laden Sie die Dateien, die Sie aus dem Tabellen-Stagingbereich heruntergeladen haben, mit PUT hoch.
Laden Sie die heruntergeladenen Dateien zum Beispiel in das Verzeichnis
data
auf Ihrem lokalen Computer hoch:- Linux oder macOS:
PUT file:///data/ @mydb.public.kafka_json;
- Microsoft Windows:
PUT file://C:\data\ @mydb.public.kafka_json;
Erstellen Sie zu Testzwecken eine temporäre Tabelle mit zwei Variant-Spalten. Die Tabelle wird nur zur Validierung der Staging-Datendatei verwendet. Es werden keine Daten in die Tabelle geladen. Die Tabelle wird automatisch gelöscht, wenn die aktuelle Benutzersitzung endet:
CREATE TEMPORARY TABLE t1 (col1 variant);
Rufen Sie alle in der Datendatei aufgetretenen Fehler durch Ausführen einer COPY INTO *Tabelle* … VALIDATION_MODE = ‚RETURN_ALL_ERRORS‘-Anweisung ab. Die Anweisung validiert die Datei in dem angegebenen Stagingbereich. Es werden keine Daten in die Tabelle geladen:
COPY INTO mydb.public.t1 FROM @mydb.public.kafka_json FILE_FORMAT = (TYPE = JSON) VALIDATION_MODE = 'RETURN_ALL_ERRORS';
Beheben Sie alle gemeldeten Fehler in den Datendateien auf Ihrem lokalen Computer.
Laden Sie die korrigierten Dateien mit PUT entweder in den Tabellen-Stagingbereich oder in den benannten internen Stagingbereich hoch.
Im folgenden Beispiel werden die Dateien in den Tabellen-Stagingbereich hochgeladen, wobei die vorhandenen Dateien überschrieben werden:
- Linux oder macOS:
PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
- Windows:
PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
Laden Sie die Daten mit COPY INTO Tabelle ohne die Option VALIDATION_MODE in die Zieltabelle.
Sie können optional die Kopieroption PURGE = TRUE verwenden, um die Datendateien aus dem Stagingbereich zu löschen, sobald die Daten erfolgreich geladen wurden, oder Sie löschen die Dateien manuell mithilfe von REMOVE aus dem Tabellen-Stagingbereich:
COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT) FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable) FILE_FORMAT = (TYPE = 'JSON') PURGE = TRUE;
Schritt 2: Protokolldatei des Kafka-Konnektors analysieren¶
Wenn die COPY_HISTORY-Ansicht keinen Datensatz zum Datenladevorgang enthält, analysieren Sie die Protokolldatei für den Kafka-Konnektor. Der Konnektor schreibt Ereignisse in die Protokolldatei. Beachten Sie, dass der Snowflake-Kafka-Konnektor dieselbe Protokolldatei mit allen Kafka-Konnektor-Plugins teilt. Name und Speicherort dieser Protokolldatei sollten sich in Ihrer Kafka Connect-Konfigurationsdatei befinden. Weitere Informationen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.
Durchsuchen Sie die Kafka-Konnektor-Protokolldatei nach Snowflake-spezifischen Fehlermeldungen. Die meisten Nachrichten weisen die Zeichenfolge ERROR
auf und enthalten den Dateinamen com.snowflake.kafka.connector...
, was das Auffinden der Nachrichten erleichtert.
Folgende Fehler können möglicherweise auftreten:
- Konfigurationsfehler:
Mögliche Fehlerursachen:
Der Konnektor verfügt nicht über die richtigen Informationen, um das Thema zu abonnieren.
Der Konnektor verfügt nicht über die richtigen Informationen zum Schreiben in die Snowflake-Tabelle (z. B. ist das Schlüsselpaar für die Authentifizierung möglicherweise falsch).
Beachten Sie, dass der Kafka-Konnektor seine Parameter überprüft. Der Konnektor gibt für jeden inkompatiblen Konfigurationsparameter einen Fehler aus. Die Fehlermeldung wird in die Protokolldatei des Kafka Connect-Clusters geschrieben. Wenn Sie von einem Konfigurationsproblem ausgehen, überprüfen Sie die Fehler in dieser Protokolldatei.
- Lesefehler:
Der Konnektor konnte aus folgenden Gründen möglicherweise nicht von Kafka lesen:
Kafka oder Kafka Connect werden möglicherweise nicht ausgeführt.
Die Nachricht wurde möglicherweise noch nicht gesendet.
Die Nachricht wurde möglicherweise gelöscht (abgelaufen).
- Schreibfehler (Stagingbereich):
Mögliche Fehlerursachen:
Unzureichende Berechtigungen auf der Stagingbereich.
Der Stagingbereich hat nicht genügend Platz.
Der Stagingbereich wurde gelöscht.
Ein anderer Benutzer oder Prozess hat unerwartete Dateien in den Stagingbereich geschrieben.
- Schreibfehler (Tabelle):
Mögliche Fehlerursachen:
Unzureichende Berechtigungen für die Tabelle.
Schritt 3: Kafka Connect überprüfen¶
Wenn in der Kafka-Verbindungsprotokolldatei kein Fehler gemeldet wird, überprüfen Sie Kafka Connect. Anweisungen zur Problembehandlung finden Sie in der Dokumentation Ihres Apache Kafka-Softwareanbieters.
Beheben spezifischer Probleme¶
Doppelte Zeilen mit derselben Themenpartition und demselben Offset¶
Beim Laden von Daten mit Version 1.4 des Kafka-Konnektors (oder höher) können doppelte Zeilen in der Zieltabelle mit derselben Themenpartition und demselben Offset darauf hinweisen, dass der Ladevorgang das Standardausführungstimeout von 300.000 Millisekunden (300 Sekunden) überschritten hat. Überprüfen Sie die Kafka Connect-Protokolldatei auf folgenden Fehler, um die Ursache zu überprüfen:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Um den Fehler zu beheben, ändern Sie in der Kafka-Konfigurationsdatei (z. B. <Kafka-Verzeichnis>/config/connect-distributed.properties
) eine der folgenden Eigenschaften:
consumer.max.poll.interval.ms
Erhöhen Sie das Ausführungstimeout auf
900000
(900 Sekunden).consumer.max.poll.records
Verringern Sie die Anzahl der mit jeder Operation geladenen Datensätze auf
50
.
Melden von Problemen¶
Wenn Sie sich an den Snowflake-Support wenden, halten Sie die folgenden Dateien bereit:
Konfigurationsdatei für Ihren Kafka-Konnektor.
Wichtig
Entfernen Sie den privaten Schlüssel, bevor Sie die Datei Snowflake bereitstellen.
Kopie des Kafka Konnektor-Protokolls. Stellen Sie sicher, dass die Datei keine vertraulichen oder sensiblen Informationen enthält.
JDBC-Protokolldatei.
Um die Protokolldatei zu generieren, setzen Sie die Umgebungsvariable
JDBC_TRACE = true
auf Ihrem Kafka Connect-Cluster, bevor Sie den Kafka-Konnektor ausführen.Weitere Informationen zur JDBC-Protokolldatei finden Sie in diesem Artikel in der Snowflake-Community.
Protokolldatei verbinden.
Um die Protokolldatei zu erzeugen, bearbeiten Sie die Datei
etc/kafka/connect-log4j.properties
. Stellen Sie die Eigenschaftlog4j.appender.stdout.layout.ConversionPattern
wie folgt ein:log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
Konnektorkontexte sind in Kafka Version 2.3 und höher verfügbar.
Weitere Informationen dazu finden Sie in den Informationen zu Protokollierungsverbesserungen auf der Confluent-Website.