UDF およびプロシージャハンドラーを使用した非構造化データの処理¶
このトピックでは、以下のために記述されたハンドラーコードを使用して、ステージングされたファイルの非構造化データを読み取り、処理する例を紹介します。
また、他の言語で記述されたハンドラーでファイルを読み取ることもできます。
- Python:
- Scala:
注釈
ファイルインジェクション攻撃に対するコードの回復性を高めるには、ファイルの場所を UDF に渡す場合、特に関数の呼び出し元がその所有者でもない場合に、スコープ URL を常に使用します。組み込み関数 BUILD_SCOPED_FILE_URL を使用して、スコープ URL を SQL に作成できます。BUILD_SCOPED_FILE_URL の機能の詳細については、 非構造化データの概要 をご参照ください。
このトピックの内容:
UDF およびプロシージャを使用して PDF を処理する¶
このセクションの例では、Javaハンドラーコードを使用して、最初は UDF、次にプロシージャによりステージングされた非構造化ファイルを処理します。どちらのハンドラーも、 Apacheの PDFBox ライブラリ を使用して、指定された PDF ファイルのコンテンツを抽出します。
ハンドラーコードは、 UDF とプロシージャの間で非常によく似ています。異なるのは、受信する PDF ファイルの読み取り方法です。
UDF では、ハンドラーはJava
InputStream
を使用してファイルを読み取ります。プロシージャでは、ハンドラーはSnowflake
SnowflakeFile
を使用してファイルを読み取ります。
例では、インラインの(ステージングされた JAR でコンパイルされたのとは対照的に)ハンドラーコードを使用しています。つまり、ステージのためにハンドラーコードをコンパイル、パッケージ化、およびアップロードする必要はありません。インラインハンドラーとステージングされたハンドラーの相違に関する詳細については、 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。
PDFBox ライブラリをダウンロードする¶
UDF の記述を開始する前に、 PDFBox ライブラリ JAR ファイルがまだない場合は、このファイルをダウンロードします。ファイルはハンドラーコードと依存関係になります。後でライブラリ JAR ファイルをステージにアップロードします。
Apache PDFBox ライブラリのダウンロードページ からライブラリの最新リリース版をダウンロードします。
ステージを作成する¶
ハンドラーコードの依存関係ライブラリとハンドラーコードが読み取るデータファイルを保持するステージを作成します。
以下のコードを使用して、保持するための内部ステージを個別に作成します。
ハンドラーの依存関係のライブラリ JAR ファイル。UDF から、ステージと JAR ファイルを参照します。
ハンドラーコードが読み取るデータファイル。
次の例のコードは、 CREATE STAGE コマンドを使用して、必要なステージを作成します。
-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;
-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
読み取りに必要なライブラリおよび PDF ファイルをアップロードする¶
以下のステップを完了して、依存関係の JAR ファイル(PDF を処理するライブラリコードがある)とデータファイル(ハンドラーコードが処理する PDF ファイル)をアップロードします。
この例では、お好みの PDF ファイルを使用することができます。
Apache PDFBox の JAR ファイルをローカル仮ディレクトリから JAR ファイルを格納するステージにコピーします。
- Linux/Mac:
PUT file:///tmp/pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
- Windows:
PUT file://C:\temp\pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
PDF ファイルをローカル仮ディレクトリからデータファイルを格納するステージにコピーします。
- Linux/Mac:
PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
- Windows:
PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
UDF を作成して呼び出す¶
以下のステップを完了して、 PDF のファイルを読み取って処理する UDF を作成します。
次のコードを貼り付けて実行し、 UDF を作成します。
この UDF のハンドラーは、 PDF ドキュメントを解析し、そのコンテンツを取得します。ハンドラーは、
InputStream
クラスを使用してファイルを読み取ります。InputStream
を使用したファイルの読み取りについては、 InputStream を使用した動的に指定されたファイルの読み取り をご参照ください。CREATE FUNCTION process_pdf_func(file STRING) RETURNS STRING LANGUAGE JAVA RUNTIME_VERSION = 11 IMPORTS = ('@jars_stage/pdfbox-app-2.0.27.jar') HANDLER = 'PdfParser.readFile' AS $$ import org.apache.pdfbox.pdmodel.PDDocument; import org.apache.pdfbox.text.PDFTextStripper; import org.apache.pdfbox.text.PDFTextStripperByArea; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; public class PdfParser { public static String readFile(InputStream stream) throws IOException { try (PDDocument document = PDDocument.load(stream)) { document.getClass(); if (!document.isEncrypted()) { PDFTextStripperByArea stripper = new PDFTextStripperByArea(); stripper.setSortByPosition(true); PDFTextStripper tStripper = new PDFTextStripper(); String pdfFileInText = tStripper.getText(document); return pdfFileInText; } } return null; } } $$;
ALTER STAGE コマンドを使用して
data_stage
ステージのディレクトリテーブルを更新します。ALTER STAGE data_stage REFRESH;
UDF を呼び出して、ステージングされた PDF ファイルを読み取り、コンテンツを抽出します。
次の例のコードは UDF を呼び出し、スコープ URL を渡して、ファイルインジェクション攻撃に対するコードの回復力を高めます。関数の呼び出し元がその所有者ではない場合は、常にスコープ付き URL を使用します。UDF の呼び出し元がその所有者でもある場合は、 URL 引数をスコープ URL または別の形式として渡すことができます。
SELECT process_pdf_func(BUILD_SCOPED_FILE_URL('@data_stage', '/myfile.pdf'));
プロシージャを作成して呼び出す¶
以下のステップを完了して、 PDF のファイルを読み取って処理するプロシージャを作成します。
次のコードを貼り付けて実行し、プロシージャを作成します。
このプロシージャのハンドラーは、 PDF ドキュメントを解析し、そのコンテンツを取得します。ハンドラーは、
SnowflakeFile
クラスを使用してファイルを読み取ります。SnowflakeFile
を使用したファイルの読み取りについては、 SnowflakeFile を使用した動的に指定されたファイルの読み取り をご参照ください。CREATE PROCEDURE process_pdf_proc(file STRING) RETURNS STRING LANGUAGE JAVA RUNTIME_VERSION = 11 IMPORTS = ('@jars_stage/pdfbox-app-2.0.28.jar') HANDLER = 'PdfParser.readFile' PACKAGES = ('com.snowflake:snowpark:latest') AS $$ import org.apache.pdfbox.pdmodel.PDDocument; import org.apache.pdfbox.text.PDFTextStripper; import org.apache.pdfbox.text.PDFTextStripperByArea; import com.snowflake.snowpark_java.types.SnowflakeFile; import com.snowflake.snowpark_java.Session; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; public class PdfParser { public static String readFile(Session session, String fileURL) throws IOException { SnowflakeFile file = SnowflakeFile.newInstance(fileURL); try (PDDocument document = PDDocument.load(file.getInputStream())) { document.getClass(); if (!document.isEncrypted()) { PDFTextStripperByArea stripper = new PDFTextStripperByArea(); stripper.setSortByPosition(true); PDFTextStripper tStripper = new PDFTextStripper(); String pdfFileInText = tStripper.getText(document); return pdfFileInText; } } return null; } } $$;
ALTER STAGE コマンドを使用して
data_stage
ステージのディレクトリテーブルを更新します。ALTER STAGE data_stage REFRESH;
プロシージャを呼び出して、ステージングされた PDF ファイルを読み取り、コンテンツを抽出します。
次の例のコードは、作成したステージ上の PDF ファイルをポイントするスコープ付き URL を渡します。
CALL process_pdf_proc(BUILD_SCOPED_FILE_URL('@data_stage', '/UsingThird-PartyPackages.pdf'));
UDTF を使用して CSV を処理する¶
このセクションの例では、Java UDTFs を使用し、てステージングされたファイルからデータを抽出して返します。
データステージを作成する¶
CREATE STAGE コマンドを使用してステージを作成します。
次の SQL ステートメントは、例のためのデータファイルを格納する、内部ステージを作成します。
-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
読み取る CSV ファイルをアップロードする¶
CSV ファイルをローカル仮ディレクトリからデータファイルを格納するステージにコピーします。
- Linux/Mac:
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
- Windows:
PUT file://C:\temp\sample.csv @data_stage AUTO_COMPRESS=FALSE;
UDTF を作成して呼び出す¶
この例では、指定された CSV ファイルのセットのコンテンツを抽出し、テーブルの行を返します。ソースから読み込まれたファイルデータを処理することで、ファイルが非常に大きい場合に発生する可能性のあるメモリ不足エラーを回避できます。
次の UDTF ハンドラー例のコードは、 SnowflakeFile
を使用してファイル URL から InputStream
を生成し、 CSV ファイルを読み込みます。(Java UDTF ハンドラーでは、Snowflakeが実装した process
メソッドを呼び出した時点で行の処理が開始されます)。このコードは、ハンドラー自身で定義された CsvStreamingReader
クラスのインスタンスを構築するときにストリームを使用します。
CsvStreamingReader
クラスは、受信した CSV ファイルストリームの内容を行ごとに読み込み、他のコードが各行をカンマで区切られた記録として取り出す方法を提供します。 process
メソッドは、ストリームから読み込まれた各記録を返します。
Javaハンドラーを使用した表形式のユーザー定義関数(UDTFs)の書き方については、 表形式のJava UDFs (UDTFs) をご参照ください。
次のステップを実行してJava UDTF を作成し、必須なファイルをアップロードします。
SnowflakeFile
クラスを使用するJava UDTF を作成します。CREATE OR REPLACE FUNCTION parse_csv(file STRING) RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING ) LANGUAGE JAVA HANDLER = 'CsvParser' AS $$ import org.xml.sax.SAXException; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.stream.Stream; import com.snowflake.snowpark_java.types.SnowflakeFile; public class CsvParser { static class Record { public String col1; public String col2; public String col3; public Record(String col1_value, String col2_value, String col3_value) { col1 = col1_value; col2 = col2_value; col3 = col3_value; } } public static Class getOutputClass() { return Record.class; } static class CsvStreamingReader { private final BufferedReader csvReader; public CsvStreamingReader(InputStream is) { this.csvReader = new BufferedReader(new InputStreamReader(is)); } public void close() { try { this.csvReader.close(); } catch (IOException e) { e.printStackTrace(); } } Record getNextRecord() { String csvRecord; try { if ((csvRecord = csvReader.readLine()) != null) { String[] columns = csvRecord.split(",", 3); return new Record(columns[0], columns[1], columns[2]); } } catch (IOException e) { throw new RuntimeException("Reading CSV failed.", e); } finally { // No more records, we can close the reader. close(); } // Return null to indicate the end of the stream. return null; } } public Stream<Record> process(String file_url) throws IOException { SnowflakeFile file = SnowflakeFile.newInstance(file_url); CsvStreamingReader csvReader = new CsvStreamingReader(file.getInputStream()); return Stream.generate(csvReader::getNextRecord); } } $$ ;
data_stage
ステージのディレクトリテーブルを更新します。ALTER STAGE data_stage REFRESH;
Java UDTF を呼び出して、1つ以上のステージングされた CSV ファイルを読み取り、コンテンツをテーブル形式で抽出します。
次の例のコードは、 UDF を呼び出し、スコープ URL を渡して、ファイルインジェクション攻撃のリスクを軽減します。関数の呼び出し元がその所有者でもない場合、常にスコープ URL を使用しました。UDF の呼び出し元がその所有者でもある場合は、 URL 引数をスコープ URL または別のサポートされている形式として渡すことができます。
-- Input a file URL. SELECT * FROM TABLE(PARSE_CSV(BUILD_SCOPED_FILE_URL(@data_stage, 'sample.csv')));