CREATE PIPE¶
Erstellt eine neue Pipe im System zum Definieren der COPY INTO <Tabelle>-Anweisung, die von Snowpipe zum Laden von Daten aus einer Erfassungswarteschlange in Tabellen verwendet wird.
- Siehe auch:
Syntax¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
Erforderliche Parameter¶
name
Bezeichner für die Pipe. Dieser muss für das Schema, in dem die Pipe erstellt wird, eindeutig sein.
Der Bezeichner muss mit einem alphabetischen Zeichen beginnen und darf keine Leer- oder Sonderzeichen enthalten, es sei denn, die gesamte Bezeichnerzeichenfolge wird in doppelte Anführungszeichen gesetzt (z. B.
"My object"
). Bei Bezeichnern, die in doppelte Anführungszeichen eingeschlossen sind, ist auch die Groß-/Kleinschreibung zu beachten.Weitere Details dazu finden Sie unter Anforderungen an Bezeichner.
copy_statement
COPY INTO <Tabelle>-Anweisung, mit der Daten aus Warteschlangendateien in eine Snowflake-Tabelle geladen werden. Diese Anweisung dient als Text/Definition für die Pipe und wird in der Ausgabe von SHOW PIPES angezeigt.
Bemerkung
Wir raten derzeit davon ab, die folgenden Funktionen in copy_statement
für Snowpipe zu verwenden:
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
Es ist ein bekanntes Problem, dass die mit diesen Funktionen eingefügten Zeitwerte einige Stunden vor den LOAD_TIME-Werten liegen können, die von der COPY_HISTORY-Funktion oder der COPY_HISTORY-Ansicht zurückgegeben werden.
Es wird empfohlen, stattdessen METADATA$START_SCAN_TIME abzufragen, was eine genauere Darstellung der zu ladenden Menge an Datensätzen bietet.
Optionale Parameter¶
AUTO_INGEST = TRUE | FALSE
Gibt an, ob Datendateien automatisch aus dem angegebenen externen Stagingbereich und optionalen Pfad geladen werden sollen, wenn Ereignisbenachrichtigungen von einem konfigurierten Nachrichtendienst empfangen werden:
TRUE
aktiviert das automatische Laden von Daten.Snowpipe unterstützt das Laden von externen Stagingbereichen (Amazon S3, Google Cloud Storage oder Microsoft Azure).
FALSE
deaktiviert das automatische Laden von Daten. Sie müssen die Snowpipe-REST-API-Endpunkte aufrufen, um Datendateien zu laden.Snowpipe unterstützt das Laden aus internen Stagingbereichen (d. h. aus benannten Snowflake-Stagingbereichen oder Tabellen-Stagingbereichen, aber nicht aus Benutzer-Stagingbereichen) oder aus externen Stagingbereichen (Amazon S3, Google Cloud Storage oder Microsoft Azure).
ERROR_INTEGRATION = 'integration_name'
Nur erforderlich, wenn Snowpipe so konfiguriert ist, dass es Fehlerbenachrichtigungen an einen Cloudmessagingdienst sendet.
Gibt den Namen der Benachrichtigungsintegration an, die für die Kommunikation mit dem Messagingdienst verwendet wird. Weitere Informationen dazu finden Sie unter Snowpipe-Fehlerbenachrichtigungen.
AWS_SNS_TOPIC = 'string'
Nur erforderlich, wenn AUTO_INGEST für Amazon S3-Stagingbereiche mit dem SNS konfiguriert wird.
Gibt den Amazon Resource Name (ARN) für das SNS-Thema Ihres S3-Buckets an. Mit der Anweisung CREATE PIPE wird die Warteschlange von Amazon Simple Queue Service (SQS) für das angegebene SNS-Thema abonniert. Die Pipe kopiert Dateien in die Erfassungswarteschlange, die durch Ereignisbenachrichtigungen über das SNS-Thema ausgelöst wurden. Weitere Informationen dazu finden Sie unter Automatisieren von Snowpipe für Amazon S3.
INTEGRATION = 'string'
Nur erforderlich, wenn AUTO_INGEST für Microsoft Azure- oder Google Cloud Storage-Stagingbereiche konfiguriert wird.
Gibt die vorhandene Benachrichtigungsintegration an, die für den Zugriff auf die Speicherwarteschlange verwendet wird. Weitere Informationen dazu finden Sie unter:
Der Integrationsname muss komplett in Großbuchstaben eingegeben werden.
COMMENT = 'string_literal'
Gibt einen Kommentar für die Pipe an.
Standard: Kein Wert
Nutzungshinweise¶
Dieser SQL-Befehl erfordert die folgenden Mindestberechtigungen:
Berechtigung
Objekt
Anmerkungen
CREATE PIPE
Schema
USAGE
Stagingbereich in der Pipe-Definition
Nur für externe Stagingbereiche
READ
Stagingbereich in der Pipe-Definition
Nur für interne Stagingbereiche
SELECT, INSERT
Tabelle in der Pipe-Definition
SQL-Operationen auf Schemaobjekten erfordern ebenfalls die USAGE-Berechtigung für die Datenbank und das Schema, die das Objekt enthalten.
Alle COPY INTO <Tabelle>-Kopieroptionen werden unterstützt, mit Ausnahme der folgenden:
FILES = ( 'file_name1' [ , 'file_name2', ... ] )
ON_ERROR = ABORT_STATEMENT
SIZE_LIMIT = num
PURGE = TRUE | FALSE
(d. h. automatisches Bereinigen während des Ladens)FORCE = TRUE | FALSE
Beachten Sie, dass Sie Dateien mit dem Befehl REMOVE manuell aus einem internen (d. h. Snowflake) Stagingbereich entfernen können (nachdem sie geladen wurden).
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
Die Kopieroption
PATTERN = 'regex_pattern'
filtert die zu ladenden Dateien mithilfe eines regulären Ausdrucks. Der Mustervergleich verhält sich abhängig vom Parameterwert AUTO_INGEST wie folgt:AUTO_INGEST = TRUE
: Der reguläre Ausdruck filtert die Liste der Dateien im Stagingbereich und dem optionalen Pfad (d. h. den Cloudspeicherort) in der Anweisung COPY INTO <Tabelle>.:AUTO_INGEST = FALSE
: Der reguläre Ausdruck filtert die Liste der Dateien, die bei Aufrufen des Snowpipe-REST-API-insertFiles
-Endpunkts gesendet wurden.
Beachten Sie, dass Snowpipe alle Pfadsegmente in der Stagingbereichsdefinition vom Speicherort abschneidet und den regulären Ausdruck auf alle verbleibenden Pfadsegmente und Dateinamen anwendet. Um die Stagingbereichsdefinition anzuzeigen, führen Sie den Befehl DESCRIBE STAGE für den Stagingbereich aus. Die URL-Eigenschaft besteht aus dem Namen des Buckets oder Containers und keinem, einem oder mehreren Pfadsegmenten. Wenn zum Beispiel der FROM-Speicherplatz in einer COPY INTO <Tabelle>-Anweisung
@s/path1/path2/
ist und der URL-Wert für Stagingbereich@s
s3://mybucket/path1/
ist, dann schneidet Snowpipe/path1/
aus dem Speicherort in der FROM-Klausel ab und wendet den regulären Ausdruck aufpath2/
plus die Dateinamen im Pfad an.Wichtig
Snowflake empfiehlt die Aktivierung der Cloudereignisfilterung für Snowpipe, um Kosten, Ereignisrauschen und Latenz zu reduzieren. Verwenden Sie die Option PATTERN nur dann, wenn das Ereignisfilter-Feature Ihres Cloudanbieters unzureichend ist. Weitere Informationen zum Konfigurieren der Ereignisfilterung für jeden Cloudanbieter finden Sie auf den folgenden Seiten:
Amazon S3: Konfigurieren von Ereignisbenachrichtigungen mithilfe der Filterung von Objektschlüsselnamen
Microsoft Azure Event Grid: Erläuterungen zur Ereignisfilterung für Event Grid-Abonnements
Google Cloud Pub/Sub: Filtern von Meldungen
Die Verwendung einer Abfrage als Quelle für die COPY-Anweisung zum Neuordnen, Entfernen und Umwandeln (d. h. Transformieren von Daten während eines Ladevorgangs) von Spalten wird unterstützt. Anwendungsbeispiele finden Sie unter Transformieren von Daten während eines Ladevorgangs. Beachten Sie, dass nur einfache SELECT-Anweisungen unterstützt werden. Das Filtern mit einer WHERE-Klausel wird nicht unterstützt.
Pipedefinitionen sind nicht dynamisch (d. h. eine Pipe wird nicht automatisch aktualisiert, wenn sich der zugrunde liegende Stagingbereich oder die Tabelle ändert, wie z. B. bei Umbenennung oder Entfernen von Stagingbereich/Tabelle). Stattdessen müssen Sie eine neue Pipe erstellen und diesen Pipenamen in zukünftigen Snowpipe-REST-API-Aufrufen angeben.
Metadaten:
Achtung
Kunden müssen sicherstellen, dass bei der Nutzung des Snowflake-Dienstes keine personenbezogenen Daten (außer für ein Objekt „Benutzer“), sensible Daten, exportkontrollierte Daten oder andere regulierte Daten als Metadaten eingegeben werden. Weitere Informationen dazu finden Sie unter Metadatenfelder in Snowflake.
CREATE OR REPLACE <Objekt>-Anweisungen sind atomar. Das heißt, wenn ein Objekt ersetzt wird, erfolgt das Löschen des alten Objekts und das Erstellen des neuen Objekts in einer einzigen Transaktion.
Wichtig
Wenn Sie eine Pipe neu erstellen (unter Verwendung der CREATE OR REPLACE PIPE-Syntax), finden Sie unter Neuerstellen von Pipes entsprechende Hinweise und bewährte Methoden.
Beispiele¶
Erstellen Sie eine Pipe im aktuellen Schema, die alle Daten aus Dateien lädt, die aus dem mystage
-Stagingbereich in mytable
bereitgestellt werden:
create pipe mypipe as copy into mytable from @mystage;
Wie im vorherigen Beispiel, jedoch mit einer Datentransformation. Laden Sie nur Daten aus der 4. und 5. Spalte in den bereitgestellten Dateien, in umgekehrter Reihenfolge:
create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);
Erstellen Sie im aktuellen Schema eine Pipe zum automatischen Laden von Daten mithilfe von Ereignisbenachrichtigungen, die von einem Nachrichtendienst empfangen wurden:
Amazon S3
create pipe mypipe_s3 auto_ingest = true aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');Google Cloud Storage
create pipe mypipe_gcs auto_ingest = true integration = 'MYINT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');Microsoft Azure
create pipe mypipe_azure auto_ingest = true integration = 'MYINT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');