Automatisieren von Snowpipe für Microsoft Azure Blob-Speicher

Unter diesem Thema finden Sie Anweisungen zum automatischen Auslösen des Ladens von Snowpipe-Daten mithilfe von Microsoft Azure Event Grid-Meldungen für Blob-Speicherereignisse. In den Anweisungen wird erläutert, wie Sie eine Ereignismeldung für den Zielpfad im Blob-Speicher erstellen, in dem Ihre Datendateien gespeichert sind.

Bemerkung

Diese Funktion ist auf Snowflake-Konten beschränkt, die auf Amazon Web Services oder Microsoft Azure gehostet werden. Die Anweisungen zum Automatisieren des Ladens von Snowpipe-Daten mithilfe von Event Grid-Meldungen sind für Konten auf beiden Cloudhosting-Plattformen identisch.

Die Unterstützung für die Automatisierung des Ladens von Snowpipe-Daten aus Azure in Snowflake-Konten, die auf AWS gehostet werden, wird als Vorschau-Funktion bereitgestellt.

Snowflake unterstützt derzeit nur Blob-Speicher. Snowflake unterstützt die folgenden Typen von Speicherkonten:

  • Blob-Speicher

  • Data Lake Storage Gen2 – Unterstützt als Vorschaufunktion.

  • General Purpose v2

Beachten Sie, dass nur Microsoft.Storage.BlobCreated-Ereignisse zum Laden von Dateien mit Snowpipe auslösen. Nur das Hinzufügen neuer Objekte zum Blob-Speicher löst diese Ereignisse aus. Das Umbenennen eines Verzeichnisses oder Objekts löst diese Ereignisse nicht aus.

Unter diesem Thema:

Prozessablauf

Microsoft Azure Event Grid-Benachrichtigungen für einen Azure-Container lösen Snowpipe-Datenladevorgänge automatisch aus.

Die folgende Abbildung veranschaulicht den Prozessablauf der automatischen Erfassung von Snowpipe:

Snowpipe Auto-ingest Process Flow
  1. Datendateien werden in einen Stagingbereich geladen.

  2. Eine Blob-Speicherereignismeldung informiert Snowpipe über Event Grid, dass Dateien ladebereit sind. Snowpipe kopiert die Dateien in eine Warteschlange.

  3. Ein von Snowflake bereitgestelltes virtuelles Warehouse lädt Daten aus den Warteschlangendateien in die Zieltabelle, basierend auf den in der angegebenen Pipe definierten Parametern.

Anweisungen dazu finden Sie unter Automatisieren von Snowpipe für Microsoft Azure Blob-Speicher.

Konfigurieren des sicheren Zugriffs auf Cloudspeicher

Bemerkung

Wenn Sie bereits den sicheren Zugriff auf den Azure Blob-Speichercontainer konfiguriert haben, in dem Ihre Datendateien gespeichert sind, können Sie diesen Abschnitt überspringen.

In diesem Abschnitt wird beschrieben, wie Sie ein Speicherintegrationsobjekt konfigurieren, um die Authentifizierungsverantwortung für den Cloudspeicher an eine Snowflake-Entität für die Identitäts- und Zugriffsverwaltung (IAM) zu delegieren.

Bemerkung

Diese Option wird dringend empfohlen, um beim Zugriff auf Cloudspeicher keine IAM-Anmeldeinformationen angeben zu müssen. Weitere Speicherzugriffsoptionen finden Sie unter Konfigurieren eines Azure-Containers zum Laden von Daten.

In diesem Abschnitt wird beschrieben, wie Sie mit Speicherintegrationen dafür sorgen können, dass Snowflake Daten in einem Azure-Container lesen und in einen Azure-Container schreiben kann, auf den in einem externen (Azure) Stagingbereich verwiesen wird. Integrationen sind benannte First-Class-Snowflake-Objekte, bei denen keine expliziten Cloudanbieter-Anmeldeinformationen wie geheime Schlüssel oder Zugriffstoken übergeben werden müssen. Integrationsobjekte speichern die Benutzer-ID aus der Identitäts- und Zugriffsverwaltung (IAM) von Azure. Diese wird als App-Registrierung bezeichnet. Ein Administrator in Ihrer Organisation erteilt dieser App die erforderlichen Berechtigungen für das Azure-Konto.

Eine Integration muss auch Container (und optionale Pfade) angeben, um die Speicherorte einzuschränken, die Benutzer beim Erstellen externer Stagingbereiche zur Verwendung mit der Integration angeben können.

Bemerkung

Zum Ausführen der Anweisungen in diesem Abschnitt sind Berechtigungen zum Verwalten von Speicherkonten in Azure erforderlich. Wenn Sie kein Azure-Administrator sind, bitten Sie Ihren Azure-Administrator, diese Aufgaben auszuführen.

Unter diesem Thema:

Schritt 1: Cloud Storage-Integration in Snowflake erstellen

Erstellen Sie mit dem Befehl CREATE STORAGE INTEGRATION eine Speicherintegration. Eine Speicherintegration ist ein Snowflake-Objekt, in dem ein für Ihren Azure-Cloudspeicher generierter Dienstprinzipal zusammen mit einem optionalen Satz zulässiger oder blockierter Speicherorte (d. h. Container) gespeichert wird. Cloudanbieter-Administratoren Ihrer Organisation erteilen dem generierten Dienstprinzipal Berechtigungen für die Speicherorte. Dank dieser Option müssen Benutzer beim Erstellen von Stagingbereichen oder beim Laden von Daten keine Anmeldeinformationen eingeben.

Eine einzelne Speicherintegration kann mehrere externe (d. h. Azure) Stagingbereiche unterstützen. Die URL in der Stagingbereichsdefinition muss mit den für den Parameter STORAGE_ALLOWED_LOCATIONS angegebenen Azure-Containern (und optionalen Pfaden) übereinstimmen.

Bemerkung

Dieser SQL-Befehl kann nur von Kontoadministratoren (Benutzer mit der Rolle ACCOUNTADMIN) oder von Rollen mit der globalen Berechtigung CREATE INTEGRATION ausgeführt werden.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = AZURE
  ENABLED = TRUE
  AZURE_TENANT_ID = '<tenant_id>'
  STORAGE_ALLOWED_LOCATIONS = ('azure://<account>.blob.core.windows.net/<container>/<path>/', 'azure://<account>.blob.core.windows.net/<container>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('azure://<account>.blob.core.windows.net/<container>/<path>/', 'azure://<account>.blob.core.windows.net/<container>/<path>/') ]

Wobei:

  • Integrationsname ist der Name der neuen Integration.

  • Mandanten-ID ist die ID Ihres Office 365-Mandanten, zu dem die zulässigen und gesperrten Speicherkonten gehören. Die Authentifizierung einer Speicherintegration kann nur für einen einzigen Mandanten erfolgen. Daher müssen sich die zulässigen und blockierten Speicherorte auf Speicherkonten beziehen, die alle diesem einen Mandanten gehören.

    Melden Sie sich beim Azure-Portal an, und klicken Sie auf Azure Active Directory » Properties, um Ihre Mandanten-ID zu ermitteln. Die Mandanten-ID wird im Feld Directory ID angezeigt.

  • Container ist der Name des Azure-Containers, in dem Ihre Datendateien gespeichert sind (z. B. mycontainer). Die Parameter STORAGE_ALLOWED_LOCATIONS und STORAGE_BLOCKED_LOCATIONS beschränken bzw. blockieren den Zugriff auf diese Container, wenn Stagingbereiche, die auf diese Integration verweisen, erstellt oder geändert werden.

  • Pfad ist ein optionaler Pfad, mit dem Sie logische Verzeichnisse im Container genauer steuern können.

Im folgenden Beispiel wird eine Integration erstellt, die die externen Stagingbereiche, die die Integration verwenden, explizit darauf beschränkt, auf zwei Container oder Pfade zu verweisen: In einem späteren Schritt werden wir einen externen Stagingbereich erstellen, der auf einen dieser Container und Pfade verweist. Es können auch mehrere externe Stagingbereiche, die diese Integration verwenden, auf die zulässigen Container und Pfade verweisen:

CREATE STORAGE INTEGRATION azure_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = AZURE
  ENABLED = TRUE
  AZURE_TENANT_ID = 'a123b4c5-1234-123a-a12b-1a23b45678c9'
  STORAGE_ALLOWED_LOCATIONS = ('azure://myaccount.blob.core.windows.net/mycontainer1/mypath1/', 'azure://myaccount.blob.core.windows.net/mycontainer2/mypath2/')
  STORAGE_BLOCKED_LOCATIONS = ('azure://myaccount.blob.core.windows.net/mycontainer1/mypath1/sensitivedata/', 'azure://myaccount.blob.core.windows.net/mycontainer2/mypath2/sensitivedata/');

Schritt 2: Snowflake Zugriff auf die Speicherorte gewähren

  1. Führen Sie den Befehl DESCRIBE INTEGRATION aus, um die Zustimmungs-URL abzurufen:

    DESC STORAGE INTEGRATION <integration_name>;
    

    Wobei:

Beachten Sie die URL in der Spalte AZURE_CONSENT_URL, die das folgende Format aufweist:

https://login.microsoftonline.com/<tenant_id>/oauth2/authorize?client_id=<snowflake_application_id>

Notieren Sie sich auch den Wert in der Spalte AZURE_MULTI_TENANT_APP_NAME. Dies ist der Name der Snowflake-Clientanwendung, die für Ihr Konto erstellt wurde. Später in dieser Anleitung müssen Sie dieser Anwendung die erforderlichen Berechtigungen erteilen, um ein Zugriffstoken für die zulässigen Speicherorte zu erhalten.

  1. Navigieren Sie in einem Webbrowser in der Spalte AZURE_CONSENT_URL URL zu der URL. Es wird eine Microsoft-Berechtigungsanforderungsseite angezeigt.

  2. Klicken Sie auf die Schaltfläche Accept. Auf diese Weise kann der Azure-Dienstprinzipal, der für Ihr Snowflake-Konto erstellt wurde, ein Zugriffstoken für eine beliebige Ressource in Ihrem Mandantenbereich erhalten. Das Abrufen eines Zugriffstokens ist nur erfolgreich, wenn Sie dem Dienstprinzipal die entsprechenden Berechtigungen für den Container erteilen (siehe nächster Schritt).

  3. Melden Sie sich beim Microsoft Azure-Portal an.

  4. Navigieren Sie zu Azure Services » Storage Accounts. Klicken Sie auf den Namen des Speicherkontos, auf das Sie dem Snowflake-Dienstprinzipal Zugriff gewähren.

  5. Klicken Sie auf Access Control (IAM) » Add role assignment.

  6. Wählen Sie die gewünschte Rolle aus, die dem Snowflake-Dienstprinzipal gewährt werden soll:

    • Storage Blob Data Reader gewährt nur Lesezugriff. Dies ermöglicht das Laden von Daten aus Stagingdateien, die im Speicherkonto bereitgestellt wurden.

    • Storage Blob Data Contributor gewährt Lese- und Schreibzugriff. Dies ermöglicht das Laden oder Entladen von Daten in/aus Dateien, die im Speicherkonto bereitgestellt sind.

  7. Search for the Snowflake service principal. This is the identity in the AZURE_MULTI_TENANT_APP_NAME property in the DESC STORAGE INTEGRATION output (in Step 1). Search for the string before the underscore in the AZURE_MULTI_TENANT_APP_NAME property.

    Wichtig

    • Es kann eine Stunde oder länger dauern, bis Azure den über die oben genannte Microsoft-Anforderungsseite angeforderten Snowflake-Dienstprinzipal erstellt hat. Wenn der Dienstprinzipal nicht sofort verfügbar ist, empfehlen wir, ein bis zwei Stunden zu warten und ihn dann erneut zu suchen.

    • Wenn Sie den Dienstprinzipal löschen, funktioniert die Speicherintegration nicht mehr.

  8. Klicken Sie auf die Schaltfläche Save.

    Bemerkung

    Gemäß der Microsoft Azure-Dokumentation kann die Weitergabe von Rollenzuweisungen bis zu fünf Minuten dauern.

Konfigurieren von automatisiertem Snowpipe unter Verwendung von Azure Event Grid

Bemerkung

Die Anweisungen unter diesem Thema setzen voraus, dass in der Snowflake-Datenbank, in die Ihre Daten geladen werden, bereits eine Zieltabelle vorhanden ist.

Schritt 1: Event Grid-Abonnement konfigurieren

In diesem Abschnitt wird beschrieben, wie Sie unter Verwendung der Azure-CLI ein Event Grid-Abonnement für Azure Storage-Ereignisse einrichten. Weitere Informationen zu den in diesem Abschnitt beschriebenen Schritten finden Sie in den folgenden Artikeln der Azure-Dokumentation:

Erstellen einer Ressourcengruppe

Ein Event Grid-Thema liefert einen Endpunkt, an den die Quelle (d. h. Azure-Speicher) Ereignisse sendet. Ein Thema wird für eine Sammlung verwandter Ereignisse verwendet. Event Grid-Themen sind Azure-Ressourcen und müssen in einer Azure-Ressourcengruppe platziert werden.

Führen Sie den folgenden Befehl aus, um eine Ressourcengruppe zu erstellen:

az group create --name <resource_group_name> --location <location>

Wobei:

  • Ressourcengruppenname ist der Name der neuen Ressourcengruppe.

  • Speicherort ist in Snowflake-Terminologie der Ort oder die Region Ihres Azure-Speicherkontos.

Aktivieren des Event Grid-Ressourcenanbieters

Führen Sie den folgenden Befehl aus, um den Event Grid-Ressourcenanbieter zu registrieren. Beachten Sie, dass dieser Schritt nur erforderlich ist, wenn Sie Event Grid mit Ihrem Azure-Konto zuvor noch nicht verwendet haben:

az provider register --namespace Microsoft.EventGrid
az provider show --namespace Microsoft.EventGrid --query "registrationState"

Erstellen eines Speicherkontos für Datendateien

Führen Sie den folgenden Befehl aus, um ein Speicherkonto zum Speichern Ihrer Datendateien zu erstellen. Dieses Konto muss eines der folgenden sein:

  • Blob-Speicher

  • Data Lake Storage Gen2 – Unterstützt als Vorschaufunktion.

  • General Purpose v2

Nur diese Kontotypen unterstützen Ereignismeldungen.

Bemerkung

Wenn Sie bereits über einen dieser Kontotypen verfügen, können Sie eines dieser Konten verwenden.

Erstellen Sie beispielsweise ein Blob-Speicherkonto:

az storage account create --resource-group <resource_group_name> --name <storage_account_name> --sku Standard_LRS --location <location> --access-tier Hot

Wobei:

  • Ressourcengruppenname ist der Name der Ressourcengruppe, die Sie unter Erstellen einer Ressourcengruppe erstellt haben.

  • Speicherkontoname ist der Name des neuen Speicherkontos.

  • Speicherort ist der Speicherort Ihres Azure-Speicherkontos.

Erstellen eines Speicherkontos für die Speicherwarteschlange

Führen Sie den folgenden Befehl aus, um ein Speicherkonto zum Hosten Ihrer Speicherwarteschlange zu erstellen. Dieses Konto muss ein GPv2-Konto sein, da nur diese Art von Konto Ereignismeldungen an eine Speicherwarteschlange unterstützt.

Bemerkung

Wenn Sie bereits über ein GPv2-Konto verfügen, können Sie dieses Konto zum Hosten Ihrer Datendateien und Ihrer Speicherwarteschlange verwenden.

Erstellen Sie zum Beispiel ein GPv2-Konto:

az storage account create --resource-group <resource_group_name> --name <storage_account_name> --sku Standard_LRS --location <location>

Wobei:

  • Ressourcengruppenname ist der Name der Ressourcengruppe, die Sie unter Erstellen einer Ressourcengruppe erstellt haben.

  • Speicherkontoname ist der Name des neuen Speicherkontos.

  • Speicherort ist der Speicherort Ihres Azure-Speicherkontos.

Erstellen einer Speicherwarteschlange

Eine einzelne Speicherwarteschlange kann die Ereignismeldungen für viele Event Grid-Abonnements erfassen. Eine einzelne Speicherwarteschlange kann von mehreren Speicherkonten gemeinsam genutzt werden.

Für optimale Leistung empfiehlt Snowflake, eine einzelne Speicherwarteschlange zu erstellen, in der alle Ihre Abonnements für Snowflake gespeichert werden. Snowpipe leitet Nachrichten an die korrekte Pipe weiter, auch wenn die Nachrichten über dieselbe Speicherwarteschlange weitergeleitet werden.

Bemerkung

Snowflake begrenzt die Anzahl der Speicherwarteschlangen, die zum Sammeln von Snowpipe-Ereignismeldungen verwendet werden können, auf 10.

Führen Sie den folgenden Befehl aus, um eine Speicherwarteschlange zu erstellen. In einer Speicherwarteschlange werden Gruppen von Nachrichten gespeichert, in diesem Fall Ereignismeldungen aus Event Grid:

az storage queue create --name <storage_queue_name> --account-name <storage_account_name>

Wobei:

Exportieren der Speicherkonto- und Warteschlangen-IDs zum Verweisen

Führen Sie die folgenden Befehle aus, um Umgebungsvariablen für die Speicherkonto- und Warteschlangen-IDs festzulegen, die später in diesen Anweisungen angefordert werden:

  • Linux oder macOS:

    export storageid=$(az storage account show --name <data_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    export queuestorageid=$(az storage account show --name <queue_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    export queueid="$queuestorageid/queueservices/default/queues/<storage_queue_name>"
    
  • Windows:

    set storageid=$(az storage account show --name <data_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    set queuestorageid=$(az storage account show --name <queue_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    set queueid="%queuestorageid%/queueservices/default/queues/<storage_queue_name>"
    

Wobei:

Installieren der Event Grid-Erweiterung

Führen Sie den folgenden Befehl aus, um die Event Grid-Erweiterung für die Azure-CLI zu installieren:

az extension add --name eventgrid

Erstellen des Event Grid-Abonnements

Führen Sie den folgenden Befehl aus, um das Event Grid-Abonnement zu erstellen. Durch das Abonnieren eines Themas wird Event Grid darüber informiert, welche Ereignisse verfolgt werden sollen:

  • Linux oder macOS:

    az eventgrid event-subscription create \
    --source-resource-id $storageid \
    --name <subscription_name> --endpoint-type storagequeue \
    --endpoint $queueid
    
  • Windows:

    az eventgrid event-subscription create \
    --source-resource-id %storageid% \
    --name <subscription_name> --endpoint-type storagequeue \
    --endpoint %queueid%
    

Wobei:

Schritt 2: Erstellen einer Benachrichtigungsintegration in Snowflake

Eine Benachrichtigungsintegration ist ein Snowflake-Objekt, das eine Schnittstelle zwischen Snowflake und dem Cloud-Nachrichtenwarteschlangendienst eines Drittanbieters wie Azure Event Grid bereitstellt.

Bemerkung

Eine Azure Queue Storage-Warteschlange unterstützt eine einzelne Benachrichtigungsintegration. Das Verweisen auf eine einzelne Speicherwarteschlange in mehreren Benachrichtigungsintegrationen kann dazu führen, dass Daten in Zieltabellen fehlen, da Ereignisbenachrichtigungen zwischen Benachrichtigungsintegrationen aufgeteilt werden.

Abrufen von Speicherwarteschlangen-URL und Mandanten-ID

  1. Melden Sie sich beim Microsoft Azure-Portal an.

  2. Navigieren Sie zu Storage account » Queue service » Queues. Notieren Sie die URL für die Warteschlange, die Sie in Erstellen einer Speicherwarteschlange erstellt haben, um später darauf zu verweisen. Die URL hat das folgende Format:

    https://<storage_account_name>.queue.core.windows.net/<storage_queue_name>
    
  3. Navigieren Sie zu Azure Active Directory » Properties. Notieren Sie den Wert Directory ID zum späteren Verweisen. Die Verzeichnis-ID oder Mandanten-ID wird benötigt, um die Zustimmungs-URL zu generieren, die Snowflake Zugriff auf das Event Grid-Abonnement gewährt.

Erstellen der Benachrichtigungsintegration

Erstellen Sie eine Benachrichtigungsintegration mit dem Befehl CREATE NOTIFICATION INTEGRATION.

Bemerkung

Dieser SQL-Befehl kann nur von Kontoadministratoren (Benutzer mit der Rolle ACCOUNTADMIN) oder von Rollen mit der globalen Berechtigung CREATE INTEGRATION ausgeführt werden.

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  AZURE_STORAGE_QUEUE_PRIMARY_URI = '<queue_URL>'
  AZURE_TENANT_ID = '<directory_ID>';

Wobei:

Gewähren von Snowflake-Zugriff auf die Speicherwarteschlange

  1. Führen Sie den Befehl DESCRIBE INTEGRATION aus, um die Zustimmungs-URL abzurufen:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    

    Wobei:

  2. Beachten Sie die URL in der Spalte AZURE_CONSENT_URL, die das folgende Format aufweist:

    https://login.microsoftonline.com/<tenant_id>/oauth2/authorize?client_id=<snowflake_application_id>
    
  3. Navigieren Sie in einem Webbrowser zur URL. Es wird eine Microsoft-Berechtigungsanfrageseite angezeigt.

  4. Klicken Sie auf die Schaltfläche Accept, um Snowflake in Active Directory zu registrieren.

  5. Melden Sie sich beim Microsoft Azure-Portal an.

  6. Navigieren Sie zu Azure Active Directory » Enterprise applications. Stellen Sie sicher, dass die Snowflake-Anwendung aufgelistet ist.

    Wichtig

    Wenn Sie die Snowflake-Anwendung löschen, funktioniert die Benachrichtigungsintegration nicht mehr.

  7. Navigieren Sie zu Queues » Name_der_Speicherwarteschlange, wobei Name_der_Speicherwarteschlange der Name der Speicherwarteschlange ist, die Sie unter Erstellen einer Speicherwarteschlange erstellt haben.

  8. Klicken Sie auf Access Control (IAM) » Add role assignment.

  9. Suchen Sie nach der Snowflake-Anwendung.

  10. Erteilen Sie der Snowflake-App die folgenden Berechtigungen:

    • Role: Storage Queue Data Contributor (Vorschau)

    • Assign access to: Azure-AD-Benutzer, -Gruppe oder -Dienstprinzipal

    • Select: Snowflake-Anwendungs-ID

    Die Snowflake-Anwendung sollte jetzt unter Storage Queue Data Contributor (im selben Dialogfeld) aufgeführt werden.

Schritt 3: Stagingbereich erstellen (falls erforderlich)

Erstellen Sie mit dem Befehl CREATE STAGE einen externen Stagingbereich, der auf Ihren Azure-Container verweist. Snowpipe ruft Ihre Datendateien aus dem Stagingbereich ab und stellt sie vorübergehend in die Warteschlange, bevor sie in die Zieltabelle geladen werden.

Alternativ können Sie einen vorhandenen externen Stagingbereich verwenden.

Bemerkung

Informationen zum Konfigurieren des sicheren Zugriffs auf den Speicherort in der Cloud finden Sie unter Konfigurieren des sicheren Zugriffs auf Cloudspeicher (unter diesem Thema).

Im folgenden Beispiel wird im aktiven Schema der Benutzersitzung ein Stagingbereich mit dem Namen mystage erstellt. Die Cloudspeicher-URL enthält den Pfad load/files. Der Stagingbereich verweist auf eine Speicherintegration mit dem Namen myint.

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL = 'azure://myaccount.blob.core.windows.net/mycontainer/load/files/'
  STORAGE_INTEGRATION = myint;

Bemerkung

Verwenden Sie den Endpunkt blob.core.windows.net für alle unterstützten Typen von Azure-Blob-Speicherkonten, einschließlich Data Lake Storage Gen2.

Schritt 4: Pipe mit aktivierter automatischer Erfassung erstellen

Erstellen Sie mit dem Befehl CREATE PIPE eine Pipe. Die Pipe definiert die Anweisung COPY INTO <Tabelle>, mit der Snowpipe Daten aus der Erfassungswarteschlange in die Zieltabelle lädt.

Erstellen Sie beispielsweise im snowpipe_db.public-Schema eine Pipe, die die Daten aus den im mystage-Stagingbereich bereitgestellten Dateien in die Tabelle mytable lädt:

create pipe snowpipe_db.public.mypipe
  auto_ingest = true
  integration = '<integration_name>'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Wobei:

Wichtig

  • Der Integrationsname muss komplett in Großbuchstaben eingegeben werden.

  • Vergleichen Sie die Stagingbereichsreferenz in der Pipe-Definition mit vorhandenen Pipes. Stellen Sie sicher, dass sich die Verzeichnispfade für denselben Azure-Container nicht überschneiden. Andernfalls könnten verschiedene Pipes denselben Satz von Datendateien mehrmals in eine oder mehrere Zieltabellen laden. Dies kann beispielsweise der Fall sein, wenn mehrere Stagingbereiche auf denselben Azure-Container verweisen, aber mit unterschiedlicher Granularität, z. B. azure://myaccount.blob.core.windows.net/mycontainer/path1 und azure://myaccount.blob.core.windows.net/mycontainer/path1/path2. In diesem Anwendungsfall würden beim Staging von Dateien in azure://myaccount.blob.core.windows.net/mycontainer/path1/path2 die Pipes für beide Stagingbereiche eine Kopie der Dateien laden.

    Dies unterscheidet sich von der manuellen Snowpipe-Einrichtung (mit deaktivierter automatischer Erfassung), bei der Benutzer einen benannten Satz von Dateien an eine REST-API senden müssen, um die Dateien zum Laden in die Warteschlange zu stellen. Bei aktivierter automatischer Erfassung erhält jede Pipe eine generierte Dateiliste von den Event Grid-Meldungen. Zusätzliche Sorgfalt ist erforderlich, um Datenduplikate zu vermeiden.

Bemerkung

Beim Klonen einer Datenbank oder eines Schemas werden alle Objekte, einschließlich Pipes, die auf externe Stagingbereiche verweisen, in der Quelldatenbank oder im Quellschema geklont. Wenn eine Datendatei in einem Stagingbereich (z. B. Blob-Speichercontainer) erstellt wird, wird eine Kopie der Benachrichtigung an jede Pipe gesendet, die mit dem Speicherort des Stagingbereichs übereinstimmt. Dies führt zu folgendem Verhalten:

  • Wenn eine Tabelle in der COPY-Anweisung der Pipedefinition vollständig qualifiziert ist (in Form von Datenbankname.Schemaname.Tabellenname oder Schemaname.Tabellenname), dann lädt Snowpipe Duplikatdaten in die Quelltabelle (d. h. in Datenbank.Schema.Tabelle der COPY-Anweisung) jeder Pipe.

  • Wenn eine Tabelle in der Pipedefinition nicht vollständig qualifiziert ist, lädt Snowpipe die Daten in dieselbe Tabelle (z. B. mytable) der Quell- und geklonten Datenbanken/Schemas.

Snowpipe mit automatischer Erfassung ist nun konfiguriert!

Wenn dem Azure-Container neue Datendateien hinzugefügt werden, weist die Ereignisbenachrichtigung Snowpipe an, diese in die in der Pipe definierte Zieltabelle zu laden.

Schritt 5: Historische Dateien laden

Um ein Backlog mit Datendateien zu laden, die vor der Konfiguration von Event Grid-Benachrichtigungen im externen Stagingbereich vorhanden waren, führen Sie eine ALTER PIPE … REFRESH-Anweisung aus.

SYSTEM$PIPE_STATUS-Ausgabe

Die Funktion SYSTEM$PIPE_STATUS ruft eine JSON-Darstellung des aktuellen Status einer Pipe ab.

Bei Pipes, bei denen AUTO_INGEST auf TRUE gesetzt ist, gibt die Funktion ein JSON-Objekt zurück, das die folgenden Name/Wert-Paare enthält (falls auf den aktuellen Pipe-Status zutreffend):

{„executionState“:“<Wert>“,“oldestFileTimestamp“:<Wert>,“pendingFileCount“:<Wert>,“notificationChannelName“:“<Wert>“,“numOutstandingMessagesOnChannel“:<Wert>,“lastReceivedMessageTimestamp“:“<Wert>“,“lastForwardedMessageTimestamp“:“<Wert>“,“error“:<Wert>,“fault“:<Wert>}

Wobei:

executionState

Aktueller Ausführungsstatus der Pipe; kann einer der folgenden sein:

  • RUNNING (d. h. alles ist normal; Snowflake verarbeitet gerade aktiv Dateien für diese Pipe oder auch nicht).

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

Frühester Zeitstempel unter den derzeit in der Warteschlange befindlichen Datendateien (so zutreffend), wobei Zeitstempel beim Hinzufügen einer Datei zur Warteschlange festgelegt werden.

pendingFileCount

Anzahl der Dateien, die von der Pipe derzeit verarbeitet werden. Falls die Pipe angehalten wurde, verringert sich dieser Wert, sobald Dateien verarbeitet werden, die in die Warteschlange gestellt wurden, bevor die Pipe angehalten wurde. Wenn dieser Wert 0 lautet, befinden sich entweder keine Dateien in der Warteschlange für diese Pipe oder wurde die Pipe angehalten.

notificationChannelName

Azure-Speicherwarteschlange, die der Pipe zugeordnet ist.

numOutstandingMessagesOnChannel

Anzahl der Nachrichten in der Speicherwarteschlange, die in die Warteschlange gestellt, aber noch nicht empfangen wurden.

lastReceivedMessageTimestamp

Zeitstempel der letzten Nachricht, die aus der Speicherwarteschlange empfangen wurde. Beachten Sie, dass diese Nachricht möglicherweise nicht für die bestimmte Pipe gilt, z. B. wenn der mit der Nachricht verknüpfte Pfad bzw. das verknüpfte Präfix nicht mit dem Pfad/Präfix in der Pipedefinition übereinstimmen. Darüber hinaus werden von Pipes zur automatischen Erfassung nur solche Nachrichten verarbeitet, die von erstellten Datenobjekten ausgelöste wurden.

lastForwardedMessageTimestamp

Zeitstempel der letzten Ereignismeldung vom Typ „Objekt erstellen“ mit einem übereinstimmenden Pfad/Präfix, der bzw. das an die Pipe weitergeleitet wurde.

error

Fehlermeldung, die erzeugt wurde, als die Pipe für die Ausführung zuletzt kompiliert wurde (falls zutreffend). Ursache sind oft Schwierigkeiten beim Zugriff auf die erforderlichen Objekte (z. B. Tabelle, Stagingbereich, Dateiformat), ausgelöst durch Berechtigungsprobleme oder gelöschte Objekte.

fault

Letzter interner Snowflake-Verarbeitungsfehler (falls zutreffend). Wird von Snowflake hauptsächlich zum Debuggen verwendet.

DDL der Benachrichtigungsintegration

Snowflake unterstützt das Erstellen und Verwalten von Integrationen mit den folgenden speziellen DDL-Befehlen: