Verarbeiten von unstrukturierten Daten mit Java-UDFs oder Java-UDTFs

Unter diesem Thema werden Beispiele für das Lesen und Verarbeiten unstrukturierter Daten in Stagingdateien mit Java-UDFs (benutzerdefinierte Funktionen) oder tabellarischen Java-UDFs (benutzerdefinierte Tabellenfunktionen) bereitgestellt.

Weitere Informationen zur Verwendung von Java für die Entwicklung von UDF-Handlern finden Sie unter:

Weitere Informationen zum Lesen aus einer Datei mit Java-UDF-Handlercode finden Sie unter Lesen einer Datei mit IMPORTS.

Unter diesem Thema:

Java-UDF-Beispiele

Das Beispiel in diesem Abschnitt verarbeitet unstrukturierte Stagingdateien mit Java UDFs, die Text aus den Dateien extrahieren und zurückgeben.

Voraussetzungen: Stagingbereiche erstellen

Die Beispiele verwenden Inline-Java-UDFs (im Gegensatz zu vorkompiliertem Java-UDFs), was bedeutet, dass Sie den Java-Code für Ihre UDF nicht kompilieren, verpacken und in einen Stagingbereich hochladen müssen.

Die Beispiele hängen jedoch von einer separaten Bibliothek ab, die in einer JAR-Datei gepackt ist. Sie müssen die JAR-Datei für diese Bibliothek in einen Stagingbereich hochladen. In den Beispielen wird zum Speichern der JAR-Datei dieser Bibliothek ein interner Stagingbereich verwendet.

Obwohl die unstrukturierten Datendateien, die von einer Java-UDF verarbeitet werden, in demselben Stagingbereich wie die JAR-Dateien gespeichert werden können, befinden sich die Datendateien in diesen Beispielen in einem separaten internen Stagingbereich.

Erstellen Sie den Stagingbereich mit einer Rolle, die über die erforderlichen Mindestberechtigungen verfügt, wie unter Erteilen von Berechtigungen für benutzerdefinierte Funktionen beschrieben.

Mit den folgenden SQL-Anweisungen können separate interne Stagingbereiche erstellt werden, um die JAR-Dateien und die Datendateien für die Beispiele getrennt zu speichern:

-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

PDF-Dateien verarbeiten

Dieses Beispiel extrahiert den Inhalt einer angegebenen PDF-Datei mit Apache PDFBox.

Führen Sie die folgenden Schritte aus, um die Java-UDF zu erstellen und die erforderlichen Dateien hochzuladen:

  1. Kopieren Sie die JAR-Datei für Apache PDFBox aus dem lokalen temporären Verzeichnis in den Stagingbereich, in dem JAR-Dateien gespeichert werden:

    Linux/Mac
    PUT file:///tmp/pdfbox-app-2.0.25.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\pdfbox-app-2.0.25.jar @jars_stage AUTO_COMPRESS=FALSE;
    
  2. Erstellen Sie eine Java-UDF, um PDF-Dokumente zu parsen und den Inhalt aus jedem Dokument abzurufen. Sie können entweder die Klasse SnowflakeFile oder die Klasse InputStream in Ihrem UDF-Code verwenden:

    Verwenden der Klasse SnowflakeFile
    CREATE FUNCTION process_pdf(file string)
    RETURNS string
    LANGUAGE java
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.25.jar')
    HANDLER = 'PdfParser.readFile'
    as
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(String fileURL) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(fileURL);
            try (PDDocument document = PDDocument.load(file.getInputStream())) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
    
            return null;
        }
    }
    $$;
    
    Verwenden der Klasse InputStream
    CREATE FUNCTION process_pdf(file string)
    RETURNS string
    LANGUAGE java
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.25.jar')
    HANDLER = 'PdfParser.readFile'
    as
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(InputStream stream) throws IOException {
            try (PDDocument document = PDDocument.load(stream)) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
            return null;
        }
    }
    $$;
    
  3. Kopieren Sie die PDF-Datei aus dem lokalen temporären Verzeichnis in den Stagingbereich, in dem Datendateien gespeichert werden:

    Linux/Mac
    PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
  4. Aktualisieren Sie die Verzeichnistabelle für den Stagingbereich data_stage:

    ALTER STAGE data_stage REFRESH;
    

Rufen Sie die Java-UDF auf, um eine oder mehrere PDF-Stagingdateien zu lesen und deren Inhalt zu extrahieren:

-- Input a stage name and file path.
SELECT process_pdf('@data_stage/myfile.pdf');

-- Input a file URL generated by the BUILD_STAGE_FILE_URL function.
SELECT process_pdf(build_stage_file_url('@data_stage', '/myfile.pdf'));

-- Input a file URL output from the BUILD_STAGE_FILE_URL function.
SELECT process_pdf('https://myaccount.snowflakecomputing.com/api/files/mydb/myschema/data_stage/myfile.pdf');

-- Input a scoped URL.
SELECT process_pdf(build_scoped_file_url('@data_stage', '/myfile.pdf'));

-- Process all of the PDF files in a directory table serially.
SELECT process_pdf(file_url)
  FROM directory(@data_stage);

-- Process all of the PDF files in a directory table in parallel.
SELECT process_pdf(file_url)
FROM (
    SELECT file_url
    FROM directory(@data_stage)
    GROUP BY file_url
);

Java-UDTF-Beispiele

Das Beispiel in diesem Abschnitt verarbeitet Stagingdateien mit Java UDTFs, die Text aus den Dateien extrahieren und zurückgeben.

Voraussetzungen: Stagingbereich erstellen

Erstellen Sie einen Stagingbereich zum Speichern Ihrer Datendateien mit einer Rolle, die über die erforderlichen Mindestberechtigungen verfügt, wie unter Erteilen von Berechtigungen für benutzerdefinierte Funktionen beschrieben.

Mit der folgenden SQL-Anweisung wird ein interner Stagingbereich zur Speicherung der Datendateien für das Beispiel erstellt:

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

CSV-Dateien verarbeiten

Dieses Beispiel extrahiert den Inhalt einer angegebenen Menge von CSV-Dateien und gibt die Zeilen in einer Tabelle zurück.

Führen Sie die folgenden Schritte aus, um die Java-UDTF zu erstellen und die erforderlichen Dateien hochzuladen:

  1. Erstellen Sie eine Java-UDTF, die die Klasse SnowflakeFile verwendet:

    CREATE OR REPLACE FUNCTION parse_csv(file string)
    RETURNS TABLE (col1 string, col2 string, col3 string )
    LANGUAGE JAVA
    HANDLER = 'CsvParser'
    as
    $$
    import org.xml.sax.SAXException;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Stream;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    public class CsvParser {
        public class Record {
            public String col1;
            public String col2;
            public String col3;
    
            public Record(String col1_value, String col2_value, String col3_value)
            {
                col1 = col1_value;
                col2 = col2_value;
                col3 = col3_value;
            }
        }
    
        public static Class getOutputClass() {
            return Record.class;
        }
    
        public Stream<Record> process(String file_url) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(file_url);
    
            String csvRecord = null;
            List<Record> rows = new ArrayList<>();
            BufferedReader csvReader = null;
    
            try {
                csvReader = new BufferedReader(new InputStreamReader(file.getInputStream()));
                while ((csvRecord = csvReader.readLine()) != null) {
                    String[] columns = csvRecord.split(",", 3);
                    rows.add(new Record(columns[0], columns[1], columns[2]));
                }
            } catch (IOException e) {
                throw new RuntimeException("Reading CSV failed.", e);
            } finally {
                if (csvReader != null)
                    try {
                        csvReader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            }
    
            return rows.stream();
        }
    }
    $$
    ;
    
  2. Kopieren Sie die PDF-Datei aus dem lokalen temporären Verzeichnis in den Stagingbereich, in dem Datendateien gespeichert werden:

    Linux/Mac
    PUT file:///tmp/sample.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\sample.pdf @data_stage AUTO_COMPRESS=FALSE;
    
  3. Aktualisieren Sie die Verzeichnistabelle für den Stagingbereich data_stage:

    ALTER STAGE data_stage REFRESH;
    

Rufen Sie die Java-UDTF auf, um eine oder mehrere CSV-Stagingdateien zu lesen und deren Inhalt in ein Tabellenformat zu extrahieren:

-- Input a file URL.
SELECT * FROM TABLE(PARSE_CSV(BUILD_STAGE_FILE_URL(@data_stage, 'sample.csv')));
Zurück zum Anfang