Erstellen einer Datenverarbeitungs-Pipeline unter Verwendung einer Verzeichnistabelle

Bauen Sie eine Datenverarbeitungspipeline auf, indem Sie eine Verzeichnistabelle, die Metadaten auf Dateiebene in einem Stagingbereich verfolgt und speichert, mit anderen Snowflake-Objekten wie Streams und Aufgaben kombinieren.

Ein Stream erfasst Änderungen, die mit Data Manipulation Language (DML) an einer Verzeichnistabelle, einer Tabelle, einer externen Tabelle oder an den zugrunde liegenden Tabellen einer Ansicht vorgenommen wurden. Eine Aufgabe führt eine einzelne Aktion aus, die ein SQL-Befehl oder eine umfangreiche benutzerdefinierte Funktion (UDF) sein kann. Sie können eine Aufgabe so planen, dass sie in regelmäßigen Abständen ausgeführt wird, oder eine Aufgabe bei Bedarf ausführen.

Beispiel: Erstellen einer einfachen Pipeline zur Verarbeitung von PDFs

In diesem Beispiel wird eine einfache Datenverarbeitungs-Pipeline erstellt, die folgende Aufgaben ausführt:

  1. Erkennen von PDF-Dateien, die zu einem Stagingbereich hinzugefügt wurden

  2. Abrufen der Daten aus den Dateien

  3. Einfügen der Daten in eine Snowflake-Tabelle

Die Pipeline verwendet einen Stream, um Änderungen an einer Verzeichnistabelle im Stagingbereich zu erkennen, und eine Aufgabe, die eine UDF ausführt, um Daten aus den Dateien zu extrahieren.

In der folgende Abbildung ist zusammengefasst, wie die Beispiel-Pipeline funktioniert:

Eine einfache Datenverarbeitungs-Pipeline, die einen Stream verwendet, um Änderungen an einer Verzeichnistabelle zu verfolgen.

Schritt 1: Stagingbereich mit einer aktivierten Verzeichnistabelle erstellen

Erstellen Sie einen internen Stagingbereich, der eine aktivierte Verzeichnistabelle enthält. In der Beispielanweisung wird für ENCRYPTION der Typ SNOWFLAKE_SSE eingestellt, um den Zugriff auf unstrukturierten Daten im Stagingbereich zu aktivieren.

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

Schritt 2: Stream auf der Verzeichnistabelle erstellen

Erstellen Sie einen Stream auf der Verzeichnistabelle, indem Sie den Stagingbereich angeben, zu dem die Verzeichnistabelle gehört. Der Stream verfolgt die Änderungen an der Verzeichnistabelle. In Schritt 5 dieses Beispiels wird dieser Stream dann verwendet, um eine Aufgabe zu konstruieren.

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

Schritt 3: Benutzerdefinierte Funktion zum Parsen der PDFs erstellen

Erstellen Sie eine UDF, die Daten aus PDF-Dateien extrahiert. Die Aufgabe, die Sie in einem späteren Schritt erstellen, wird diese UDF aufrufen, um neu hinzugefügte Dateien im Stagingbereich zu verarbeiten.

Die folgende Beispielanweisung erstellt eine Python UDF mit dem Namen PDF_PARSE, die PDF-Dateien mit Produktprüfungsdaten verarbeitet. Die UDF extrahiert Formularfelddaten unter Verwendung der PyPDF2-Bibliothek. Sie gibt ein Wörterbuch zurück, das die Formularnamen und -werte als Schlüssel-Wert-Paare enthält.

Bemerkung

Die UDF liest dynamisch spezifizierte Dateien mithilfe der Klasse SnowflakeFile aus. Weitere Informationen zu SnowflakeFile finden Sie unter Lesen einer mit SnowflakeFile dynamisch spezifizierten Datei.

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
AS
$$
from pathlib import Path
import PyPDF2 as pypdf
from io import BytesIO
from snowflake.snowpark.files import SnowflakeFile

def parse_pdf_fields(file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
    reader = pypdf.PdfFileReader(buffer)
    fields = reader.getFields()
    field_dict = {}
    for k, v in fields.items():
        if "/V" in v.keys():
            field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

    return field_dict
$$;
Copy

Schritt 4: Tabelle zum Speichern der Dateiinhalte erstellen

Als Nächstes erstellen Sie eine Tabelle, in der in jeder Zeile in den Spalten file_name und file_data Informationen zu einer Datei des Stagingbereichs gespeichert werden. Die Aufgabe, die Sie in einem späteren Schritt erstellen, lädt die Daten in diese Tabelle.

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

Schritt 5: Aufgabe erstellen

Erstellen Sie eine geplante Aufgabe, die den Stream auf neue Dateien im Stagingbereich überprüft und die Daten in diesen Dateien in die Tabelle prod_reviews einfügt.

Die folgende Anweisung erstellt eine geplante Aufgabe unter Verwendung des zuvor erstellten Streams. Die Aufgabe verwendet die Funktion SYSTEM$STREAM_HAS_DATA, um zu prüfen, ob der Stream CDC-Datensätze (Change Data Capture, Datenänderungserfassung) enthält.

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

Schritt 6: Aufgabe zum Testen der Pipeline ausführen

Um zu prüfen, ob die Pipeline funktioniert, können Sie dem Stagingbereich Dateien hinzufügen, die Aufgabe manuell ausführen und dann die Tabelle product_reviews abfragen.

Fügen Sie zunächst einige PDF-Dateien zum Stagingbereich my_pdf_stage hinzu, und aktualisieren Sie dann den Stagingbereich.

Bemerkung

Dieses Beispiel verwendet PUT-Befehle, die Sie nicht von einem Arbeitsblatt in der Snowflake-Weboberfläche aus ausführen können. Weitere Informationen zum Hochladen von Dateien mit Snowsight finden Sie unter Dateien in einen benannten internen Stagingbereich hochladen.

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

Sie können den Stream abfragen, um zu überprüfen, ob dieser die beiden PDF-Dateien erfasst hat, die wir dem Stagingbereich hinzugefügt hatten.

SELECT * FROM my_pdf_stream;
Copy

Führen Sie nun die Aufgabe aus, um die PDF-Dateien zu verarbeiten und die Tabelle product_reviews zu aktualisieren.

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

Fragen Sie die Tabelle product_reviews ab, um zu prüfen, ob die Aufgabe für jede PDF-Datei eine Zeile hinzugefügt hat.

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

Schließlich können Sie eine Ansicht erstellen, die die Objekte in der Spalte FILE_DATA in einzelne Spalten aufteilt. Sie können dann die Ansicht abfragen, um die Dateiinhalte zu analysieren und weiterzuverwenden.

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;

+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Copy