Apache Iceberg™ テーブルへのデータのロード

Snowflakeは、Snowflake管理Icebergテーブルにデータをロードする以下のオプションをサポートしています。

ファイル形式

標準のSnowflakeテーブルへのロードでサポートされている形式のファイルから、Icebergテーブルにデータをロードできます。

CSV、 JSON、Avro、 ORC の場合、SnowflakeはParquet以外のファイル形式のデータをIceberg Parquetファイルに変換し、Icebergテーブルのベーステーブルに格納します。タイプ変換を必要とするこれらのファイル形式でのロードシナリオでは、デフォルトの LOAD_MODE = FULL_INGEST オプションのみがサポートされています。

Apache Parquetファイルの場合、Snowflakeはデータをテーブル列に直接ロードして、以下の LOAD_MODE オプションから選択できるようになっています。

  • FULL_INGEST: ファイルをスキャンし、IcebergテーブルのベーステーブルにあるParquetデータを書き換えます。

  • ADD_FILES_COPY: バイナリは、Icebergカタログに登録されていないIceberg互換のApache ParquetファイルをIcebergテーブルのベーステーブルにコピーし、Icebergテーブルにファイルを登録します。

詳細については、 COPY INTO <テーブル> をご参照ください。

例: Iceberg互換Parquetファイルのロード

この例では、Icebergテーブルを作成し、外部ステージにあるIceberg互換のParquetデータファイルからデータをロードします。

デモのため、この例では以下のリソースを使用します。

  • iceberg_ingest_vol という名前の外部ボリューム。外部ボリュームを作成するには、 外部ボリュームの構成 をご参照ください。

  • Iceberg互換のParquetファイルが置かれた my_parquet_stage という外部ステージ。外部ステージを作成するには、 CREATE STAGE をご参照ください。

  1. Iceberg互換のParquetデータ(TYPE = PARQUET USE_VECTORIZED_SCANNER = TRUE)をコピーするために必要な構成を使用して、ステージングされたParquetファイルを説明するファイル形式オブジェクトを作成します。

    CREATE OR REPLACE FILE FORMAT my_parquet_format
      TYPE = PARQUET
      USE_VECTORIZED_SCANNER = TRUE;
    
    Copy
  2. ソースParquetファイルのデータ型と互換性のあるデータ型の列を定義して、Snowflake管理のIcebergテーブルを作成します。

    CREATE OR REPLACE ICEBERG TABLE customer_iceberg_ingest (
      c_custkey INTEGER,
      c_name STRING,
      c_address STRING,
      c_nationkey INTEGER,
      c_phone STRING,
      c_acctbal INTEGER,
      c_mktsegment STRING,
      c_comment STRING
    )
      CATALOG = 'SNOWFLAKE'
      EXTERNAL_VOLUME = 'iceberg_ingest_vol'
      BASE_LOCATION = 'customer_iceberg_ingest/';
    
    Copy

    注釈

    このステートメント例では、Snowflakeデータ型にマップするIcebergデータ型を指定します。詳細については、 Apache Iceberg™ テーブルのデータ型 をご参照ください。

  3. COPY INTO ステートメントを使用して、ステージングされたParquetファイル(ステージ URL パスの直下にある)のデータをIcebergテーブルにロードします。

    COPY INTO customer_iceberg_ingest
      FROM @my_parquet_stage
      FILE_FORMAT = 'my_parquet_format'
      LOAD_MODE = ADD_FILES_COPY
      PURGE = TRUE
      MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;
    
    Copy

    注釈

    この例では、 LOAD_MODE = ADD_FILES_COPY を指定しています。これは、Snowflakeにファイルを外部ボリュームの場所にコピーし、そのファイルをテーブルに登録するように指示します。

    SnowflakeはソースParquetファイルをスキャンしてデータを新しいParquetファイルに書き換えないため、このオプションを使用するとファイル課金が発生しません。

    出力:

    +---------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
    | file                                                          | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
    |---------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
    | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_008.parquet | LOADED |       15000 |       15000 |           0 |           0 | NULL        |             NULL |                  NULL | NULL                    |
    | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_006.parquet | LOADED |       15000 |       15000 |           0 |           0 | NULL        |             NULL |                  NULL | NULL                    |
    | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_005.parquet | LOADED |       15000 |       15000 |           0 |           0 | NULL        |             NULL |                  NULL | NULL                    |
    | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_002.parquet | LOADED |           5 |           5 |           0 |           0 | NULL        |             NULL |                  NULL | NULL                    |
    | my_parquet_stage/snow_af9mR2HShTY_AABspxOVwhc_0_1_010.parquet | LOADED |       15000 |       15000 |           0 |           0 | NULL        |             NULL |                  NULL | NULL                    |
    +---------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
    
  4. テーブルをクエリします。

    SELECT
        c_custkey,
        c_name,
        c_mktsegment
      FROM customer_iceberg_ingest
      LIMIT 10;
    
    Copy

    出力:

    +-----------+--------------------+--------------+
    | C_CUSTKEY | C_NAME             | C_MKTSEGMENT |
    |-----------+--------------------+--------------|
    |     75001 | Customer#000075001 | FURNITURE    |
    |     75002 | Customer#000075002 | FURNITURE    |
    |     75003 | Customer#000075003 | MACHINERY    |
    |     75004 | Customer#000075004 | AUTOMOBILE   |
    |     75005 | Customer#000075005 | FURNITURE    |
    |         1 | Customer#000000001 | BUILDING     |
    |         2 | Customer#000000002 | AUTOMOBILE   |
    |         3 | Customer#000000003 | AUTOMOBILE   |
    |         4 | Customer#000000004 | MACHINERY    |
    |         5 | Customer#000000005 | HOUSEHOLD    |
    +-----------+--------------------+--------------+
    

例: INFER_SCHEMA 関数で作成したテーブルに Iceberg 互換の Parquet ファイルをロードします。

この例では、以下の方法を説明します。

  1. INFER_SCHEMA 関数を使用して Apache Iceberg™ テーブルを作成します。

  2. 外部ステージにあるIceberg互換のParquetデータファイルからデータを読み込みます。

デモのため、この例では以下のリソースを使用します。

  • iceberg_ingest_vol という名前の外部ボリューム。外部ボリュームを作成するには、 外部ボリュームの構成 をご参照ください。

  • Iceberg互換のParquetファイルが置かれた my_parquet_stage という外部ステージ。外部ステージを作成するには、 CREATE STAGE をご参照ください。

  1. Iceberg互換のParquetデータ(TYPE = PARQUET USE_VECTORIZED_SCANNER = TRUE)をコピーするために必要な構成を使用して、ステージングされたParquetファイルを説明するファイル形式オブジェクトを作成します。

    CREATE OR REPLACE FILE FORMAT my_parquet_format
      TYPE = PARQUET
      USE_VECTORIZED_SCANNER = TRUE;
    
    Copy
  2. my_parquet_stage ステージでParquetファイルの列定義を取得します。

    SELECT *
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@my_parquet_stage/customer_iceberg/files-to-ingest/'
          , FILE_FORMAT=>'my_parquet_format'
          , KIND => 'ICEBERG'
          )
        );
    
    Copy

    出力:

    +-------------+---------+----------+---------------------+------------------------------------------------------+----------+
    | COLUMN_NAME | TYPE    | NULLABLE | EXPRESSION          | FILENAMES                                            | ORDER_ID |
    |-------------+---------+----------+---------------------+------------------------------------------------------|----------+
    | id          | INT     | False    | $1:id::INT          | customer_iceberg/files-to-ingest/customers.parquet   | 0        |
    | custnum     | INT     | False    | $1:custnum::INT     | customer_iceberg/files-to-ingest/customers.parquet   | 1        |
    +-------------+---------+----------+---------------------+------------------------------------------------------+----------+
    
  3. 検出されたスキーマを使用してIcebergテーブルを作成します。

    CREATE ICEBERG TABLE myicebergtable
      USING TEMPLATE (
        SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
        WITHIN GROUP (ORDER BY order_id)
          FROM TABLE(
            INFER_SCHEMA(
              LOCATION=>'@my_parquet_stage/customer_iceberg/files-to-ingest/',
              FILE_FORMAT=>'my_parquet_format',
              KIND => 'ICEBERG'
            )
          ))
     ... {rest of the ICEBERG options}
     ;
    
    Copy

    注釈

    ARRAY_AGG(OBJECT_CONSTRUCT()) に対して * を使用すると、返される結果が 16MB より大きいとエラーになる場合があります。大きなクエリ結果セットでは * の使用を避け、必要な列、 COLUMN NAMETYPENULLABLE のみをクエリに使用することをお勧めします。 WITHIN GROUP (ORDER BY order_id) を使用する場合は、オプションの列 ORDER_ID を含めることができます。

  4. COPY INTO ステートメントを使用して、ステージングされた Parquet ファイルから Iceberg テーブルにデータを読み込みます。

    COPY INTO myicebergtable
      FROM @my_parquet_stage/customer_iceberg/files-to-ingest/
      FILE_FORMAT = 'my_parquet_format'
      LOAD_MODE = ADD_FILES_COPY
      MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;
    
    Copy

    注釈

    この例では、 LOAD_MODE = ADD_FILES_COPY を指定しています。これは、Snowflakeにファイルを外部ボリュームの場所にコピーし、そのファイルをテーブルに登録するように指示します。

    SnowflakeはソースParquetファイルをスキャンしてデータを新しいParquetファイルに書き換えないため、このオプションを使用するとファイル課金が発生しません。

    出力:

    +---------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
    | file                                                                | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
    |---------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
    | my_parquet_stage/customer_iceberg/files-to-ingest/customers.parquet | LOADED |       15000 |       15000 |           0 |           0 | NULL        |             NULL |                  NULL | NULL                    |
    +---------------------------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
    
  5. データの読み込み中、テーブルにクエリを実行します。

    SELECT
        id,
        custnum
      FROM myicebergtable
      LIMIT 10;
    
    Copy

    出力:

    +-----------+---------+
    | id        | custnum |
    |-----------+---------+
    |         1 |   75001 |
    |         2 |   75002 |
    |         3 |   75003 |
    |         4 |   75004 |
    |         5 |   75005 |
    |         6 |   75006 |
    |         7 |   75007 |
    |         8 |   75008 |
    |         9 |   75009 |
    |        10 |   75010 |
    +-----------+---------+