UDF 및 프로시저 처리기로 비정형 데이터 처리하기

이 항목에서는 다음에 대해 작성된 처리기 코드를 사용하여 스테이징된 파일에서 비정형 데이터를 읽고 처리하는 예제를 제공합니다.

다른 언어로 작성된 처리기로 파일을 읽을 수도 있습니다.

Python
Scala

참고

파일 삽입 공격에 대한 코드의 탄력성을 높이려면 파일의 위치를 UDF에 전달할 때, 특히 함수 호출자가 소유자이기도 하지 않을 때는 항상 범위가 지정된 URL을 사용하십시오. 기본 제공 함수 BUILD_SCOPED_FILE_URL 을 사용하여 SQL에서 범위가 지정된 URL을 생성할 수 있습니다. BUILD_SCOPED_FILE_URL 의 기능에 대한 자세한 내용은 반정형 데이터 로딩 소개 섹션을 참조하십시오.

이 항목의 내용:

UDF 및 프로시저로 PDF 처리하기

이 섹션의 예제에서는 Java 처리기 코드를 사용하여 스테이징된 비정형 파일을 처리하는 방법을 보여주는데, 처음에는 UDF, 다음으로는 프로시저를 사용합니다. 두 처리기 모두 Apache PDFBox 라이브러리 를 사용하여 지정된 PDF 파일의 내용을 추출합니다.

이 처리기 코드는 UDF와 프로시저 간에 매우 유사합니다. 수신 PDF 파일을 읽는 방식이 서로 다릅니다.

  • UDF에서는 처리기가 Java InputStream 을 사용하여 파일을 읽습니다.

  • 프로시저에서는 처리기가 Snowflake SnowflakeFile 을 사용하여 파일을 읽습니다.

예제에서는 (스테이징된 JAR로 컴파일하는 것과는 반대로) 인라인 처리기 코드를 사용하는데, 이는 처리기 코드를 컴파일하고 패키징하여 스테이지에 업로드할 필요가 없다는 뜻입니다. 인라인 처리기와 스테이징된 처리기의 차이점에 대한 자세한 내용은 처리기 코드를 인라인 또는 스테이지에 유지하기 섹션을 참조하십시오.

PDFBox 라이브러리 다운로드하기

UDF 작성을 시작하기 전에 아직 가지고 있지 않은 경우 PDFBox 라이브러리 JAR 파일을 다운로드하십시오. 그 파일이 처리기 코드의 종속 항목이 될 것입니다. 나중에 라이브러리 JAR 파일을 스테이지에 업로드하게 됩니다.

Apache PDFBox 라이브러리 다운로드 페이지 에서 최신 릴리스 버전의 라이브러리를 다운로드합니다.

스테이지 만들기

처리기 코드의 종속성 라이브러리와 처리기 코드가 읽을 데이터 파일을 보관할 스테이지를 만듭니다.

아래 코드를 사용하여 다음 파일을 보관할 별도의 내부 스테이지를 만듭니다.

  • 처리기에 대한 종속 항목인 라이브러리 JAR 파일. UDF에서 스테이지 및 JAR 파일을 참조합니다.

  • 처리기 코드가 읽을 데이터 파일.

다음 예제의 코드에서는 CREATE STAGE 명령을 사용하여 필요한 스테이지를 생성합니다.

-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
Copy

필수 라이브러리와 읽을 PDF 파일 업로드하기

종속성 JAR 파일(PDF를 처리하는 라이브러리 코드 포함)과 데이터 파일(처리기 코드가 처리할 PDF 파일)을 업로드하려면 다음 단계를 완료하십시오.

이 예에서 선택한 PDF 파일을 사용할 수 있습니다.

  1. 로컬 임시 디렉터리에서 JAR 파일을 저장하는 스테이지로 Apache PDFBox용 JAR 파일을 복사합니다.

    Linux/Mac
    PUT file:///tmp/pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Copy
    Windows
    PUT file://C:\temp\pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
    
    Copy
  2. 로컬 임시 디렉터리에서 데이터 파일을 저장하는 스테이지로 PDF 파일을 복사합니다.

    Linux/Mac
    PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Copy
    Windows
    PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
    
    Copy

UDF 생성 및 호출하기

PDF 파일을 읽고 처리하는 UDF를 생성하려면 다음 단계를 완료하십시오.

  1. 다음 코드를 붙여넣고 실행하여 UDF를 만듭니다.

    이 UDF의 처리기는 PDF 문서를 구문 분석하고 그 내용을 검색합니다. 처리기는 InputStream 클래스를 사용하여 파일을 읽습니다. InputStream 으로 파일 읽기에 대한 자세한 내용은 InputStream 을 사용하여 동적으로 지정된 파일 읽기 섹션을 참조하십시오.

    CREATE FUNCTION process_pdf_func(file STRING)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.27.jar')
    HANDLER = 'PdfParser.readFile'
    AS
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(InputStream stream) throws IOException {
            try (PDDocument document = PDDocument.load(stream)) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
            return null;
        }
    }
    $$;
    
    Copy
  2. ALTER STAGE 명령으로 data_stage 스테이지의 디렉터리 테이블을 새로 고칩니다.

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. UDF를 호출하여 스테이징된 PDF 파일을 읽고 그 내용을 추출합니다.

    다음 예제의 코드는 UDF를 호출하고 범위가 지정된 URL을 전달하여 코드가 파일 삽입 공격에 복원력을 갖도록 만듭니다. 함수의 호출자가 소유자는 아닌 경우 항상 범위가 지정된 URL을 사용합니다. UDF의 호출자가 소유자이기도 한 경우에는 범위가 지정된 URL 또는 다른 형식으로 URL 인자를 전달할 수 있습니다.

    SELECT process_pdf_func(BUILD_SCOPED_FILE_URL('@data_stage', '/myfile.pdf'));
    
    Copy

프로시저 생성 및 호출하기

PDF 파일을 읽고 처리하는 프로시저를 생성하려면 다음 단계를 완료하십시오.

  1. 다음 코드를 붙여넣고 실행하여 프로시저를 만듭니다.

    이 프로시저의 처리기는 PDF 문서를 구문 분석하고 그 내용을 검색합니다. 처리기는 SnowflakeFile 클래스를 사용하여 파일을 읽습니다. SnowflakeFile 로 파일 읽기에 대한 자세한 내용은 SnowflakeFile 을 사용하여 동적으로 지정된 파일 읽기 섹션을 참조하십시오.

    CREATE PROCEDURE process_pdf_proc(file STRING)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = 11
    IMPORTS = ('@jars_stage/pdfbox-app-2.0.28.jar')
    HANDLER = 'PdfParser.readFile'
    PACKAGES = ('com.snowflake:snowpark:latest')
    AS
    $$
    import org.apache.pdfbox.pdmodel.PDDocument;
    import org.apache.pdfbox.text.PDFTextStripper;
    import org.apache.pdfbox.text.PDFTextStripperByArea;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    import com.snowflake.snowpark_java.Session;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class PdfParser {
    
        public static String readFile(Session session, String fileURL) throws IOException {
            SnowflakeFile file = SnowflakeFile.newInstance(fileURL);
            try (PDDocument document = PDDocument.load(file.getInputStream())) {
    
                document.getClass();
    
                if (!document.isEncrypted()) {
    
                    PDFTextStripperByArea stripper = new PDFTextStripperByArea();
                    stripper.setSortByPosition(true);
    
                    PDFTextStripper tStripper = new PDFTextStripper();
    
                    String pdfFileInText = tStripper.getText(document);
                    return pdfFileInText;
                }
            }
    
            return null;
        }
    }
    $$;
    
    Copy
  2. ALTER STAGE 명령으로 data_stage 스테이지의 디렉터리 테이블을 새로 고칩니다.

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. 프로시저를 호출하여 스테이징된 PDF 파일을 읽고 그 내용을 추출합니다.

    다음 예제의 코드는 생성한 스테이지의 PDF 파일을 가리키는 범위가 지정된 URL을 전달합니다.

    CALL process_pdf_proc(BUILD_SCOPED_FILE_URL('@data_stage', '/UsingThird-PartyPackages.pdf'));
    
    Copy

UDTF로 CSV 처리하기

이 섹션의 예제에서는 Java UDTF를 사용하여 스테이징된 파일에서 데이터를 추출하고 반환합니다.

데이터 스테이지 만들기

CREATE STAGE 명령을 사용하여 스테이지를 만듭니다.

다음 SQL 문은 이 예제의 데이터 파일을 저장할 내부 스테이지를 만듭니다.

-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
Copy

읽을 CSV 파일 업로드하기

로컬 임시 디렉터리에서 데이터 파일을 저장하는 스테이지로 CSV 파일을 복사합니다.

Linux/Mac
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
Copy
Windows
PUT file://C:\temp\sample.csv @data_stage AUTO_COMPRESS=FALSE;
Copy

UDTF 생성 및 호출하기

이 예제에서는 지정된 CSV 파일 세트의 내용을 추출하고 테이블의 행을 반환합니다. 파일 데이터를 원본에서 읽는 대로 처리하면 파일이 매우 클 때 발생할 수 있는 잠재적 메모리 부족 오류를 방지할 수 있습니다.

다음 UDTF 처리기 예제의 코드는 SnowflakeFile 을 사용하여 파일 URL에서 InputStream 을 생성하여 CSV 파일을 읽습니다. (Java UDTF 처리기에서는 사용자가 구현하는 process 메서드를 Snowflake가 호출할 때 행 처리가 시작됩니다.) 이 코드는 처리기 자체에 정의된 CsvStreamingReader 클래스의 인스턴스를 생성할 때 스트림을 사용합니다.

CsvStreamingReader 클래스는 수신된 CSV 파일 스트림의 내용을 행 단위로 읽으며, 다른 코드가 각 행을 쉼표로 열을 구분하는 레코드로 검색할 수 있는 방법을 제공합니다. process 메서드는 스트림에서 각 레코드를 읽는 대로 반환합니다.

Java 처리기를 사용하여 테이블 형식의 UDTF(사용자 정의 함수)를 작성하는 방법에 대한 자세한 내용은 테이블 형식 Java UDF(UDTF) 섹션을 참조하십시오.

Java UDTF를 만들고 필요한 파일을 업로드하려면 다음 단계를 완료하십시오.

  1. SnowflakeFile 클래스를 사용하는 Java UDTF를 만듭니다.

    CREATE OR REPLACE FUNCTION parse_csv(file STRING)
    RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING )
    LANGUAGE JAVA
    HANDLER = 'CsvParser'
    AS
    $$
    import org.xml.sax.SAXException;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Stream;
    import com.snowflake.snowpark_java.types.SnowflakeFile;
    
    public class CsvParser {
    
      static class Record {
        public String col1;
        public String col2;
        public String col3;
    
        public Record(String col1_value, String col2_value, String col3_value)
        {
          col1 = col1_value;
          col2 = col2_value;
          col3 = col3_value;
        }
      }
    
      public static Class getOutputClass() {
        return Record.class;
      }
    
      static class CsvStreamingReader {
        private final BufferedReader csvReader;
    
        public CsvStreamingReader(InputStream is) {
          this.csvReader = new BufferedReader(new InputStreamReader(is));
        }
    
        public void close() {
          try {
            this.csvReader.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
    
        Record getNextRecord() {
          String csvRecord;
    
          try {
            if ((csvRecord = csvReader.readLine()) != null) {
              String[] columns = csvRecord.split(",", 3);
              return new Record(columns[0], columns[1], columns[2]);
            }
          } catch (IOException e) {
            throw new RuntimeException("Reading CSV failed.", e);
          } finally {
            // No more records, we can close the reader.
            close();
          }
    
          // Return null to indicate the end of the stream.
          return null;
        }
      }
    
      public Stream<Record> process(String file_url) throws IOException {
        SnowflakeFile file = SnowflakeFile.newInstance(file_url);
    
        CsvStreamingReader csvReader = new CsvStreamingReader(file.getInputStream());
        return Stream.generate(csvReader::getNextRecord);
      }
    }
    $$
    ;
    
    Copy
  2. data_stage 스테이지의 디렉터리 테이블을 새로 고칩니다.

    ALTER STAGE data_stage REFRESH;
    
    Copy
  3. Java UDTF를 호출하여 하나 이상의 스테이징된 CSV 파일을 읽고 콘텐츠를 테이블 형식으로 추출합니다.

    다음 예제의 코드는 UDF를 호출하고 범위가 지정된 URL을 전달하여 파일 삽입 공격의 위험을 줄여줍니다. 함수의 호출자가 소유자는 아닌 경우 항상 범위가 지정된 URL을 사용했습니다. UDF의 호출자가 소유자이기도 한 경우에는 범위가 지정된 URL 또는 지원되는 다른 형식으로 URL 인자를 전달할 수 있습니다.

    -- Input a file URL.
    SELECT * FROM TABLE(PARSE_CSV(BUILD_SCOPED_FILE_URL(@data_stage, 'sample.csv')));
    
    Copy