Création d’un pipeline de traitement de données à l’aide d’une table de répertoire

Vous pouvez combiner une table de répertoire, qui suit et stocke les métadonnées au niveau des fichiers sur une zone de préparation, avec d’autres objets Snowflake tels que des flux et des tâches pour créer un pipeline de traitement de données.

Un flux enregistre les modifications apportées en langage de manipulation des données (DML) à une table de répertoire, une table, une table externe ou les tables sous-jacentes d’une vue. Une tâche exécute une seule action, qui peut être une commande SQL ou une UDF étendue. Vous pouvez planifier une tâche ou l’exécuter à la demande.

Exemple : Création d’un pipeline simple pour traiter des PDFs

Cet exemple crée un pipeline de traitement de données simple qui effectue les opérations suivantes :

  1. Détecte les fichiers PDF ajoutés à une zone de préparation.

  2. Récupère des données à partir des fichiers.

  3. Insère les données dans une table Snowflake.

Le pipeline utilise un flux pour détecter les modifications apportées à une table de répertoire sur la zone de préparation et une tâche qui exécute une fonction définie par l’utilisateur (UDF) pour traiter les fichiers.

Le diagramme suivant résume le fonctionnement de l’exemple de pipeline :

Pipeline de traitement de données simple qui utilise un flux pour suivre les modifications apportées à une table de répertoire.

Étape 1 : Création d’une zone de préparation avec une table de répertoire activée

Créez une zone de préparation interne avec une table de répertoire activée. L’exemple d’instruction définit le type ENCRYPTION sur SNOWFLAKE_SSE pour activer l’accès aux données non structurées sur la zone de préparation.

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

Étape 2 : Création d’un flux sur la table du répertoire

Ensuite, créez un flux sur la table de répertoire en spécifiant la zone de préparation à laquelle appartient la table de répertoire. Le flux suivra les modifications apportées à la table de répertoire. À l’étape 5 de cet exemple, nous utilisons ce flux pour construire une tâche.

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

Étape 3 : Création d’une fonction définie par l’utilisateur pour analyser des PDFs

Créez une fonction définie par l’utilisateur (UDF) qui extrait les données de fichiers PDF. La tâche que nous créons à l’étape 5 appellera cette UDF pour traiter les fichiers venant d’être ajoutés sur la zone de préparation.

L’exemple d’instruction suivant crée une UDF nommée PDF_PARSE qui traite des fichiers PDF contenant des données d’avis sur les produits. L’UDF extrait les données des champs de formulaire à l’aide de la bibliothèque PyPDF2. Elle renvoie un dictionnaire contenant les noms et les valeurs du formulaire sous forme de paires clé-valeur.

Note

L’UDF lit les fichiers spécifiés dynamiquement à l’aide de la classe SnowflakeFile. Pour en savoir plus sur SnowflakeFile, consultez Lecture d’un fichier spécifié de façon dynamique avec SnowflakeFile.

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
  AS
  $$
  from pathlib import Path
  import PyPDF2 as pypdf
  from io import BytesIO
  from snowflake.snowpark.files import SnowflakeFile

  def parse_pdf_fields(file_path):
      with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
      reader = pypdf.PdfFileReader(buffer)
      fields = reader.getFields()
      field_dict = {}
      for k, v in fields.items():
          if "/V" in v.keys():
              field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

      return field_dict
  $$;
Copy

Étape 4 : Création d’une table pour stocker le contenu du fichier

Créez ensuite une table dans laquelle chaque ligne stocke des informations sur un fichier sur la zone de préparation dans des colonnes nommées file_name et file_data. La tâche que nous créons à l’étape 5 de cet exemple chargera les données dans cette table.

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

Étape 5 : Création d’une tâche

Créez une tâche planifiée qui vérifie le flux pour les nouveaux fichiers sur la zone de préparation et insère les données du fichier dans la table prod_reviews.

L’instruction suivante crée une tâche planifiée à l’aide du flux créé à l’étape 2. La tâche utilise la fonction SYSTEM$STREAM_HAS_DATA pour vérifier si le flux contient des enregistrements de capture de données modifiées (CDC).

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

Étape 6 : Exécution de la tâche pour tester le pipeline

Pour vérifier que le pipeline fonctionne, vous pouvez ajouter des fichiers à la zone de préparation, exécuter manuellement la tâche, puis interroger la table product_reviews.

Commencez par ajouter quelques fichiers PDF à la zone de préparation my_pdf_stage, puis actualisez la zone de préparation.

Note

Cet exemple utilise des commandes PUT, qui ne peuvent pas être exécutées à partir d’une feuille de calcul dans l’interface Web de Snowflake. Pour charger des fichiers avec Snowsight, voir Chargement de fichiers dans une zone de préparation interne nommée.

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

Vous pouvez interroger le flux pour vérifier qu’il a enregistré les deux fichiers PDF que nous avons ajoutés à la zone de préparation.

SELECT * FROM my_pdf_stream;
Copy

Maintenant, exécutez la tâche pour traiter les fichiers PDF et mettre à jour la table product_reviews.

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

Interrogez la table product_reviews pour voir que la tâche a ajouté une ligne pour chaque fichier PDF.

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

Enfin, vous pouvez créer une vue qui analyse les objets de la colonne FILE_DATA dans des colonnes distinctes. Vous pouvez ensuite interroger la vue pour analyser et travailler avec le contenu du fichier.

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;

+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Copy