マルチステップフローのデザイン¶
概要¶
ほとんどのclean roomは、clean room内の1つまたは複数のテーブルに対して単一の SQL クエリを実行し、その結果をレスポンスに表示するために使用されます。しかし、フローをいくつかのステップに分割して順次実行したり、任意の順序で実行したり、Pythonコードを呼び出してデータを処理(または前処理)したりするケースも数多くあります。その例として、データをデータセットに対して一度トレーニングした後、それを入力データを変えながら単独またはバッチで複数回実行する機械学習フローが挙げられます。
Clean roomには、このような高度なシナリオを可能にするためのメカニズムがいくつかあります:
テンプレートチェーン: テンプレートチェーン は各テンプレートの出力を次のテンプレートの入力として使用しながら、セットになったテンプレートを特定の順序で実行します。チェーンの最初のテンプレートにはユーザーが入力し、チェーンの最後のテンプレートからの出力をユーザーに返します。
内部テーブル: テンプレートまたはカスタムな内部関数により、clean room内にテーブルを作成できます。これらのテーブルはリンクされたテーブルのように動作し、テンプレートやアップロードされたカスタムコードからアクセスできます。内部テーブルは状態やデータを管理するために使用されます。機械学習の例では、トレーニングデータが内部テーブルに保存され、内部関数で使用されます。これらのテーブルには、clean room内のテンプレートまたはアップロードされたコードによってのみアクセスできます。内部テーブルに中間データを格納すると、テンプレートを使ってclean roomに大容量の情報を出し入れするよりも、はるかに効率が高くなりますす。
カスタム内部関数: clean room内にカスタム関数を定義し、そのclean roomでテンプレートから呼び出すことができます。関数は、 Python UDFs または UDTFs からclean roomにアップロードするか、 clean roomにコンテナーサービスを作成し、 そこから関数を実装するエンドポイントを公開することで定義します。これらの関数は、clean room内のテンプレートからのみ呼び出すことができます。
注釈
すべてのメカニズムに、テーブルや関数はテンプレートを使用してアクセスまたは実行されるという共通の原則があります。clean roomの内部テーブルへのアクセス、clean roomカスタム関数の実行、clean room内部のエンドポイントへの直接アクセスはできません。
clean roomの内部テーブル¶
SQL やPythonを使用してclean room内にテーブルを作成し、中間的な結果を保存したり、ユーザーや内部関数(トレーニングデータなど)のための永続的なストレージとして使用したりできます。これらのテーブルの動作は、リンクテーブルと同じですが、以下の注意点があります:
これらのテーブルは、clean roomのテンプレートまたは UDF/UDTF を使用して作成され、外部のテーブルにはリンクされません。
これらのテーブルは、
cleanroom
名前空間に作成する必要があります。手動で作成した内部テーブルには、行や列のポリシーを後からセットできます。
テーブル名が静的ではなく、テーブルに他のテンプレートやコードからアクセスする必要がある場合、テーブル名をユーザーに返すことで、ユーザーが動的テーブル名を使用し、そのテーブルへのアクセスが必要な他のテンプレートに渡せるようにする必要があります。
以下は内部テーブルの作成例です:
JinjaSQL テンプレートは内部テーブルを作成できます。これは一部のタイプの アクティベーション で行われます。
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
$template_name,
$$
BEGIN
CREATE OR REPLACE TABLE cleanroom.activation_data_analysis_results AS
SELECT count(*) AS ITEM_COUNT, c.status, c.age_band
FROM IDENTIFIER({{ my_table[0] }}) AS c
JOIN IDENTIFIER({{ source_table[0] }}) AS p
ON {{ c_join_col | sqlsafe | activation_policy }} = {{ p_join_col | sqlsafe | activation_policy }}
GROUP BY c.status, c.age_band
ORDER BY c.age_band;
RETURN 'analysis_results';
END;
$$);
UDF は内部テーブルを作成できます。これには通常、Pythonで SQL が実行されます。
# Snippet of Python UDF to save results to an internal table.
table_name = f'cleanroom.results'
session.sql(f"""
CREATE OR REPLACE TABLE {table_name} AS (
WITH joint_data AS (
SELECT
date,
p.hashed_email AS hem,
impression_id
FROM {source_table} p
)
SELECT
date,
COUNT(DISTINCT hem) AS reach,
COUNT(DISTINCT impression_id) AS num_impressions
FROM joint_data
GROUP BY date
ORDER BY date
);
""").collect()
# Snippet of container services Python code to create an internal results table.
# 'cleanroom' table name prefix is added using the schema parameter when the table is created.
@app.post("/score")
def score():
... omitted content ...
df = pd.DataFrame({
"ID": ids,
"SCORE": scores
})
table = "LOOKALIKE_RESULTS"
session.write_pandas(df, table, schema="CLEANROOM", auto_create_table=True, overwrite=True)
end_time = time.perf_counter()
execution_time = end_time - start_time
response = make_json_response([[0, {"results_table": table, "size": len(ids), "execution_time": round(execution_time, 2)}]])
return response
テンプレートやコードからアクセスする必要のある内部テーブルを生成する場合は、定数のテーブル名を使用するか、またはテーブル名を動的に作成してユーザーに返し、ユーザーがそれを結果の関数に渡します。
ここでは、名前が動的に作成されるテーブルを使用して結果を格納する例を示します。ユーザーは呼び出しを2回行います。1つはデータを生成してテーブル名を取得するため、もう1つは結果を確認するためです。
プロバイダーテンプレートは、
reach_impression_regression
UDF を呼び出してデータを処理します (cleanroom
プレフィックスにより、 UDFであることがわかります)。UDF は内部テーブルのプレフィックス名をテンプレートに返し、テンプレートはそれを呼び出し元に返します。CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_calculate_regression', $$ CALL cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }}); $$ );
Python UDF はテーブル名の接尾辞名をテンプレートの呼び出し元に返します。
def main(session, source_table, my_table): ... table = f'results_{suffix}'.upper() retval_df = session.write_pandas(regression_output, table, schema = 'CLEANROOM', auto_create_table = True) return f'Done, results have been written to the following suffix: {suffix}'
プロバイダーテンプレートは渡されたテーブル名の接尾辞を受け取り、そのテーブルの内容を表示します。
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_get_results', $$ SELECT * FROM cleanroom.results_{{ results_suffix | sqlsafe }}; $$ );
コンシューマーは、テーブル名の接尾辞を渡してテンプレートを呼び出します。
CALL samooha_by_snowflake_local_db.consumer.run_analysis( $cleanroom_name, 'prod_get_results', [], [], object_construct( 'results_suffix', $result_suffix -- Table name suffix to identify the results table. ) );
カスタム関数のトリガー¶
カスタム関数は、clean room内のテンプレートまたはコード(UDFs、 UDTFs、 またはコンテナーサービスエンドポイント)から呼び出すことができます。任意のコラボレーターによってアップロードされた関数は、他の任意のコラボレーターのテンプレートやコードからアクセスできます。
Clean room関数は、常に以下の名前空間で適切にスコープし、呼び出す必要があります:
cleanroom.function_name
カスタムな UDF/UDTF 関数を呼び出す場合service_functions.function_name
Snowparkコンテナーサービスの組み込み関数として公開されている関数を呼び出す場合
以下の例では、カスタム UDF とカスタムコンテナーのサービスエンドポイントをテンプレートから呼び出します:
テンプレートは cleanroom
スコープを使用して UDF または UDTFs にアクセスします。
-- Template to generate results. Calls the UDF 'my_function', which
-- generates a results table inside the clean room called 'results'.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'generate_results_template',
$$
CALL cleanroom.my_function({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
テンプレートは service_functions
スコープを使用してコンテナーサービス関数にアクセスします。
-- Template to trigger training data generation.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'lal_train',
$$
SELECT service_functions.my_func(
{{ source_table[0] }},
{{ provider_join_col }},
{{ my_table[0] }},
{{ consumer_join_col }},
{{ dimensions | sqlsafe }},
{{ filter_clause }}
) AS train_result;
$$
一般的なマルチステップフローのパターン¶
このSnowpark API の例 では、データを処理し、中間テーブルを生成し、1つ目のテンプレートを呼び出して結果テーブルを生成してから、2つ目のテンプレート呼び出しで結果を直接公開します。
このSnowparkコンテナーサービスの例 では、1つ目のテンプレート呼び出しでトレーニングデータを作成し、内部テーブルにトレーニングデータを保存します。2つ目のテンプレートで、保存されているトレーニングデータに対してユーザー入力の分析を行います。