Java UDF 또는 UDTF를 사용하여 비정형 데이터 처리하기

이 항목에서는 Java UDF (사용자 정의 함수) 또는 테이블 형식 Java UDF (사용자 정의 테이블 함수)를 사용하여 스테이징된 파일의 비정형 데이터를 읽고 처리하는 예제를 제공합니다.

Java를 사용하여 UDF 처리기를 개발하는 자세한 방법은 다음을 참조하십시오.

Java UDF 처리기 코드가 포함된 파일에서 읽는 자세한 방법은 IMPORTS를 사용하여 파일 읽기 섹션을 참조하십시오.

이 항목의 내용:

Java UDF 예

이 섹션의 예에서는 파일에서 텍스트를 추출하고 반환하는 Java UDF를 사용하여 스테이징된 비정형 파일을 처리합니다.

전제 조건: 스테이지 만들기

예제에서는 (미리 컴파일된 Java UDF와는 반대로) 인라인 Java UDF 를 사용합니다. 즉, UDF에 대한 Java 코드를 컴파일하고 패키징하여 스테이지에 업로드할 필요가 없다는 뜻입니다.

하지만 이들 예제는 JAR 파일에 패키징된 별도의 라이브러리에 따라 다릅니다. 그 라이브러리의 JAR 파일을 스테이지에 업로드해야 합니다. 예제에서는 내부 스테이지를 사용하여 이 라이브러리의 JAR 파일을 저장합니다.

Java UDF가 처리하는 비정형 데이터 파일이 JAR 파일과 동일한 스테이지에 있을 수 있지만, 이들 예제에서는 데이터 파일이 별도의 내부 스테이지에 있습니다.

사용자 정의 함수에 대한 권한 부여하기 에서 설명하는 바와 같이, 최소 필수 권한이 있는 역할을 사용하여 스테이지를 만듭니다.

다음 SQL 문은 예제를 위한 JAR 파일과 데이터 파일을 따로 저장하려고 별개의 내부 스테이지를 만듭니다.

-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

PDF 파일 처리하기

이 예제에서는 Apache PDFBox 를 사용하여 지정된 PDF 파일의 내용을 추출합니다.

Java UDF를 만들고 필수 파일을 업로드하려면 다음 단계를 완료하십시오.

  1. 로컬 임시 디렉터리에서 JAR 파일을 저장하는 스테이지로 Apache PDFBox용 JAR 파일을 복사합니다.

    Linux/Mac
    PUT file:///tmp/pdfbox-app-2.0.25.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\pdfbox-app-2.0.25.jar @jars_stage AUTO_COMPRESS=FALSE;
    
  2. Java UDF를 만들어 PDF 문서를 구문 분석하고 각 문서에서 콘텐츠를 검색합니다. UDF 코드에서 SnowflakeFile 클래스 또는 InputStream 클래스를 사용할 수 있습니다.

    SnowflakeFile 클래스 사용하기
    CREATE FUNCTION process_pdf(file string)
    RETURNS string
    LANGUAGE java
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.25.jar')
    HANDLER = 'PdfParser.readFile'
    as
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(String fileURL) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(fileURL);
            try (PDDocument document = PDDocument.load(file.getInputStream())) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
    
            return null;
        }
    }
    $$;
    
    InputStream 클래스 사용하기
    CREATE FUNCTION process_pdf(file string)
    RETURNS string
    LANGUAGE java
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.25.jar')
    HANDLER = 'PdfParser.readFile'
    as
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(InputStream stream) throws IOException {
            try (PDDocument document = PDDocument.load(stream)) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
            return null;
        }
    }
    $$;
    
  3. 로컬 임시 디렉터리에서 데이터 파일을 저장하는 스테이지로 PDF 파일을 복사합니다.

    Linux/Mac
    PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
  4. data_stage 스테이지의 디렉터리 테이블을 새로 고칩니다.

    ALTER STAGE data_stage REFRESH;
    

Java UDF를 호출하여 하나 이상의 스테이징된 PDF 파일을 읽고 콘텐츠를 추출합니다.

-- Input a stage name and file path.
SELECT process_pdf('@data_stage/myfile.pdf');

-- Input a file URL generated by the BUILD_STAGE_FILE_URL function.
SELECT process_pdf(build_stage_file_url('@data_stage', '/myfile.pdf'));

-- Input a file URL output from the BUILD_STAGE_FILE_URL function.
SELECT process_pdf('https://myaccount.snowflakecomputing.com/api/files/mydb/myschema/data_stage/myfile.pdf');

-- Input a scoped URL.
SELECT process_pdf(build_scoped_file_url('@data_stage', '/myfile.pdf'));

-- Process all of the PDF files in a directory table serially.
SELECT process_pdf(file_url)
  FROM directory(@data_stage);

-- Process all of the PDF files in a directory table in parallel.
SELECT process_pdf(file_url)
FROM (
    SELECT file_url
    FROM directory(@data_stage)
    GROUP BY file_url
);

Java UDTF 예

이 섹션의 예제에서는 Java UDTF를 사용하여 스테이징된 파일에서 데이터를 추출하고 반환합니다.

전제 조건: 데이터 스테이지 만들기

사용자 정의 함수에 대한 권한 부여하기 에서 설명하는 바와 같이, 최소 필수 권한이 있는 역할을 사용하여 데이터 파일을 저장할 스테이지를 만듭니다.

다음 SQL 문은 이 예제의 데이터 파일을 저장할 내부 스테이지를 만듭니다.

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

CSV 파일 처리하기

이 예제에서는 지정된 CSV 파일 세트의 내용을 추출하고 테이블의 행을 반환합니다.

Java UDTF를 만들고 필수 파일을 업로드하려면 다음 단계를 완료하십시오.

  1. SnowflakeFile 클래스를 사용하는 Java UDTF를 만듭니다.

    CREATE OR REPLACE FUNCTION parse_csv(file string)
    RETURNS TABLE (col1 string, col2 string, col3 string )
    LANGUAGE JAVA
    HANDLER = 'CsvParser'
    as
    $$
    import org.xml.sax.SAXException;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Stream;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    public class CsvParser {
        public class Record {
            public String col1;
            public String col2;
            public String col3;
    
            public Record(String col1_value, String col2_value, String col3_value)
            {
                col1 = col1_value;
                col2 = col2_value;
                col3 = col3_value;
            }
        }
    
        public static Class getOutputClass() {
            return Record.class;
        }
    
        public Stream<Record> process(String file_url) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(file_url);
    
            String csvRecord = null;
            List<Record> rows = new ArrayList<>();
            BufferedReader csvReader = null;
    
            try {
                csvReader = new BufferedReader(new InputStreamReader(file.getInputStream()));
                while ((csvRecord = csvReader.readLine()) != null) {
                    String[] columns = csvRecord.split(",", 3);
                    rows.add(new Record(columns[0], columns[1], columns[2]));
                }
            } catch (IOException e) {
                throw new RuntimeException("Reading CSV failed.", e);
            } finally {
                if (csvReader != null)
                    try {
                        csvReader.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            }
    
            return rows.stream();
        }
    }
    $$
    ;
    
  2. 로컬 임시 디렉터리에서 데이터 파일을 저장하는 스테이지로 PDF 파일을 복사합니다.

    Linux/Mac
    PUT file:///tmp/sample.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Windows
    PUT file://C:\temp\sample.pdf @data_stage AUTO_COMPRESS=FALSE;
    
  3. data_stage 스테이지의 디렉터리 테이블을 새로 고칩니다.

    ALTER STAGE data_stage REFRESH;
    

Java UDTF를 호출하여 하나 이상의 스테이징된 CSV 파일을 읽고 콘텐츠를 테이블 형식으로 추출합니다.

-- Input a file URL.
SELECT * FROM TABLE(PARSE_CSV(BUILD_STAGE_FILE_URL(@data_stage, 'sample.csv')));
맨 위로 이동