マルチステップフローのデザイン¶
概要¶
ほとんどのclean roomは、clean room内の1つまたは複数のテーブルに対して単一の SQL クエリを実行し、その結果をレスポンスに表示するために使用されます。しかし、フローをいくつかのステップに分割して順次実行したり、任意の順序で実行したり、Pythonコードを呼び出してデータを処理(または前処理)したりするケースも数多くあります。その例として、データをデータセットに対して一度トレーニングした後、それを入力データを変えながら単独またはバッチで複数回実行する機械学習フローが挙げられます。
Clean roomには、このような高度なシナリオを可能にするためのメカニズムがいくつかあります:
テンプレートチェーン: テンプレートチェーン は各テンプレートの出力を次のテンプレートの入力として使用しながら、セットになったテンプレートを特定の順序で実行します。チェーンの最初のテンプレートにはユーザーが入力し、チェーンの最後のテンプレートからの出力をユーザーに返します。
内部テーブル: テンプレートまたはカスタムな内部関数により、clean room内にテーブルを作成できます。これらのテーブルはリンクされたテーブルのように動作し、テンプレートやアップロードされたカスタムコードからアクセスできます。内部テーブルは状態やデータを管理するために使用されます。機械学習の例では、トレーニングデータが内部テーブルに保存され、内部関数で使用されます。これらのテーブルには、clean room内のテンプレートまたはアップロードされたコードによってのみアクセスできます。内部テーブルに中間データを格納すると、テンプレートを使ってclean roomに大容量の情報を出し入れするよりも、はるかに効率が高くなりますす。
**カスタム内部関数:**クリーンルーム内でカスタム関数を定義し、そのクリーンルームのテンプレートから呼び出すことができます。クリーンルームで関数を定義するには、Python UDFs またはUDTFs をクリーンルームにアップロードするか、関数を実装するエンドポイントを公開する:ref:
コンテナサービスをクリーンルームに作成 <label-dcr_snowpark_spcs>します。これらの関数はクリーンルーム内のテンプレートからのみ呼び出すことができます。
注釈
すべてのメカニズムに、テーブルや関数はテンプレートを使用してアクセスまたは実行されるという共通の原則があります。clean roomの内部テーブルへのアクセス、clean roomカスタム関数の実行、clean room内部のエンドポイントへの直接アクセスはできません。
clean roomの内部テーブル¶
You can create tables inside a clean room using SQL or Python to store intermediary results, or for persistent storage for the user or your internal functions (for example, to save training data that is used for multiple runs). These tables behave the same as linked tables, with the following notes:
Internal tables are created using a clean room template or a UDF/UDTF, and have no linkage to outside tables.
Internal tables must be created in the
cleanroomnamespace.手動で作成した内部テーブルには、行や列のポリシーを後からセットできます。
If the table name is dynamic, and the table is accessed by other templates or code, you can return the name of the table to the user, so the user can pass in the dynamic table name to any other templates that need to access that table.
以下は内部テーブルの作成例です:
JinjaSQL テンプレートは内部テーブルを作成できます。これは一部のタイプの アクティベーション で行われます。
This example returns the table name so that it can be passed in as a parameter to other templates.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
$template_name,
$$
BEGIN
CREATE OR REPLACE TABLE cleanroom.data_analysis_results AS
SELECT count(*) AS ITEM_COUNT, c.status, c.age_band
FROM IDENTIFIER({{ my_table[0] }}) AS c
JOIN IDENTIFIER({{ source_table[0] }}) AS p
ON {{ c_join_col | sqlsafe | activation_policy }} = {{ p_join_col | sqlsafe | activation_policy }}
GROUP BY c.status, c.age_band
ORDER BY c.age_band;
RETURN 'analysis_results';
END;
$$);
UDF は内部テーブルを作成できます。これには通常、Pythonで SQL が実行されます。
# Snippet of Python UDF to save results to an internal table.
table_name = f'cleanroom.results'
session.sql(f"""
CREATE OR REPLACE TABLE {table_name} AS (
WITH joint_data AS (
SELECT
date,
p.hashed_email AS hem,
impression_id
FROM {source_table} p
)
SELECT
date,
COUNT(DISTINCT hem) AS reach,
COUNT(DISTINCT impression_id) AS num_impressions
FROM joint_data
GROUP BY date
ORDER BY date
);
""").collect()
# Snippet of container services Python code to create an internal results table.
# 'cleanroom' table name prefix is added using the schema parameter when the table is created.
@app.post("/score")
def score():
... omitted content ...
df = pd.DataFrame({
"ID": ids,
"SCORE": scores
})
table = "LOOKALIKE_RESULTS"
session.write_pandas(df, table, schema="CLEANROOM", auto_create_table=True, overwrite=True)
end_time = time.perf_counter()
execution_time = end_time - start_time
response = make_json_response([[0, {"results_table": table, "size": len(ids), "execution_time": round(execution_time, 2)}]])
return response
テンプレートやコードからアクセスする必要のある内部テーブルを生成する場合は、定数のテーブル名を使用するか、またはテーブル名を動的に作成してユーザーに返し、ユーザーがそれを結果の関数に渡します。
ここでは、名前が動的に作成されるテーブルを使用して結果を格納する例を示します。ユーザーは呼び出しを2回行います。1つはデータを生成してテーブル名を取得するため、もう1つは結果を確認するためです。
プロバイダーテンプレートは、
reach_impression_regressionUDF を呼び出してデータを処理します (cleanroomプレフィックスにより、 UDFであることがわかります)。UDF は内部テーブルのプレフィックス名をテンプレートに返し、テンプレートはそれを呼び出し元に返します。CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_calculate_regression', $$ CALL cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }}); $$ );
Python UDF はテーブル名の接尾辞名をテンプレートの呼び出し元に返します。
def main(session, source_table, my_table): ... table = f'results_{suffix}'.upper() retval_df = session.write_pandas(regression_output, table, schema = 'CLEANROOM', auto_create_table = True) return f'Done, results have been written to the following suffix: {suffix}'
プロバイダーテンプレートは渡されたテーブル名の接尾辞を受け取り、そのテーブルの内容を表示します。
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_get_results', $$ SELECT * FROM cleanroom.results_{{ results_suffix | sqlsafe }}; $$ );
コンシューマーは、テーブル名の接尾辞を渡してテンプレートを呼び出します。
CALL samooha_by_snowflake_local_db.consumer.run_analysis( $cleanroom_name, 'prod_get_results', [], [], object_construct( 'results_suffix', $result_suffix -- Table name suffix to identify the results table. ) );
カスタム関数のトリガー¶
カスタム関数は、clean room内のテンプレートまたはコード(UDFs、 UDTFs、 またはコンテナーサービスエンドポイント)から呼び出すことができます。任意のコラボレーターによってアップロードされた関数は、他の任意のコラボレーターのテンプレートやコードからアクセスできます。
Clean room関数は、常に以下の名前空間で適切にスコープし、呼び出す必要があります:
cleanroom.function_nameカスタムな UDF/UDTF 関数を呼び出す場合service_functions.function_nameSnowparkコンテナーサービスの組み込み関数として公開されている関数を呼び出す場合
以下の例では、カスタム UDF とカスタムコンテナーのサービスエンドポイントをテンプレートから呼び出します:
テンプレートは cleanroom スコープを使用して UDF または UDTFs にアクセスします。
-- Template to generate results. Calls the UDF 'my_function', which
-- generates a results table inside the clean room called 'results'.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'generate_results_template',
$$
CALL cleanroom.my_function({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
テンプレートは service_functions スコープを使用してコンテナーサービス関数にアクセスします。
-- Template to trigger training data generation.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'lal_train',
$$
SELECT service_functions.my_func(
{{ source_table[0] }},
{{ provider_join_col }},
{{ my_table[0] }},
{{ consumer_join_col }},
{{ dimensions | sqlsafe }},
{{ filter_clause }}
) AS train_result;
$$
一般的なマルチステップフローのパターン¶
このSnowpark API の例 では、データを処理し、中間テーブルを生成し、1つ目のテンプレートを呼び出して結果テーブルを生成してから、2つ目のテンプレート呼び出しで結果を直接公開します。
このSnowparkコンテナーサービスの例 では、1つ目のテンプレート呼び出しでトレーニングデータを作成し、内部テーブルにトレーニングデータを保存します。2つ目のテンプレートで、保存されているトレーニングデータに対してユーザー入力の分析を行います。