디렉터리 테이블을 사용하여 데이터 처리 파이프라인 만들기

스테이지에서 파일 수준 메타데이터를 추적하고 저장하는 디렉터리 테이블을 스트림 및 작업과 같은 다른 Snowflake 오브젝트와 결합하여 데이터 처리 파이프라인을 만들 수 있습니다.

스트림 은 디렉터리 테이블, 테이블, 외부 테이블 또는 뷰의 기본 테이블에 대한 데이터 조작 언어(DML) 변경 사항을 기록합니다. 작업 은 SQL 명령 또는 광범위한 UDF일 수 있는 단일 작업을 실행합니다. 작업을 예약하거나 요청 시 실행할 수 있습니다.

예: PDF 처리를 위한 간단한 파이프라인 만들기

이 예에서는 다음을 수행하는 간단한 데이터 처리 파이프라인을 만듭니다.

  1. 스테이지에 추가된 PDF 파일 감지.

  2. 파일에서 데이터 추출.

  3. Snowflake 테이블에 데이터 삽입.

파이프라인은 스트림을 사용하여 스테이지의 디렉터리 테이블에 대한 변경 사항을 감지하고 사용자 정의 함수(UDF)를 실행하여 작업을 사용하여 파일을 처리합니다.

다음 다이어그램은 예시 파이프라인의 작동 방식을 요약한 것입니다.

A simple data processing pipeline that uses a stream to track changes to a directory table.

1단계: 디렉터리 테이블이 활성화된 스테이지 만들기

디렉터리 테이블이 활성화된 내부 스테이지를 만듭니다. 예시 문은 ENCRYPTION 유형을 SNOWFLAKE_SSE 로 설정하여 스테이지에서 비정형 데이터 액세스를 활성화합니다.

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

2단계: 디렉터리 테이블에 스트림 만들기

다음으로, 디렉터리 테이블이 속한 스테이지를 지정하여 디렉터리 테이블에 스트림을 만듭니다. 스트림은 디렉터리 테이블의 변경 사항을 추적합니다. 이 예시의 5 단계에서는 이 스트림을 사용하여 작업을 생성합니다.

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

3단계: PDF를 구문 분석하는 사용자 정의 함수 만들기

PDF 파일에서 데이터를 추출하는 사용자 정의 함수(UDF)를 만듭니다. 5단계에서 생성하는 작업은 이 UDF를 호출하여 스테이지에서 새로 추가된 파일을 처리합니다.

다음 예시 문은 제품 리뷰 데이터가 포함된 PDF 파일을 처리하는 PDF_PARSE 라는 UDF를 생성합니다. UDF는 PyPDF2 라이브러리를 사용하여 양식 필드 데이터를 추출합니다. UDF는 양식 이름과 값을 키-값 페어로 포함하는 사전을 반환합니다.

참고

UDF는 SnowflakeFile 클래스를 사용하여 동적으로 지정된 파일을 읽습니다. SnowflakeFile 에 대해 자세히 알아보려면 SnowflakeFile 을 사용하여 동적으로 지정된 파일 읽기 섹션을 참조하십시오.

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
  AS
  $$
  from pathlib import Path
  import PyPDF2 as pypdf
  from io import BytesIO
  from snowflake.snowpark.files import SnowflakeFile

  def parse_pdf_fields(file_path):
      with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
      reader = pypdf.PdfFileReader(buffer)
      fields = reader.getFields()
      field_dict = {}
      for k, v in fields.items():
          if "/V" in v.keys():
              field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

      return field_dict
  $$;
Copy

4단계: 파일 콘텐츠를 저장할 테이블 만들기

다음으로, 각 행이 file_namefile_data 라는 열에 스테이지의 파일에 대한 정보를 저장하는 테이블을 만듭니다. 이 예시의 5단계에서 생성하는 작업은 이 테이블에 데이터를 로드합니다.

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

5단계: 작업 만들기

스테이지에서 새 파일의 스트림을 확인하고 파일 데이터를 prod_reviews 테이블에 삽입하는 예약된 작업을 만듭니다.

다음 문은 2단계에서 만든 스트림을 사용하여 예약된 작업을 만듭니다. 이 작업은 SYSTEM$STREAM_HAS_DATA 함수를 사용하여 스트림에 변경 데이터 캡처(CDC) 레코드가 포함되어 있는지 확인합니다.

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

6단계: 파이프라인을 테스트하는 작업 실행

파이프라인이 작동하는지 확인하려면 스테이지에 파일을 추가하고 작업을 수동으로 실행한 다음 product_reviews 테이블을 쿼리하면 됩니다.

먼저 PDF 파일을 my_pdf_stage 스테이지에 추가한 다음 스테이지를 새로 고칩니다.

참고

이 예에서는 Snowflake 웹 인터페이스의 워크시트에서 실행할 수 없는 PUT 명령을 사용합니다. Snowsight 를 사용하여 파일을 업로드하려면 명명된 내부 스테이지에 파일 업로드하기 섹션을 참조하십시오.

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

스트림을 쿼리하여 스테이지에 추가한 두 PDF 파일을 기록했는지 확인할 수 있습니다.

SELECT * FROM my_pdf_stream;
Copy

이제 작업을 실행하여 PDF 파일을 처리하고 product_reviews 테이블을 업데이트합니다.

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

product_reviews 테이블을 쿼리하여 작업이 각 PDF 파일에 대한 행을 추가했는지 확인합니다.

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

마지막으로, FILE_DATA 열의 오브젝트를 별도의 열로 구문 분석하는 뷰를 생성할 수 있습니다. 그런 다음 뷰를 쿼리하여 파일 내용을 분석하고 작업할 수 있습니다.

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;


| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
||
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |

Copy