Création d’un pipeline de traitement de données à l’aide d’une table de répertoire¶
Vous pouvez combiner une table de répertoire, qui suit et stocke les métadonnées au niveau des fichiers sur une zone de préparation, avec d’autres objets Snowflake tels que des flux et des tâches pour créer un pipeline de traitement de données.
Un flux enregistre les modifications apportées en langage de manipulation des données (DML) à une table de répertoire, une table, une table externe ou les tables sous-jacentes d’une vue. Une tâche exécute une seule action, qui peut être une commande SQL ou une UDF étendue. Vous pouvez planifier une tâche ou l’exécuter à la demande.
Exemple : Création d’un pipeline simple pour traiter des PDFs¶
Cet exemple crée un pipeline de traitement de données simple qui effectue les opérations suivantes :
Détecte les fichiers PDF ajoutés à une zone de préparation.
Récupère des données à partir des fichiers.
Insère les données dans une table Snowflake.
Le pipeline utilise un flux pour détecter les modifications apportées à une table de répertoire sur la zone de préparation et une tâche qui exécute une fonction définie par l’utilisateur (UDF) pour traiter les fichiers.
Le diagramme suivant résume le fonctionnement de l’exemple de pipeline :
Étape 1 : Création d’une zone de préparation avec une table de répertoire activée¶
Créez une zone de préparation interne avec une table de répertoire activée. L’exemple d’instruction définit le type ENCRYPTION
sur SNOWFLAKE_SSE
pour activer l’accès aux données non structurées sur la zone de préparation.
CREATE OR REPLACE STAGE my_pdf_stage
ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
DIRECTORY = ( ENABLE = TRUE);
Étape 2 : Création d’un flux sur la table du répertoire¶
Ensuite, créez un flux sur la table de répertoire en spécifiant la zone de préparation à laquelle appartient la table de répertoire. Le flux suivra les modifications apportées à la table de répertoire. À l’étape 5 de cet exemple, nous utilisons ce flux pour construire une tâche.
CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Étape 3 : Création d’une fonction définie par l’utilisateur pour analyser des PDFs¶
Créez une fonction définie par l’utilisateur (UDF) qui extrait les données de fichiers PDF. La tâche que nous créons à l’étape 5 appellera cette UDF pour traiter les fichiers venant d’être ajoutés sur la zone de préparation.
L’exemple d’instruction suivant crée une UDF nommée PDF_PARSE
qui traite des fichiers PDF contenant des données d’avis sur les produits. L’UDF extrait les données des champs de formulaire à l’aide de la bibliothèque PyPDF2. Elle renvoie un dictionnaire contenant les noms et les valeurs du formulaire sous forme de paires clé-valeur.
Note
L’UDF lit les fichiers spécifiés dynamiquement à l’aide de la classe SnowflakeFile
. Pour en savoir plus sur SnowflakeFile
, consultez Lecture d’un fichier spécifié de façon dynamique avec SnowflakeFile.
CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'parse_pdf_fields'
PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
AS
$$
from pathlib import Path
import PyPDF2 as pypdf
from io import BytesIO
from snowflake.snowpark.files import SnowflakeFile
def parse_pdf_fields(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
buffer = BytesIO(f.readall())
reader = pypdf.PdfFileReader(buffer)
fields = reader.getFields()
field_dict = {}
for k, v in fields.items():
if "/V" in v.keys():
field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]
return field_dict
$$;
Étape 4 : Création d’une table pour stocker le contenu du fichier¶
Créez ensuite une table dans laquelle chaque ligne stocke des informations sur un fichier sur la zone de préparation dans des colonnes nommées file_name
et file_data
. La tâche que nous créons à l’étape 5 de cet exemple chargera les données dans cette table.
CREATE OR REPLACE TABLE prod_reviews (
file_name varchar,
file_data variant
);
Étape 5 : Création d’une tâche¶
Créez une tâche planifiée qui vérifie le flux pour les nouveaux fichiers sur la zone de préparation et insère les données du fichier dans la table prod_reviews
.
L’instruction suivante crée une tâche planifiée à l’aide du flux créé à l’étape 2. La tâche utilise la fonction SYSTEM$STREAM_HAS_DATA pour vérifier si le flux contient des enregistrements de capture de données modifiées (CDC).
CREATE OR REPLACE TASK load_new_file_data
WAREHOUSE = 'MY_WAREHOUSE'
SCHEDULE = '1 minute'
COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
WHEN
SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
AS
INSERT INTO prod_reviews (
SELECT relative_path as file_name,
PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
FROM my_pdf_stream
WHERE METADATA$ACTION='INSERT'
);
Étape 6 : Exécution de la tâche pour tester le pipeline¶
Pour vérifier que le pipeline fonctionne, vous pouvez ajouter des fichiers à la zone de préparation, exécuter manuellement la tâche, puis interroger la table product_reviews
.
Commencez par ajouter quelques fichiers PDF à la zone de préparation my_pdf_stage
, puis actualisez la zone de préparation.
Note
Cet exemple utilise des commandes PUT, qui ne peuvent pas être exécutées à partir d’une feuille de calcul dans l’interface Web de Snowflake. Pour charger des fichiers avec Snowsight, voir Chargement de fichiers dans une zone de préparation interne nommée.
PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
ALTER STAGE my_pdf_stage REFRESH;
Vous pouvez interroger le flux pour vérifier qu’il a enregistré les deux fichiers PDF que nous avons ajoutés à la zone de préparation.
SELECT * FROM my_pdf_stream;
Maintenant, exécutez la tâche pour traiter les fichiers PDF et mettre à jour la table product_reviews
.
EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Interrogez la table product_reviews
pour voir que la tâche a ajouté une ligne pour chaque fichier PDF.
select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME | FILE_DATA |
|------------------+----------------------------------|
| prod_review1.pdf | { |
| | "FirstName": "John", |
| | "LastName": "Johnson", |
| | "Middle Name": "Michael", |
| | "Product": "Tennis Shoes", |
| | "Purchase Date": "03/15/2022", |
| | "Recommend": "Yes" |
| | } |
| prod_review2.pdf | { |
| | "FirstName": "Emily", |
| | "LastName": "Smith", |
| | "Middle Name": "Ann", |
| | "Product": "Red Skateboard", |
| | "Purchase Date": "01/10/2023", |
| | "Recommend": "MayBe" |
| | } |
+------------------+----------------------------------+
Enfin, vous pouvez créer une vue qui analyse les objets de la colonne FILE_DATA
dans des colonnes distinctes. Vous pouvez ensuite interroger la vue pour analyser et travailler avec le contenu du fichier.
CREATE OR REPLACE VIEW prod_review_info_v
AS
WITH file_data
AS (
SELECT
file_name
, parse_json(file_data) AS file_data
FROM prod_reviews
)
SELECT
file_name
, file_data:FirstName::varchar AS first_name
, file_data:LastName::varchar AS last_name
, file_data:"Middle Name"::varchar AS middle_name
, file_data:Product::varchar AS product
, file_data:"Purchase Date"::date AS purchase_date
, file_data:Recommend::varchar AS recommended
, build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
FROM file_data;
SELECT * FROM prod_review_info_v;
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John | Johnson | Michael | Tennis Shoes | 2022-03-15 | Yes | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d |
| prod_review2.pdf | Emily | Smith | Ann | Red Skateboard | 2023-01-10 | MayBe | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+