Problembehandlung des Kafka-Konnektors¶
In diesem Abschnitt wird beschrieben, wie Probleme behoben werden, die beim Erfassen von Daten mithilfe des Kafka-Konnektors auftreten.
Unter diesem Thema:
Fehlerbenachrichtigungen¶
Konfigurieren Sie Fehlerbenachrichtigungen für Snowpipe. Wenn Snowpipe während eines Ladevorgangs auf Dateifehler stößt, sendet das Feature eine Benachrichtigung an einen konfigurierten Cloudmessagingdienst und ermöglicht so die Analyse Ihrer Datendateien. Weitere Informationen dazu finden Sie unter Snowpipe-Fehlerbenachrichtigungen.
Allgemeine Problembehandlung¶
Führen Sie die folgenden Schritte aus, um Probleme mit dem Laden von Daten mithilfe des Kafka-Konnektors zu beheben.
Schritt 1: COPY-Verlauf für die Tabelle anzeigen¶
Fragen Sie den Ladeaktivitätsverlauf für die Zieltabelle ab. Weitere Informationen dazu finden Sie unter Ansicht COPY_HISTORY. Wenn die COPY_HISTORY-Ausgabe keine erwarteten Dateien enthält, fragen Sie einen früheren Zeitraum ab. Wenn die Dateien Duplikate früherer Dateien waren, wurde die Aktivität möglicherweise im Ladeverlauf aufgezeichnet, als versucht wurde, die ursprünglichen Dateien zu laden. In der Spalte STATUS
wird gezeigt, ob ein bestimmter Satz von Dateien geladen, teilweise geladen oder nicht geladen wurde. Die Spalte FIRST_ERROR_MESSAGE
bietet einen Hinweis auf die Ursache, warum ein Ladeversuch nur teilweise erfolgreich war oder fehlgeschlagen ist.
Der Kafka-Konnektor verschiebt Dateien in den der Zieltabelle zugeordneten Stagingbereich. Die Syntax zum Verweisen auf einen Tabellen-Stagingbereich lautet @[namespace.]%table_name
.
Listen Sie mit LIST alle Dateien auf, die sich im Tabellen-Stagingbereich befinden.
Beispiel:
LIST @mydb.public.%mytable;
Die Dateinamen haben eines der folgenden Formate. Die Bedingungen, die zu jedem Format führen, sind in der Tabelle beschrieben:
Dateityp |
Beschreibung |
---|---|
Raw-Bytes |
Diese Dateien stimmen mit dem folgenden Muster überein:
Für diese Dateien konnten Kafka-Datensätze nicht von Raw-Bytes in das Quelldateiformat (Avro, JSON oder Protobuf) konvertiert werden. Eine häufige Ursache für dieses Problem ist ein Netzwerkfehler, der dazu geführt hat, dass ein Zeichen aus dem Datensatz entfernt wurde. Der Kafka-Konnektor konnte die Raw-Bytes nicht mehr parsen, was zu einer Beschädigung des Datensatzes führte. |
Quelldateiformat (Avro, JSON oder Protobuf) |
Diese Dateien stimmen mit dem folgenden Muster überein:
Wenn der Kafka-Konnektor für diese Dateien die Raw-Bytes zurück in das Quelldateiformat konvertiert hatte, stieß Snowpipe auf einen Fehler und konnte die Datei nicht laden. |
Die folgenden Abschnitte enthalten Anweisungen zur Lösung von Problemen mit dem jeweiligen Dateityp:
Raw-Bytes¶
Der Dateiname <Konnektorname>/<Tabellenname>/<Partition>/offset_(<Schlüssel>/<Wert>_)<Zeitstempel>.gz
enthält den genauen Offset des Datensatzes, der nicht von Raw-Bytes in das Quelldateiformat konvertiert wurde. Um Probleme zu lösen, senden Sie den Datensatz als neuen Datensatz erneut an den Kafka-Konnektor.
Quelldateiformat (Avro, JSON oder Protobuf)¶
Wenn Snowpipe keine Daten aus Dateien in den internen Stagingbereich laden konnte, der für das Kafka-Thema erstellt wurde, verschiebt der Kafka-Konnektor die Dateien im Quelldateiformat in den Stagingbereich, der der Zieltabelle zugeordnet ist.
Wenn ein Satz von Dateien mehrere Probleme aufweist, wird in der Spalte FIRST_ERROR_MESSAGE
der COPY_HISTORY-Ausgabe nur der erste aufgetretene Fehler angezeigt. Um alle Fehler in den Dateien anzuzeigen, ist es erforderlich, die Dateien aus dem Tabellen-Stagingbereich abzurufen, sie in einen benannten Stagingbereich hochzuladen und dann eine COPY INTO <Tabelle>-Anweisung auszuführen, wobei die Kopieroption VALIDATION_MODE auf RETURN_ALL_ERRORS
gesetzt sein muss. Mit der Kopieroption VALIDATION_MODE wird die COPY-Anweisung angewiesen, die zu ladenden Daten zu validieren und Ergebnisse auf Basis der angegebenen Validierungsoption zurückzugeben. Bei Angabe dieser Kopieroption werden keine Daten geladen. Verweisen Sie in der Anweisung auf die Dateien, die Sie mit dem Kafka-Konnektor zu laden versucht haben.
Wenn Probleme mit den Datendateien behoben sind, können Sie die Daten manuell mit einer oder mehreren COPY-Anweisungen laden.
Das folgende Beispiel verweist auf Datendateien im Tabellen-Stagingbereich für die Tabelle mytable
in der Datenbank mydb.public
und im Schema.
So validieren Sie Datendateien im Tabellen-Stagingbereich und beheben Fehler:
Listen Sie mit LIST alle Dateien auf, die sich im Tabellen-Stagingbereich befinden.
Beispiel:
LIST @mydb.public.%mytable;
Die Beispiele in diesem Abschnitt gehen davon aus, dass JSON das Quellformat für die Datendateien ist.
Laden Sie die vom Kafka-Konnektor erstellten Dateien mit GET auf Ihren lokalen Computer herunter.
Laden Sie die Dateien beispielsweise in ein Verzeichnis namens
data
auf Ihrem lokalen Computer herunter:- Linux oder macOS:
GET @mydb.public.%mytable file:///data/;
- Microsoft Windows:
GET @mydb.public.%mytable file://C:\data\;
Erstellen Sie mit CREATE STAGE einen benannten internen Stagingbereich, der die Datendateien im gleichen Format speichert wie die Kafka-Quelldateien.
Erstellen Sie beispielsweise einen internen Stagingbereich mit dem Namen
kafka_json
, der JSON-Dateien speichert:CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
Laden Sie die Dateien, die Sie aus dem Tabellen-Stagingbereich heruntergeladen haben, mit PUT hoch.
Laden Sie die heruntergeladenen Dateien zum Beispiel in das Verzeichnis
data
auf Ihrem lokalen Computer hoch:- Linux oder macOS:
PUT file:///data/ @mydb.public.kafka_json;
- Microsoft Windows:
PUT file://C:\data\ @mydb.public.kafka_json;
Erstellen Sie zu Testzwecken eine temporäre Tabelle mit zwei Variant-Spalten. Die Tabelle wird nur zur Validierung der Staging-Datendatei verwendet. Es werden keine Daten in die Tabelle geladen. Die Tabelle wird automatisch gelöscht, wenn die aktuelle Benutzersitzung endet:
CREATE TEMPORARY TABLE t1 (col1 variant);
Rufen Sie alle in der Datendatei aufgetretenen Fehler durch Ausführen einer COPY INTO *Tabelle* … VALIDATION_MODE = ‚RETURN_ALL_ERRORS‘-Anweisung ab. Die Anweisung validiert die Datei in dem angegebenen Stagingbereich. Es werden keine Daten in die Tabelle geladen:
COPY INTO mydb.public.t1 FROM @mydb.public.kafka_json FILE_FORMAT = (TYPE = JSON) VALIDATION_MODE = 'RETURN_ALL_ERRORS';
Beheben Sie alle gemeldeten Fehler in den Datendateien auf Ihrem lokalen Computer.
Laden Sie die korrigierten Dateien mit PUT entweder in den Tabellen-Stagingbereich oder in den benannten internen Stagingbereich hoch.
Im folgenden Beispiel werden die Dateien in den Tabellen-Stagingbereich hochgeladen, wobei die vorhandenen Dateien überschrieben werden:
- Linux oder macOS:
PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
- Windows:
PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
Laden Sie die Daten mit COPY INTO Tabelle ohne die Option VALIDATION_MODE in die Zieltabelle.
Sie können optional die Kopieroption PURGE = TRUE verwenden, um die Datendateien aus dem Stagingbereich zu löschen, sobald die Daten erfolgreich geladen wurden, oder Sie löschen die Dateien manuell mithilfe von REMOVE aus dem Tabellen-Stagingbereich:
COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT) FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable) FILE_FORMAT = (TYPE = 'JSON') PURGE = TRUE;
Schritt 2: Protokolldatei des Kafka-Konnektors analysieren¶
Wenn die COPY_HISTORY-Ansicht keinen Datensatz zum Datenladevorgang enthält, analysieren Sie die Protokolldatei für den Kafka-Konnektor. Der Konnektor schreibt Ereignisse in die Protokolldatei. Beachten Sie, dass der Snowflake-Kafka-Konnektor dieselbe Protokolldatei mit allen Kafka-Konnektor-Plugins teilt. Name und Speicherort dieser Protokolldatei sollten sich in Ihrer Kafka Connect-Konfigurationsdatei befinden. Weitere Informationen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.
Durchsuchen Sie die Kafka-Konnektor-Protokolldatei nach Snowflake-spezifischen Fehlermeldungen. Die meisten Nachrichten weisen die Zeichenfolge ERROR
auf und enthalten den Dateinamen com.snowflake.kafka.connector...
, was das Auffinden der Nachrichten erleichtert.
Folgende Fehler können möglicherweise auftreten:
- Konfigurationsfehler:
Mögliche Fehlerursachen:
Der Konnektor verfügt nicht über die richtigen Informationen, um das Thema zu abonnieren.
Der Konnektor verfügt nicht über die richtigen Informationen zum Schreiben in die Snowflake-Tabelle (z. B. ist das Schlüsselpaar für die Authentifizierung möglicherweise falsch).
Beachten Sie, dass der Kafka-Konnektor seine Parameter überprüft. Der Konnektor gibt für jeden inkompatiblen Konfigurationsparameter einen Fehler aus. Die Fehlermeldung wird in die Protokolldatei des Kafka Connect-Clusters geschrieben. Wenn Sie von einem Konfigurationsproblem ausgehen, überprüfen Sie die Fehler in dieser Protokolldatei.
- Lesefehler:
Der Konnektor konnte aus folgenden Gründen möglicherweise nicht von Kafka lesen:
Kafka oder Kafka Connect werden möglicherweise nicht ausgeführt.
Die Nachricht wurde möglicherweise noch nicht gesendet.
Die Nachricht wurde möglicherweise gelöscht (abgelaufen).
- Schreibfehler (Stagingbereich):
Mögliche Fehlerursachen:
Unzureichende Berechtigungen auf der Stagingbereich.
Der Stagingbereich hat nicht genügend Platz.
Der Stagingbereich wurde gelöscht.
Ein anderer Benutzer oder Prozess hat unerwartete Dateien in den Stagingbereich geschrieben.
- Schreibfehler (Tabelle):
Mögliche Fehlerursachen:
Unzureichende Berechtigungen für die Tabelle.
Schritt 3: Kafka Connect überprüfen¶
Wenn in der Kafka-Verbindungsprotokolldatei kein Fehler gemeldet wird, überprüfen Sie Kafka Connect. Anweisungen zur Problembehandlung finden Sie in der Dokumentation Ihres Apache Kafka-Softwareanbieters.
Beheben spezifischer Probleme¶
Doppelte Zeilen mit derselben Themenpartition und demselben Offset¶
Beim Laden von Daten mit Version 1.4 des Kafka-Konnektors (oder höher) können doppelte Zeilen in der Zieltabelle mit derselben Themenpartition und demselben Offset darauf hinweisen, dass der Ladevorgang das Standardausführungstimeout von 300.000 Millisekunden (300 Sekunden) überschritten hat. Überprüfen Sie die Kafka Connect-Protokolldatei auf folgenden Fehler, um die Ursache zu überprüfen:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Um den Fehler zu beheben, ändern Sie in der Kafka-Konfigurationsdatei (z. B. <Kafka-Verzeichnis>/config/connect-distributed.properties
) eine der folgenden Eigenschaften:
consumer.max.poll.interval.ms
Erhöhen Sie das Ausführungstimeout auf
900000
(900 Sekunden).consumer.max.poll.records
Verringern Sie die Anzahl der mit jeder Operation geladenen Datensätze auf
50
.
Failure in Streaming Channel Offset Migration Response Error Code: 5023¶
Beim Upgrade auf die Konnektor-Version v2.1.0 (oder höher) wurde eine Änderung am Namensformat des Snowpipe Streaming-Kanals vorgenommen. Folglich findet die Logik, die Informationen über zuvor mit Commit bestätigte Offsets ermittelt, keine Informationen über zuvor mit Commit bestätigte Offsets. Dies äußert sich in der folgenden Ausnahme:
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023
Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support
Message: Snowflake experienced a transient exception, please retry the migration request.
Um diesen Fehler zu beheben, fügen Sie in der Datei der Kafka-Konfiguration (z. B. <kafka_dir>/config/connect-distributed.properties
) die folgende Eigenschaft der Konfiguration hinzu:
enable.streaming.channel.offset.migration
Deaktivieren Sie die automatische Offset-Migration, indem Sie sie auf
false
festlegen.
Konfigurieren des Konnektors zur Unterstützung verschiedener Themen¶
Wir haben ein Problem mit einer einzelnen Kafka-Konnektor-Instanz festgestellt, die eine große Anzahl von Themen unterstützt, von denen jedes mehrere Partitionen hat. Die Konfiguration des Konnektors, obwohl sie gültig zu sein schien, führte zu einem endlosen Zyklus der erneuten Abgleichung, ohne die Möglichkeit, Daten in Snowflake zu importieren. Das Problem betraf den Snowpipe Streaming-Modus (snowflake.ingestion.method=SNOWPIPE_STREAMING
), aber die Richtlinien gelten auch für den Snowpipe Modus (snowflake.ingestion.method=SNOWPIPE
). Das Problem manifestiert sich in der Datei dadurch, dass diese Protokollmeldung wiederholt aufgezeichnet wird:
[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed
Dies kann typischerweise passieren, wenn Sie Ihren Konnektor so konfigurieren, dass er Themen über Regex einliest. Wir empfehlen die Anwendung der folgenden Optionen auf die Kafka-Konfigurationsdatei (z. B. <kafka_dir>/config/connect-distributed.properties
):
consumer.override.partition.assignment.strategy
Konfigurieren Sie die Strategie für die Zuweisung von Partitionen an Tasks als
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
- dies führt zu einer gleichmäßigen Verteilung der aufgenommenen Kanäle auf die verfügbaren Aufgaben und verringert das Risiko einer Neuverteilung.tasks.max
Die Anzahl der instanziierten Aufgaben pro Konnektor sollte die Anzahl der verfügbaren CPUs nicht überschreiten - der zugrunde liegende Treiber implementiert einen Drosselungsmechanismus auf der Basis der verfügbaren CPUs. Eine steigende Anzahl paralleler Anfragen erhöht nicht nur den Druck auf den Speicher Ihres Systems, sondern führt auch zu längeren Verarbeitungszeiten für Einsätze, was direkt zu fehlenden Heartbeats des Konnektors führt.
Timeout Wenn es um die Timeout-Werte des Konnektors geht, gibt es eine Reihe von Eigenschaftseigenschaften, die diese direkt beeinflussen:
consumer.override.heartbeat.interval.ms
Defines how often the monitor thread (there is one associated with each task) will send heartbeat to Kafka. Die Voreinstellung ist
3000
ms, aber bei höherer Systembelastung können Sie mit einer Erhöhung auf5000
ms experimentieren.consumer.override.session.timeout.ms
Legt fest, wie lange der Broker wartet, bevor er davon annimmt, dass sich der Verbraucher in einem ungültigen Zustand befindet und versucht, den Abgleich erneut durchzuführen. Diese Einstellung sollte in der Regel 3-mal höher sein als das Heartbeat-Intervall. Wenn Sie also das Heartbeat-Intervall auf
5000
ms festgelegt haben, legen Sie diese Einstellung auf15000
ms fest.consumer.override.max.poll.interval.ms
Definiert das maximale Intervall zwischen Aufrufen von
poll()
aus zugrunde liegendem Kafka. Die Zeit zwischen den Abfragen wird im Wesentlichen dem Konnektor zugeordnet, der einen Batch von Daten verarbeitet (einschließlich Hochladen in Snowflake und Bestätigen durch Commit). In Szenarios, in denen mehrere Aufgaben Daten verarbeiten, kann die zugrunde liegende Snowflake-Verbindung beginnen, Anfragen zu drosseln, was zu längeren Verarbeitungszeiten führt. Abhängig von Ihrem Szenario können Sie diesen Wert sogar auf 20 Minuten (1200000
ms) erhöhen - vor allem, wenn Sie den Konnektor mit einer großen Anzahl von Datensätzen starten, die aufgenommen werden sollen.consumer.override.rebalance.timeout.ms
Wenn der Abgleich in einem Szenario mit einer großen Anzahl von Kanälen pro Aufgabe erneut durchgeführt wird, gibt es eine Menge zugrunde liegender Logik pro Kanal, um herauszufinden, wo die Verarbeitung fortgesetzt werden soll. Dieser Code wird sequenziell ausgeführt. Je mehr Kanäle pro Aufgabe, desto länger dauert das anfängliche Setup. Konfigurieren Sie diese Eigenschaft so, dass der Wert groß genug ist, damit jeder Kanal seine Initialisierung abschließen kann. Ein Wert von 3 Minuten (
180000
ms) ist ein guter Ausgangspunkt.
Es ist auch wichtig, auf den verfügbaren Heap-Speicher für den Konnektor zu achten. Dies ist besonders wichtig in Szenarios, in denen mehrere Konnektoren gleichzeitig laufen oder ein Konnektor Daten aus mehreren Themen aufnimmt. Jede Partition eines Themas ist einem einzelnen Kanal zugeordnet und benötigt daher Speicher.
Stellen Sie sicher, dass Sie die Einstellungen für den Arbeitsspeicher Ihres Kafka-Verbindungsprozesses über die Xmx-Einstellung anpassen. Eine Möglichkeit, dies zu tun, besteht darin, die Umgebungsvariable KAFKA_OPTS
zu definieren und entsprechend zu konfigurieren (KAFKA_OPTS=-Xmx4G
).
Datei-Cleaner löscht Dateien unerwartet¶
Wenn Sie den Kafka-Konnektor mit SNOWPIPE verwenden, kann es zu einem Problem kommen, wenn Sie Daten aus mehreren Themen in eine einzige Tabelle einlesen. Wenn Ihre Konfiguration den Eintrag snowflake.topic2table.map
nicht enthält oder es eine 1:1-Zuordnung zwischen dem Thema und der Tabelle gibt, ist dieses Problem nicht relevant.
Der Kafka-Konnektor erzeugt Dateien mit Datensätzen, die in einen Stagingbereich hochgeladen werden sollen. Diese Dateien sind nach dem folgenden Muster formatiert: snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz
. Das Problem liegt im Speicherort <partition-id>
: Wenn mehrere Themen Daten in eine einzige Tabelle geladen werden, sind Duplikate im Wert partition-id
wahrscheinlich. Dies ist bei einer normalen Konnektoroperation kein Problem. Wenn der Konnektor jedoch neu gestartet oder neu abgeglichen wird, könnte der Cleaner-Prozess die in den Stagingbereich geladenen (aber noch nicht eingelesenen) Dateien fälschlicherweise der falschen Partition zuordnen und beschließen, sie zu löschen, was zu einem Datenverlust führen könnte.
Der Konnektor mit der Version 2.4.x korrigiert dieses Problem, indem er den Hashcode des Ausgangsthemas in die partition-id
einfügt, um eindeutige Dateinamen zu gewährleisten, die genau mit der Partition eines einzelnen Themas übereinstimmen. Diese Korrektur ist standardmäßig aktiviert – snowflake.snowpipe.stageFileNameExtensionEnabled
– und betrifft nur Konfigurationen, bei denen eine Zieltabelle mehr als einmal unter snowflake.topic2table.map
aufgeführt ist.
Wenn Ihre Konfiguration von dieser Funktionalität betroffen ist, kann es vorkommen, dass veraltete Dateien in Ihren Stagingbereich hochgeladen werden. Wenn der Konnektor startet, prüft er, ob Ihr Stagingbereich solche Dateien enthält. Sie müssen nach den Protokolleinträgen suchen, die mit NOTE: For table
beginnen, gefolgt von der Liste der erkannten Dateien.
Sie können auch manuell prüfen, ob einige Dateien im Stagingbereich betroffen sind:
Suchen Sie den betroffenen Stagingbereich:
show stages like 'snowflake_kafka_connector%<your table name>';
Liste der Stagingbereich-Dateien:
list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
Der obige Befehl listet alle Dateien auf, die dem Stagingbereich Ihrer Tabelle entsprechen und deren Partition-IDs im Bereich 0–9999 liegen. Diese Dateien werden nicht mehr importiert, so dass Sie sie herunterladen oder löschen können.
Melden von Problemen¶
Wenn Sie sich an den Snowflake-Support wenden, halten Sie die folgenden Dateien bereit:
Konfigurationsdatei für Ihren Kafka-Konnektor.
Wichtig
Entfernen Sie den privaten Schlüssel, bevor Sie die Datei Snowflake bereitstellen.
Kopie des Kafka Konnektor-Protokolls. Stellen Sie sicher, dass die Datei keine vertraulichen oder sensiblen Informationen enthält.
JDBC-Protokolldatei.
Um die Protokolldatei zu generieren, setzen Sie die Umgebungsvariable
JDBC_TRACE = true
auf Ihrem Kafka Connect-Cluster, bevor Sie den Kafka-Konnektor ausführen.Weitere Informationen zur JDBC-Protokolldatei finden Sie in diesem Artikel in der Snowflake-Community.
Protokolldatei verbinden.
Um die Protokolldatei zu erzeugen, bearbeiten Sie die Datei
etc/kafka/connect-log4j.properties
. Stellen Sie die Eigenschaftlog4j.appender.stdout.layout.ConversionPattern
wie folgt ein:log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
Konnektorkontexte sind in Kafka Version 2.3 und höher verfügbar.
Weitere Informationen dazu finden Sie in den Informationen zu Protokollierungsverbesserungen auf der Confluent-Website.