Funktionsweise von Snowflake High Performance connector for Kafka

Unter diesem Thema werden die verschiedenen Aspekte des Konnektors beschrieben, wie er mit Tabellen und Pipes funktioniert und wie der Konnektor konfiguriert wird.

Funktionsweise des Konnektors mit Tabellen und Pipes

Der High-Performance-Snowflake-Connector für Kafka erfordert, dass Sie Zieltabellen manuell erstellen. Der Konnektor behandelt jeden Kafka-Datensatz als eine Zeile, die in eine Snowflake-Tabelle eingefügt werden soll. Wenn Sie zum Beispiel ein Kafka-Topic haben, bei dem der Inhalt der Nachricht wie folgt strukturiert ist:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

Sie können eine Tabelle mit Spalten erstellen, die denJSON-Schlüsseln entsprechen, und sich auf eine Standard-Pipe namens {tableName}-STREAMING stützen, die automatisch die Schlüssel der obersten Ebene des Datensatzinhalts den Tabellenspalten zuordnet, die dem Namen nach übereinstimmen (Groß-/Kleinschreibung wird nicht berücksichtigt).

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

Wenn Sie Ihre eigene Pipe erstellen möchten, können Sie die Datentransformationslogik in der COPY INTO-Anweisung der Pipe definieren. Sie können Spalten nach Bedarf umbenennen und die Datentypen nach Bedarf umwandeln. Beispiel:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

oder

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wenn Sie Ihre eigene Pipe definieren, müssen die Spalten der Zieltabelle nicht mit den JSON-Schlüsseln übereinstimmen. Sie können die Spalten in die von Ihnen gewünschten Namen umbenennen und die Datentypen nach Bedarf umwandeln.

Topic-Namen, Tabellennamen und Pipe-Namen

Je nach Konfigurationseinstellung verwendet der Konnektor unterschiedliche Namen für die Zieltabelle. Der Name der Zieltabelle wird immer vom Namen des Topics abgeleitet.

Wie der Konnektor Topic-Namen der Zieltabelle zuordnet

Der Kafka-Konnektor bietet zwei Modi für das Zuordnen von Kafka-Topic-Namen zu Snowflake-Tabellennamen:

  • Statische Zuordnung: Der Konnektor leitet die Namen der Zieltabellen nur mit dem Namen des Kafka-Topics ab.

  • Expliziter Modus für die Zuordnung von Topics zu Tabellen: Sie geben benutzerdefinierte Zuordnungen zwischen Themen und Tabellen unter Verwendung des Konfigurationsparameters snowflake.topic2table.map an.

Dynamische Zuordnung

Wenn Sie den Parameter snowflake.topic2table.map nicht konfigurieren, leitet der Konnektor die Tabellennamen immer aus dem Themennamen ab.

Generierung von Tabellennamen:

Der Konnektor leitet den Namen der Zieltabelle aus dem Namen des Topics nach folgenden Regeln ab:

  1. Wenn der Topic-Name ein gültiger Snowflake-Bezeichner ist (beginnend mit einem Buchstaben oder Unterstrich und enthält nur Buchstaben, Ziffern, Unterstriche oder Dollarzeichen), verwendet der Konnektor den Topic-Namen als Tabellennamen (in Großbuchstaben umgewandelt).

  2. Wenn der Topic-Name ungültige Zeichen enthält, führt der Konnektor Folgendes aus:

    • Ersetzt ungültige Zeichen durch Unterstriche

    • Fügt einen Unterstrich gefolgt von einem Hash-Code hinzu, um die Eindeutigkeit zu gewährleisten

    • So wir zum Beispiel das Topics my-topic.data zu MY_TOPIC_DATA_<hash>.

Bestimmen des Pipe-Namens:

Der Konnektor bestimmt anhand der folgenden Logik, welche Pipe verwendet werden soll:

  1. Der Konnektor prüft, ob eine Pipe mit demselben Namen wie dem der Zieltabelle vorhanden ist.

  2. Wenn eine benutzerdefinierte Pipe mit diesem Namen vorhanden ist, verwendet der Konnektor diese Pipe (benutzerdefinierter Pipe-Modus).

  3. Wenn nicht, verwendet der Konnektor die Standard-Pipe mit dem Namen {tableName}-STREAMING.

Bemerkung

Snowflake empfiehlt, Topic-Namen auszuwählen, die den Regeln für Snowflake-Bezeichnernamen entsprechen, um vorhersehbare Tabellennamen zu gewährleisten.

Erläuterungen zu RECORD_METADATA

Der Konnektor füllt die RECORD_METADATA-Struktur mit Metadaten zum Kafka-Datensatz auf. Diese Metadaten werden über die Snowpipe Streaming-Datenquelle an Snowflake gesendet, wo sie mithilfe des $1:RECORD_METADATA-Accessors in Pipe-Transformationen verfügbar gemacht werden. Die RECORD_METADATA-Struktur ist sowohl in benutzerdefinierten Pipe- als auch in Standard-Pipe-Modi verfügbar. Ihr Inhalt kann in der Spalte des Typs VARIANT gespeichert werden, oder einzelne Felder können extrahiert und in separaten Spalten gespeichert werden.

Beispiel für Pipe mit Transformationen und Metadaten:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

In diesem Beispiel:

  • Die Pipe extrahiert bestimmte Felder aus der Kafka-Nachricht (order_id, customer_name, order_total)

  • Es werden auch Metadatenfelder erfasst (Topics, Offset und Zeitstempel der Datenaufnahme).

  • Die Werte können nach Bedarf umgewandelt und/oder transformiert werden.

Wie die Metadatenfelder ausgefüllt werden

Der Konnektor füllt die Metadatenfelder automatisch auf der Grundlage der Eigenschaften des Kafka-Datensatzes und der Konfiguration des Konnektors aus. Mit diesen Konfigurationsparametern können Sie steuern, welche Metadatenfelder enthalten sind:

  • snowflake.metadata.topic (Standard: true) – Beinhaltet den Themennamen

  • snowflake.metadata.offset.and.partition (Standard: „true“) – Beinhaltet Offset und Partition

  • snowflake.metadata.createtime (Standard: true) – Beinhaltet den Zeitstempel des Kafka-Datensatzes

  • snowflake.metadata.all (Standard: true) – Beinhaltet alle verfügbaren Metadaten

Wann snowflake.metadata.all=true (Standard) ist, werden alle Metadatenfelder ausgefüllt. Das Festlegen einzelner Metadaten-Flags auf false schließt diese spezifischen Felder aus der RECORD_METADATA-Struktur aus.

Bemerkung

Das Feld SnowflakeConnectorPushTime ist immer verfügbar und steht für den Zeitpunkt, zu dem der Konnektor den Datensatz in den Aufnahmepuffer verschoben hat. Dies ist nützlich für die Berechnung der End-to-End-Datenaufnahme-Latenz.

Die Spalte RECORD_METADATA enthält standardmäßig die folgenden Informationen:

Feld

Datentyp

Beschreibung

topic

String

Der Name des Kafka-Themas, aus dem der Datensatz stammt.

partition

String

Die Nummer der Partition innerhalb des Themas. (Beachten Sie, dass dies die Kafka-Partition ist, nicht die Snowflake-Mikropartition.)

offset

number

Der Offset in dieser Partition.

CreateTime / . LogAppendTime

number

Dies ist der Zeitstempel, der der Nachricht im Kafka-Thema zugeordnet ist. Der Wert ist Millisekunden seit Mitternacht, 1. Januar 1970, UTC. Weitere Informationen dazu finden Sie unter: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html

SnowflakeConnectorPushTime

number

Ein Zeitstempel, wann ein Datensatz in einen Ingest-SDK-Puffer verschoben wurde. Der Wert ist die Anzahl der Millisekunden seit Mitternacht am 1. Januar 1970, UTC. Weitere Informationen finden Sie unter Abschätzen der Datenaufnahme-Latenz.

key

String

Wenn die Nachricht eine Kafka KeyedMessage ist, ist dies der Schlüssel für diese Nachricht. Damit der Konnektor den Schlüssel in RECORD_METADATA speichern kann, muss der Parameter key.converter in Kafka-Konfigurationseigenschaften auf „org.apache.kafka.connect.storage.StringConverter“ gesetzt sein. Andernfalls ignoriert der Konnektor die Schlüssel.

headers

Objekt

Ein Header ist ein benutzerdefiniertes Schlüssel-Wert-Paar, das dem Datensatz zugeordnet ist. Jeder Datensatz kann 0, 1 oder mehrere Header haben.

Die Menge der in der Spalte RECORD_METADATA aufgezeichneten Metadaten kann mithilfe der optionalen Kafka-Konfigurationseigenschaften konfiguriert werden.

Bei den Feldnamen und Werten muss die Groß- und Kleinschreibung beachtet werden.

Wie Kafka-Datensätze vor der Erfassung konvertiert werden

Bevor jede Zeile an Snowpipe Streaming übergeben wird, konvertiert der Konnektor den Wert des Kafka Connect-Datensatzes in eine Map<Zeichenfolge, Objekt>, deren Schlüssel mit den Namen der Zielspalten übereinstimmen müssen (oder in einer benutzerdefinierten Pipe umgewandelt werden können). Primitive Zeichenfolgen, Byte-Arrays oder Zahlen müssen umschlossen werden (z. B. durch Verwendung von HoistField SMT), sodass der Konnektor ein strukturiertes Objekt erhält. Der Konverter wendet die folgenden Regeln an:

  • Nullwerte werden als Tombstones behandelt. Sie werden übersprungen, wenn behavior.on.null.values=IGNORE ist, oder ansonsten als leere JSON-Objekte aufgenommen.

  • Numerische und boolesche Felder werden unverändert übergeben. Dezimalwerte, deren Genauigkeit größer als 38 ist, werden als Zeichenfolgen serialisiert, um innerhalb der NUMBER-Grenzen von Snowflake zu bleiben.

  • byte[]- und ByteBuffer-Payloads sind Base64-kodierte Zeichenfolgen; speichern Sie sie daher in den Spalten VARIANT oder VARCHAR.

  • Arrays bleiben Arrays, und verschachtelte Objekte bleiben verschachtelte Zuordnungen. Deklarieren Sie VARIANT-Spalten, wenn Sie sich auf die Standard-Pipe verlassen, um verschachtelte Daten so zu übertragen, wie sie sind.

  • Zuordnungen mit Nicht-Zeichenfolgen-Schlüsseln werden als Arrays von [key, value]-Paaren ausgegeben, da Snowflake-Spaltennamen Text sein müssen.

  • Datensatzheader und -schlüssel werden immer dann in RECORD_METADATA kopiert, wenn die entsprechenden Metadaten-Flags aktiviert sind.

Wenn Sie möchten, dass der gesamte Nachrichtentext als eine einzelne Spalte beibehalten wird, schließen Sie ihn mit SMTs in ein neues Feld der obersten Ebene ein. Informationen zum Transformationsmuster finden Sie unter Ältere RECORD_CONTENT-Spalte.

Benutzerdefinierter Pipe-Modus im Vgl. zum Standard-Pipe-Modus

Der Konnektor unterstützt zwei Modi für die Verwaltung der Datenaufnahme:

Benutzerdefinierter Pipe-Modus

In diesem Modus haben Sie die volle Kontrolle über die Datentransformation und die Spaltenzuordnung.

Wann dieser Modus verwendet werden sollte:

  • Sie benötigen benutzerdefinierte Spaltennamen, die sich von den JSON-Feldnamen unterscheiden.

  • Sie müssen Datentransformationen anwenden (Typumwandlung, Maskierung, Filterung)

  • Sie möchten die volle Kontrolle darüber haben, wie Daten den Spalten zugeordnet werden

Standard-Pipe-Modus

In diesem Modus verwendet der Konnektor eine Standard-Pipe namens {tableName}-STREAMING und ordnet Kafka-Datensatzfelder Tabellenspalten zu, die dem Namen nach übereinstimmen (Groß-/Kleinschreibung wird nicht berücksichtigt).

Wann dieser Modus verwendet werden sollte:

  • Die Namen der Kafka-Datensatzschlüssel entsprechen den gewünschten Spaltennamen

  • Sie benötigen keine benutzerdefinierten Datentransformationen

  • Sie wünschen eine einfache Konfiguration

Zuordnung von Kafka-Datensatzschlüsseln zu Tabellenspalten mit Standard-Pipe-Modus

Wenn Sie den Standard-Pipe-Modus verwenden, verwendet der Konnektor eine Standard-Pipe namens {tableName}-STREAMING und ordnet die Schlüssel der ersten Ebene des Inhalts direkt den Tabellenspalten zu, wobei die Groß-/Kleinschreibung nicht berücksichtigt wird.

Verwendung des Standard-Pipe-Modus – Beispiel

Beispiel 1:

Betrachten Sie die folgende Payload des Kafka-Datensatzinhalts:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

Sie erstellen eine Tabelle mit Spalten, die den JSON-Schlüsseln entsprechen (Groß-/Kleinschreibung wird nicht berücksichtigt, einschließlich Sonderzeichen):

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Übereinstimmende Verhaltensweise:

  • "city" (kafka) → city oder CITY oder City (column) – Groß-/Kleinschreibung nicht berücksichtigt

  • "has cat" (kafka) → "has cat" (column) – muss aus Platzgründen in Anführungszeichen gesetzt werden

  • "!@&$#* includes special characters" (kafka) → "!@&$#* includes special characters" (column) – Sonderzeichen werden beibehalten

  • Verschachtelte Objekte wie skills und family werden VARIANT-Spalten automatisch zugeordnet.

Verwenden des benutzerdefinierten Pipe-Modus – Beispiele

Dieses Beispiel zeigt, wie Sie benutzerdefinierte Pipes mit benutzerdefinierten Datentransformationen konfigurieren und verwenden.

Beispiel 1:

Erstellen Sie eine Tabelle mit dem gewünschten Schema:

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

Erstellen Sie eine Pipe, die die eingehenden Kafka-Datensätze so umwandelt, dass sie mit Ihrem Tabellenschema übereinstimmen:

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Beachten Sie, dass der Pipe-Name (ORDERS) mit dem Tabellennamen (ORDERS) übereinstimmt. Die Pipedefinition extrahiert Felder aus der JSON-Payload, wobei die $1:field_name-Syntax verwendet wird, und ordnet sie den Tabellenspalten zu.

Bemerkung

Sie können auf verschachtelte JSON-Felder und Felder mit Sonderzeichen in Klammern zugreifen, z. B. $1['field name'] oder $1['has cat'].

Konfigurieren Sie die Zuordnung von Topics zu Tabellen:

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

Diese Konfiguration ordnet das Kafka-Topic kafka-orders-topic der bereits vorhandenen Tabelle und der Pipe mit dem Namen ORDERS zu.

Beispiel 2:

Wenn Sie im Inhalt auf Schlüssel zugreifen müssen, die keine herkömmlichen Namen haben, verwenden Sie die folgende Syntax:

  • Einfache Felder: $1:field_name

  • Felder mit Leerzeichen oder Sonderzeichen: $1['field name'] oder $1['has cat']

  • Felder mit Unicode-Zeichen: $1[' @&$#* has Łułósżź']

  • Verschachtelte Felder: $1:parent.child oder $1:parent['child field']

Betrachten Sie die JSON-Payload von Kafka:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

Sie erstellen eine Zieltabelle mit den von Ihnen gewählten Spaltennamen:

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Erstellen Sie dann eine Pipe mit demselben Namen, der die Zuordnung definiert:

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wichtigste Punkte:

  • Sie kontrollieren die Spaltennamen (z. B. Umbenennen von "has cat" zu has_cat).

  • Sie können Datentypen nach Bedarf umwandeln (z. B. $1:age::NUMBER).

  • Sie können Felder je nach Bedarf ein- oder ausschließen

  • Sie können Metadatenfelder hinzufügen (z. B. $1:RECORD_METADATA.topic).

  • VARIANT-Spalten verarbeiten verschachtelte JSON-Strukturen automatisch.

Beispiel 3: Mit interaktiven Tabellen

Interaktive Tabellen sind eine spezielle Art von Snowflake-Tabellen, die für Abfragen mit niedriger Latenz bei hoher Parallelität optimiert sind. Mehr über interaktive Tabellen erfahren Sie in der Dokumentation zu interaktiven Tabellen.

Bemerkung

Interaktive Tabellen sind derzeit ein Vorschau-Feature, das nur für ausgewählte Konten verfügbar ist.

  1. Interaktive Tabelle erstellen:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. Konfigurieren Sie die Zuordnung von Topics zu Tabellen:

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

Wichtige Hinweise:

  • Interaktive Tabellen unterliegen bestimmten Einschränkungen und Abfragebeschränkungen. Sehen Sie sich die Dokumentation zu interaktiven Tabellen an, bevor Sie sie mit dem Konnektor verwenden.

  • Bei interaktiven Tabellen müssen alle erforderlichen Transformationen in der Tabellendefinition verarbeitet werden.

  • Interaktive Warehouses sind erforderlich, um interaktive Tabellen effizient abzufragen.

Explizite Zuordnung von Topics zu Tabellen

Wenn Sie den Parameter snowflake.topic2table.map konfigurieren, arbeitet der Konnektor im expliziten Zuordnungsmodus. Dieser Modus bietet Folgendes:

  • Zuordnen mehrerer Kafka-Topics zu einer einzigen Snowflake-Tabelle

  • Verwenden von benutzerdefinierten Tabellennamen, die sich von Topic-Namen unterscheiden

  • Anwenden von Regex-Mustern, um mehrere Topics zuzuordnen

Konfigurationsformat:

Der snowflake.topic2table.map-Parameter akzeptiert eine durch Kommas getrennte Liste von Topics-zu-Topics-Zuordnungen im folgenden Format:

topic1:table1,topic2:table2,topic3:table3
Copy

Beispielkonfigurationen:

Direkte Topic-Zuordnung

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

Regex-Mustererkennung

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

Dieses Konfiguration ordnet alle Topics, die mit _cat enden (wie z. B. orange_cat, calico_cat) der Tabelle CAT_TABLE zu, und alle Topics, die mit _dog enden, werden der Tabelle DOG_TABLE zugeordnet.

Viele Topics zu einer Tabelle

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

Diese Konfiguration ordnet sowohl topic1 als auch topic2 der shared_table zu, während topic3 der other_table zugeordnet wird.

Wichtig

  • Regex-Muster in der Zuordnung können sich nicht überschneiden. Jedes Topics darf höchstens einem Muster entsprechen.

  • Tabellennamen in der Zuordnung müssen gültige Snowflake-Bezeichner mit mindestens 2 Zeichen sein, beginnend mit einem Buchstaben oder einem Unterstrich.

  • Sie können einer einzigen Tabelle mehrere Topics zuordnen.

Ältere RECORD_CONTENT-Spalte

Wenn in älteren Versionen des Konnektors die Schematisierungsfunktion deaktiviert war, erstellte der Konnektor eine Zieltabelle mit zwei Spalten: RECORD_CONTENT und RECORD_METADATA. Die RECORD_CONTENT-Spalte enthielt den gesamten Kafka-Nachrichtinhalt in einer Spalte vom Typ VARIANT. Die RECORD_METADATA-Spalte wird weiterhin unterstützt, aber die RECORD_CONTENT-Spalte wird nicht mehr vom Konnektor erstellt. Die gleiche Funktionalität kann mit SMT-Transformationen erreicht werden (siehe Beispiele weiter unten in diesem Abschnitt). Der RECORD_CONTENT-Schlüssel ist auch nicht mehr in PIPE-Transformationen verfügbar. Diese PIPE-Definition funktioniert beispielsweise nicht standardmäßig:

Bemerkung

Diese Pipe-Definition funktioniert nicht ohne zusätzliche SMT-Transformationen.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wenn Sie den gesamten Inhalt einer Kafka-Nachricht in einer einzigen Spalte speichern möchten, oder wenn Sie ein Handle für den gesamten Inhalt der Kafka-Nachricht in einer PIPE-Transformation benötigen, können Sie die folgende SMT-Transformation verwenden, die den gesamten Inhalt der Kafka-Nachricht in das gewünschte benutzerdefinierte Feld einschließt:

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

Durch diese Transformation wird der gesamte Inhalt der Kafka-Nachricht in ein benutzerdefiniertes Feld mit dem Namen your_top_level_field_name eingeschlossen. Sie können dann mithilfe der des $1:your_top_level_field_name-Accessors in Ihrer PIPE-Transformation auf den gesamten Inhalt der Kafka-Nachricht zugreifen.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wenn Sie alternativ die gesamten Metadaten und den Inhalt in einer einzigen Tabelle mit der Standard-Pipe speichern möchten, erstellen Sie keine benutzerdefinierte Pipe. Erstellen Sie stattdessen eine Tabelle mit nur zwei Spalten: RECORD_CONTENT und your_top_level_field_name.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

Weitere Informationen zur HoistField$Value-Transformation finden Sie in der Kafka-Dokumentation.

Warnung

Das Speichern des gesamten Inhalts einer Kafka-Nachricht und der Metadaten in einer Tabelle kann sich negativ auf die Kosten für die Datenaufnahme, die Geschwindigkeit der Pipeline und die Latenz auswirken. Wenn Sie die bestmögliche Performance benötigen, sollten Sie nur die Daten speichern, die Sie benötigen, wenn diese von der obersten Ebene des Kafka-Datensatzinhalts zugänglich sind. Oder verwenden Sie SMT-Transformationen, um die Daten aus tief verschachtelten Feldern in Felder der obersten Ebene zu extrahieren.

Behandlung von Streaming-Kanal-Fehlern und Dead-Letter-Queues

In Version 4.0.0-rc4 prüft der Konnektor den Status des Snowpipe Streaming-Kanals, bevor Offsets übertragen werden. Wenn Snowflake abgelehnte Zeilen meldet (rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all

errors.tolerance=all
Copy

Schemaentwicklung

Wichtig

Schemaentwicklung wird vom High-Performance-Snowflake-Connector für Kafka nicht unterstützt. Sie müssen Schemaänderungen an Ihren Zieltabellen manuell verwalten.

Der Konnektor erkennt Schema-Änderungen nicht automatisch oder entwickelt Tabellenschemas auf Grundlage eingehender Kafka-Datensätze nicht weiter. Wenn Sie Spalten hinzufügen, Datentypen ändern oder andere Schemaänderungen vornehmen müssen, müssen Sie Folgendes tun:

  1. Konnektor anhalten, um die Datenerfassung zu stoppen

  2. Tabellenschema manuell ändern mit ALTER TABLE oder Tabelle neu erstellen

  3. Die Pipe-Definition aktualisieren, wenn Sie benutzerdefinierte Pipes verwenden und die Transformationslogik geändert werden muss

  4. Den Konnektor neu starten, um die Datenaufnahme fortzusetzen

Bemerkung

Die Unterstützung der Schemaentwicklung wird in zukünftigen Versionen hinzugefügt.

Fehlertoleranz

Einschränkungen der Fehlertoleranz mit dem Konnektor

Kafka-Themen können mit einer Begrenzung des Speicherplatzes oder der Aufbewahrungsdauer konfiguriert werden.

  • Wenn das System länger als die Aufbewahrungsdauer offline ist, werden abgelaufene Datensätze nicht geladen. Auf ähnliche Weise werden einige Nachrichten nicht zugestellt, wenn das Speicherplatzlimit von Kafka überschritten wurde.

  • Wenn Nachrichten im Kafka-Topic gelöscht oder aktualisiert werden, spiegeln sich diese Änderungen ggf. nicht in der Snowflake-Tabelle wider.

Nächste Schritte

Richten Sie Aufgaben ein.