Erstellen einer Datenverarbeitungs-Pipeline unter Verwendung einer Verzeichnistabelle

Sie können eine Verzeichnistabelle, die Metadaten in einem Stagingbereich auf Dateiebene verfolgt und speichert, mit anderen Snowflake-Objekten wie Streams und Aufgaben kombinieren, um eine Datenverarbeitungs-Pipeline aufzubauen.

Ein Stream erfasst Änderungen, die mit Data Manipulation Language (DML) an einer Verzeichnistabelle, einer Tabelle, einer externen Tabelle oder an den zugrunde liegenden Tabellen einer Ansicht vorgenommen wurden. Eine Aufgabe führt eine einzelne Aktion aus, wie einen SQL-Befehl oder eine umfangreiche UDF. Sie können eine Aufgabe planen oder bei Bedarf ausführen.

Beispiel: Erstellen einer einfachen Pipeline zur Verarbeitung von PDFs

In diesem Beispiel wird eine einfache Datenverarbeitungs-Pipeline erstellt, die folgende Aufgaben ausführt:

  1. Erkennen von PDF-Dateien, die zu einem Stagingbereich hinzugefügt wurden

  2. Abrufen der Daten aus den Dateien

  3. Einfügen der Daten in eine Snowflake-Tabelle

Die Pipeline verwendet einen Stream, um Änderungen an einer Verzeichnistabelle im Stagingbereich zu erkennen, sowie eine Aufgabe, die eine benutzerdefinierte Funktion (UDF) zum Verarbeiten der Dateien ausführt.

In der folgende Abbildung ist zusammengefasst, wie die Beispiel-Pipeline funktioniert:

A simple data processing pipeline that uses a stream to track changes to a directory table.

Schritt 1: Stagingbereich mit einer aktivierten Verzeichnistabelle erstellen

Erstellen Sie einen internen Stagingbereich, der eine aktivierte Verzeichnistabelle enthält. In der Beispielanweisung wird für ENCRYPTION der Typ SNOWFLAKE_SSE eingestellt, um den Zugriff auf unstrukturierten Daten im Stagingbereich zu aktivieren.

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

Schritt 2: Stream auf der Verzeichnistabelle erstellen

Als Nächstes erstellen Sie einen Stream auf der Verzeichnistabelle, indem Sie den Stagingbereich angeben, zu dem die Verzeichnistabelle gehört. Der Stream verfolgt die Änderungen an der Verzeichnistabelle. In Schritt 5 dieses Beispiels wird dieser Stream dann verwendet, um eine Aufgabe zu konstruieren.

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

Schritt 3: Benutzerdefinierte Funktion zum Parsen der PDFs erstellen

Erstellen Sie eine benutzerdefinierte Funktion (UDF), die Daten aus PDF-Dateien extrahiert. Die Aufgabe, die in Schritt 5 erstellt wird, wird diese UDF aufrufen, um die neu zum Stagingbereich hinzugefügten Dateien zu verarbeiten.

Die folgende Beispielanweisung erstellt eine UDF mit dem Namen PDF_PARSE, die PDF-Dateien mit Produktbewertungsdaten verarbeitet. Die UDF extrahiert Formularfelddaten unter Verwendung der PyPDF2-Bibliothek. Sie gibt ein Wörterbuch zurück, das die Formularnamen und -werte als Schlüssel-Wert-Paare enthält.

Bemerkung

Die UDF liest dynamisch spezifizierte Dateien mithilfe der Klasse SnowflakeFile aus. Weitere Informationen zu SnowflakeFile finden Sie unter Lesen einer mit SnowflakeFile dynamisch spezifizierten Datei.

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
  AS
  $$
  from pathlib import Path
  import PyPDF2 as pypdf
  from io import BytesIO
  from snowflake.snowpark.files import SnowflakeFile

  def parse_pdf_fields(file_path):
      with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
      reader = pypdf.PdfFileReader(buffer)
      fields = reader.getFields()
      field_dict = {}
      for k, v in fields.items():
          if "/V" in v.keys():
              field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

      return field_dict
  $$;
Copy

Schritt 4: Tabelle zum Speichern der Dateiinhalte erstellen

Als Nächstes erstellen Sie eine Tabelle, in der in jeder Zeile in den Spalten file_name und file_data Informationen zu einer Datei des Stagingbereichs gespeichert werden. Die Aufgabe, die in Schritt 5 dieses Beispiels erstellt wird, wird die Daten in diese Tabelle laden.

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

Schritt 5: Aufgabe erstellen

Erstellen Sie eine geplante Aufgabe, die den Stream auf neue Dateien im Stagingbereich überprüft und die Daten in diesen Dateien in die Tabelle prod_reviews einfügt.

Mit der folgenden Anweisung wird unter Verwendung des in Schritt 2 erstellten Streams eine geplante Aufgabe erstellt. Die Aufgabe verwendet die Funktion SYSTEM$STREAM_HAS_DATA, um zu prüfen, ob der Stream CDC-Datensätze (Change Data Capture, Datenänderungserfassung) enthält.

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

Schritt 6: Aufgabe zum Testen der Pipeline ausführen

Um zu prüfen, ob die Pipeline funktioniert, können Sie dem Stagingbereich Dateien hinzufügen, die Aufgabe manuell ausführen und dann die Tabelle product_reviews abfragen.

Fügen Sie zunächst einige PDF-Dateien zum Stagingbereich my_pdf_stage hinzu, und aktualisieren Sie dann den Stagingbereich.

Bemerkung

In diesem Beispiel werden PUT-Befehle verwendet, die nicht über ein Arbeitsblatt der Snowflake-Weboberfläche ausgeführt werden können. Weitere Informationen zum Hochladen von Dateien mit Snowsight finden Sie unter Dateien in einen benannten internen Stagingbereich hochladen.

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

Sie können den Stream abfragen, um zu überprüfen, ob dieser die beiden PDF-Dateien erfasst hat, die wir dem Stagingbereich hinzugefügt hatten.

SELECT * FROM my_pdf_stream;
Copy

Führen Sie nun die Aufgabe aus, um die PDF-Dateien zu verarbeiten und die Tabelle product_reviews zu aktualisieren.

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

Fragen Sie die Tabelle product_reviews ab, um zu prüfen, ob die Aufgabe für jede PDF-Datei eine Zeile hinzugefügt hat.

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

Schließlich können Sie eine Ansicht erstellen, die die Objekte in der Spalte FILE_DATA in einzelne Spalten aufteilt. Sie können dann die Ansicht abfragen, um die Dateiinhalte zu analysieren und weiterzuverwenden.

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;

+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Copy