Erstellen einer Datenverarbeitungs-Pipeline unter Verwendung einer Verzeichnistabelle¶
Sie können eine Verzeichnistabelle, die Metadaten in einem Stagingbereich auf Dateiebene verfolgt und speichert, mit anderen Snowflake-Objekten wie Streams und Aufgaben kombinieren, um eine Datenverarbeitungs-Pipeline aufzubauen.
Ein Stream erfasst Änderungen, die mit Data Manipulation Language (DML) an einer Verzeichnistabelle, einer Tabelle, einer externen Tabelle oder an den zugrunde liegenden Tabellen einer Ansicht vorgenommen wurden. Eine Aufgabe führt eine einzelne Aktion aus, wie einen SQL-Befehl oder eine umfangreiche UDF. Sie können eine Aufgabe planen oder bei Bedarf ausführen.
Beispiel: Erstellen einer einfachen Pipeline zur Verarbeitung von PDFs¶
In diesem Beispiel wird eine einfache Datenverarbeitungs-Pipeline erstellt, die folgende Aufgaben ausführt:
Erkennen von PDF-Dateien, die zu einem Stagingbereich hinzugefügt wurden
Abrufen der Daten aus den Dateien
Einfügen der Daten in eine Snowflake-Tabelle
Die Pipeline verwendet einen Stream, um Änderungen an einer Verzeichnistabelle im Stagingbereich zu erkennen, sowie eine Aufgabe, die eine benutzerdefinierte Funktion (UDF) zum Verarbeiten der Dateien ausführt.
In der folgende Abbildung ist zusammengefasst, wie die Beispiel-Pipeline funktioniert:
Schritt 1: Stagingbereich mit einer aktivierten Verzeichnistabelle erstellen¶
Erstellen Sie einen internen Stagingbereich, der eine aktivierte Verzeichnistabelle enthält. In der Beispielanweisung wird für ENCRYPTION
der Typ SNOWFLAKE_SSE
eingestellt, um den Zugriff auf unstrukturierten Daten im Stagingbereich zu aktivieren.
CREATE OR REPLACE STAGE my_pdf_stage
ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
DIRECTORY = ( ENABLE = TRUE);
Schritt 2: Stream auf der Verzeichnistabelle erstellen¶
Als Nächstes erstellen Sie einen Stream auf der Verzeichnistabelle, indem Sie den Stagingbereich angeben, zu dem die Verzeichnistabelle gehört. Der Stream verfolgt die Änderungen an der Verzeichnistabelle. In Schritt 5 dieses Beispiels wird dieser Stream dann verwendet, um eine Aufgabe zu konstruieren.
CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Schritt 3: Benutzerdefinierte Funktion zum Parsen der PDFs erstellen¶
Erstellen Sie eine benutzerdefinierte Funktion (UDF), die Daten aus PDF-Dateien extrahiert. Die Aufgabe, die in Schritt 5 erstellt wird, wird diese UDF aufrufen, um die neu zum Stagingbereich hinzugefügten Dateien zu verarbeiten.
Die folgende Beispielanweisung erstellt eine UDF namens PDF_PARSE
, die PDF-Dateien mit Produktbewertungsdaten verarbeitet. Die UDF extrahiert Formularfelddaten unter Verwendung der PyPDF2-Bibliothek. Sie gibt ein Dictionary zurück, das die Formularnamen und -werte als Schlüssel-Wert-Paare enthält.
Bemerkung
Die UDF liest dynamisch spezifizierte Dateien mithilfe der Klasse SnowflakeFile
aus. Weitere Informationen zu SnowflakeFile
finden Sie unter Lesen einer mit SnowflakeFile dynamisch spezifizierten Datei.
CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'parse_pdf_fields'
PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
AS
$$
from pathlib import Path
import PyPDF2 as pypdf
from io import BytesIO
from snowflake.snowpark.files import SnowflakeFile
def parse_pdf_fields(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
buffer = BytesIO(f.readall())
reader = pypdf.PdfFileReader(buffer)
fields = reader.getFields()
field_dict = {}
for k, v in fields.items():
if "/V" in v.keys():
field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]
return field_dict
$$;
Schritt 4: Tabelle zum Speichern der Dateiinhalte erstellen¶
Als Nächstes erstellen Sie eine Tabelle, in der in jeder Zeile in den Spalten file_name
und file_data
Informationen zu einer Datei des Stagingbereichs gespeichert werden. Die Aufgabe, die in Schritt 5 dieses Beispiels erstellt wird, wird die Daten in diese Tabelle laden.
CREATE OR REPLACE TABLE prod_reviews (
file_name varchar,
file_data variant
);
Schritt 5: Aufgabe erstellen¶
Erstellen Sie eine geplante Aufgabe, die den Stream auf neue Dateien im Stagingbereich überprüft und die Daten in diesen Dateien in die Tabelle prod_reviews
einfügt.
Mit der folgenden Anweisung wird unter Verwendung des in Schritt 2 erstellten Streams eine geplante Aufgabe erstellt. Die Aufgabe verwendet die Funktion SYSTEM$STREAM_HAS_DATA, um zu prüfen, ob der Stream CDC-Datensätze (Change Data Capture, Datenänderungserfassung) enthält.
CREATE OR REPLACE TASK load_new_file_data
WAREHOUSE = 'MY_WAREHOUSE'
SCHEDULE = '1 minute'
COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
WHEN
SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
AS
INSERT INTO prod_reviews (
SELECT relative_path as file_name,
PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
FROM my_pdf_stream
WHERE METADATA$ACTION='INSERT'
);
Schritt 6: Aufgabe zum Testen der Pipeline ausführen¶
Um zu prüfen, ob die Pipeline funktioniert, können Sie dem Stagingbereich Dateien hinzufügen, die Aufgabe manuell ausführen und dann die Tabelle product_reviews
abfragen.
Fügen Sie zunächst einige PDF-Dateien zum Stagingbereich my_pdf_stage
hinzu, und aktualisieren Sie dann den Stagingbereich.
Bemerkung
In diesem Beispiel werden PUT-Befehle verwendet, die nicht über ein Arbeitsblatt der Snowflake-Weboberfläche ausgeführt werden können. Weitere Informationen zum Hochladen von Dateien mit Snowsight finden Sie unter Dateien in einen benannten internen Stagingbereich hochladen.
PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
ALTER STAGE my_pdf_stage REFRESH;
Sie können den Stream abfragen, um zu überprüfen, ob dieser die beiden PDF-Dateien erfasst hat, die wir dem Stagingbereich hinzugefügt hatten.
SELECT * FROM my_pdf_stream;
Führen Sie nun die Aufgabe aus, um die PDF-Dateien zu verarbeiten und die Tabelle product_reviews
zu aktualisieren.
EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Fragen Sie die Tabelle product_reviews
ab, um zu prüfen, ob die Aufgabe für jede PDF-Datei eine Zeile hinzugefügt hat.
select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME | FILE_DATA |
|------------------+----------------------------------|
| prod_review1.pdf | { |
| | "FirstName": "John", |
| | "LastName": "Johnson", |
| | "Middle Name": "Michael", |
| | "Product": "Tennis Shoes", |
| | "Purchase Date": "03/15/2022", |
| | "Recommend": "Yes" |
| | } |
| prod_review2.pdf | { |
| | "FirstName": "Emily", |
| | "LastName": "Smith", |
| | "Middle Name": "Ann", |
| | "Product": "Red Skateboard", |
| | "Purchase Date": "01/10/2023", |
| | "Recommend": "MayBe" |
| | } |
+------------------+----------------------------------+
Schließlich können Sie eine Ansicht erstellen, die die Objekte in der Spalte FILE_DATA
in einzelne Spalten aufteilt. Sie können dann die Ansicht abfragen, um die Dateiinhalte zu analysieren und weiterzuverwenden.
CREATE OR REPLACE VIEW prod_review_info_v
AS
WITH file_data
AS (
SELECT
file_name
, parse_json(file_data) AS file_data
FROM prod_reviews
)
SELECT
file_name
, file_data:FirstName::varchar AS first_name
, file_data:LastName::varchar AS last_name
, file_data:"Middle Name"::varchar AS middle_name
, file_data:Product::varchar AS product
, file_data:"Purchase Date"::date AS purchase_date
, file_data:Recommend::varchar AS recommended
, build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
FROM file_data;
SELECT * FROM prod_review_info_v;

| FILE_NAME | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL |
||
| prod_review1.pdf | John | Johnson | Michael | Tennis Shoes | 2022-03-15 | Yes | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d |
| prod_review2.pdf | Emily | Smith | Ann | Red Skateboard | 2023-01-10 | MayBe | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d |
