clean roomをSnowparkで使用する

概要

Snowflake Data Clean RoomsでSnowparkを使用すると、大規模データをクエリまたは処理する必要がある場合に、clean roomのコンピューティングパワーを強化することができます。

Clean roomは以下の2つの方法でSnowparkを利用できます:

  • Snowpark UDFs: clean roomのコードでSnowpark API を使用し、Snowparkのスケーリングと処理能力を利用できるSnowpark UDFs を作成します。

  • Snowparkコンテナサービス: Snowpark環境をより細かく制御したい場合や、Snowpark API にないライブラリを使用したい場合に、clean room内でコンテナーを構成およびホストできます。これにより、コンピューティングやストレージについて、それぞれの要件に合わせて環境を構成したり、それぞれの環境に用意されたライブラリをカスタマイズしたりできます。

メモリに収まりきらないほど大きなデータを読み込む必要がある場合は、 to_pandas_batches() を使用し、それを繰り返し処理します。例:

df_iter = session.table(intermediary_table_name).to_pandas_batches()
for df in df_iter:
  ...
Copy

複雑な使用フローの全体的なデザイン

データの生成と表示を、1つのテンプレートを呼び出して実行することもできますが、多くの場合、データを生成するステップを、結果を表示するステップと切り離した方が効率が高くなります。これにより、コンシューマーは結果を表示するたびに再計算をトリガーする必要がなくなり、また、プロセスのさまざまな時点のデータを表示することも可能になります。1つのフローを、ユーザーからアクセス可能な複数のステージに分割するには、データの生成や処理をトリガーするテンプレートと、保存した結果を表示するテンプレートを別々に作成します。 複雑な使用フローのデザインについては、こちらをご覧ください。

clean roomでSnowpark UDFs を使用する

Snowpark API を、アップロード済みのPythonコードで使用すると、大容量データの読み込みを高速化できます。Clean roomは Snowpark Python API のみをサポートしています。プロバイダーとコンシューマーの双方から、アップロード済みのPythonコードでSnowpark Python API を使用できます。

前提条件

clean roomでSnowpark API を使用する

clean roomのPythonコードでSnowpark API を使用ことも、他のPython UDF をアップロードして実行することも同じですが、 snowflake-snowpark-python ライブラリをリンクする必要がある相違点があります。

Snowparkデコレータを使用するのではなく、 cleanroom スキーマで session.sql を使用して SQL を実行することで、 UDFs、 UDTFs、 プロシージャを作成します。例:

session.sql("CREATE OR REPLACE FUNCTION cleanroom.udf(...")
Copy

基本的な手順

以下は、clean roomで UDF または UDTF を使用してSnowpark API を使用する基本的な手順です:

プロバイダー

  1. clean roomを作成し、デフォルトのリリースディレクティブをセットし、標準的な方法でデータをリンクします。

  2. コードはおそらく目的に応じて大きく異なるため、結合ポリシーや列ポリシーをclean roomに追加することはできても、その必要はおそらくないでしょう。

  3. provider.load_python_into_cleanroom を呼び出して、カスタムSnowparkハンドラーコードをclean roomに読み込みます。このコードでは、少なくとも snowflake-snowpark-python パッケージだけは、その他必要なパッケージと一緒に読み込む必要があります。UDFs は行ごとにデータを処理して返しますが、Snowparkのユースケースでは通常、別の結果テンプレートを呼び出して読み込む出力テーブルを生成します。

  4. デフォルトのリリースディレクティブを更新します (コードを追加すると新しいパッチバージョンが生成されるため)。

  5. カスタムテンプレート を作成し、アップロードして、Snowparkコードを実行します。UDF を実行する唯一の方法は、 UDF を呼び出すテンプレートからトリガーすることです。UDFを呼び出すテンプレートについての詳細:

    • provider.load_python_into_cleanroom で指定したエイリアスとパラメーターを使用して関数を呼び出します。テンプレートは、 cleanroom 名前空間を使用しなければ、関数のエイリアスを呼び出させません。

    • UDF がclean room内のテーブルに結果を書き込み、テーブルの名前が実行ごとに異なる場合、結果生成テンプレートは結果テーブルの名前を返し、結果テンプレートはテーブル名をユーザーからの引数として受け取ります。

  6. カスタム SQL テンプレートをアップロードし、Snowpark UDF が生成した結果テーブルにアクセスします(中間結果テーブルを生成した場合)。ハードコーディングされた結果テーブル名を使うか、コードで生成し結果生成テンプレートによって返されたテーブル名を、ユーザーが渡します。

  7. コラボレーターを追加し、標準的な方法でclean roomを公開します。

コンシューマー

コンシューマーはclean roomをインストールし、標準的な方法で分析を実行します。データの生成と結果の読み取りが別々のテンプレートに分かれている場合、コンシューマーはそれぞれのテンプレートを順番に呼び出す必要があります。

コード例

次のコード例は、「インプレッション数に対するリーチ」の線形回帰をアップロードして実行し、傾きを推定する方法を示しています。

  1. コンシューマーがまず、プロバイダー UDF を実行する prod_calculate_regression テンプレートを実行し、結果を生成します。プロバイダー UDF が以下のアクションを実行します:

    1. インプレッションデータの前処理。 プロバイダーのインプレッションデータとコンシューマーのデータを結合し、日付ごとのインプレッション数とリーチ回数を計算して、clean room内の中間テーブルに結果を格納するダイナミック SQL を作成します。コンシューマーがテーブルを指定しない場合、コードはプロバイダーのインプレッションテーブル全体に対して実行されます。

    2. 中間テーブルの読み込み。 中間テーブルはpandas DataFrame としてSnowparkプロシージャに読み込まれます。

    3. 回帰の実行。 回帰は statsmodels ライブラリを使用して計算され、結果をpandas DataFrame として返します。

    4. clean room内部テーブルへの結果の書き込み。 結果はclean room内部の結果テーブルに書き込まれ、テーブル名の接尾辞 ID がコンシューマーに返されます。Snowparkのプロシージャはclean room内で実行されるため、コンシューマーのアカウントにデータをアクティベートする機能には制限があります。結果のセキュリティを強化するため、結果はclean room内のテーブルに書き込まれ、コンシューマーが別のテンプレートを実行して結果データを読み取ります。

    5. 中間テーブルの破棄。 中間テーブルは計算中にclean room内で作成され、不要になると、Snowparkプロシージャが終了する前に破棄されます。

    6. 結果テーブルの名前を返す。 過去のすべての実行結果が保持されるため、コンシューマーに返される名前をテンプレート実行中に指定して、結果を取得する必要があります。

  2. 次に、コンシューマーが get_results テンプレートを実行し、最初のテンプレートが返した結果テーブルの接尾辞を渡して結果を確認します。

以下の例を実行するには、同じウェブホスティングリージョンに、プロバイダー用とコンシューマー用の2つのアカウントが必要です( クロスクラウド自動フルフィルメント を実装している場合を除く)。

サンプルコードは、Snowparkの設定を変更しなくてもSnowflakeのワークシートで実行されます。別の環境で実行する場合は、Snowpark Python API をインストールして構成する必要があるかもしれません。

詳細情報

clean roomでのSnowparkコンテナーサービスの利用

Pythonコードを実行する環境をより細かく制御したい場合は、clean room内でSnowparkコンテナーサービスを実行します。これにより、コードの実行環境を細かく制御できます。また、特殊なコンピューティングリソースや、ストレージ、その他のリソースを使用してパフォーマンスを最大化し、コストを最小化する必要がある場合や、カスタムパッケージやその他の環境機能を導入する場合に最適です。

コンテナーサービスをclean roomでホスティングする場合、そのサービスで公開される関数を、テンプレートとカスタムPythonコードから呼び出すことができます。Snowparkコンテナーサービスの使用方法は、Snowparkで UDFs を使用する場合と似ていますが、 UDFs が HTTP エンドポイントとして公開され、テンプレートから呼び出される点が異なります。サービスとエンドポイントを定義し、clean roomにアップロードします。

内部でホスティングされるエンドポイントには、clean room内のテンプレートからのみアクセス可能で、clean roomのコラボレーターから直接呼び出すことはできません。

前提条件

clean roomでSnowparkコンテナーサービスを使用するために、以下のトピックを理解してください:

基本的な手順

プロバイダー

  1. リクエストの処理を実行するサービス仕様、コード、エンドポイントを作成します。

  2. イメージリポジトリを作成し、 SAMOOHA_APP_ROLE のアクセスをそのリポジトリに付与します。

  3. 次のステップのために、リポジトリの URL を取得します。

  4. イメージをビルドしてリポジトリ URL にアップロードします。

  5. 標準的な方法でclean roomを作成し、データをリンクし、結合ポリシーを追加して、コンシューマーを追加します。

  6. サービスポイントを呼び出すテンプレート を定義し、clean roomにアップロードします。サービス関数は、名前空間 service_functions で作成され、呼び出されます(名前空間 cleanroom で作成および呼び出される UDFsとは異なります)。

    -- Template to call an SPCS function named train.
    SELECT service_functions.train(
          {{ source_table[0] }},
          {{ provider_join_col }},
          {{ my_table[0] }},
          {{ consumer_join_col }},
          {{ dimensions | sqlsafe }},
    
    Copy
  7. provider.load_service_into_cleanroom を呼び出して、サービスの詳細をclean roomにアップロードします。これにより、は画像 URL、エンドポイントを定義します。

    ) AS train_result;

  8. provider.load_service_into_cleanroom を呼び出して、サービスの詳細をclean roomにアップロードします。これにより、イメージ URL、エンドポイント、その他のサービスオプションが定義されます。ここで定義されるエンドポイント名(サービス仕様と一致)が、テンプレートから関数を呼び出すために使用する名前になります。

    CALL samooha_by_snowflake_local_db.provider.load_service_into_cleanroom(
    $cleanroom_name,
    $$
    spec:
      containers:
      - name: lal
        image: /dcr_spcs/repos/lal_example/lal_service_image:latest
        env:
          SERVER_PORT: 8000
      endpoints:
      - name: lalendpoint
        port: 8000
        public: false
    $$,
    $$
    functions:
      - name: train
        args: PROVIDER_TABLE VARCHAR, PROVIDER_JOIN_COL VARCHAR, CONSUMER_TABLE VARCHAR, CONSUMER_JOIN_COL VARCHAR, DIMENSIONS ARRAY, FILTER VARCHAR
        returns: VARCHAR
        endpoint: lalendpoint
        path: /train
    $$);
    
    Copy
  9. clean roomのデフォルトリリースディレクティブをセットします。サービスをアップロードまたは変更するたびに、新しいパッチバージョンが作成されます。

  10. clean roomを公開します。

  11. イメージ、関数、コードを変更する場合は、自分とコンシューマーが インスタンスを更新する 必要があります。

コンシューマー

  1. clean roomをインストールし、必要なデータを標準的な方法でリンクします。

  2. コンピューティングプールを作成し、clean roomへのアクセス許可を付与します。

  3. クエリを実行する場合(ほぼ間違いなく実行します)、使用するウェアハウスのclean roomにも USAGE 権限を付与する必要があります。

  4. samooha_by_snowflake_local_db.consumer.start_or_update_service を呼び出してサービスを開始し、clean room名、コンピューティングプール名、ウェアハウス名(ウェアハウスを使用する場合)を渡します。

  5. SHOW ENDPOINTS IN SERVICE SAMOOHA_CLEANROOM_APP_clean_room_name.services.service; を実行し、サービスに使用できるエンドポイントを調べます。

  6. サービスが実行中になったら、標準的な方法で consumer.run_analysis を呼び出すことで、サービスエンドポイントにアクセスするclean roomテンプレートの実行を開始できます。

コンピューティングプールの作成

誰がプールを所有および構成しているかによって、プロバイダーがclean room内にコンピューティングプールを作成するのか、またはコンシューマーがclean roomの外にコンピューティングプールを作成するのかが決まります。

コンピューティングプールがclean roomの外に作成されている場合には、clean roomに適切な権限を与えて、プールにアクセスし、以下の方法でサービスを作成する必要があります:

-- Grant access to a warehouse to run queries. Needed only if the service queries Snowflake accounts.
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON WAREHOUSE APP_WH TO APPLICATION SAMOOHA_CLEANROOM_APP_<CLEANROOM_NAME>;

-- Grant SAMOOHA_APP_ROLE privileges to create compute pools and create services
GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE SAMOOHA_APP_ROLE WITH GRANT OPTION;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE SAMOOHA_APP_ROLE WITH GRANT OPTION;


USE ROLE SAMOOHA_APP_ROLE;
-- Create the compute pool
CREATE COMPUTE POOL DCR_LAL_POOL
  FOR APPLICATION SAMOOHA_CLEANROOM_APP_<CLEANROOM_NAME>
  min_nodes = 1 max_nodes = 1
  instance_family = highmem_x64_l
  auto_resume = true;

-- Grant the clean room the privileges to access a pool running outside the clean room.
-- Grant the clean room access to the compute pool
GRANT USAGE ON COMPUTE POOL DCR_LAL_POOL TO APPLICATION SAMOOHA_CLEANROOM_<CLEANROOM_NAME>;

-- Allow the clean room to create the service
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO APPLICATION SAMOOHA_CLEANROOM_APP_<CLEANROOM_NAME>;
Copy

サービスコードまたは構成の更新

プロバイダーがイメージ、サービス仕様、エンドポイントの名前やソースコードを更新した場合は、プロバイダーとコンシューマーの双方が以下の手順を踏む必要があります。

1.プロバイダー:

  1. イメージまたはソースコードを更新します。

  2. provider.load_service_into_cleanroom を呼び出し、新しいパッチ番号を返します。

  3. 新しいパッチ番号を指定して provider.set_default_release_directive を呼び出します。

2.コンシューマー:

  • consumer.start_or_update_service を呼び出します。

サービスの監視

デフォルトで、コンシューマーはサービスを監視できます。この動作は、 provider.load_service_into_cleanroomservice_config 引数で、 allow_monitoring の値を使って変更できます。

コンシューマーの監視が有効な場合、以下にに示すように、コンシューマーはclean roomサービス (SAMOOHA_CLEANROOM_APP_SPCS_cleanroom_name.services.service の形式)、サービス ID 、コンテナーを指定して、監視ログにアクセスできます:

SELECT VALUE AS log_line
  FROM TABLE(
    SPLIT_TO_TABLE(SYSTEM$GET_SERVICE_LOGS(
        'SAMOOHA_CLEANROOM_APP_SPCS_Lookalike_Demo.services.service', 0, 'lal'), '\n')
  );
Copy

コンシューマーはまた、以下のように DESCRIBE SERVICE コマンドを使用して、サービスの状態を確認できます:

-- See the state of the service.
DESCRIBE SERVICE SAMOOHA_CLEANROOM_APP_SPCS_Lookalike_Demo.services.service;
Copy

SHOW ENDPOINTS IN SERVICE SAMOOHA_CLEANROOM_APP_clean_room_name.services.service; を実行すると、サービスエンドポイントを確認できます。例:

SHOW ENDPOINTS IN SERVICE SAMOOHA_CLEANROOM_APP_SPCS_Lookalike_Demo.services.service;
Copy

コード例

以下のノートブックとZIPファイルは、Snowflake Container Servicesをclean roomで使用する方法を示しています。clean roomをインストールした2つのアカウントが必要です:1つはプロバイダー用、もう1つはコンシューマー用です。その2つを、同じクラウドホスティングリージョンに入れてください。zip圧縮された構成ファイルを使用し、サービスを定義します。