Java UDFs または UDTFs を使用した非構造化データの処理

このトピックでは、 Java UDFs (ユーザー定義関数)または 表形式のJava UDFs (ユーザー定義テーブル関数)を使用して、ステージングされたファイル内の非構造化データを読み取って処理する例を提供します。

Javaを使用して UDF ハンドラーを開発する方法の詳細については、以下をご参照ください。

Java UDF ハンドラーコードを使用してファイルから読み取る方法の詳細については、 IMPORTS を使用したファイルの読み取り をご参照ください。

このトピックの内容:

Java UDF の例

このセクションの例では、Java UDFs を使用してステージングされた非構造化ファイルを処理します。これにより、ファイルからテキストが抽出されて返されます。

前提条件: ステージを作成する

例では、(事前にコンパイルされたJava UDFs ではなく) インラインJava UDFs を使用しています。つまり、ステージのために UDF のJavaコードをコンパイル、パッケージ化、およびアップロードする必要はありません。

ただし、これらの例は、 JAR ファイルにパッケージ化されている別のライブラリに依存しています。そのライブラリの JAR ファイルをステージにアップロードする必要があります。例では、内部ステージを使用して、このライブラリの JAR ファイルを格納しています。

Java UDF によって処理される非構造化データファイルは、 JAR ファイルと同じステージに配置できますが、これらの例では、データファイルは別の内部ステージに配置されています。

ユーザー定義関数の権限の付与 で説明されているように、最低限必要な権限を持つロールを使用してステージを作成します。

次の SQL ステートメントは、例のための JAR ファイルとデータファイルを個別に格納する、個別の内部ステージを作成します。

-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

PDF ファイルを処理する

この例では、 Apache PDFBox を使用して、指定された PDF ファイルのコンテンツを抽出します。

次のステップを実行してJava UDF を作成し、必要なファイルをアップロードします。

  1. Apache PDFBox の JAR ファイルをローカル仮ディレクトリから JAR ファイルを格納するステージにコピーします。

    Linux/Mac
    PUT file:///tmp/pdfbox-app-2.0.25.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\pdfbox-app-2.0.25.jar @jars_stage AUTO_COMPRESS=FALSE;
    
  2. Java UDF を作成して PDF ドキュメントを解析し、各ドキュメントからコンテンツを取得します。UDF コードでは、 SnowflakeFile クラスまたは InputStream クラスのいずれかを使用できます。

    SnowflakeFile クラスの使用
    CREATE FUNCTION process_pdf(file string)
    RETURNS string
    LANGUAGE java
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.25.jar')
    HANDLER = 'PdfParser.readFile'
    as
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(String fileURL) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(fileURL);
            try (PDDocument document = PDDocument.load(file.getInputStream())) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
    
            return null;
        }
    }
    $$;
    
    InputStream クラスの使用
    CREATE FUNCTION process_pdf(file string)
    RETURNS string
    LANGUAGE java
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.25.jar')
    HANDLER = 'PdfParser.readFile'
    as
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(InputStream stream) throws IOException {
            try (PDDocument document = PDDocument.load(stream)) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
            return null;
        }
    }
    $$;
    
  3. PDF ファイルをローカル仮ディレクトリからデータファイルを格納するステージにコピーします。

    Linux/Mac
    PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
  4. data_stage ステージのディレクトリテーブルを更新します。

    ALTER STAGE data_stage REFRESH;
    

Java UDF を呼び出して、1つ以上のステージングされた PDF ファイルを読み取り、コンテンツを抽出します。

-- Input a stage name and file path.
SELECT process_pdf('@data_stage/myfile.pdf');

-- Input a file URL generated by the BUILD_STAGE_FILE_URL function.
SELECT process_pdf(build_stage_file_url('@data_stage', '/myfile.pdf'));

-- Input a file URL output from the BUILD_STAGE_FILE_URL function.
SELECT process_pdf('https://myaccount.snowflakecomputing.com/api/files/mydb/myschema/data_stage/myfile.pdf');

-- Input a scoped URL.
SELECT process_pdf(build_scoped_file_url('@data_stage', '/myfile.pdf'));

-- Process all of the PDF files in a directory table serially.
SELECT process_pdf(file_url)
  FROM directory(@data_stage);

-- Process all of the PDF files in a directory table in parallel.
SELECT process_pdf(file_url)
FROM (
    SELECT file_url
    FROM directory(@data_stage)
    GROUP BY file_url
);

Java UDTF の例

このセクションの例では、Java UDTFs を使用し、てステージングされたファイルからデータを抽出して返します。

前提条件: データステージを作成する

ユーザー定義関数の権限の付与 で説明されているように、最低限必要な権限を持つロールを使用してデータファイルを格納するステージを作成します。

次の SQL ステートメントは、例のためのデータファイルを格納する、内部ステージを作成します。

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

CSV ファイルを処理する

この例では、指定された CSV ファイルのセットのコンテンツを抽出し、テーブルの行を返します。

次のステップを実行してJava UDTF を作成し、必要なファイルをアップロードします。

  1. SnowflakeFile クラスを使用するJava UDTF を作成します。

    CREATE OR REPLACE FUNCTION parse_csv(file string)
    RETURNS TABLE (col1 string, col2 string, col3 string )
    LANGUAGE JAVA
    HANDLER = 'CsvParser'
    as
    $$
    import org.xml.sax.SAXException;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Stream;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    public class CsvParser {
        public class Record {
            public String col1;
            public String col2;
            public String col3;
    
            public Record(String col1_value, String col2_value, String col3_value)
            {
                col1 = col1_value;
                col2 = col2_value;
                col3 = col3_value;
            }
        }
    
        public static Class getOutputClass() {
            return Record.class;
        }
    
        public Stream<Record> process(String file_url) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(file_url);
    
            String csvRecord = null;
            List<Record> rows = new ArrayList<>();
            BufferedReader csvReader = null;
    
            try {
                csvReader = new BufferedReader(new InputStreamReader(file.getInputStream()));
                while ((csvRecord = csvReader.readLine()) != null) {
                    String[] columns = csvRecord.split(",", 3);
                    rows.add(new Record(columns[0], columns[1], columns[2]));
                }
            } catch (IOException e) {
                throw new RuntimeException("Reading CSV failed.", e);
            } finally {
                if (csvReader != null)
                    try {
                        csvReader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            }
    
            return rows.stream();
        }
    }
    $$
    ;
    
  2. PDF ファイルをローカル仮ディレクトリからデータファイルを格納するステージにコピーします。

    Linux/Mac
    PUT file:///tmp/sample.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\sample.pdf @data_stage AUTO_COMPRESS=FALSE;
    
  3. data_stage ステージのディレクトリテーブルを更新します。

    ALTER STAGE data_stage REFRESH;
    

Java UDTF を呼び出して、1つ以上のステージングされた CSV ファイルを読み取り、コンテンツをテーブル形式で抽出します。

-- Input a file URL.
SELECT * FROM TABLE(PARSE_CSV(BUILD_STAGE_FILE_URL(@data_stage, 'sample.csv')));
最上部に戻る