Snowflake Data Clean Rooms:セキュアSnowparkプロシージャ

このトピックでは、クリーンルームをプログラムで設定し、コンシューマーと共有し、プロバイダーのアカウントからクリーンルームにロードされた安全なSnowparkプロシージャを使用する方法で分析を実行するために必要なプロバイダーとコンシューマーのフローについて説明します。このフローでは、プロバイダーは、APIを使用して、セキュアな Snowparkプロシージャをクリーンルームにロードします。

このフローにおけるSnowparkプロシージャは、リーチをインプレッション数に線形回帰し、傾きを推定します。これは、プロバイダーのアカウントのインプレッションIDs、ユーザーIDs、タイムスタンプを含むテーブルを入力とし、オプションでコンシューマーのユーザーのテーブルを入力とします。Snowparkプロシージャは、インプレッションデータが提供された場合、コンシューマーのユーザーデータに結合するために、SQLを動的に作成し、日ごとのインプレッション数とリーチ数を含む中間テーブルをクリーンルームに作成します。

次に、中間テーブルからのこのデータがSnowparkプロシージャ内で処理され、切片、傾き、および他の多くのパラメータを推定するために回帰が実行されます。このデータはクリーンルーム内の結果テーブルに書き込まれ、このテーブルのIDが出力としてコンシューマーに提供されます。最後に、コンシューマーはこのIDをget_resultsテンプレートと使用して、クリーンルームからデータを取り戻すことができます。Snowparkプロシージャが終了する前に、クリーンルームで作成されたすべての中間テーブルをクリーンアップします。

注:すべての中間テーブルはクリーンルーム内で作成されるため、Snowparkプロシージャ自体以外にはアクセスできません。

上記以外のこのフローの重要な点は以下の通りです。

  1. プロバイダー:

    a.Snowparkプロシージャをクリーンルームに確実に追加します。

    b.Snowparkプロシージャを実行するカスタムテンプレートと、結果を取得するカスタムテンプレートを追加します。

    c.クリーンルームをコンシューマーと共有します。

  2. コンシューマー:

    a.回帰を実行するテンプレートを実行します。

    b.分析結果の取得

前提条件

このフローを完了するには、2つの別々のSnowflakeアカウントが必要です。プロバイダーのコマンドを実行するために最初のアカウントを使用し、コンシューマーのコマンドを実行するために2番目のアカウントに切り替えます。

プロバイダー

注釈

以下のコマンドは、プロバイダーアカウントのSnowflakeワークシートで実行する必要があります。

環境の設定

開発者APIsを使用してSnowflake Data Clean Roomで作業する前に、以下のコマンドを実行してSnowflake環境をセットアップします。SAMOOHA_APP_ROLEロールを持ってない場合は、アカウント管理者にお問い合わせください。

use role samooha_app_role;
use warehouse app_wh;
Copy

クリーンルームの作成

クリーンルームの名称を作成します。既存のクリーンルーム名との衝突を避けるため、新しいクリーンルーム名を入力してください。クリーンルーム名には 英数字 しか使用できません。クリーンルーム名にはスペースとアンダースコア以外の特殊文字は使用できません。

set cleanroom_name = 'Snowpark Demo clean room';
Copy

上記で設定したクリーンルーム名で新しいクリーンルームを作成できます。上記で設定したクリーンルーム名が既にデフォルトのクリーンルームとして存在する場合、この処理は失敗します。

この手順の実行には約45秒かかります。

provider.cleanroom_init の2番目の引数は、クリーンルームのディストリビューションです。これはINTERNALまたはEXTERNALのいずれかです。テスト目的で、同じ組織内のアカウントにクリーンルームを共有する場合、INTERNALを使用して、アプリケーションパッケージがコラボレーターにリリースされる前に実施されなければならない自動セキュリティスキャンをバイパスすることができます。ただし、このクリーンルームを別の組織のアカウントと共有する場合は、EXTERNALクリーンルーム配布を使用する必要があります。

call samooha_by_snowflake_local_db.provider.cleanroom_init($cleanroom_name, 'INTERNAL');
Copy

セキュリティスキャンのステータスを表示するには、以下を使用します。

call samooha_by_snowflake_local_db.provider.view_cleanroom_scan_status($cleanroom_name);
Copy

クリーンルームを作成したら、コラボレーターと共有する前にリリースディレクティブを設定する必要があります。しかし、ディストリビューションがEXTERNALに設定されている場合は、リリースディレクティブを設定する前に、まずセキュリティスキャンが完了するのを待つ必要があります。残りのステップの実行を続け、スキャンが実行されている間、 provider.create_cleanroom_listing ステップの前にここに戻ることができます。

リリースディレクティブを設定するには、次のようにします。

call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '0');
Copy

クロスリージョンの共有

顧客のアカウントと異なる地域にアカウントを持つSnowflakeの顧客とクリーンルームを共有するには、Cross-Cloud Auto-Fulfillmentを有効にする必要があります。他地域のコンシューマーとの協業に関連する追加コストについては、Cross-Cloud Auto-Fulfillment costsをご参照ください。

開発者APIsを使用する場合、クロスリージョン共有を有効にするには2つのステップがあります。

  1. ACCOUNTADMINロールを持つSnowflake管理者は、Snowflake アカウントのクロスクラウド自動複製を有効にします。手順については、異なるリージョンのアカウントと連携するをご参照ください。

  2. プロバイダーは、 provider.enable_laf_for_cleanroom コマンドを実行して、クリーンルームのクロスクラウド自動複製を有効にします。例:

    call samooha_by_snowflake_local_db.provider.enable_laf_for_cleanroom($cleanroom_name);
    
    Copy

クリーンルームのクロスクラウド自動複製を有効にした後、 provider.create_cleanroom_listing コマンドを使用して、通常通りリストにコンシューマーを追加できます。リストは、必要に応じて自動的にリモートクラウドやリージョンに複製されます。

Snowparkプロシージャをクリーンルームに秘密裏にロードします。

このセクションでは、Snowparkプロシージャをクリーンルームにロードする方法を説明します。プロシージャは次のステップを実行します。

  1. インプレッションデータの前処理: コンシューマーのテーブルが提供された場合、データプロバイダーのインプレッションデータをコンシューマーのユーザーデータに結合し、日付ごとのインプレッションとリーチの明確なカウントを計算し、クリーンルーム内の中間テーブルに格納する動的な SQL が作成されます。コンシューマーテーブルが提供されない場合、プロバイダーのインプレッションテーブルのすべてを使用します。

  2. 中間テーブルのロード: 中間テーブルはpandasデータフレームとしてSnowparkプロシージャにロードされます。

  3. 回帰の実行: 回帰は statsmodels ライブラリを使用して計算され、結果はpandas dataframeとして返されます。

  4. Snowflakeテーブルへの結果の書き込み:クリーンルーム内の結果テーブルに結果が書き込まれ、テーブルのIDサフィックスがコンシューマーに返されます。

    a.Snowparkプロシージャはクリーンルーム内で実行されるため、コンシューマーテナントに直接書き込む機能は限られています。その代わりに、結果をより安全に保つために、クリーンルーム内のテーブルに書き込まれ、コンシューマーはテーブルから読み取ることができます。

  5. 中間テーブルの削除:クリーンルーム内で計算中に作成された、不要になった中間テーブルは、Snowparkプロシージャが終了する前に削除されます。

次のAPIを使うことにより、Python関数をインライン関数としてクリーンルームに直接定義することができます。あるいは、クリーンルームステージにアップロードしたステージングされたファイルからPythonをロードすることもできます。例については[API リファレンスガイド](../provider)をご参照ください。

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name,
    'reach_impression_regression',
    ['source_table string', 'my_table string'],
    ['snowflake-snowpark-python', 'pandas', 'statsmodels', 'numpy'],
    'string',
    'main',
    $$
import traceback
import pandas as pd
import numpy as np

import statsmodels.formula.api as sm


def drop_tables(session, table_names):
    """
    Drop the tables passed in
    """
    for tbl in table_names:
        session.sql(f'drop table {tbl}').collect()

def preprocess_regression_data(session, source_table, my_table, suffix):
    """
    Preprocess the impressions and customer data into an intermediary table for regression
    """
    table_name = f'cleanroom.intermediary_{suffix}'

    my_table_statement = f'inner join {my_table} c on p.hem = c.hem' if my_table != 'NONE' else ''
    session.sql(f"""
    create or replace table {table_name} as (
        with joint_data as (
            select
                date,
                p.hem as hem,
                impression_id
            from {source_table} p
            {my_table_statement}
        )
        select
            date,
            count(distinct hem) as reach,
            count(distinct impression_id) as num_impressions
        from joint_data
        group by date
        order by date
    );
    """).collect()

    return table_name

def calculate_regression(df):
    """
    Calculate the regression data from the dataframe we put together
    """
    result = sm.ols('REACH ~ 1 + NUM_IMPRESSIONS', data=df).fit()
    retval = pd.concat([
        result.params,
        result.tvalues,
        result.pvalues
    ], keys=['params', 't-stat', 'p-value'], names=['STATISTIC', 'PARAMETER']).rename('VALUE').reset_index()
    return retval

def main(session, source_table, my_table):
    """
    First compute the regression data from an overlap between customer and provider data, and counting
    the number of impressions and reach per day. Next regress these locally and compute the regression
    statistics. Finally write it to a results table which can be queried to get the output.
    """
    suffix = f'regression_results_{abs(hash((source_table, my_table))) % 10000}'

    try:
        # Preprocess impressions and customer data into an intermediary form to use for regression
        intermediary_table_name = preprocess_regression_data(session, source_table, my_table, suffix)

        # Load the data into Python locally
        df = session.table(intermediary_table_name).to_pandas()

        # Carry out the regression and get statistics as an output
        regression_output = calculate_regression(df)

        # Write the statistics to an output table
        # The table and the schema names should be in upper case to quoted identifier related issues.
        table = f'results_{suffix}'.upper()
        retval_df = session.write_pandas(regression_output, table,  schema = 'CLEANROOM', auto_create_table = True)

        # Drop any intermediary tables
        drop_tables(session, [intermediary_table_name])

        # Tell the user the name of the table the results have been written to
        return f'Done, results have been written to the following suffix: {suffix}'
    except:
        return traceback.format_exc()
$$
);
Copy

注釈

Pythonをクリーンルームにロードすると、クリーンルーム用の新しいパッチが作成されます。クリーンルームのディストリビューションがEXTERNALに設定されている場合は、セキュリ ティスキャンが完了するのを待ってから、デフォルトのリリースディレクティブを更新する必要があります。

-- See the versions available inside the clean room
show versions in application package samooha_cleanroom_Snowpark_Demo_clean_room;

-- Once the security scan is approved, update the release directive to the latest version
call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '1');
Copy

UDFsを使用してカスタムテンプレートを追加する

クリーンルームにカスタム分析テンプレートを追加するには、プロバイダー側とコンシューマー側の両方でテーブル名のプレースホルダーとプロバイダー側の結合列が必要です。SQL Jinjaテンプレートでは、これらのプレースホルダーは次のものである必要があります。

  • source_table:プロバイダーのテーブル名の 配列

  • my_table:コンシューマーのテーブル名の 配列

テーブル名は、これらの変数を使用して動的にすることができますが、クリーンルームにリンクされた表示名を使用して、必要であればテンプレートにハードコード化することもできます。列名はテンプレートにハードコード化することも、パラメーターで動的に設定することもできます。パラメーターを使用して設定する場合は、列ポリシーと照合するために、パラメーターの dimensions または measure_column を配列として呼び出す必要があることにご留意ください。これらを SQL Jinjaパラメーターとしてテンプレートに追加します。これらは後でクエリ時にコンシューマーから渡されます。結合ポリシーは、許可された列以外の列でコンシューマーにより結合できないようにします。

また、SQL Jinjaテンプレートのカスタム引数は、以下のフィルターを使用して、結合および列のポリシーに準拠しているかどうかを確認することができます。

  • join_policy: 文字列値またはフィルター句が結合ポリシーに準拠しているかどうかを確認します。

  • column_policy: 文字列値またはフィルター句が列ポリシーに準拠しているかどうかを確認します。

  • join_and_column_policy: フィルター句で結合に使用される列が結合ポリシーに準拠しているか、また、フィルターとして使用される列が列ポリシーに準拠しているかを確認します。

たとえば、 {{provider_id | sqlsafe | join_policy }} 句では、 p.HEM の入力が解析され、 p.HEM が結合ポリシーにあるかどうかが確認されます。注意: sqlsafe フィルターでは、コラボレーターが純粋な SQL をテンプレートに入れることができるため、使用には注意してください。

注釈

クリーンルームに実際にリンクされているセキュアビューの名前はテーブル名と異なるため、すべてのプロバイダー/コンシューマーテーブルは、これらの引数を使用して参照する必要があります。重要なのは、プロバイダーテーブルのエイリアスはp(またはp1)、p2、p3、p4などにする 必要 があり、コンシューマーテーブルのエイリアスはc(またはc1)、c2、c3などにする 必要 があるということです。これは、クリーンルームでセキュリティポリシーを実施するために必要です。

この関数は、同じ名前の既存のテンプレートを上書きすることに注意してください。既存のテンプレートを更新したい場合は、更新されたテンプレートでこの関数を再度呼び出します。

call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
        $cleanroom_name,
        'prod_calculate_regression',
        $$
call cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
Copy

最後に、calculate_regressionテンプレートから返されるresults suffix IDを使って、コンシューマーが分析結果を取得できるカスタムテンプレートが追加されます。

call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
        $cleanroom_name,
        'prod_get_results',
        $$
select * from cleanroom.results_{{ results_suffix | sqlsafe }};
$$
);
Copy

クリーンルームで現在有効なテンプレートを表示したい場合は、以下の手順を呼び出します。

call samooha_by_snowflake_local_db.provider.view_added_templates($cleanroom_name);
Copy

コンシューマーと共有する

最後に、以下のようにSnowflakeアカウントロケータとアカウント名を追加して、データコンシューマーをクリーンルームに追加します。Snowflakeアカウント名は<ORGANIZATION>.<ACCOUNT_NAME>の形式である必要があります。

注釈

以下のプロシージャを呼び出すには、最初に provider.set_default_release_directive を使用してリリースディレクティブを設定していることを確認してください。次を使用した最新のバージョンとパッチが表示されます。

show versions in application package samooha_cleanroom_Snowpark_Demo_clean_room;
Copy

注釈

この呼び出しが完了するまでに約60秒かかることに注意してください。これは、コンシューマーからのリクエストをリッスンし、ログに記録するための多くのタスクを設定するためです。

call samooha_by_snowflake_local_db.provider.add_consumers($cleanroom_name, '<CONSUMER_ACCOUNT_LOCATOR>', '<CONSUMER_ACCOUNT_NAME>');
call samooha_By_snowflake_local_db.provider.create_cleanroom_listing($cleanroom_name, '<CONSUMER_ACCOUNT_NAME>');
Copy

複数のコンシューマーアカウントロケーターを、カンマ区切りの文字列として provider.add_consumers 関数に渡すことも、 provider.add_consumers を個別に呼び出すこともできます。

このクリーンルームに追加されたコンシューマーを表示したい場合は、次の手順を呼び出します。

call samooha_by_snowflake_local_db.provider.view_consumers($cleanroom_name);
Copy

以下の手順で最近作成されたクリーンルームを表示します。

call samooha_by_snowflake_local_db.provider.view_cleanrooms();
Copy

最近作られたクリーンルームの詳細を、以下の手順で表示します。

call samooha_by_snowflake_local_db.provider.describe_cleanroom($cleanroom_name);
Copy

作成されたクリーンルームは削除することもできます。次のコマンドはクリーンルームを完全に削除するので、以前クリーンルームにアクセスしていたコンシューマーはもうクリーンルームを使用することができません。将来、同じ名前のクリーンルームが必要な場合は、上記のフローを使用して再初期化する必要があります。

call samooha_by_snowflake_local_db.provider.drop_cleanroom($cleanroom_name);
Copy

注釈

これでプロバイダーフローは終了です。コンシューマーアカウントに切り替えて、コンシューマーフローを継続してください。

コンシューマー

注釈

以下のコマンドは、コンシューマーアカウントのSnowflakeワークシートで実行する必要があります。

環境の設定

開発者APIsを使用してSnowflake Data Clean Roomで作業する前に、以下のコマンドを実行してSnowflake環境をセットアップします。SAMOOHA_APP_ROLEロールを持ってない場合は、アカウント管理者にお問い合わせください。

use role samooha_app_role;
use warehouse app_wh;
Copy

クリーンルームをインストールする

クリーンルーム共有がインストールされると、利用可能なクリーンルームのリストは、以下のコマンドを使用して表示することができます。

call samooha_by_snowflake_local_db.consumer.view_cleanrooms();
Copy

プロバイダーが共有したクリーンルームの名前を割り当てます。

set cleanroom_name = 'Snowpark Demo clean room';
Copy

以下のコマンドは、関連するプロバイダーと選択されたクリーンルームを持つコンシューマーアカウントにクリーンルームをインストールします。

この手順の実行には約45秒かかります。

call samooha_by_snowflake_local_db.consumer.install_cleanroom($cleanroom_name, '<PROVIDER_ACCOUNT_LOCATOR>');
Copy

クリーンルームが設置されると、プロバイダーはクリーンルームの使用を可能にする前に、プロバイダー側でクリーンルームの設定を完了しなければなりません。以下の関数でクリーンルームのステータスを確認できます。有効化されると、以下のRun Analysisコマンドを実行できるようになります。クリーンルームが有効になるまで、通常約1分かかります。

この関数を実行する前に、 install_cleanroom 関数が終了していることを確認してください。

call samooha_by_snowflake_local_db.consumer.is_enabled($cleanroom_name);
Copy

分析を実行する

クリーンルームがインストールされたので、プロバイダーからクリーンルームに与えられた分析テンプレートを"run_analysis"コマンドを使って実行することができます。各フィールドがどのように決定されるかについては、以下のセクションをご参照ください。

通過可能なデータセットの数は、プロバイダーが実装したテンプレートによって制限されます。テンプレートによっては、特定のテーブル数を必要とする場合があります。テンプレート作成者は、サポートしたい要件を実装することができます。

注釈

分析を実行する前に、ウェアハウスサイズを変更したり、テーブルが大きい場合は新しい大きなウェアハウスサイズを使用することができます。

call samooha_by_snowflake_local_db.consumer.run_analysis(
  $cleanroom_name,               -- cleanroom
  'prod_calculate_regression',    -- template name

  ['<USERS_TABLE>'],    -- consumer tables

  ['<IMPRESSSIONS_TABLE>'],     -- provider tables

  object_construct()     -- Rest of the custom arguments needed for the template
);
Copy

この分析の出力は、以下のテンプレートを使って回帰の結果を取り出すために使用できるIDになります。

set result_suffix = 'regression_results_<ID>';

call samooha_by_snowflake_local_db.consumer.run_analysis(
    $cleanroom_name,        -- cleanroom
    'prod_get_results',     -- template name
    [],                     -- consumer tables
    [],                     -- provider tables
    object_construct(
        'results_suffix', $result_suffix  -- The suffix with the results
    )
);
Copy

run_analysisの入力を決定する方法

分析を実行するには、run_analysis関数にいくつかのパラメータを渡す必要があります。このセクションでは、どのようなパラメータを渡すかを決定する方法を説明します。

テンプレート名

まず、以下のプロシージャを呼び出すことで、サポートされている分析テンプレートを確認できます。

call samooha_by_snowflake_local_db.consumer.view_added_templates($cleanroom_name);
Copy

分析を実行するには、run_analysis関数にいくつかのパラメータを渡す必要があります。このセクションでは、どのようなパラメータを渡すかを決定する方法を説明します。

call samooha_by_snowflake_local_db.consumer.view_template_definition($cleanroom_name, 'prod_calculate_regression');
Copy

これには、異なるSQL Jinjaのパラメータが大量に含まれることもあります。以下の機能は、SQL Jinjaテンプレートを解析し、run_analysisで指定する必要がある引数をリストに抽出します。

call samooha_by_snowflake_local_db.consumer.get_arguments_from_template($cleanroom_name, 'prod_calculate_regression');
Copy

データセット名

プロバイダーによってクリーンルームに追加されたデータセット名を表示したい場合は、次の手順を呼び出します。クリーンルームのセキュリティ特性上、プロバイダーがクリーンルームに追加したデータセットは表示できません。

call samooha_by_snowflake_local_db.consumer.view_provider_datasets($cleanroom_name);
Copy

また、クリーンルームにリンクしたテーブルは、以下の呼び出しで確認することができます。

call samooha_by_snowflake_local_db.consumer.view_consumer_datasets($cleanroom_name);
Copy

推奨事項

  • 重要なデータの前処理はすべて、可能な限り動的SQL、 cleanroom スキーマを使用して中間テーブルにデータを格納します。より速く、より効率的です。例:

    session.sql("create or replace table cleanroom.intermediary as ...")
    
    Copy
  • Snowparkデコレータを使用するのではなく、 cleanroom スキーマでsession.sqlを介してSQL、UDFs、UDTFsおよびプロシージャを作成します。例:

    session.sql("create or replace function cleanroom.udf(...")
    
    Copy
  • メモリに収まりきらないほど大きなデータを読み込み中は、.to_pandas_batches()を使って繰り返し処理します。例:

    df_iter = session.table(intermediary_table_name).to_pandas_batches()
    for df in df_iter:
        ...
    
    Copy