Working with the Snowflake High Performance connector for Kafka

This topic describes how the connector works with tables and pipes, and how to configure the connector with these elements.

Funktionsweise des Konnektors mit Tabellen und Pipes

The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

Standardmäßig müssen Sie vor Beginn der Datenaufnahme keine Tabelle oder Pipe erstellen. Der Konnektor kann eine Tabelle mit Spalten erstellen, die denJSON-Schlüsseln entsprechen, und sich auf eine Standard-Pipe namens {tableName}-STREAMING stützen, die automatisch die Schlüssel der obersten Ebene des Datensatzinhalts den Tabellenspalten zuordnet, die dem Namen nach übereinstimmen (Groß-/Kleinschreibung wird nicht berücksichtigt). Sie können auch Ihre eigene Tabelle mit Spalten erstellen, die den JSON-Schlüsseln entsprechen. Der Konnektor versucht, die Schlüssel der ersten Ebene des Datensatzinhalts namentlich mit den Tabellenspalten abzugleichen. Wenn Schlüssel aus dem JSON nicht mit den Tabellenspalten übereinstimmen, ignoriert der Konnektor die Schlüssel.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

Wenn Sie Ihre eigene Pipe erstellen möchten, können Sie die Datentransformationslogik in der COPY INTO-Anweisung der Pipe definieren. Sie können Spalten nach Bedarf umbenennen und die Datentypen nach Bedarf umwandeln. Beispiel:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

oder

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

When you define your own pipe your destination table columns need not need match the JSON keys. You can rename the columns to your desired names and cast the data types if required.

Topic-Namen, Tabellennamen und Pipe-Namen

Je nach Konfigurationseinstellung verwendet der Konnektor unterschiedliche Namen für die Zieltabelle. Der Name der Zieltabelle wird immer vom Namen des Topics abgeleitet.

Wie der Konnektor Topic-Namen der Zieltabelle zuordnet

Der Kafka-Konnektor bietet zwei Modi für das Zuordnen von Kafka-Topic-Namen zu Snowflake-Tabellennamen:

  • Statische Zuordnung: Der Konnektor leitet die Namen der Zieltabellen nur mit dem Namen des Kafka-Topics ab.

  • Expliziter Modus für die Zuordnung von Topics zu Tabellen: Sie geben benutzerdefinierte Zuordnungen zwischen Themen und Tabellen unter Verwendung des Konfigurationsparameters snowflake.topic2table.map an.

Dynamische Zuordnung

Wenn Sie den Parameter snowflake.topic2table.map nicht konfigurieren, leitet der Konnektor die Tabellennamen immer aus dem Themennamen ab.

Generierung von Tabellennamen:

Der Konnektor leitet den Namen der Zieltabelle aus dem Namen des Topics nach folgenden Regeln ab:

  1. If the topic name is a valid Snowflake identifier the connector uses the topic name as the destination table name, converted to uppercase).

  2. Wenn der Topic-Name ungültige Zeichen enthält, führt der Konnektor Folgendes aus:

    • Ersetzt ungültige Zeichen durch Unterstriche

    • Fügt einen Unterstrich gefolgt von einem Hash-Code hinzu, um die Eindeutigkeit zu gewährleisten

    • So wir zum Beispiel das Topics my-topic.data zu MY_TOPIC_DATA_<hash>.

Bestimmen des Pipe-Namens:

Der Konnektor bestimmt anhand der folgenden Logik, welche Pipe verwendet werden soll:

  1. Der Konnektor prüft, ob eine Pipe mit demselben Namen wie dem der Zieltabelle vorhanden ist.

  2. Wenn eine benutzerdefinierte Pipe mit diesem Namen vorhanden ist, verwendet der Konnektor diese Pipe (benutzerdefinierter Pipe-Modus).

  3. Wenn nicht, verwendet der Konnektor die Standard-Pipe mit dem Namen {tableName}-STREAMING.

Bemerkung

Snowflake empfiehlt, Topic-Namen auszuwählen, die den Regeln für Snowflake-Bezeichnernamen entsprechen, um vorhersehbare Tabellennamen zu gewährleisten.

Erläuterungen zu RECORD_METADATA

Der Konnektor füllt die RECORD_METADATA-Struktur mit Metadaten zum Kafka-Datensatz auf. Diese Metadaten werden über die Snowpipe Streaming-Datenquelle an Snowflake gesendet, wo sie mithilfe des $1:RECORD_METADATA-Accessors in Pipe-Transformationen verfügbar gemacht werden. Die RECORD_METADATA-Struktur ist sowohl in benutzerdefinierten Pipe- als auch in Standard-Pipe-Modi verfügbar. Ihr Inhalt kann in der Spalte des Typs VARIANT gespeichert werden, oder einzelne Felder können extrahiert und in separaten Spalten gespeichert werden.

Beispiel für Pipe mit Transformationen und Metadaten:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

In diesem Beispiel:

  • Die Pipe extrahiert bestimmte Felder aus der Kafka-Nachricht (order_id, customer_name, order_total)

  • Es werden auch Metadatenfelder erfasst (Topics, Offset und Zeitstempel der Datenaufnahme).

  • Die Werte können nach Bedarf umgewandelt und/oder transformiert werden.

Wie die Metadatenfelder ausgefüllt werden

Der Konnektor füllt die Metadatenfelder automatisch auf der Grundlage der Eigenschaften des Kafka-Datensatzes und der Konfiguration des Konnektors aus. Mit diesen Konfigurationsparametern können Sie steuern, welche Metadatenfelder enthalten sind:

  • snowflake.metadata.topic (Standard: true) – Beinhaltet den Themennamen

  • snowflake.metadata.offset.and.partition (Standard: „true“) – Beinhaltet Offset und Partition

  • snowflake.metadata.createtime (Standard: true) – Beinhaltet den Zeitstempel des Kafka-Datensatzes

  • snowflake.metadata.all (Standard: true) – Beinhaltet alle verfügbaren Metadaten

Wann snowflake.metadata.all=true (Standard) ist, werden alle Metadatenfelder ausgefüllt. Das Festlegen einzelner Metadaten-Flags auf false schließt diese spezifischen Felder aus der RECORD_METADATA-Struktur aus.

Bemerkung

Das Feld SnowflakeConnectorPushTime ist immer verfügbar und steht für den Zeitpunkt, zu dem der Konnektor den Datensatz in den Aufnahmepuffer verschoben hat. Dies ist nützlich für die Berechnung der End-to-End-Datenaufnahme-Latenz.

Die Spalte RECORD_METADATA enthält standardmäßig die folgenden Informationen:

Feld

Datentyp

Beschreibung

topic

String

Der Name des Kafka-Themas, aus dem der Datensatz stammt.

partition

String

Die Nummer der Partition innerhalb des Themas. (Beachten Sie, dass dies die Kafka-Partition ist, nicht die Snowflake-Mikropartition.)

offset

number

Der Offset in dieser Partition.

CreateTime / . LogAppendTime

number

This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: Kafka ProducerRecord documentation.

SnowflakeConnectorPushTime

number

Ein Zeitstempel, wann ein Datensatz in einen Ingest-SDK-Puffer verschoben wurde. Der Wert ist die Anzahl der Millisekunden seit Mitternacht am 1. Januar 1970, UTC. Weitere Informationen finden Sie unter Abschätzen der Datenaufnahme-Latenz.

key

String

If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the key.converter parameter in the Kafka-Konfigurationseigenschaften must be set to org.apache.kafka.connect.storage.StringConverter; otherwise, the connector ignores keys.

headers

Objekt

A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers.

Die Menge der in der Spalte RECORD_METADATA aufgezeichneten Metadaten kann mithilfe der optionalen Kafka-Konfigurationseigenschaften konfiguriert werden.

Bei den Feldnamen und Werten muss die Groß- und Kleinschreibung beachtet werden.

Wie Kafka-Datensätze vor der Erfassung konvertiert werden

Bevor jede Zeile an Snowpipe Streaming übergeben wird, konvertiert der Konnektor den Wert des Kafka Connect-Datensatzes in eine Map<Zeichenfolge, Objekt>, deren Schlüssel mit den Namen der Zielspalten übereinstimmen müssen (oder in einer benutzerdefinierten Pipe umgewandelt werden können). Primitive Zeichenfolgen, Byte-Arrays oder Zahlen müssen umschlossen werden (z. B. durch Verwendung von HoistField SMT), sodass der Konnektor ein strukturiertes Objekt erhält. Der Konverter wendet die folgenden Regeln an:

  • Nullwerte werden als Tombstones behandelt. Sie werden übersprungen, wenn behavior.on.null.values=IGNORE ist, oder ansonsten als leere JSON-Objekte aufgenommen.

  • Numerische und boolesche Felder werden unverändert übergeben. Dezimalwerte, deren Genauigkeit größer als 38 ist, werden als Zeichenfolgen serialisiert, um innerhalb der NUMBER-Grenzen von Snowflake zu bleiben.

  • byte[]- und ByteBuffer-Payloads sind Base64-kodierte Zeichenfolgen; speichern Sie sie daher in den Spalten VARIANT oder VARCHAR.

  • Arrays bleiben Arrays, und verschachtelte Objekte bleiben verschachtelte Zuordnungen. Deklarieren Sie VARIANT-Spalten, wenn Sie sich auf die Standard-Pipe verlassen, um verschachtelte Daten so zu übertragen, wie sie sind.

  • Zuordnungen mit Nicht-Zeichenfolgen-Schlüsseln werden als Arrays von [key, value]-Paaren ausgegeben, da Snowflake-Spaltennamen Text sein müssen.

  • Datensatzheader und -schlüssel werden immer dann in RECORD_METADATA kopiert, wenn die entsprechenden Metadaten-Flags aktiviert sind.

Wenn Sie möchten, dass der gesamte Nachrichtentext als eine einzelne Spalte beibehalten wird, schließen Sie ihn mit SMTs in ein neues Feld der obersten Ebene ein. Informationen zum Transformationsmuster finden Sie unter Ältere RECORD_CONTENT-Spalte.

Benutzerdefinierter Pipe-Modus im Vgl. zum Standard-Pipe-Modus

Der Konnektor unterstützt zwei Modi für die Verwaltung der Datenaufnahme:

Benutzerdefinierter Pipe-Modus

In diesem Modus haben Sie die volle Kontrolle über die Datentransformation und die Spaltenzuordnung.

Wann dieser Modus verwendet werden sollte:

  • Sie benötigen benutzerdefinierte Spaltennamen, die sich von den JSON-Feldnamen unterscheiden.

  • Sie müssen Datentransformationen anwenden (Typumwandlung, Maskierung, Filterung)

  • Sie möchten die volle Kontrolle darüber haben, wie Daten den Spalten zugeordnet werden

Standard-Pipe-Modus

In diesem Modus verwendet der Konnektor eine Standard-Pipe namens {tableName}-STREAMING und ordnet Kafka-Datensatzfelder Tabellenspalten zu, die dem Namen nach übereinstimmen (Groß-/Kleinschreibung wird nicht berücksichtigt).

Wann dieser Modus verwendet werden sollte:

  • Die Namen der Kafka-Datensatzschlüssel entsprechen den gewünschten Spaltennamen

  • Sie benötigen keine benutzerdefinierten Datentransformationen

  • You want a simple configuration

Zuordnung von Kafka-Datensatzschlüsseln zu Tabellenspalten mit Standard-Pipe-Modus

Wenn Sie den Standard-Pipe-Modus verwenden, verwendet der Konnektor eine Standard-Pipe namens {tableName}-STREAMING und ordnet die Schlüssel der ersten Ebene des Inhalts direkt den Tabellenspalten zu, wobei die Groß-/Kleinschreibung nicht berücksichtigt wird.

Verwendung des Standard-Pipe-Modus – Beispiel

Beispiel 1:

Betrachten Sie die folgende Payload des Kafka-Datensatzinhalts:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

Sie erstellen eine Tabelle mit Spalten, die den JSON-Schlüsseln entsprechen (Groß-/Kleinschreibung wird nicht berücksichtigt, einschließlich Sonderzeichen):

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Übereinstimmende Verhaltensweise:

  • "city" (kafka) → city oder CITY oder City (column) – Groß-/Kleinschreibung nicht berücksichtigt

  • "has cat" (kafka) → "has cat" (column) – muss aus Platzgründen in Anführungszeichen gesetzt werden

  • "!@&$#* includes special characters" (kafka) → "!@&$#* includes special characters" (column) – Sonderzeichen werden beibehalten

  • Verschachtelte Objekte wie skills und family werden VARIANT-Spalten automatisch zugeordnet.

Verwenden des benutzerdefinierten Pipe-Modus – Beispiele

Dieses Beispiel zeigt, wie Sie benutzerdefinierte Pipes mit benutzerdefinierten Datentransformationen konfigurieren und verwenden.

Beispiel 1:

Erstellen Sie eine Tabelle mit dem gewünschten Schema:

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

Erstellen Sie eine Pipe, die die eingehenden Kafka-Datensätze so umwandelt, dass sie mit Ihrem Tabellenschema übereinstimmen:

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Beachten Sie, dass der Pipe-Name (ORDERS) mit dem Tabellennamen (ORDERS) übereinstimmt. Die Pipedefinition extrahiert Felder aus der JSON-Payload, wobei die $1:field_name-Syntax verwendet wird, und ordnet sie den Tabellenspalten zu.

Bemerkung

Sie können auf verschachtelte JSON-Felder und Felder mit Sonderzeichen in Klammern zugreifen, z. B. $1['field name'] oder $1['has cat'].

Konfigurieren Sie die Zuordnung von Topics zu Tabellen:

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

Diese Konfiguration ordnet das Kafka-Topic kafka-orders-topic der bereits vorhandenen Tabelle und der Pipe mit dem Namen ORDERS zu.

Beispiel 2:

Wenn Sie im Inhalt auf Schlüssel zugreifen müssen, die keine herkömmlichen Namen haben, verwenden Sie die folgende Syntax:

  • Einfache Felder: $1:field_name

  • Felder mit Leerzeichen oder Sonderzeichen: $1['field name'] oder $1['has cat']

  • Felder mit Unicode-Zeichen: $1[' @&$#* has Łułósżź']

  • Verschachtelte Felder: $1:parent.child oder $1:parent['child field']

Betrachten Sie die JSON-Payload von Kafka:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

Sie erstellen eine Zieltabelle mit den von Ihnen gewählten Spaltennamen:

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Erstellen Sie dann eine Pipe mit demselben Namen, der die Zuordnung definiert:

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wichtigste Punkte:

  • Sie kontrollieren die Spaltennamen (z. B. Umbenennen von "has cat" zu has_cat).

  • Sie können Datentypen nach Bedarf umwandeln (z. B. $1:age::NUMBER).

  • Sie können Felder je nach Bedarf ein- oder ausschließen

  • Sie können Metadatenfelder hinzufügen (z. B. $1:RECORD_METADATA.topic).

  • VARIANT-Spalten verarbeiten verschachtelte JSON-Strukturen automatisch.

Beispiel 3: Mit interaktiven Tabellen

Interaktive Tabellen sind eine spezielle Art von Snowflake-Tabellen, die für Abfragen mit niedriger Latenz bei hoher Parallelität optimiert sind. Mehr über interaktive Tabellen erfahren Sie in der Dokumentation zu interaktiven Tabellen.

  1. Interaktive Tabelle erstellen:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. Konfigurieren Sie die Zuordnung von Topics zu Tabellen:

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

Wichtige Hinweise:

  • Interaktive Tabellen unterliegen bestimmten Einschränkungen und Abfragebeschränkungen. Sehen Sie sich die Dokumentation zu interaktiven Tabellen an, bevor Sie sie mit dem Konnektor verwenden.

  • Bei interaktiven Tabellen müssen alle erforderlichen Transformationen in der Tabellendefinition verarbeitet werden.

  • Interaktive Warehouses sind erforderlich, um interaktive Tabellen effizient abzufragen.

Explizite Zuordnung von Topics zu Tabellen

Wenn Sie den Parameter snowflake.topic2table.map konfigurieren, arbeitet der Konnektor im expliziten Zuordnungsmodus. Dieser Modus bietet Folgendes:

  • Zuordnen mehrerer Kafka-Topics zu einer einzigen Snowflake-Tabelle

  • Verwenden von benutzerdefinierten Tabellennamen, die sich von Topic-Namen unterscheiden

  • Anwenden von Regex-Mustern, um mehrere Topics zuzuordnen

Konfigurationsformat:

Der snowflake.topic2table.map-Parameter akzeptiert eine durch Kommas getrennte Liste von Topics-zu-Topics-Zuordnungen im folgenden Format:

topic1:table1,topic2:table2,topic3:table3
Copy

Beispielkonfigurationen:

Direkte Topic-Zuordnung

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

Regex-Mustererkennung

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

Dieses Konfiguration ordnet alle Topics, die mit _cat enden (wie z. B. orange_cat, calico_cat) der Tabelle CAT_TABLE zu, und alle Topics, die mit _dog enden, werden der Tabelle DOG_TABLE zugeordnet.

Viele Topics zu einer Tabelle

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

Diese Konfiguration ordnet sowohl topic1 als auch topic2 der shared_table zu, während topic3 der other_table zugeordnet wird.

Wichtig

  • Regex-Muster in der Zuordnung können sich nicht überschneiden. Jedes Topics darf höchstens einem Muster entsprechen.

  • Tabellennamen in der Zuordnung müssen gültige Snowflake-Bezeichner mit mindestens 2 Zeichen sein, beginnend mit einem Buchstaben oder einem Unterstrich.

  • Sie können einer einzigen Tabelle mehrere Topics zuordnen.

Ältere RECORD_CONTENT-Spalte

In prior versions of the connector (3.x and earlier), when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in a column of type VARIANT. The RECORD_METADATA column continues to be supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:

Bemerkung

Diese Pipe-Definition funktioniert nicht ohne zusätzliche SMT-Transformationen.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wenn Sie den gesamten Inhalt einer Kafka-Nachricht in einer einzigen Spalte speichern möchten, oder wenn Sie ein Handle für den gesamten Inhalt der Kafka-Nachricht in einer PIPE-Transformation benötigen, können Sie die folgende SMT-Transformation verwenden, die den gesamten Inhalt der Kafka-Nachricht in das gewünschte benutzerdefinierte Feld einschließt:

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

Durch diese Transformation wird der gesamte Inhalt der Kafka-Nachricht in ein benutzerdefiniertes Feld mit dem Namen your_top_level_field_name eingeschlossen. Sie können dann mithilfe der des $1:your_top_level_field_name-Accessors in Ihrer PIPE-Transformation auf den gesamten Inhalt der Kafka-Nachricht zugreifen.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Wenn Sie alternativ die gesamten Metadaten und den Inhalt in einer einzigen Tabelle mit der Standard-Pipe speichern möchten, erstellen Sie keine benutzerdefinierte Pipe. Erstellen Sie stattdessen eine Tabelle mit nur zwei Spalten: RECORD_CONTENT und your_top_level_field_name.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

Weitere Informationen zur HoistField$Value-Transformation finden Sie in der Kafka-Dokumentation.

Warnung

Das Speichern des gesamten Inhalts einer Kafka-Nachricht und der Metadaten in einer Tabelle kann sich negativ auf die Kosten für die Datenaufnahme, die Geschwindigkeit der Pipeline und die Latenz auswirken. Wenn Sie die bestmögliche Performance benötigen, sollten Sie nur die Daten speichern, die Sie benötigen, wenn diese von der obersten Ebene des Kafka-Datensatzinhalts zugänglich sind. Oder verwenden Sie SMT-Transformationen, um die Daten aus tief verschachtelten Feldern in Felder der obersten Ebene zu extrahieren.

Behandlung von Streaming-Kanal-Fehlern und Dead-Letter-Queues

The connector inspects the Snowpipe Streaming channel status before committing offsets in Kafka. If the connector detects that the rowsErrorCount property on channel has increased since the connector was started, it raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues don’t go unnoticed. To allow ingestion to continue while triaging bad rows, set errors.tolerance=all

errors.tolerance=all
Copy

Schemaentwicklung

Bei Tabellen mit ENABLE_SCHEMA_EVOLUTION=TRUE entwickelt der Konnektor automatisch sein Schema auf der Grundlage der eingehenden Kafka-Datensätze weiter. Alle vom Konnektor erstellten Tabellen sind standardmäßig auf ENABLE_SCHEMA_EVOLUTION=TRUE eingestellt.

Die Schemaentwicklung ist auf die folgenden Operationen beschränkt:

  • Hinzufügen neuer Spalten. Der Konnektor fügt der Tabelle neue Spalten hinzu, wenn die eingehenden Kafka-Datensätze neue Felder enthalten, die in der Tabelle nicht vorhanden sind.

  • Löschen der NOT NULL-Einschränkung von Spalten, denen Daten in den eingefügten Datensätzen fehlen

Verwenden des Konnektors mit Apache Iceberg™-Tabellen

Der Konnektor kann Daten in von Snowflake verwaltete aufnehmen Apache Iceberg™-Tabellen, unter folgenden Voraussetzungen:

  • Sie müssen die USAGE-Berechtigung für das externe Volume haben, das mit Ihrer Apache Iceberg™-Tabelle verknüpft ist.

  • Sie müssen eine Apache Iceberg™-Tabelle erstellen, bevor Sie den Konnektor ausführen.

Verwendung auf einem externen Volume gewähren

Um die USAGE-Berechtigung für das externe Volume, das mit Ihrer Apache Iceberg™-Tabelle verknüpft ist, an Ihre Rolle für den Kafka-Konnektor zu erteilen, führen Sie die folgende Anweisung aus:

Wenn Ihre Iceberg-Tabelle beispielsweise das externe Volume kafka_external_volume verwendet und der Konnektor die Rolle kafka_connector_role benutzt, führen Sie die folgende Anweisung aus:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

Erstellen einer Apache Iceberg™-Tabelle für die Datenaufnahme

Der Konnektor erstellt Iceberg-Tabellen nicht automatisch und unterstützt keine Schemaentwicklung. Bevor Sie den Konnektor ausführen, müssen Sie manuell eine Iceberg-Tabelle erstellen.

Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen (wie VARIANT) oder kompatible Snowflake-Typen verwenden.

Betrachten Sie zum Beispiel die folgende Meldung:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "options":
    {
        "can_walk": true,
        "can_talk": false
    },
    "date_added": "2024-10-15"
}
Copy

Um eine Iceberg-Tabelle für die Beispielmeldung zu erstellen, verwenden Sie eine der folgenden Anweisungen:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id number(38,0),
    name varchar,
    body_temperature number(4,2),
    approved_coffee_types array(varchar),
    animals_possessed variant,
    options object(can_walk boolean, can_talk boolean),
    date_added date
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ICEBERG_VERSION = 3;
Copy
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    name string,
    body_temperature float,
    approved_coffee_types array(string),
    animals_possessed variant,
    date_added date,
    options object(can_walk boolean, can_talk boolean),
    )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ICEBERG_VERSION = 3;
Copy

Bemerkung

Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs oder cats wird zwischen Groß- und Kleinschreibung unterschieden.

Nächste Schritte

Richten Sie Aufgaben ein.