Erstellen einer Datenverarbeitungs-Pipeline unter Verwendung einer Verzeichnistabelle¶
Bauen Sie eine Datenverarbeitungspipeline auf, indem Sie eine Verzeichnistabelle, die Metadaten auf Dateiebene in einem Stagingbereich verfolgt und speichert, mit anderen Snowflake-Objekten wie Streams und Aufgaben kombinieren.
Ein Stream erfasst Änderungen, die mit Data Manipulation Language (DML) an einer Verzeichnistabelle, einer Tabelle, einer externen Tabelle oder an den zugrunde liegenden Tabellen einer Ansicht vorgenommen wurden. Eine Aufgabe führt eine einzelne Aktion aus, die ein SQL-Befehl oder eine umfangreiche benutzerdefinierte Funktion (UDF) sein kann. Sie können eine Aufgabe so planen, dass sie in regelmäßigen Abständen ausgeführt wird, oder eine Aufgabe bei Bedarf ausführen.
Beispiel: Erstellen einer einfachen Pipeline zur Verarbeitung von PDFs¶
In diesem Beispiel wird eine einfache Datenverarbeitungs-Pipeline erstellt, die folgende Aufgaben ausführt:
Erkennen von PDF-Dateien, die zu einem Stagingbereich hinzugefügt wurden
Abrufen der Daten aus den Dateien
Einfügen der Daten in eine Snowflake-Tabelle
Die Pipeline verwendet einen Stream, um Änderungen an einer Verzeichnistabelle im Stagingbereich zu erkennen, und eine Aufgabe, die eine UDF ausführt, um Daten aus den Dateien zu extrahieren.
In der folgende Abbildung ist zusammengefasst, wie die Beispiel-Pipeline funktioniert:

Schritt 1: Stagingbereich mit einer aktivierten Verzeichnistabelle erstellen¶
Erstellen Sie einen internen Stagingbereich, der eine aktivierte Verzeichnistabelle enthält. In der Beispielanweisung wird für ENCRYPTION
der Typ SNOWFLAKE_SSE
eingestellt, um den Zugriff auf unstrukturierten Daten im Stagingbereich zu aktivieren.
CREATE OR REPLACE STAGE my_pdf_stage
ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
DIRECTORY = ( ENABLE = TRUE);
Schritt 2: Stream auf der Verzeichnistabelle erstellen¶
Erstellen Sie einen Stream auf der Verzeichnistabelle, indem Sie den Stagingbereich angeben, zu dem die Verzeichnistabelle gehört. Der Stream verfolgt die Änderungen an der Verzeichnistabelle. In Schritt 5 dieses Beispiels wird dieser Stream dann verwendet, um eine Aufgabe zu konstruieren.
CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Schritt 3: Benutzerdefinierte Funktion zum Parsen der PDFs erstellen¶
Erstellen Sie eine UDF, die Daten aus PDF-Dateien extrahiert. Die Aufgabe, die Sie in einem späteren Schritt erstellen, wird diese UDF aufrufen, um neu hinzugefügte Dateien im Stagingbereich zu verarbeiten.
Die folgende Beispielanweisung erstellt eine Python UDF mit dem Namen PDF_PARSE
, die PDF-Dateien mit Produktprüfungsdaten verarbeitet. Die UDF extrahiert Formularfelddaten unter Verwendung der PyPDF2-Bibliothek. Sie gibt ein Wörterbuch zurück, das die Formularnamen und -werte als Schlüssel-Wert-Paare enthält.
Bemerkung
Die UDF liest dynamisch spezifizierte Dateien mithilfe der Klasse SnowflakeFile
aus. Weitere Informationen zu SnowflakeFile
finden Sie unter Lesen einer mit SnowflakeFile dynamisch spezifizierten Datei.
CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'parse_pdf_fields'
PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
AS
$$
from pathlib import Path
import PyPDF2 as pypdf
from io import BytesIO
from snowflake.snowpark.files import SnowflakeFile
def parse_pdf_fields(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
buffer = BytesIO(f.readall())
reader = pypdf.PdfFileReader(buffer)
fields = reader.getFields()
field_dict = {}
for k, v in fields.items():
if "/V" in v.keys():
field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]
return field_dict
$$;
Schritt 4: Tabelle zum Speichern der Dateiinhalte erstellen¶
Als Nächstes erstellen Sie eine Tabelle, in der in jeder Zeile in den Spalten file_name
und file_data
Informationen zu einer Datei des Stagingbereichs gespeichert werden. Die Aufgabe, die Sie in einem späteren Schritt erstellen, lädt die Daten in diese Tabelle.
CREATE OR REPLACE TABLE prod_reviews (
file_name varchar,
file_data variant
);
Schritt 5: Aufgabe erstellen¶
Erstellen Sie eine geplante Aufgabe, die den Stream auf neue Dateien im Stagingbereich überprüft und die Daten in diesen Dateien in die Tabelle prod_reviews
einfügt.
Die folgende Anweisung erstellt eine geplante Aufgabe unter Verwendung des zuvor erstellten Streams. Die Aufgabe verwendet die Funktion SYSTEM$STREAM_HAS_DATA, um zu prüfen, ob der Stream CDC-Datensätze (Change Data Capture, Datenänderungserfassung) enthält.
CREATE OR REPLACE TASK load_new_file_data
WAREHOUSE = 'MY_WAREHOUSE'
SCHEDULE = '1 minute'
COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
WHEN
SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
AS
INSERT INTO prod_reviews (
SELECT relative_path as file_name,
PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
FROM my_pdf_stream
WHERE METADATA$ACTION='INSERT'
);
Schritt 6: Aufgabe zum Testen der Pipeline ausführen¶
Um zu prüfen, ob die Pipeline funktioniert, können Sie dem Stagingbereich Dateien hinzufügen, die Aufgabe manuell ausführen und dann die Tabelle product_reviews
abfragen.
Fügen Sie zunächst einige PDF-Dateien zum Stagingbereich my_pdf_stage
hinzu, und aktualisieren Sie dann den Stagingbereich.
Bemerkung
Dieses Beispiel verwendet PUT-Befehle, die Sie nicht von einem Arbeitsblatt in der Snowflake-Weboberfläche aus ausführen können. Weitere Informationen zum Hochladen von Dateien mit Snowsight finden Sie unter Dateien in einen benannten internen Stagingbereich hochladen.
PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
ALTER STAGE my_pdf_stage REFRESH;
Sie können den Stream abfragen, um zu überprüfen, ob dieser die beiden PDF-Dateien erfasst hat, die wir dem Stagingbereich hinzugefügt hatten.
SELECT * FROM my_pdf_stream;
Führen Sie nun die Aufgabe aus, um die PDF-Dateien zu verarbeiten und die Tabelle product_reviews
zu aktualisieren.
EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Fragen Sie die Tabelle product_reviews
ab, um zu prüfen, ob die Aufgabe für jede PDF-Datei eine Zeile hinzugefügt hat.
select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME | FILE_DATA |
|------------------+----------------------------------|
| prod_review1.pdf | { |
| | "FirstName": "John", |
| | "LastName": "Johnson", |
| | "Middle Name": "Michael", |
| | "Product": "Tennis Shoes", |
| | "Purchase Date": "03/15/2022", |
| | "Recommend": "Yes" |
| | } |
| prod_review2.pdf | { |
| | "FirstName": "Emily", |
| | "LastName": "Smith", |
| | "Middle Name": "Ann", |
| | "Product": "Red Skateboard", |
| | "Purchase Date": "01/10/2023", |
| | "Recommend": "MayBe" |
| | } |
+------------------+----------------------------------+
Schließlich können Sie eine Ansicht erstellen, die die Objekte in der Spalte FILE_DATA
in einzelne Spalten aufteilt. Sie können dann die Ansicht abfragen, um die Dateiinhalte zu analysieren und weiterzuverwenden.
CREATE OR REPLACE VIEW prod_review_info_v
AS
WITH file_data
AS (
SELECT
file_name
, parse_json(file_data) AS file_data
FROM prod_reviews
)
SELECT
file_name
, file_data:FirstName::varchar AS first_name
, file_data:LastName::varchar AS last_name
, file_data:"Middle Name"::varchar AS middle_name
, file_data:Product::varchar AS product
, file_data:"Purchase Date"::date AS purchase_date
, file_data:Recommend::varchar AS recommended
, build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
FROM file_data;
SELECT * FROM prod_review_info_v;
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John | Johnson | Michael | Tennis Shoes | 2022-03-15 | Yes | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d |
| prod_review2.pdf | Emily | Smith | Ann | Red Skateboard | 2023-01-10 | MayBe | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+