Traitement des données non structurées avec des gestionnaires d’UDF et de procédures

Cette rubrique fournit des exemples de lecture et de traitement de données non structurées dans des fichiers en zone de préparation avec un code de gestionnaire écrit pour les éléments suivants :

Vous pouvez également lire un fichier avec des gestionnaires écrits dans d’autres langages :

Python
Scala

Note

Pour rendre votre code résistant aux attaques par injection de fichiers, utilisez toujours une URL scopée lorsque vous transmettez l’emplacement d’un fichier à une UDF, en particulier lorsque l’appelant de la fonction n’est pas également son propriétaire. Vous pouvez créer une URL scopée en SQL à l’aide de la fonction intégrée BUILD_SCOPED_FILE_URL. Pour plus d’informations sur le rôle de BUILD_SCOPED_FILE_URL, voir Introduction au chargement de données non structurées.

Dans ce chapitre :

Traiter un PDF avec une UDF et une procédure

Les exemples de cette section traitent des fichiers en zone de préparation non structurés à l’aide d’un code de gestionnaire Java – d’abord avec une UDF, puis avec une procédure. Les deux gestionnaires extraient le contenu d’un fichier PDF spécifié à l’aide de la bibliothèque Apache PDFBox.

Le code du gestionnaire est très similaire entre l’UDF et la procédure. Ils diffèrent dans la manière dont elles lisent le fichier PDF entrant.

  • Dans l’UDF, le gestionnaire lit le fichier à l’aide d’un InputStream Java.

  • Dans la procédure, le gestionnaire lit le fichier à l’aide d’un SnowflakeFile Snowflake.

Les exemples utilisent du code de gestionnaire en ligne (par opposition à un code compilé dans un JAR en zone de préparation), ce qui signifie que vous ne devez pas compiler, empaqueter, ni charger le code de gestionnaire vers une zone de préparation. Pour plus d’informations sur la différence entre les gestionnaires en ligne et les gestionnaires en zone de préparation, voir Conserver le code du gestionnaire en ligne ou dans une zone de préparation.

Télécharger la bibliothèque PDFBox

Avant de commencer à écrire l’UDF, chargez le fichier JAR de la bibliothèque PDFBox si vous ne l’avez pas déjà. Il s’agit d’une dépendance pour votre code de gestionnaire. Vous chargerez ultérieurement le fichier JAR de la bibliothèque vers une zone de préparation.

Téléchargez la dernière version de la bibliothèque à partir de la page de téléchargement de la bibliothèque PDFBox Apache.

Créer des zones de préparation

Créez des zones de préparation dans lesquelles vous conserverez les bibliothèques de dépendances de votre code de gestionnaire et le fichier de données que le code de gestionnaire lira.

En utilisant le code ci-dessous, vous créerez des zones de préparation internes distinctes pour contenir :

  • Un fichier JAR de la bibliothèque qui est une dépendance de votre gestionnaire. Vous ferez référence à la zone de préparation et au fichier JAR en zone de préparation à partir de l’UDF.

  • Un fichier de données que votre code de gestionnaire lira.

Le code de l’exemple suivant utilise la commande CREATE STAGE pour créer les zones de préparation dont vous aurez besoin.

-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
Copy

Charger la bibliothèque requise et le fichier PDF à lire

Effectuez les étapes suivantes pour charger le fichier JAR de dépendance (avec le code de la bibliothèque qui traite le PDF) et le fichier de données (le fichier PDF que le code du gestionnaire traitera).

Vous pouvez utiliser le fichier PDF de votre choix dans cet exemple.

  1. Copiez le fichier JAR pour Apache PDFBox du répertoire temporaire local vers la zone de préparation qui stocke les fichiers JAR :

    Linux/Mac
    PUT file:///tmp/pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Copy
    Windows
    PUT file://C:\temp\pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Copy
  2. Copiez le fichier PDF du répertoire temporaire local vers la zone de préparation qui stocke les fichiers de données :

    Linux/Mac
    PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Copy
    Windows
    PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Copy

Créer et appeler l’UDF

Suivez les étapes suivantes pour créer une UDF qui lit et traite les fichiers PDF.

  1. Collez et exécutez le code suivant pour créer une UDF.

    Le gestionnaire d’UDF analyse les documents PDF et en extrait le contenu. Le gestionnaire utilise la classe InputStream pour lire le fichier. Pour en savoir plus sur la lecture de fichiers avec InputStream, reportez-vous à Lecture d’un fichier spécifié de façon dynamique avec InputStream.

    CREATE FUNCTION process_pdf_func(file STRING)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.27.jar')
    HANDLER = 'PdfParser.readFile'
    AS
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(InputStream stream) throws IOException {
            try (PDDocument document = PDDocument.load(stream)) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
            return null;
        }
    }
    $$;
    
    Copy
  2. Actualisez la table du répertoire pour la zone de préparation data_stage avec la commande ALTER STAGE :

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Appelez l’UDF pour lire le fichier PDF en zone de préparation et en extraire le contenu.

    Le code de l’exemple suivant appelle l’UDF, en lui passant une URL scopée pour rendre le code résistant aux attaques par injection de fichier. Vous devez toujours utiliser une URL scopée lorsque l’appelant de la fonction n’est pas également son propriétaire. Vous pouvez passer l’argument URL sous la forme d’une URL scopée ou sous une autre forme lorsque l’appelant de l’UDF est également son propriétaire.

    SELECT process_pdf_func(BUILD_SCOPED_FILE_URL('@data_stage', '/myfile.pdf'));
    
    Copy

Créer et appeler la procédure

Effectuez les étapes suivantes pour créer une procédure qui lit et traite les fichiers PDF.

  1. Collez et exécutez le code suivant pour créer une procédure.

    Ce gestionnaire de procédure analyse les documents PDF et récupère leur contenu. Le gestionnaire utilise la classe SnowflakeFile pour lire le fichier. Pour en savoir plus sur la lecture de fichiers avec SnowflakeFile, reportez-vous à Lecture d’un fichier spécifié de façon dynamique avec SnowflakeFile.

    CREATE PROCEDURE process_pdf_proc(file STRING)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.28.jar')
    HANDLER = 'PdfParser.readFile'
    PACKAGES = ('com.snowflake:snowpark:latest')
    AS
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    import com.snowflake.snowpark_java.Session;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(Session session, String fileURL) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(fileURL);
            try (PDDocument document = PDDocument.load(file.getInputStream())) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
    
            return null;
        }
    }
    $$;
    
    Copy
  2. Actualisez la table du répertoire pour la zone de préparation data_stage avec la commande ALTER STAGE :

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Appelez la procédure pour lire le fichier PDF en zone de préparation et en extraire le contenu.

    Le code de l’exemple suivant transmet une URL scopée pointant vers le fichier PDF sur la zone de préparation que vous avez créée.

    CALL process_pdf_proc(BUILD_SCOPED_FILE_URL('@data_stage', '/UsingThird-PartyPackages.pdf'));
    
    Copy

Traiter un fichier CSV avec une UDTF

L’exemple de cette section extrait et renvoie des données à partir de fichiers en zone de préparation en utilisant des UDTFs Java.

Créer une zone de préparation de données

Créez une zone de préparation à l’aide de la commande CREATE STAGE :

L’instruction SQL suivante crée une zone de préparation interne pour stocker les fichiers de données de l’exemple :

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
Copy

Charger le fichier CSV à lire

Copiez le fichier CSV du répertoire temporaire local vers la zone de préparation qui stocke les fichiers de données :

Linux/Mac
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
Copy
Windows
PUT file://C:\temp\sample.csv @data_stage AUTO_COMPRESS=FALSE;
Copy

Créer et appeler l’UDTF

Cet exemple extrait le contenu d’un ensemble spécifié de fichiers CSV et renvoie les lignes dans une table. En traitant les données du fichier au fur et à mesure qu’elles sont lues à partir de la source, vous pouvez éviter les erreurs potentielles de mémoire qui peuvent survenir lorsque le fichier est très volumineux.

Le code de l’exemple de gestionnaire UDTF suivant utilise SnowflakeFile pour générer un InputStream à partir d’une URL de fichier pour lire un fichier CSV. (Dans un gestionnaire UDTF Java, le traitement des lignes commence lorsque Snowflake appelle la méthode process que vous implémentez). Le code utilise le flux lors de la construction d’une instance d’une classe CsvStreamingReader définie dans le gestionnaire lui-même.

La classe CsvStreamingReader lit le contenu du flux de fichiers CSV reçu ligne par ligne, ce qui permet à d’autres codes d’extraire chaque ligne sous la forme d’un enregistrement dont les colonnes sont délimitées par des virgules. La méthode process renvoie chaque enregistrement au fur et à mesure qu’il est lu dans le flux.

Pour en savoir plus sur l’écriture de fonctions tabulaires définies par l’utilisateur (UDTFs) avec un gestionnaire Java, consultez UDFs Java tabulaires (UDTFs).

Effectuez les étapes suivantes pour créer l’UDTF Java et charger les fichiers requis :

  1. Créez une UDTF Java qui utilise la classe SnowflakeFile :

    CREATE OR REPLACE FUNCTION parse_csv(file STRING)
    RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING )
    LANGUAGE JAVA
    HANDLER = 'CsvParser'
    AS
    $$
    import org.xml.sax.SAXException;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Stream;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    public class CsvParser {
    
      static class Record {
        public String col1;
        public String col2;
        public String col3;
    
        public Record(String col1_value, String col2_value, String col3_value)
        {
          col1 = col1_value;
          col2 = col2_value;
          col3 = col3_value;
        }
      }
    
      public static Class getOutputClass() {
        return Record.class;
      }
    
      static class CsvStreamingReader {
        private final BufferedReader csvReader;
    
        public CsvStreamingReader(InputStream is) {
          this.csvReader = new BufferedReader(new InputStreamReader(is));
        }
    
        public void close() {
          try {
            this.csvReader.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
    
        Record getNextRecord() {
          String csvRecord;
    
          try {
            if ((csvRecord = csvReader.readLine()) != null) {
              String[] columns = csvRecord.split(",", 3);
              return new Record(columns[0], columns[1], columns[2]);
            }
          } catch (IOException e) {
            throw new RuntimeException("Reading CSV failed.", e);
          } finally {
            // No more records, we can close the reader.
            close();
          }
    
          // Return null to indicate the end of the stream.
          return null;
        }
      }
    
      public Stream<Record> process(String file_url) throws IOException {
        SnowflakeFile file = SnowflakeFile.newInstance(file_url);
    
        CsvStreamingReader csvReader = new CsvStreamingReader(file.getInputStream());
        return Stream.generate(csvReader::getNextRecord);
      }
    }
    $$
    ;
    
    Copy
  2. Actualisez la table du répertoire pour la zone de préparation data_stage :

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Appelez l’UDTF Java pour lire un ou plusieurs fichiers CSV en zone de préparation et extraire le contenu sous forme de table :

    Le code de l’exemple suivant appelle l’UDF, en transmettant une URL scopée pour réduire le risque d’attaques par injection de fichiers. Vous devez toujours utiliser une URL scopée lorsque l’appelant de la fonction n’est pas également son propriétaire. Vous pouvez passer l’argument URL sous la forme d’une URL scopée ou sous une autre forme prise en charge lorsque l’appelant de l’UDF est également son propriétaire.

    -- Input a file URL.
    SELECT * FROM TABLE(PARSE_CSV(BUILD_SCOPED_FILE_URL(@data_stage, 'sample.csv')));
    
    Copy