ディレクトリテーブルを使用したデータ処理パイプラインの構築

ステージ上のファイルレベルのメタデータを追跡して格納するディレクトリテーブルをストリームやタスクなどの他のSnowflakeオブジェクトと組み合わせて、データ処理パイプラインを構築することができます。

ストリーム は、テーブル、ディレクトリテーブル、外部テーブル、またはビュー内の基になるテーブルに対するデータ操作言語(DML)の変更を記録します。 タスク は、 SQL コマンドや広範な UDF などの単一アクションを実行します。タスクをスケジュールすることも、オンデマンドで実行することもできます。

例: シンプルなパイプラインを作成して PDFs を処理する

この例では、以下のような単純なデータ処理パイプラインを構築します。

  1. ステージに追加された PDF ファイルを検出します。

  2. ファイルからデータを抽出します。

  3. データをSnowflakeテーブルに挿入します。

パイプラインは、ステージ上のディレクトリテーブルの変更を検出するストリームと、ファイルを処理するためにユーザー定義関数(UDF)を実行するタスクを使用します。

次の図は、例のパイプラインがどのように機能するかをまとめたものです。

ストリームを使用してディレクトリテーブルの変更を追跡するシンプルなデータ処理パイプライン。

ステップ1: ディレクトリテーブルを有効化してステージを作成する

ディレクトリテーブルを有効化して内部ステージを作成します。このステートメント例では、 ENCRYPTION 型を SNOWFLAKE_SSE に設定し、 ステージで非構造化データにアクセスできるように しています。

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

ステップ2: ディレクトリテーブルにストリームを作成する

次に、ディレクトリテーブルが属するステージを指定して、ディレクトリテーブル上にストリームを作成します。ストリームはディレクトリテーブルの変更を追跡します。この例のステップ5では、このストリームを使用してタスクを構築します。

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

ステップ3: ユーザー定義関数を作成して PDFs を解析する

ファイル PDF からデータを抽出するユーザー定義関数(UDF)を作成します。ステップ5で作成したタスクは、この UDF を呼び出して、ステージ上に新しく追加されたファイルを処理します。

次のステートメント例では、製品レビューデータを含んだ PDF ファイルを処理する PDF_PARSE という名前の UDF を作成します。UDF は、 PyPDF2 ライブラリを使用してフォームフィールドデータを抽出します。これは、フォーム名と値をキーと値のペアとして含むディクショナリを返します。

注釈

UDF は、 SnowflakeFile クラスを使用して動的に指定されたファイルを読み取ります。 SnowflakeFile の詳細については、 SnowflakeFile を使用した動的に指定されたファイルの読み取り をご参照ください。

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
  AS
  $$
  from pathlib import Path
  import PyPDF2 as pypdf
  from io import BytesIO
  from snowflake.snowpark.files import SnowflakeFile

  def parse_pdf_fields(file_path):
      with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
      reader = pypdf.PdfFileReader(buffer)
      fields = reader.getFields()
      field_dict = {}
      for k, v in fields.items():
          if "/V" in v.keys():
              field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

      return field_dict
  $$;
Copy

ステップ4: ファイルのコンテンツを格納するテーブルを作成する

次に、各行がステージ上のファイルに関する情報を file_namefile_data という名前の列に格納するテーブルを作成します。この例のステップ5で作成したタスクは、このテーブルにデータをロードします。

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

ステップ5: タスクを作成する

ステージ上に新しいファイルがないかストリームをチェックし、ファイルデータを prod_reviews テーブルに挿入するスケジュールタスクを作成します。

次のステートメントは、ステップ2で作成したストリームを使用して、スケジュールされたタスクを作成します。タスクは SYSTEM$STREAM_HAS_DATA 関数を使用して、ストリームに変更データキャプチャ(CDC)記録が含まれているかどうかをチェックします。

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

ステップ6: タスクを実行してパイプラインをテストする

パイプラインが機能することを確認するには、ステージにファイルを追加し、手動でタスクを実行し、 product_reviews テーブルをクエリします。

まず PDF ファイルを my_pdf_stage ステージに追加し、ステージをリフレッシュします。

注釈

この例では PUT コマンドを使用していますが、これはSnowflakeウェブインターフェイスのワークシートからは実行できません。 Snowsight でファイルをアップロードするには、 名前付き内部ステージにファイルをアップロードする をご参照ください。

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

ストリームをクエリして、ステージに追加した2つのファイル PDF が記録されていることを確認できます。

SELECT * FROM my_pdf_stream;
Copy

ここで、タスクを実行して PDF ファイルを処理し、 product_reviews テーブルを更新します。

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

product_reviews テーブルをクエリして、タスクが各 PDF ファイルの行を追加したことを確認します。

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

最後に、 FILE_DATA 列のオブジェクトを別々の列に解析するビューを作成します。その後、ファイルのコンテンツを分析して作業するために、ビューをクエリできます。

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;

+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.com/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Copy