디렉터리 테이블을 사용하여 데이터 처리 파이프라인 만들기¶
스테이지에서 파일 수준의 메타데이터를 추적하고 저장하는 디렉터리 테이블을 스트림 및 작업과 같은 다른 Snowflake 오브젝트와 결합하여 데이터 처리 파이프라인을 구축하십시오.
스트림 은 디렉터리 테이블, 테이블, 외부 테이블 또는 뷰의 기본 테이블에 대한 데이터 조작 언어(DML) 변경 사항을 기록합니다. 작업 은 SQL 명령 또는 광범위한 사용자 정의 함수(UDF) 일 수 있는 단일 작업을 실행합니다. 주기적으로 실행되도록 작업을 예약하거나 온디맨드 방식으로 작업을 실행할 수 있습니다.
예: PDF 처리를 위한 간단한 파이프라인 만들기¶
이 예에서는 다음을 수행하는 간단한 데이터 처리 파이프라인을 만듭니다.
- 스테이지에 추가된 PDF 파일 감지. 
- 파일에서 데이터 추출. 
- Snowflake 테이블에 데이터 삽입. 
데이터 파이프라인은 스트림을 사용하여 스테이지의 디렉터리 테이블에 대한 변경 사항을 감지하고 UDF 를 실행하여 파일에서 데이터를 추출하는 작업을 수행합니다.
다음 다이어그램은 예시 파이프라인의 작동 방식을 요약한 것입니다.
 
1단계: 디렉터리 테이블이 활성화된 스테이지 만들기¶
디렉터리 테이블이 활성화된 내부 스테이지를 만듭니다. 예시 문은 ENCRYPTION 유형을 SNOWFLAKE_SSE 로 설정하여 스테이지에서 비정형 데이터 액세스를 활성화합니다.
CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
2단계: 디렉터리 테이블에 스트림 만들기¶
디렉터리 테이블이 속한 스테이지를 지정하여 디렉터리 테이블에 스트림을 만듭니다. 스트림은 디렉터리 테이블의 변경 사항을 추적합니다. 이 예시의 5 단계에서는 이 스트림을 사용하여 작업을 생성합니다.
CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
3단계: PDF를 구문 분석하는 사용자 정의 함수 만들기¶
PDF 파일에서 데이터를 추출하는 UDF 를 만듭니다. 이후 단계에서 생성하는 작업은 UDF 를 호출하여 스테이지에 새로 추가된 파일을 처리합니다.
다음 예제 문은 제품 리뷰 데이터가 포함된 PDF 파일을 처리하는 PDF_PARSE 라는 이름의 Python UDF 를 생성합니다. UDF 는 PyPDF2 라이브러리를 사용하여 양식 필드 데이터를 추출합니다. UDF는 양식 이름과 값을 키-값 페어로 포함하는 사전을 반환합니다.
참고
UDF는 SnowflakeFile 클래스를 사용하여 동적으로 지정된 파일을 읽습니다. SnowflakeFile 에 대해 자세히 알아보려면 SnowflakeFile 을 사용하여 동적으로 지정된 파일 읽기 섹션을 참조하십시오.
CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
AS
$$
from pathlib import Path
import PyPDF2 as pypdf
from io import BytesIO
from snowflake.snowpark.files import SnowflakeFile
def parse_pdf_fields(file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
    reader = pypdf.PdfFileReader(buffer)
    fields = reader.getFields()
    field_dict = {}
    for k, v in fields.items():
        if "/V" in v.keys():
            field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]
    return field_dict
$$;
4단계: 파일 콘텐츠를 저장할 테이블 만들기¶
다음으로, 각 행이 file_name 및 file_data 라는 열에 스테이지의 파일에 대한 정보를 저장하는 테이블을 만듭니다. 이후 단계에서 생성하는 작업은 이 테이블에 데이터를 로드합니다.
CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
5단계: 작업 만들기¶
스테이지에서 새 파일의 스트림을 확인하고 파일 데이터를 prod_reviews 테이블에 삽입하는 예약된 작업을 만듭니다.
다음 문은 이전에 생성한 스트림을 사용하여 예약된 작업을 만듭니다. 이 작업은 SYSTEM$STREAM_HAS_DATA 함수를 사용하여 스트림에 변경 데이터 캡처(CDC) 레코드가 포함되어 있는지 확인합니다.
CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
6단계: 파이프라인을 테스트하는 작업 실행¶
파이프라인이 작동하는지 확인하려면 스테이지에 파일을 추가하고 작업을 수동으로 실행한 다음 product_reviews 테이블을 쿼리하면 됩니다.
먼저 PDF 파일을 my_pdf_stage 스테이지에 추가한 다음 스테이지를 새로 고칩니다.
참고
이 예제에서는 PUT 명령을 사용하는데, 이 명령은 Snowflake 웹 인터페이스의 워크시트에서 실행할 수 없습니다. Snowsight 를 사용하여 파일을 업로드하려면 명명된 내부 스테이지에 파일 업로드하기 섹션을 참조하십시오.
PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
ALTER STAGE my_pdf_stage REFRESH;
스트림을 쿼리하여 스테이지에 추가한 두 PDF 파일을 기록했는지 확인할 수 있습니다.
SELECT * FROM my_pdf_stream;
이제 작업을 실행하여 PDF 파일을 처리하고 product_reviews 테이블을 업데이트합니다.
EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
product_reviews 테이블을 쿼리하여 작업이 각 PDF 파일에 대한 행을 추가했는지 확인합니다.
select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
마지막으로, FILE_DATA 열의 오브젝트를 별도의 열로 구문 분석하는 뷰를 생성할 수 있습니다. 그런 다음 뷰를 쿼리하여 파일 내용을 분석하고 작업할 수 있습니다.
CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;
SELECT * FROM prod_review_info_v;
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+