Verarbeiten von unstrukturierten Daten mit UDF- und Prozedur-Handlern

Unter diesem Thema werden Beispiele für das Lesen und Verarbeiten unstrukturierter Daten in Stagingdateien mit Handler-Code bereitgestellt, der für Folgendes geschrieben wurde:

Sie können auch eine Datei mit Handlern lesen, die in anderen Sprachen geschrieben sind:

Python:
Scala:

Bemerkung

Um Ihren Code widerstandsfähig gegen Angriffe per Dateieinschleusung zu machen, verwenden Sie immer eine Bereichs-URL, wenn Sie den Speicherort einer Datei an eine UDF übergeben, insbesondere wenn der Aufrufer der Funktion nicht auch ihr Eigentümer ist. Mit der integrierten Funktion BUILD_SCOPED_FILE_URL können Sie eine Bereichs-URL in SQL erstellen. Weitere Informationen zur Funktion BUILD_SCOPED_FILE_URL finden Sie unter Einführung in unstrukturierte Daten.

Unter diesem Thema:

PDF mit UDF und Prozedur verarbeiten

Die Beispiele in diesem Abschnitt verarbeiten Stagingdateien mit unstrukturierten Daten mithilfe von Java-Handler-Code – zunächst mit einer UDF, dann mit einer Prozedur. Beide Handler extrahieren den Inhalt einer angegebenen PDF-Datei unter Verwendung der Apache PDFBox-Bibliothek.

Der Handler-Code von UDF und Prozedur ist sehr ähnlich. Er unterscheidet sich darin, wie die übergebene PDF-Datei gelesen wird.

  • Bei der UDF liest der Handler die Datei mithilfe eines Java-InputStream.

  • Bei der Prozedur liest der Handler die Datei mithilfe eines Snowflake-SnowflakeFile.

Die Beispiele verwenden den Handler-Code inline (im Gegensatz zu kompiliertem Code in einer JAR-Datei in einem Stagingbereich), was bedeutet, dass Sie den Handler-Code nicht kompilieren, packen und in einen Stagingbereich hochladen müssen. Weitere Informationen zum Unterschied zwischen Inline-Handler und Staging-Handler finden Sie unter Speichern von Handler-Code inline oder in einem Stagingbereich.

PDFBox-Bibliothek herunterladen

Bevor Sie mit dem Schreiben der UDF beginnen, laden Sie die JAR-Datei mit der PDFBox-Bibliothek herunter, falls Sie sie noch nicht haben. Diese wird eine Abhängigkeit für Ihren Handler-Code sein. Sie werden die JAR-Bibliotheksdatei später in einen Stagingbereich hochladen.

Laden Sie die neueste Version der Bibliothek von der Download-Seite für die Apache-PDFBox-Bibliothek herunter.

Stagingbereiche erstellen

Erstellen Sie Stagingbereiche, in denen Sie die Bibliotheken für die Abhängigkeiten Ihres Handler-Codes und die Datendatei, die der Handler-Code lesen wird, aufbewahren.

Mit dem nachstehenden Code erstellen Sie separate interne Stagingbereiche, um Folgendes aufzubewahren:

  • Eine JAR-Bibliotheksdatei, die eine Abhängigkeit für Ihren Handler darstellt. Sie referenzieren den Stagingbereich und die JAR-Datei über die UDF.

  • Eine Datendatei, die von Ihrem Handler-Code gelesen wird.

Der Code im folgenden Beispiel verwendet den Befehl CREATE STAGE, um die Stagingbereiche zu erstellen, die Sie benötigen.

-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
Copy

Benötigte Bibliothek und zu lesende PDF-Datei hochladen

Führen Sie die folgenden Schritte aus, um die JAR-Abhängigkeitsdatei (mit dem Bibliothekscode, der die PDF verarbeitet) und die Datendatei (die PDF-Datei, die der Handler-Code verarbeiten wird) hochzuladen.

Sie können in diesem Beispiel eine PDF-Datei Ihrer Wahl verwenden.

  1. Kopieren Sie die JAR-Datei für Apache PDFBox aus dem lokalen temporären Verzeichnis in den Stagingbereich, in dem JAR-Dateien gespeichert werden:

    Linux/Mac:
    PUT file:///tmp/pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Copy
    Windows:
    PUT file://C:\temp\pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Copy
  2. Kopieren Sie die PDF-Datei aus dem lokalen temporären Verzeichnis in den Stagingbereich, in dem Datendateien gespeichert werden:

    Linux/Mac:
    PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Copy

UDF erstellen und aufrufen

Führen Sie die folgenden Schritte aus, um eine UDF zu erstellen, die PDF-Dateien liest und verarbeitet.

  1. Fügen Sie den folgenden Code zum Erstellen einer UDF ein, und führen Sie ihn aus.

    Der Handler dieser UDF parst PDF-Dokumente und ruft deren Inhalt ab. Der Handler verwendet die Klasse InputStream, um die Datei zu lesen. Weitere Informationen zum Lesen von Dateien mit InputStream finden Sie unter Lesen einer mit InputStream dynamisch spezifizierten Datei.

    CREATE FUNCTION process_pdf_func(file STRING)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.27.jar')
    HANDLER = 'PdfParser.readFile'
    AS
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(InputStream stream) throws IOException {
            try (PDDocument document = PDDocument.load(stream)) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
            return null;
        }
    }
    $$;
    
    Copy
  2. Aktualisieren Sie die Verzeichnistabelle des Stagingbereichs data_stage mit dem Befehl ALTER STAGE:

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Rufen Sie die UDF auf, um die PDF-Datei im Stagingbereich zu lesen und den Inhalt zu extrahieren.

    Der Code im folgenden Beispiel ruft die UDF auf und übergibt eine Bereichs-URL, um den Code widerstandsfähig gegen Angriffe per Dateieinschleusung zu machen. Verwenden Sie immer eine Bereichs-URL, wenn der Aufrufer der Funktion nicht auch dessen Eigentümer ist. Sie können das URL-Argument als Bereichs-URL oder in einem anderen Format übergeben, wenn der Aufrufer der UDF auch dessen Eigentümer ist.

    SELECT process_pdf_func(BUILD_SCOPED_FILE_URL('@data_stage', '/myfile.pdf'));
    
    Copy

Prozedur erstellen und aufrufen

Führen Sie die folgenden Schritte aus, um eine Prozedur zu erstellen, die PDF-Dateien liest und verarbeitet.

  1. Fügen Sie den folgenden Code zum Erstellen einer Prozedur ein, und führen Sie ihn aus.

    Der Handler dieser Prozedur parst PDF-Dokumente und ruft deren Inhalt ab. Der Handler verwendet die Klasse SnowflakeFile, um die Datei zu lesen. Weitere Informationen zum Lesen von Dateien mit SnowflakeFile finden Sie unter Lesen einer mit SnowflakeFile dynamisch spezifizierten Datei.

    CREATE PROCEDURE process_pdf_proc(file STRING)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.28.jar')
    HANDLER = 'PdfParser.readFile'
    PACKAGES = ('com.snowflake:snowpark:latest')
    AS
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    import com.snowflake.snowpark_java.Session;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(Session session, String fileURL) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(fileURL);
            try (PDDocument document = PDDocument.load(file.getInputStream())) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
    
            return null;
        }
    }
    $$;
    
    Copy
  2. Aktualisieren Sie die Verzeichnistabelle des Stagingbereichs data_stage mit dem Befehl ALTER STAGE:

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Rufen Sie die Prozedur auf, um die PDF-Datei im Stagingbereich zu lesen und den Inhalt zu extrahieren.

    Der Code im folgenden Beispiel übergibt eine Bereichs-URL, die auf die PDF-Datei in dem Stagingbereich verweist, den Sie erstellt haben.

    CALL process_pdf_proc(BUILD_SCOPED_FILE_URL('@data_stage', '/UsingThird-PartyPackages.pdf'));
    
    Copy

CSV-Datei mit einer UDTF verarbeiten

Das Beispiel in diesem Abschnitt verarbeitet Stagingdateien mit Java UDTFs, die Text aus den Dateien extrahieren und zurückgeben.

Daten-Stagingbereich erstellen

Erstellen Sie einen Stagingbereich mit dem Befehl CREATE STAGE:

Mit der folgenden SQL-Anweisung wird ein interner Stagingbereich zur Speicherung der Datendateien für das Beispiel erstellt:

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
Copy

Zu lesende CSV-Datei hochladen

Kopieren Sie die CSV-Datei aus dem lokalen temporären Verzeichnis in den Stagingbereich, in dem Datendateien gespeichert werden:

Linux/Mac:
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
Copy
Windows:
PUT file://C:\temp\sample.csv @data_stage AUTO_COMPRESS=FALSE;
Copy

UDTF erstellen und aufrufen

Dieses Beispiel extrahiert den Inhalt einer angegebenen Menge von CSV-Dateien und gibt die Zeilen in einer Tabelle zurück. Durch die Verarbeitung von Dateidaten beim Lesen aus der Quelle können Sie potenzielle Speicherplatzprobleme vermeiden, die bei sehr großen Dateien auftreten können.

Der Code im folgenden UDTF-Handler-Beispiel verwendet SnowflakeFile, um einen InputStream aus einer Datei-URL zu generieren und eine CSV-Datei zu lesen. (In einem Java-UDTF-Handler beginnt die Zeilenverarbeitung, wenn Snowflake die von Ihnen implementierte process-Methode aufruft). Der Code verwendet den Stream, wenn er eine Instanz einer CsvStreamingReader-Klasse konstruiert, die im Handler selbst definiert ist.

Die Klasse CsvStreamingReader liest den Inhalt des empfangenen CSV-Dateistreams zeilenweise und bietet eine Möglichkeit für andere Codes, jede Zeile als Datensatz abzurufen, wobei die Spalten durch Kommas getrennt werden. Mit der process-Methode wird jeder Datensatz zurückgegeben, wenn er aus dem Stream gelesen wird.

Weitere Informationen zum Schreiben von benutzerdefinierten Tabellenfunktionen (UDTFs) mit einem Java-Handler finden Sie unter Tabellarische Java-UDFs (UDTFs).

Führen Sie die folgenden Schritte aus, um die Java-UDTF zu erstellen und die erforderlichen Dateien hochzuladen:

  1. Erstellen Sie eine Java-UDTF, die die Klasse SnowflakeFile verwendet:

    CREATE OR REPLACE FUNCTION parse_csv(file STRING)
    RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING )
    LANGUAGE JAVA
    HANDLER = 'CsvParser'
    AS
    $$
    import org.xml.sax.SAXException;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Stream;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    public class CsvParser {
    
      static class Record {
        public String col1;
        public String col2;
        public String col3;
    
        public Record(String col1_value, String col2_value, String col3_value)
        {
          col1 = col1_value;
          col2 = col2_value;
          col3 = col3_value;
        }
      }
    
      public static Class getOutputClass() {
        return Record.class;
      }
    
      static class CsvStreamingReader {
        private final BufferedReader csvReader;
    
        public CsvStreamingReader(InputStream is) {
          this.csvReader = new BufferedReader(new InputStreamReader(is));
        }
    
        public void close() {
          try {
            this.csvReader.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
    
        Record getNextRecord() {
          String csvRecord;
    
          try {
            if ((csvRecord = csvReader.readLine()) != null) {
              String[] columns = csvRecord.split(",", 3);
              return new Record(columns[0], columns[1], columns[2]);
            }
          } catch (IOException e) {
            throw new RuntimeException("Reading CSV failed.", e);
          } finally {
            // No more records, we can close the reader.
            close();
          }
    
          // Return null to indicate the end of the stream.
          return null;
        }
      }
    
      public Stream<Record> process(String file_url) throws IOException {
        SnowflakeFile file = SnowflakeFile.newInstance(file_url);
    
        CsvStreamingReader csvReader = new CsvStreamingReader(file.getInputStream());
        return Stream.generate(csvReader::getNextRecord);
      }
    }
    $$
    ;
    
    Copy
  2. Aktualisieren Sie die Verzeichnistabelle für den Stagingbereich data_stage:

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Rufen Sie die Java-UDTF auf, um eine oder mehrere CSV-Stagingdateien zu lesen und deren Inhalt in ein Tabellenformat zu extrahieren:

    Der Code im folgenden Beispiel ruft die UDF auf und übergibt eine Bereichs-URL, um das Risiko von Angriffen per Dateieinschleusung zu reduzieren. Es wird immer eine Bereichs-URL verwendet, wenn der Aufrufer der Funktion nicht auch dessen Eigentümer ist. Das URL-Argument kann als Bereichs-URL oder in einem anderen unterstützten Format übergeben werden, wenn der Aufrufer der UDF auch dessen Eigentümer ist.

    -- Input a file URL.
    SELECT * FROM TABLE(PARSE_CSV(BUILD_SCOPED_FILE_URL(@data_stage, 'sample.csv')));
    
    Copy