Funktionsweise von Snowflake High Performance connector for Kafka¶
Unter diesem Thema werden die verschiedenen Aspekte des Konnektors beschrieben, wie er mit Tabellen und Pipes funktioniert und wie der Konnektor konfiguriert wird.
Funktionsweise des Konnektors mit Tabellen und Pipes¶
Der High-Performance-Snowflake-Connector für Kafka erfordert, dass Sie Zieltabellen manuell erstellen. Der Konnektor behandelt jeden Kafka-Datensatz als eine Zeile, die in eine Snowflake-Tabelle eingefügt werden soll. Wenn Sie zum Beispiel ein Kafka-Topic haben, bei dem der Inhalt der Nachricht wie folgt strukturiert ist:
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
Sie können eine Tabelle mit Spalten erstellen, die denJSON-Schlüsseln entsprechen, und sich auf eine Standard-Pipe namens {tableName}-STREAMING stützen, die automatisch die Schlüssel der obersten Ebene des Datensatzinhalts den Tabellenspalten zuordnet, die dem Namen nach übereinstimmen (Groß-/Kleinschreibung wird nicht berücksichtigt).
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
Wenn Sie Ihre eigene Pipe erstellen möchten, können Sie die Datentransformationslogik in der COPY INTO-Anweisung der Pipe definieren. Sie können Spalten nach Bedarf umbenennen und die Datentypen nach Bedarf umwandeln. Beispiel:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
oder
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Wenn Sie Ihre eigene Pipe definieren, müssen die Spalten der Zieltabelle nicht mit den JSON-Schlüsseln übereinstimmen. Sie können die Spalten in die von Ihnen gewünschten Namen umbenennen und die Datentypen nach Bedarf umwandeln.
Topic-Namen, Tabellennamen und Pipe-Namen¶
Je nach Konfigurationseinstellung verwendet der Konnektor unterschiedliche Namen für die Zieltabelle. Der Name der Zieltabelle wird immer vom Namen des Topics abgeleitet.
Wie der Konnektor Topic-Namen der Zieltabelle zuordnet¶
Der Kafka-Konnektor bietet zwei Modi für das Zuordnen von Kafka-Topic-Namen zu Snowflake-Tabellennamen:
Statische Zuordnung: Der Konnektor leitet die Namen der Zieltabellen nur mit dem Namen des Kafka-Topics ab.
Expliziter Modus für die Zuordnung von Topics zu Tabellen: Sie geben benutzerdefinierte Zuordnungen zwischen Themen und Tabellen unter Verwendung des Konfigurationsparameters
snowflake.topic2table.mapan.
Dynamische Zuordnung¶
Wenn Sie den Parameter snowflake.topic2table.map nicht konfigurieren, leitet der Konnektor die Tabellennamen immer aus dem Themennamen ab.
Generierung von Tabellennamen:
Der Konnektor leitet den Namen der Zieltabelle aus dem Namen des Topics nach folgenden Regeln ab:
Wenn der Topic-Name ein gültiger Snowflake-Bezeichner ist (beginnend mit einem Buchstaben oder Unterstrich und enthält nur Buchstaben, Ziffern, Unterstriche oder Dollarzeichen), verwendet der Konnektor den Topic-Namen als Tabellennamen (in Großbuchstaben umgewandelt).
Wenn der Topic-Name ungültige Zeichen enthält, führt der Konnektor Folgendes aus:
Ersetzt ungültige Zeichen durch Unterstriche
Fügt einen Unterstrich gefolgt von einem Hash-Code hinzu, um die Eindeutigkeit zu gewährleisten
So wir zum Beispiel das Topics
my-topic.datazuMY_TOPIC_DATA_<hash>.
Bestimmen des Pipe-Namens:
Der Konnektor bestimmt anhand der folgenden Logik, welche Pipe verwendet werden soll:
Der Konnektor prüft, ob eine Pipe mit demselben Namen wie dem der Zieltabelle vorhanden ist.
Wenn eine benutzerdefinierte Pipe mit diesem Namen vorhanden ist, verwendet der Konnektor diese Pipe (benutzerdefinierter Pipe-Modus).
Wenn nicht, verwendet der Konnektor die Standard-Pipe mit dem Namen
{tableName}-STREAMING.
Bemerkung
Snowflake empfiehlt, Topic-Namen auszuwählen, die den Regeln für Snowflake-Bezeichnernamen entsprechen, um vorhersehbare Tabellennamen zu gewährleisten.
Erläuterungen zu RECORD_METADATA¶
Der Konnektor füllt die RECORD_METADATA-Struktur mit Metadaten zum Kafka-Datensatz auf. Diese Metadaten werden über die Snowpipe Streaming-Datenquelle an Snowflake gesendet, wo sie mithilfe des $1:RECORD_METADATA-Accessors in Pipe-Transformationen verfügbar gemacht werden. Die RECORD_METADATA-Struktur ist sowohl in benutzerdefinierten Pipe- als auch in Standard-Pipe-Modi verfügbar. Ihr Inhalt kann in der Spalte des Typs VARIANT gespeichert werden, oder einzelne Felder können extrahiert und in separaten Spalten gespeichert werden.
Beispiel für Pipe mit Transformationen und Metadaten:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
In diesem Beispiel:
Die Pipe extrahiert bestimmte Felder aus der Kafka-Nachricht (order_id, customer_name, order_total)
Es werden auch Metadatenfelder erfasst (Topics, Offset und Zeitstempel der Datenaufnahme).
Die Werte können nach Bedarf umgewandelt und/oder transformiert werden.
Wie die Metadatenfelder ausgefüllt werden¶
Der Konnektor füllt die Metadatenfelder automatisch auf der Grundlage der Eigenschaften des Kafka-Datensatzes und der Konfiguration des Konnektors aus. Mit diesen Konfigurationsparametern können Sie steuern, welche Metadatenfelder enthalten sind:
snowflake.metadata.topic(Standard: true) – Beinhaltet den Themennamensnowflake.metadata.offset.and.partition(Standard: „true“) – Beinhaltet Offset und Partitionsnowflake.metadata.createtime(Standard: true) – Beinhaltet den Zeitstempel des Kafka-Datensatzessnowflake.metadata.all(Standard: true) – Beinhaltet alle verfügbaren Metadaten
Wann snowflake.metadata.all=true (Standard) ist, werden alle Metadatenfelder ausgefüllt. Das Festlegen einzelner Metadaten-Flags auf false schließt diese spezifischen Felder aus der RECORD_METADATA-Struktur aus.
Bemerkung
Das Feld SnowflakeConnectorPushTime ist immer verfügbar und steht für den Zeitpunkt, zu dem der Konnektor den Datensatz in den Aufnahmepuffer verschoben hat. Dies ist nützlich für die Berechnung der End-to-End-Datenaufnahme-Latenz.
Die Spalte RECORD_METADATA enthält standardmäßig die folgenden Informationen:
Feld |
Datentyp |
Beschreibung |
|---|---|---|
topic |
String |
Der Name des Kafka-Themas, aus dem der Datensatz stammt. |
partition |
String |
Die Nummer der Partition innerhalb des Themas. (Beachten Sie, dass dies die Kafka-Partition ist, nicht die Snowflake-Mikropartition.) |
offset |
number |
Der Offset in dieser Partition. |
CreateTime / . LogAppendTime |
number |
Dies ist der Zeitstempel, der der Nachricht im Kafka-Thema zugeordnet ist. Der Wert ist Millisekunden seit Mitternacht, 1. Januar 1970, UTC. Weitere Informationen dazu finden Sie unter: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html |
SnowflakeConnectorPushTime |
number |
Ein Zeitstempel, wann ein Datensatz in einen Ingest-SDK-Puffer verschoben wurde. Der Wert ist die Anzahl der Millisekunden seit Mitternacht am 1. Januar 1970, UTC. Weitere Informationen finden Sie unter Abschätzen der Datenaufnahme-Latenz. |
key |
String |
Wenn die Nachricht eine Kafka KeyedMessage ist, ist dies der Schlüssel für diese Nachricht. Damit der Konnektor den Schlüssel in RECORD_METADATA speichern kann, muss der Parameter key.converter in Kafka-Konfigurationseigenschaften auf „org.apache.kafka.connect.storage.StringConverter“ gesetzt sein. Andernfalls ignoriert der Konnektor die Schlüssel. |
headers |
Objekt |
Ein Header ist ein benutzerdefiniertes Schlüssel-Wert-Paar, das dem Datensatz zugeordnet ist. Jeder Datensatz kann 0, 1 oder mehrere Header haben. |
Die Menge der in der Spalte RECORD_METADATA aufgezeichneten Metadaten kann mithilfe der optionalen Kafka-Konfigurationseigenschaften konfiguriert werden.
Bei den Feldnamen und Werten muss die Groß- und Kleinschreibung beachtet werden.
Wie Kafka-Datensätze vor der Erfassung konvertiert werden¶
Bevor jede Zeile an Snowpipe Streaming übergeben wird, konvertiert der Konnektor den Wert des Kafka Connect-Datensatzes in eine Map<Zeichenfolge, Objekt>, deren Schlüssel mit den Namen der Zielspalten übereinstimmen müssen (oder in einer benutzerdefinierten Pipe umgewandelt werden können). Primitive Zeichenfolgen, Byte-Arrays oder Zahlen müssen umschlossen werden (z. B. durch Verwendung von HoistField SMT), sodass der Konnektor ein strukturiertes Objekt erhält. Der Konverter wendet die folgenden Regeln an:
Nullwerte werden als Tombstones behandelt. Sie werden übersprungen, wenn
behavior.on.null.values=IGNOREist, oder ansonsten als leere JSON-Objekte aufgenommen.Numerische und boolesche Felder werden unverändert übergeben. Dezimalwerte, deren Genauigkeit größer als 38 ist, werden als Zeichenfolgen serialisiert, um innerhalb der
NUMBER-Grenzen von Snowflake zu bleiben.byte[]- undByteBuffer-Payloads sind Base64-kodierte Zeichenfolgen; speichern Sie sie daher in den SpaltenVARIANToderVARCHAR.Arrays bleiben Arrays, und verschachtelte Objekte bleiben verschachtelte Zuordnungen. Deklarieren Sie
VARIANT-Spalten, wenn Sie sich auf die Standard-Pipe verlassen, um verschachtelte Daten so zu übertragen, wie sie sind.Zuordnungen mit Nicht-Zeichenfolgen-Schlüsseln werden als Arrays von
[key, value]-Paaren ausgegeben, da Snowflake-Spaltennamen Text sein müssen.Datensatzheader und -schlüssel werden immer dann in
RECORD_METADATAkopiert, wenn die entsprechenden Metadaten-Flags aktiviert sind.
Wenn Sie möchten, dass der gesamte Nachrichtentext als eine einzelne Spalte beibehalten wird, schließen Sie ihn mit SMTs in ein neues Feld der obersten Ebene ein. Informationen zum Transformationsmuster finden Sie unter Ältere RECORD_CONTENT-Spalte.
Benutzerdefinierter Pipe-Modus im Vgl. zum Standard-Pipe-Modus¶
Der Konnektor unterstützt zwei Modi für die Verwaltung der Datenaufnahme:
Benutzerdefinierter Pipe-Modus¶
In diesem Modus haben Sie die volle Kontrolle über die Datentransformation und die Spaltenzuordnung.
Wann dieser Modus verwendet werden sollte:
Sie benötigen benutzerdefinierte Spaltennamen, die sich von den JSON-Feldnamen unterscheiden.
Sie müssen Datentransformationen anwenden (Typumwandlung, Maskierung, Filterung)
Sie möchten die volle Kontrolle darüber haben, wie Daten den Spalten zugeordnet werden
Standard-Pipe-Modus¶
In diesem Modus verwendet der Konnektor eine Standard-Pipe namens {tableName}-STREAMING und ordnet Kafka-Datensatzfelder Tabellenspalten zu, die dem Namen nach übereinstimmen (Groß-/Kleinschreibung wird nicht berücksichtigt).
Wann dieser Modus verwendet werden sollte:
Die Namen der Kafka-Datensatzschlüssel entsprechen den gewünschten Spaltennamen
Sie benötigen keine benutzerdefinierten Datentransformationen
Sie wünschen eine einfache Konfiguration
Zuordnung von Kafka-Datensatzschlüsseln zu Tabellenspalten mit Standard-Pipe-Modus
Wenn Sie den Standard-Pipe-Modus verwenden, verwendet der Konnektor eine Standard-Pipe namens {tableName}-STREAMING und ordnet die Schlüssel der ersten Ebene des Inhalts direkt den Tabellenspalten zu, wobei die Groß-/Kleinschreibung nicht berücksichtigt wird.
Verwendung des Standard-Pipe-Modus – Beispiel¶
Beispiel 1:¶
Betrachten Sie die folgende Payload des Kafka-Datensatzinhalts:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
Sie erstellen eine Tabelle mit Spalten, die den JSON-Schlüsseln entsprechen (Groß-/Kleinschreibung wird nicht berücksichtigt, einschließlich Sonderzeichen):
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
Übereinstimmende Verhaltensweise:
"city"(kafka) →cityoderCITYoderCity(column) – Groß-/Kleinschreibung nicht berücksichtigt"has cat"(kafka) →"has cat"(column) – muss aus Platzgründen in Anführungszeichen gesetzt werden"!@&$#* includes special characters"(kafka) →"!@&$#* includes special characters"(column) – Sonderzeichen werden beibehaltenVerschachtelte Objekte wie
skillsundfamilywerden VARIANT-Spalten automatisch zugeordnet.
Verwenden des benutzerdefinierten Pipe-Modus – Beispiele¶
Dieses Beispiel zeigt, wie Sie benutzerdefinierte Pipes mit benutzerdefinierten Datentransformationen konfigurieren und verwenden.
Beispiel 1:¶
Erstellen Sie eine Tabelle mit dem gewünschten Schema:
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
Erstellen Sie eine Pipe, die die eingehenden Kafka-Datensätze so umwandelt, dass sie mit Ihrem Tabellenschema übereinstimmen:
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Beachten Sie, dass der Pipe-Name (ORDERS) mit dem Tabellennamen (ORDERS) übereinstimmt. Die Pipedefinition extrahiert Felder aus der JSON-Payload, wobei die $1:field_name-Syntax verwendet wird, und ordnet sie den Tabellenspalten zu.
Bemerkung
Sie können auf verschachtelte JSON-Felder und Felder mit Sonderzeichen in Klammern zugreifen, z. B. $1['field name'] oder $1['has cat'].
Konfigurieren Sie die Zuordnung von Topics zu Tabellen:
snowflake.topic2table.map=kafka-orders-topic:ORDERS
Diese Konfiguration ordnet das Kafka-Topic kafka-orders-topic der bereits vorhandenen Tabelle und der Pipe mit dem Namen ORDERS zu.
Beispiel 2:¶
Wenn Sie im Inhalt auf Schlüssel zugreifen müssen, die keine herkömmlichen Namen haben, verwenden Sie die folgende Syntax:
Einfache Felder:
$1:field_nameFelder mit Leerzeichen oder Sonderzeichen:
$1['field name']oder$1['has cat']Felder mit Unicode-Zeichen:
$1[' @&$#* has Łułósżź']Verschachtelte Felder:
$1:parent.childoder$1:parent['child field']
Betrachten Sie die JSON-Payload von Kafka:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
Sie erstellen eine Zieltabelle mit den von Ihnen gewählten Spaltennamen:
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
Erstellen Sie dann eine Pipe mit demselben Namen, der die Zuordnung definiert:
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Wichtigste Punkte:
Sie kontrollieren die Spaltennamen (z. B. Umbenennen von
"has cat"zuhas_cat).Sie können Datentypen nach Bedarf umwandeln (z. B.
$1:age::NUMBER).Sie können Felder je nach Bedarf ein- oder ausschließen
Sie können Metadatenfelder hinzufügen (z. B.
$1:RECORD_METADATA.topic).VARIANT-Spalten verarbeiten verschachtelte JSON-Strukturen automatisch.
Beispiel 3: Mit interaktiven Tabellen¶
Interaktive Tabellen sind eine spezielle Art von Snowflake-Tabellen, die für Abfragen mit niedriger Latenz bei hoher Parallelität optimiert sind. Mehr über interaktive Tabellen erfahren Sie in der Dokumentation zu interaktiven Tabellen.
Bemerkung
Interaktive Tabellen sind derzeit ein Vorschau-Feature, das nur für ausgewählte Konten verfügbar ist.
Interaktive Tabelle erstellen:
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Konfigurieren Sie die Zuordnung von Topics zu Tabellen:
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
Wichtige Hinweise:
Interaktive Tabellen unterliegen bestimmten Einschränkungen und Abfragebeschränkungen. Sehen Sie sich die Dokumentation zu interaktiven Tabellen an, bevor Sie sie mit dem Konnektor verwenden.
Bei interaktiven Tabellen müssen alle erforderlichen Transformationen in der Tabellendefinition verarbeitet werden.
Interaktive Warehouses sind erforderlich, um interaktive Tabellen effizient abzufragen.
Explizite Zuordnung von Topics zu Tabellen¶
Wenn Sie den Parameter snowflake.topic2table.map konfigurieren, arbeitet der Konnektor im expliziten Zuordnungsmodus. Dieser Modus bietet Folgendes:
Zuordnen mehrerer Kafka-Topics zu einer einzigen Snowflake-Tabelle
Verwenden von benutzerdefinierten Tabellennamen, die sich von Topic-Namen unterscheiden
Anwenden von Regex-Mustern, um mehrere Topics zuzuordnen
Konfigurationsformat:
Der snowflake.topic2table.map-Parameter akzeptiert eine durch Kommas getrennte Liste von Topics-zu-Topics-Zuordnungen im folgenden Format:
topic1:table1,topic2:table2,topic3:table3
Beispielkonfigurationen:
Direkte Topic-Zuordnung
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Regex-Mustererkennung
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Dieses Konfiguration ordnet alle Topics, die mit _cat enden (wie z. B. orange_cat, calico_cat) der Tabelle CAT_TABLE zu, und alle Topics, die mit _dog enden, werden der Tabelle DOG_TABLE zugeordnet.
Viele Topics zu einer Tabelle
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Diese Konfiguration ordnet sowohl topic1 als auch topic2 der shared_table zu, während topic3 der other_table zugeordnet wird.
Wichtig
Regex-Muster in der Zuordnung können sich nicht überschneiden. Jedes Topics darf höchstens einem Muster entsprechen.
Tabellennamen in der Zuordnung müssen gültige Snowflake-Bezeichner mit mindestens 2 Zeichen sein, beginnend mit einem Buchstaben oder einem Unterstrich.
Sie können einer einzigen Tabelle mehrere Topics zuordnen.
Ältere RECORD_CONTENT-Spalte¶
Wenn in älteren Versionen des Konnektors die Schematisierungsfunktion deaktiviert war, erstellte der Konnektor eine Zieltabelle mit zwei Spalten: RECORD_CONTENT und RECORD_METADATA. Die RECORD_CONTENT-Spalte enthielt den gesamten Kafka-Nachrichtinhalt in einer Spalte vom Typ VARIANT. Die RECORD_METADATA-Spalte wird weiterhin unterstützt, aber die RECORD_CONTENT-Spalte wird nicht mehr vom Konnektor erstellt. Die gleiche Funktionalität kann mit SMT-Transformationen erreicht werden (siehe Beispiele weiter unten in diesem Abschnitt). Der RECORD_CONTENT-Schlüssel ist auch nicht mehr in PIPE-Transformationen verfügbar. Diese PIPE-Definition funktioniert beispielsweise nicht standardmäßig:
Bemerkung
Diese Pipe-Definition funktioniert nicht ohne zusätzliche SMT-Transformationen.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Wenn Sie den gesamten Inhalt einer Kafka-Nachricht in einer einzigen Spalte speichern möchten, oder wenn Sie ein Handle für den gesamten Inhalt der Kafka-Nachricht in einer PIPE-Transformation benötigen, können Sie die folgende SMT-Transformation verwenden, die den gesamten Inhalt der Kafka-Nachricht in das gewünschte benutzerdefinierte Feld einschließt:
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Durch diese Transformation wird der gesamte Inhalt der Kafka-Nachricht in ein benutzerdefiniertes Feld mit dem Namen your_top_level_field_name eingeschlossen. Sie können dann mithilfe der des $1:your_top_level_field_name-Accessors in Ihrer PIPE-Transformation auf den gesamten Inhalt der Kafka-Nachricht zugreifen.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Wenn Sie alternativ die gesamten Metadaten und den Inhalt in einer einzigen Tabelle mit der Standard-Pipe speichern möchten, erstellen Sie keine benutzerdefinierte Pipe. Erstellen Sie stattdessen eine Tabelle mit nur zwei Spalten: RECORD_CONTENT und your_top_level_field_name.
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
Weitere Informationen zur HoistField$Value-Transformation finden Sie in der Kafka-Dokumentation.
Warnung
Das Speichern des gesamten Inhalts einer Kafka-Nachricht und der Metadaten in einer Tabelle kann sich negativ auf die Kosten für die Datenaufnahme, die Geschwindigkeit der Pipeline und die Latenz auswirken. Wenn Sie die bestmögliche Performance benötigen, sollten Sie nur die Daten speichern, die Sie benötigen, wenn diese von der obersten Ebene des Kafka-Datensatzinhalts zugänglich sind. Oder verwenden Sie SMT-Transformationen, um die Daten aus tief verschachtelten Feldern in Felder der obersten Ebene zu extrahieren.
Behandlung von Streaming-Kanal-Fehlern und Dead-Letter-Queues¶
In Version 4.0.0-rc4 prüft der Konnektor den Status des Snowpipe Streaming-Kanals, bevor Offsets übertragen werden. Wenn Snowflake abgelehnte Zeilen meldet (rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all
errors.tolerance=all
Schemaentwicklung¶
Wichtig
Schemaentwicklung wird vom High-Performance-Snowflake-Connector für Kafka nicht unterstützt. Sie müssen Schemaänderungen an Ihren Zieltabellen manuell verwalten.
Der Konnektor erkennt Schema-Änderungen nicht automatisch oder entwickelt Tabellenschemas auf Grundlage eingehender Kafka-Datensätze nicht weiter. Wenn Sie Spalten hinzufügen, Datentypen ändern oder andere Schemaänderungen vornehmen müssen, müssen Sie Folgendes tun:
Konnektor anhalten, um die Datenerfassung zu stoppen
Tabellenschema manuell ändern mit ALTER TABLE oder Tabelle neu erstellen
Die Pipe-Definition aktualisieren, wenn Sie benutzerdefinierte Pipes verwenden und die Transformationslogik geändert werden muss
Den Konnektor neu starten, um die Datenaufnahme fortzusetzen
Bemerkung
Die Unterstützung der Schemaentwicklung wird in zukünftigen Versionen hinzugefügt.
Fehlertoleranz¶
Einschränkungen der Fehlertoleranz mit dem Konnektor¶
Kafka-Themen können mit einer Begrenzung des Speicherplatzes oder der Aufbewahrungsdauer konfiguriert werden.
Wenn das System länger als die Aufbewahrungsdauer offline ist, werden abgelaufene Datensätze nicht geladen. Auf ähnliche Weise werden einige Nachrichten nicht zugestellt, wenn das Speicherplatzlimit von Kafka überschritten wurde.
Wenn Nachrichten im Kafka-Topic gelöscht oder aktualisiert werden, spiegeln sich diese Änderungen ggf. nicht in der Snowflake-Tabelle wider.