Kategorien:

DDL zum Laden/Entladen von Daten

CREATE PIPE

Erstellt eine neue Pipe im System zum Definieren der COPY INTO <Tabelle>-Anweisung, die von Snowpipe zum Laden von Daten aus einer Erfassungswarteschlange in Tabellen verwendet wird.

Siehe auch:

ALTER PIPE, DESCRIBE PIPE, DROP PIPE, SHOW PIPES

Syntax

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ AWS_SNS_TOPIC = <string> ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>

Erforderliche Parameter

Name

Bezeichner für die Pipe. Dieser muss für das Schema, in dem die Pipe erstellt wird, eindeutig sein.

Der Bezeichner muss mit einem alphabetischen Zeichen beginnen und darf keine Leer- oder Sonderzeichen enthalten, es sei denn, die gesamte Bezeichnerzeichenfolge wird in doppelte Anführungszeichen gesetzt (z. B. "My object"). Bei Bezeichnern, die in doppelte Anführungszeichen eingeschlossen sind, ist auch die Groß-/Kleinschreibung zu beachten.

Weitere Details dazu finden Sie unter Anforderungen an Bezeichner.

Copy-Anweisung

COPY INTO <Tabelle>-Anweisung, mit der Daten aus Warteschlangendateien in eine Snowflake-Tabelle geladen werden. Diese Anweisung dient als Text/Definition für die Pipe und wird in der Ausgabe SHOW PIPES angezeigt.

Optionale Parameter

AUTO_INGEST = TRUE | FALSE

Gibt an, ob Datendateien automatisch aus dem angegebenen externen Stagingbereich und optionalen Pfad geladen werden sollen, wenn Ereignisbenachrichtigungen von einem konfigurierten Nachrichtendienst empfangen werden.

  • TRUE aktiviert das automatische Laden von Daten.

    Snowpipe unterstützt das Laden von externen Stagingbereichen (Amazon S3, Google Cloud Storage oder Microsoft Azure).

  • FALSE deaktiviert das automatische Laden von Daten. Sie müssen die Snowpipe-REST-API-Endpunkte aufrufen, um Datendateien zu laden.

    Snowpipe unterstützt das Laden aus internen Stagingbereichen (d. h. aus benannten Snowflake-Stagingbereichen oder Tabellen-Stagingbereichen, aber nicht aus Benutzer-Stagingbereichen) oder aus externen Stagingbereichen (Amazon S3, Google Cloud Storage oder Microsoft Azure).

AWS_SNS_TOPIC = Zeichenfolge

Nur erforderlich, wenn AUTO_INGEST für Amazon S3-Stagingbereiche mit dem Amazon Simple Notification Service (SNS) konfiguriert wird. Gibt den Amazon-Ressourcennamen (ARN) für das SNS-Thema für Ihren S3-Bucket an. Mit der Anweisung CREATE PIPE wird die Warteschlange von Amazon Simple Queue Service (SQS) für das angegebene SNS-Thema abonniert. Die Pipe kopiert Dateien in die Erfassungswarteschlange, die durch Ereignisbenachrichtigungen über das SNS-Thema ausgelöst wurden. Weitere Informationen dazu finden Sie unter Automatisieren von Snowpipe für Amazon S3.

INTEGRATION = 'Zeichenfolge'

Nur erforderlich, wenn AUTO_INGEST für Microsoft Azure- oder Google Cloud Storage-Stagingbereiche konfiguriert wird. Gibt die vorhandene Benachrichtigungsintegration an, die für den Zugriff auf die Speicherwarteschlange verwendet wird. Weitere Informationen dazu finden Sie unter:

Der Integrationsname muss komplett in Großbuchstaben eingegeben werden.

COMMENT = 'Zeichenfolgenliteral'

Gibt einen Kommentar für die Pipe an.

Standard: Kein Wert

Nutzungshinweise

  • Alle COPY INTO <Tabelle>-Kopieroptionen werden unterstützt, mit Ausnahme der folgenden:

    • FILES = ( 'Dateiname1' [ , 'Dateiname2', ... ] )

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = Zahl

    • PURGE = TRUE | FALSE (d. h. automatisches Bereinigen während des Ladevorgangs)

    • MATCH_BY_COLUMN_NAME = CASE_SENSITIVE | CASE_INSENSITIVE | NONE

    • FORCE = TRUE | FALSE

      Beachten Sie, dass Sie Dateien mit dem Befehl REMOVE manuell aus einem internen (d. h. Snowflake) Stagingbereich entfernen können (nachdem sie geladen wurden).

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • Die Kopieroption PATTERN = 'Muster_für_regAusdruck' wird als Vorschau-Funktion unterstützt. Die Kopieroption filtert die zu ladenden Dateien mithilfe eines regulären Ausdrucks. Der Mustervergleich verhält sich abhängig vom Parameterwert AUTO_INGEST wie folgt:

    • AUTO_INGEST = TRUE: Der reguläre Ausdruck filtert die Liste der Dateien im Stagingbereich und dem optionalen Pfad (d. h. den Cloudspeicherort) in der Anweisung COPY INTO <Tabelle>.

    • :AUTO_INGEST = FALSE: Der reguläre Ausdruck filtert die Liste der Dateien, die bei Aufrufen des Snowpipe-REST-API-insertFiles-Endpunkts gesendet wurden.

  • Die Verwendung einer Abfrage als Quelle für die COPY-Anweisung (d. h. die Transformation von Daten während eines Ladevorgangs) wird unterstützt. Anwendungsbeispiele finden Sie unter Transformieren von Daten während eines Ladevorgangs. Beachten Sie, dass nur einfache SELECT-Anweisungen unterstützt werden. Das Filtern mit einer WHERE-Klausel wird nicht unterstützt.

  • Pipedefinitionen sind nicht dynamisch (d. h. eine Pipe wird nicht automatisch aktualisiert, wenn sich der zugrunde liegende Stagingbereich oder die Tabelle ändert, wie z. B. bei Umbenennung oder Entfernen von Stagingbereich/Tabelle). Stattdessen müssen Sie eine neue Pipe erstellen und diesen Pipenamen in zukünftigen Snowpipe-REST-API-Aufrufen angeben.

Wichtig

Wenn Sie eine Pipe neu erstellen, die das Laden von Daten mithilfe von Ereignisbenachrichtigungen automatisiert, empfehlen wir Ihnen, die folgenden Schritte auszuführen:

  1. Halten Sie die Pipe an (mit ALTER PIPE … SET PIPE_EXECUTION_PAUSED = true). Warten Sie, bis alle derzeit in der Warteschlange befindlichen Dateien in die Zieltabelle geladen wurden.

  2. Fragen Sie die Funktion SYSTEM$PIPE_STATUS ab, und vergewissern Sie sich, dass der Ausführungsstatus der Pipe PAUSED und die Anzahl der ausstehenden Dateien 0 ist.

  3. Erstellen Sie die Pipe neu (mithilfe der Syntax CREATE OR REPLACE PIPE).

  4. Halten Sie die Pipe erneut an.

  5. Überprüfen Sie die Konfigurationsschritte für Ihren Cloudmessagingdienst, um sicherzustellen, dass die Einstellungen weiterhin korrekt sind:

  6. Setzen Sie die Pipe fort (mit ALTER PIPE … SET PIPE_EXECUTION_PAUSED = false).

  7. Fragen Sie die Funktion SYSTEM$PIPE_STATUS erneut ab, und vergewissern Sie sich, dass der Ausführungsstatus der Pipe RUNNING ist.

Beispiele

Erstellen Sie eine Pipe im aktuellen Schema, die alle Daten aus Dateien lädt, die aus dem mystage-Stagingbereich in mytable bereitgestellt werden:

create pipe mypipe as copy into mytable from @mystage;

Wie im vorherigen Beispiel, jedoch mit einer Datentransformation. Laden Sie nur Daten aus der 4. und 5. Spalte in den bereitgestellten Dateien, in umgekehrter Reihenfolge:

create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);

Erstellen Sie im aktuellen Schema eine Pipe zum automatischen Laden von Daten mithilfe von Ereignisbenachrichtigungen, die von einem Nachrichtendienst empfangen wurden:

Amazon S3

create pipe mypipe_s3
  auto_ingest = true
  aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Google Cloud Storage

create pipe mypipe_gcs
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Microsoft Azure

create pipe mypipe_azure
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');