CREATE PIPE

Erstellt eine neue Pipe im System zum Definieren der COPY INTO <Tabelle>-Anweisung, die von Snowpipe zum Laden von Daten aus einer Erfassungswarteschlange in Tabellen verwendet wird.

Siehe auch:

ALTER PIPE, DROP PIPE, SHOW PIPES, DESCRIBE PIPE

Syntax

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ ERROR_INTEGRATION = <integration_name> ]
  [ AWS_SNS_TOPIC = '<string>' ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>
Copy

Erforderliche Parameter

name

Bezeichner für die Pipe. Dieser muss für das Schema, in dem die Pipe erstellt wird, eindeutig sein.

Der Bezeichner muss mit einem alphabetischen Zeichen beginnen und darf keine Leer- oder Sonderzeichen enthalten, es sei denn, die gesamte Bezeichnerzeichenfolge wird in doppelte Anführungszeichen gesetzt (z. B. "My object"). Bei Bezeichnern, die in doppelte Anführungszeichen eingeschlossen sind, ist auch die Groß-/Kleinschreibung zu beachten.

Weitere Details dazu finden Sie unter Anforderungen an Bezeichner.

copy_statement

COPY INTO <Tabelle>-Anweisung, mit der Daten aus Warteschlangendateien in eine Snowflake-Tabelle geladen werden. Diese Anweisung dient als Text/Definition für die Pipe und wird in der Ausgabe von SHOW PIPES angezeigt.

Bemerkung

Wir raten derzeit davon ab, die folgenden Funktionen in copy_statement für Snowpipe zu verwenden:

  • CURRENT_DATE

  • CURRENT_TIME

  • CURRENT_TIMESTAMP

  • GETDATE

  • LOCALTIME

  • LOCALTIMESTAMP

  • SYSDATE

  • SYSTIMESTAMP

Es ist ein bekanntes Problem, dass die mit diesen Funktionen eingefügten Zeitwerte einige Stunden vor den LOAD_TIME-Werten liegen können, die von der COPY_HISTORY-Funktion oder der COPY_HISTORY-Ansicht zurückgegeben werden.

Es wird empfohlen, stattdessen METADATA$START_SCAN_TIME abzufragen, was eine genauere Darstellung der zu ladenden Menge an Datensätzen bietet.

Optionale Parameter

AUTO_INGEST = TRUE | FALSE

Gibt an, ob Datendateien automatisch aus dem angegebenen externen Stagingbereich und optionalen Pfad geladen werden sollen, wenn Ereignisbenachrichtigungen von einem konfigurierten Nachrichtendienst empfangen werden:

  • TRUE aktiviert das automatische Laden von Daten.

    Snowpipe unterstützt das Laden von externen Stagingbereichen (Amazon S3, Google Cloud Storage oder Microsoft Azure).

  • FALSE deaktiviert das automatische Laden von Daten. Sie müssen die Snowpipe-REST-API-Endpunkte aufrufen, um Datendateien zu laden.

    Snowpipe unterstützt das Laden aus internen Stagingbereichen (d. h. aus benannten Snowflake-Stagingbereichen oder Tabellen-Stagingbereichen, aber nicht aus Benutzer-Stagingbereichen) oder aus externen Stagingbereichen (Amazon S3, Google Cloud Storage oder Microsoft Azure).

ERROR_INTEGRATION = 'integration_name'

Nur erforderlich, wenn Snowpipe so konfiguriert ist, dass es Fehlerbenachrichtigungen an einen Cloudmessagingdienst sendet.

Gibt den Namen der Benachrichtigungsintegration an, die für die Kommunikation mit dem Messagingdienst verwendet wird. Weitere Informationen dazu finden Sie unter Snowpipe-Fehlerbenachrichtigungen.

AWS_SNS_TOPIC = 'string'

Nur erforderlich, wenn AUTO_INGEST für Amazon S3-Stagingbereiche mit dem SNS konfiguriert wird.

Gibt den Amazon Resource Name (ARN) für das SNS-Thema Ihres S3-Buckets an. Mit der Anweisung CREATE PIPE wird die Warteschlange von Amazon Simple Queue Service (SQS) für das angegebene SNS-Thema abonniert. Die Pipe kopiert Dateien in die Erfassungswarteschlange, die durch Ereignisbenachrichtigungen über das SNS-Thema ausgelöst wurden. Weitere Informationen dazu finden Sie unter Automatisieren von Snowpipe für Amazon S3.

INTEGRATION = 'string'

Nur erforderlich, wenn AUTO_INGEST für Microsoft Azure- oder Google Cloud Storage-Stagingbereiche konfiguriert wird.

Gibt die vorhandene Benachrichtigungsintegration an, die für den Zugriff auf die Speicherwarteschlange verwendet wird. Weitere Informationen dazu finden Sie unter:

Der Integrationsname muss komplett in Großbuchstaben eingegeben werden.

COMMENT = 'string_literal'

Gibt einen Kommentar für die Pipe an.

Standard: Kein Wert

Nutzungshinweise

  • Dieser SQL-Befehl erfordert die folgenden Mindestberechtigungen:

    Berechtigung

    Objekt

    Anmerkungen

    CREATE PIPE

    Schema

    USAGE

    Stagingbereich in der Pipe-Definition

    Nur für externe Stagingbereiche

    READ

    Stagingbereich in der Pipe-Definition

    Nur für interne Stagingbereiche

    SELECT, INSERT

    Tabelle in der Pipe-Definition

    SQL-Operationen auf Schemaobjekten erfordern ebenfalls die USAGE-Berechtigung für die Datenbank und das Schema, die das Objekt enthalten.

  • Alle COPY INTO <Tabelle>-Kopieroptionen werden unterstützt, mit Ausnahme der folgenden:

    • FILES = ( 'file_name1' [ , 'file_name2', ... ] )

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = num

    • PURGE = TRUE | FALSE (d. h. automatisches Bereinigen während des Ladens)

    • FORCE = TRUE | FALSE

      Beachten Sie, dass Sie Dateien mit dem Befehl REMOVE manuell aus einem internen (d. h. Snowflake) Stagingbereich entfernen können (nachdem sie geladen wurden).

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • Die Kopieroption PATTERN = 'regex_pattern' filtert die zu ladenden Dateien mithilfe eines regulären Ausdrucks. Der Mustervergleich verhält sich abhängig vom Parameterwert AUTO_INGEST wie folgt:

    • AUTO_INGEST = TRUE: Der reguläre Ausdruck filtert die Liste der Dateien im Stagingbereich und dem optionalen Pfad (d. h. den Cloudspeicherort) in der Anweisung COPY INTO <Tabelle>.

    • :AUTO_INGEST = FALSE: Der reguläre Ausdruck filtert die Liste der Dateien, die bei Aufrufen des Snowpipe-REST-API-insertFiles-Endpunkts gesendet wurden.

    Beachten Sie, dass Snowpipe alle Pfadsegmente in der Stagingbereichsdefinition vom Speicherort abschneidet und den regulären Ausdruck auf alle verbleibenden Pfadsegmente und Dateinamen anwendet. Um die Stagingbereichsdefinition anzuzeigen, führen Sie den Befehl DESCRIBE STAGE für den Stagingbereich aus. Die URL-Eigenschaft besteht aus dem Namen des Buckets oder Containers und keinem, einem oder mehreren Pfadsegmenten. Wenn zum Beispiel der FROM-Speicherplatz in einer COPY INTO <Tabelle>-Anweisung @s/path1/path2/ ist und der URL-Wert für Stagingbereich @s s3://mybucket/path1/ ist, dann schneidet Snowpipe /path1/ aus dem Speicherort in der FROM-Klausel ab und wendet den regulären Ausdruck auf path2/ plus die Dateinamen im Pfad an.

    Wichtig

    Snowflake empfiehlt die Aktivierung der Cloudereignisfilterung für Snowpipe, um Kosten, Ereignisrauschen und Latenz zu reduzieren. Verwenden Sie die Option PATTERN nur dann, wenn das Ereignisfilter-Feature Ihres Cloudanbieters unzureichend ist. Weitere Informationen zum Konfigurieren der Ereignisfilterung für jeden Cloudanbieter finden Sie auf den folgenden Seiten:

  • Die Verwendung einer Abfrage als Quelle für die COPY-Anweisung zum Neuordnen, Entfernen und Umwandeln (d. h. Transformieren von Daten während eines Ladevorgangs) von Spalten wird unterstützt. Anwendungsbeispiele finden Sie unter Transformieren von Daten während eines Ladevorgangs. Beachten Sie, dass nur einfache SELECT-Anweisungen unterstützt werden. Das Filtern mit einer WHERE-Klausel wird nicht unterstützt.

  • Pipedefinitionen sind nicht dynamisch (d. h. eine Pipe wird nicht automatisch aktualisiert, wenn sich der zugrunde liegende Stagingbereich oder die Tabelle ändert, wie z. B. bei Umbenennung oder Entfernen von Stagingbereich/Tabelle). Stattdessen müssen Sie eine neue Pipe erstellen und diesen Pipenamen in zukünftigen Snowpipe-REST-API-Aufrufen angeben.

  • Metadaten:

    Achtung

    Kunden müssen sicherstellen, dass bei der Nutzung des Snowflake-Dienstes keine personenbezogenen Daten (außer für ein Objekt „Benutzer“), sensible Daten, exportkontrollierte Daten oder andere regulierte Daten als Metadaten eingegeben werden. Weitere Informationen dazu finden Sie unter Metadatenfelder in Snowflake.

  • CREATE OR REPLACE <Objekt>-Anweisungen sind atomar. Das heißt, wenn ein Objekt ersetzt wird, erfolgt das Löschen des alten Objekts und das Erstellen des neuen Objekts in einer einzigen Transaktion.

Wichtig

Wenn Sie eine Pipe neu erstellen (unter Verwendung der CREATE OR REPLACE PIPE-Syntax), finden Sie unter Neuerstellen von Pipes entsprechende Hinweise und bewährte Methoden.

Beispiele

Erstellen Sie eine Pipe im aktuellen Schema, die alle Daten aus Dateien lädt, die aus dem mystage-Stagingbereich in mytable bereitgestellt werden:

create pipe mypipe as copy into mytable from @mystage;
Copy

Wie im vorherigen Beispiel, jedoch mit einer Datentransformation. Laden Sie nur Daten aus der 4. und 5. Spalte in den bereitgestellten Dateien, in umgekehrter Reihenfolge:

create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);
Copy

Erstellen Sie im aktuellen Schema eine Pipe zum automatischen Laden von Daten mithilfe von Ereignisbenachrichtigungen, die von einem Nachrichtendienst empfangen wurden:

Amazon S3

create pipe mypipe_s3
  auto_ingest = true
  aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

Google Cloud Storage

create pipe mypipe_gcs
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

Microsoft Azure

create pipe mypipe_azure
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy