Snowflake Data Clean Rooms: procedimentos Snowpark seguros¶
Este tópico descreve os fluxos de provedores e consumidores necessários para configurar programaticamente uma sala limpa, compartilhá-la com um consumidor e executar análises nela de uma forma que use procedimentos Snowpark seguros carregados na sala limpa a partir da conta do provedor. Neste fluxo, um provedor carrega um procedimento Snowpark seguro na sala limpa usando uma API que mantém o código Python subjacente completamente confidencial do consumidor.
O procedimento Snowpark neste fluxo está realizando uma regressão linear do alcance na contagem de impressões para estimar a inclinação. Ele toma como entrada uma tabela com IDs de impressão, IDs de usuário e carimbos de data/hora na conta do provedor e, opcionalmente, uma tabela de usuários de um consumidor. O procedimento Snowpark cria dinamicamente SQL para unir os dados de impressões aos dados dos usuários do consumidor, se fornecidos, e criar uma tabela intermediária na sala limpa que contenha a contagem de impressões e alcance por dia.
Em seguida, esses dados da tabela intermediária são processados dentro do procedimento Snowpark, e uma regressão é realizada para estimar a interceptação, a inclinação e uma série de outros parâmetros. Esses dados são então gravados em uma tabela de resultados dentro da sala limpa e o ID desta tabela é fornecida ao consumidor como uma saída. Por fim, o consumidor pode usar um modelo get_results com este ID para obter os dados de volta da sala limpa. Antes que o procedimento Snowpark seja concluído, ele limpa todas as tabelas intermediárias criadas na sala limpa.
Observação: todas as tabelas intermediárias são criadas dentro da sala limpa e, portanto, não são acessíveis a ninguém, exceto ao próprio procedimento Snowpark.
Os principais aspectos desse fluxo, além dos mencionados acima, são:
Provedor:
a. Adicione com segurança um procedimento Snowpark na sala limpa.
b. Adicione um modelo personalizado que execute o procedimento Snowpark e outro que recupere os resultados.
c. Compartilhe a sala limpa com um consumidor.
Consumidor:
a. Execute o modelo que realiza a regressão.
b. Recupere os resultados da análise.
Pré-requisitos¶
Você precisa de duas contas Snowflake separadas para concluir este fluxo. Use a primeira conta para executar os comandos do provedor e alterne para a segunda conta para executar os comandos do consumidor.
Provedor¶
Nota
Os comandos a seguir devem ser executados em uma planilha Snowflake na conta do provedor.
Configuração do ambiente¶
Execute os seguintes comandos para configurar o ambiente Snowflake antes de usar as APIs de desenvolvedor para trabalhar com uma Snowflake Data Clean Room. Se você não tem a função SAMOOHA_APP_ROLE, entre em contato com o administrador da sua conta.
use role samooha_app_role;
use warehouse app_wh;
Crie a sala limpa¶
Crie um nome para a sala limpa. Insira um novo nome de salas limpas para evitar colisões com nomes de salas limpas. Observe que os nomes das salas limpas só podem conter caracteres alfanuméricos. Os nomes das salas limpas não podem conter caracteres especiais além de espaços e sublinhados.
set cleanroom_name = 'Snowpark Demo clean room';
Você pode criar uma nova sala limpa com o nome de sala limpa definido acima. Se o nome da sala limpa definido acima já existir como uma sala limpa, esse processo falhará.
Este procedimento leva aproximadamente 45 segundos para ser executado.
O segundo argumento para provider.cleanroom_init é a distribuição da sala limpa. Ele pode ser INTERNAL ou EXTERNAL. Para fins de teste, se você estiver compartilhando a sala limpa com uma conta na mesma organização, você pode usar INTERNAL para ignorar a verificação de segurança automatizada que deve ocorrer antes que um pacote de aplicativo seja liberado aos colaboradores. No entanto, se você estiver compartilhando esta sala limpa com uma conta em uma organização diferente, você deve usar uma distribuição de sala limpa EXTERNAL.
call samooha_by_snowflake_local_db.provider.cleanroom_init($cleanroom_name, 'INTERNAL');
Para visualizar o status da verificação de segurança, use:
call samooha_by_snowflake_local_db.provider.view_cleanroom_scan_status($cleanroom_name);
Após criar sua sala limpa, você deve definir sua diretiva de lançamento para que ela possa ser compartilhada com qualquer colaborador. No entanto, se sua distribuição tiver sido definida como EXTERNAL, você deverá primeiro aguardar a conclusão da verificação de segurança antes de definir a diretiva de lançamento. Você pode continuar executando o restante das etapas e retornar aqui antes da etapa provider.create_cleanroom_listing enquanto a verificação é executada.
Para definir a diretiva de lançamento, chame:
call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '0');
Compartilhamento entre regiões¶
Para compartilhar uma sala limpa com um cliente Snowflake cuja conta está em uma região diferente da sua conta, você deve habilitar o Preenchimento automático entre nuvens. Para obter mais informações sobre os custos adicionais associados à colaboração com consumidores em outras regiões, consulte Custos de preenchimento automático entre nuvens.
Ao usar as APIs de desenvolvedor, o processo para habilitar o compartilhamento entre regiões ocorre em duas etapas.
Um administrador Snowflake com a função ACCOUNTADMIN habilita o preenchimento automático entre nuvens para sua conta Snowflake. Para obter instruções, consulte Colaboração com contas em diferentes regiões.
Execute o comando provider.enable_laf_for_cleanroom para habilitar o preenchimento automático entre nuvens para a sala limpa. Por exemplo:
call samooha_by_snowflake_local_db.provider.enable_laf_for_cleanroom($cleanroom_name);
Após habilitar o preenchimento automático entre nuvens para a sala limpa, você pode adicionar consumidores à sua listagem normalmente usando o comando provider.create_cleanroom_listing. A listagem é replicada automaticamente para nuvens e regiões remotas, conforme necessário.
Vincule o conjunto de dados e defina a política de junção para o conjunto de dados¶
Conecte as tabelas Snowflake à sala limpa. Esses conjuntos de dados serão disponibilizados para você em sua conta por meio do patch mais recente. Navegue pela lista de tabelas em sua conta Snowflake e insira os nomes de tabela totalmente qualificados (Database.Schema.Table) como uma matriz. O procedimento torna a tabela automaticamente acessível à sala limpa, criando uma exibição segura da tabela de dentro da sala limpa, evitando assim a necessidade de fazer uma cópia da tabela.
call samooha_by_snowflake_local_db.provider.link_datasets($cleanroom_name, ['<IMPRESSIONS_TABLE>']);
Se vincular uma visualização à sala limpa que tenha dependências downstream, use provider.link_datasets_advanced:
call samooha_by_snowflake_local_db.provider.link_datasets_advanced($cleanroom_name, ['<VIEW_NAME>'], ['<SOURCE_DB_NAMES>']);
Nota
Se esta etapa não funcionar mesmo que sua tabela exista, é provável que a função SAMOOHA_APP_ROLE ainda não tenha recebido acesso a ela. Se esse for o caso, mude para a função ACCOUNTADMIN, chame o procedimento abaixo no banco de dados e reverta para o restante do fluxo:
use role accountadmin;
call samooha_by_snowflake_local_db.provider.register_db('<DATABASE_NAME>');
use role samooha_app_role;
Você pode ver os conjuntos de dados vinculados à sala limpa usando o seguinte procedimento:
call samooha_by_snowflake_local_db.provider.view_provider_datasets($cleanroom_name);
Para descobrir quais colunas usar como política de junção, você pode dar uma olhada em seu conjunto de dados para determinar as colunas PII. Para ver as 10 principais linhas, use esta consulta:
select * from <IMPRESSIONS_TABLE> limit 10;
Carregue confidencialmente o procedimento Snowpark na sala limpa¶
Esta seção mostra como carregar o procedimento Snowpark na sala limpa. O procedimento realiza as seguintes etapas:
Pré-processa dados de impressões: o SQL dinâmico é criado para unir os dados de impressões do provedor aos dados dos usuários do consumidor, se a tabela do consumidor tiver sido fornecida, e calcula a contagem distinta de impressões e alcance por data, e os armazena em uma tabela intermediária na sala limpa. Se a tabela do consumidor não for fornecida, ela usará toda a tabela de impressões do provedor.
Carrega a tabela intermediária: a tabela intermediária é carregada no procedimento Snowpark como um pandas DataFrame.
Executa regressão: a regressão é calculada usando a biblioteca statsmodels e os resultados são retornados como um pandas DataFrame.
Grava os resultados na tabela Snowflake: os resultados são gravados em uma tabela de resultados na sala limpa e o sufixo de ID da tabela é retornado ao consumidor.
a. Como o procedimento Snowpark é executado na sala limpa, ele tem uma capacidade limitada de gravar diretamente no locatário consumidor. Em vez disso, para manter os resultados mais seguros, eles são gravados em uma tabela dentro da sala limpa e permitem que os consumidores leiam a tabela.
Remove as tabelas intermediárias: tabelas intermediárias criadas durante o cálculo na sala limpa que não são mais necessárias são descartadas antes que o procedimento Snowpark seja concluído.
A API a seguir permite definir suas funções Python diretamente como funções em linha na sala limpa. Como alternativa, você pode carregar o código Python a partir de arquivos preparados que você enviou para o estágio da sala limpa. Consulte o Guia de referência da API para um exemplo.
call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
$cleanroom_name,
'reach_impression_regression',
['source_table string', 'my_table string'],
['snowflake-snowpark-python', 'pandas', 'statsmodels', 'numpy'],
'string',
'main',
$$
import traceback
import pandas as pd
import numpy as np
import statsmodels.formula.api as sm
def drop_tables(session, table_names):
"""
Drop the tables passed in
"""
for tbl in table_names:
session.sql(f'drop table {tbl}').collect()
def preprocess_regression_data(session, source_table, my_table, suffix):
"""
Preprocess the impressions and customer data into an intermediary table for regression
"""
table_name = f'cleanroom.intermediary_{suffix}'
my_table_statement = f'inner join {my_table} c on p.hem = c.hem' if my_table != 'NONE' else ''
session.sql(f"""
create or replace table {table_name} as (
with joint_data as (
select
date,
p.hem as hem,
impression_id
from {source_table} p
{my_table_statement}
)
select
date,
count(distinct hem) as reach,
count(distinct impression_id) as num_impressions
from joint_data
group by date
order by date
);
""").collect()
return table_name
def calculate_regression(df):
"""
Calculate the regression data from the dataframe we put together
"""
result = sm.ols('REACH ~ 1 + NUM_IMPRESSIONS', data=df).fit()
retval = pd.concat([
result.params,
result.tvalues,
result.pvalues
], keys=['params', 't-stat', 'p-value'], names=['STATISTIC', 'PARAMETER']).rename('VALUE').reset_index()
return retval
def main(session, source_table, my_table):
"""
First compute the regression data from an overlap between customer and provider data, and counting
the number of impressions and reach per day. Next regress these locally and compute the regression
statistics. Finally write it to a results table which can be queried to get the output.
"""
suffix = f'regression_results_{abs(hash((source_table, my_table))) % 10000}'
try:
# Preprocess impressions and customer data into an intermediary form to use for regression
intermediary_table_name = preprocess_regression_data(session, source_table, my_table, suffix)
# Load the data into Python locally
df = session.table(intermediary_table_name).to_pandas()
# Carry out the regression and get statistics as an output
regression_output = calculate_regression(df)
# Write the statistics to an output table
# The table and the schema names should be in upper case to quoted identifier related issues.
table = f'results_{suffix}'.upper()
retval_df = session.write_pandas(regression_output, table, schema = 'CLEANROOM', auto_create_table = True)
# Drop any intermediary tables
drop_tables(session, [intermediary_table_name])
# Tell the user the name of the table the results have been written to
return f'Done, results have been written to the following suffix: {suffix}'
except:
return traceback.format_exc()
$$
);
Nota
Carregar o código Python na sala limpa cria um novo patch para a sala limpa. Se a distribuição de sua sala limpa estiver definida como EXTERNAL, você precisa aguardar a conclusão da verificação de segurança e, em seguida, atualizar a diretiva de liberação padrão usando:
-- See the versions available inside the clean room
show versions in application package samooha_cleanroom_Snowpark_Demo_clean_room;
-- Once the security scan is approved, update the release directive to the latest version
call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '1');
Como adicionar um modelo personalizado usando UDFs¶
Para adicionar um modelo de análise personalizado à sala limpa, você precisa de um espaço reservado para nomes de tabelas no lado do provedor e do consumidor, juntamente com colunas de junção do lado do provedor. Nos modelos SQL Jinja, esses espaços reservados devem ser sempre:
source_table: uma matriz de nomes de tabela do provedor
my_table: uma matriz de nomes de tabela do consumidor
Os nomes das tabelas podem ser tornados dinâmicos por meio do uso dessas variáveis, mas também podem ser embutidos em código no modelo, se desejado, usando o nome da visualização vinculada à sala limpa. Os nomes das colunas podem ser embutidos em código no modelo, se desejado, ou definidos dinamicamente por meio de parâmetros. Se eles forem definidos por meio de parâmetros, lembre-se de que você precisa chamar os parâmetros dimensions ou measure_column, que precisam ser matrizes para que sejam verificados em relação à política de coluna. Você adiciona estes como parâmetros SQL Jinja no modelo que serão passados posteriormente pelo consumidor durante a consulta. As políticas de junção garantem que o consumidor não possa ingressar em colunas não autorizadas.
Alternativamente, qualquer argumento em um modelo SQL Jinja personalizado pode ser verificado quanto à conformidade com as políticas de junção e coluna usando os seguintes filtros:
join_policy: verifica se um valor de cadeia de caracteres ou cláusula de filtro está em conformidade com a política de junção
column_policy: verifica se um valor de cadeia de caracteres ou cláusula de filtro está em conformidade com a política de coluna
join_and_column_policy: verifica se as colunas usadas para uma junção em uma cláusula de filtro estão em conformidade com a política de junção e se as colunas usadas como um filtro estão em conformidade com a política de coluna
Por exemplo, na cláusula {{ provider_id | sqlsafe | join_policy }}, uma entrada de p.HEM será analisada para verificar se p.HEM está na política de junção. Nota: Use o filtro sqlsafe com cautela, pois ele permite que os colaboradores coloquem SQL puro no modelo.
Nota
Todas as tabelas de provedores/consumidores devem ser referenciadas usando esses argumentos, pois o nome da exibição segura realmente vinculada à sala limpa será diferente do nome da tabela. Os aliases de tabela de provedores DEVEM ser obrigatoriamente p (ou p1), p2, p3, p4 etc., e os aliases da tabela do consumidor devem ser c (ou c1), c2, c3 etc. Isso é necessário para aplicar políticas de segurança na sala limpa.
Observe que esta função substitui qualquer modelo existente com o mesmo nome. Se quiser atualizar qualquer modelo existente, basta chamar esta função novamente com o modelo atualizado.
call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'prod_calculate_regression',
$$
call cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
Por fim, um modelo personalizado é adicionado para que o consumidor possa recuperar os resultados de sua análise, utilizando o ID de sufixo dos resultados retornado pelo modelo calculate_regression.
call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'prod_get_results',
$$
select * from cleanroom.results_{{ results_suffix | sqlsafe }};
$$
);
Se quiser visualizar os modelos atualmente ativos na sala limpa, chame o procedimento a seguir.
call samooha_by_snowflake_local_db.provider.view_added_templates($cleanroom_name);
Consumidor¶
Nota
Os seguintes comandos devem ser executados em uma planilha Snowflake na conta do consumidor
Configuração do ambiente¶
Execute os seguintes comandos para configurar o ambiente Snowflake antes de usar as APIs de desenvolvedor para trabalhar com uma Snowflake Data Clean Room. Se você não tem a função SAMOOHA_APP_ROLE, entre em contato com o administrador da sua conta.
use role samooha_app_role;
use warehouse app_wh;
Instalação da sala limpa¶
Depois que um compartilhamento de sala limpa for instalado, a lista de salas limpas disponíveis poderá ser visualizada usando o comando abaixo.
call samooha_by_snowflake_local_db.consumer.view_cleanrooms();
Atribua um nome para a sala limpa que o provedor compartilhou com você.
set cleanroom_name = 'Snowpark Demo clean room';
O comando a seguir instala a sala limpa na conta do consumidor com o provedor associado e a sala limpa selecionada.
Este procedimento leva aproximadamente 45 segundos para ser executado.
call samooha_by_snowflake_local_db.consumer.install_cleanroom($cleanroom_name, '<PROVIDER_ACCOUNT_LOCATOR>');
Após a instalação da sala limpa, o provedor precisa terminar de configurá-la do seu lado antes que ela seja habilitada para uso. A função abaixo permite que você verifique o status da sala limpa. Depois que ela for habilitada, você poderá executar o comando Run Analysis abaixo. Normalmente, leva cerca de 1 minuto para que a sala limpa seja habilitada.
Certifique-se de que a função install_cleanroom foi concluída antes de executar esta função.
call samooha_by_snowflake_local_db.consumer.is_enabled($cleanroom_name);
Vincule o conjunto de dados¶
Vincule conjuntos de dados à sala limpa para realizar cálculos seguros com os dados do provedor. Esses conjuntos de dados serão disponibilizados para você em sua conta por meio do patch mais recente.
call samooha_by_snowflake_local_db.consumer.link_datasets($cleanroom_name, ['<USERS_TABLE>']);
Nota
Se esta etapa não funcionar mesmo que sua tabela exista, é provável que a função SAMOOHA_APP_ROLE ainda não tenha recebido acesso a ela. Se esse for o caso, mude para a função ACCOUNTADMIN, chame o procedimento abaixo no banco de dados e reverta para o restante do fluxo:
use role accountadmin;
call samooha_by_snowflake_local_db.consumer.register_db('<DATABASE_NAME>');
use role samooha_app_role;
Para executar a análise, você precisará passar a tabela do consumidor. Se quiser visualizar os conjuntos de dados que adicionou à sala limpa, chame o procedimento a seguir.
call samooha_by_snowflake_local_db.consumer.view_consumer_datasets($cleanroom_name);
Execute a análise¶
Agora que a sala limpa está instalada, você pode executar o modelo de análise fornecido à sala limpa pelo provedor usando o comando “run_analysis”. Você pode ver como cada campo é determinado nas seções abaixo.
O número de conjuntos de dados que podem ser transmitidos é limitado pelo modelo implementado pelo provedor. Alguns modelos exigem um número específico de tabelas. O criador do modelo pode implementar os requisitos que deseja atender.
Nota
Antes de executar a análise, você pode alterar o tamanho do warehouse ou usar um novo warehouse de tamanho maior se suas tabelas forem grandes.
call samooha_by_snowflake_local_db.consumer.run_analysis(
$cleanroom_name, -- cleanroom
'prod_calculate_regression', -- template name
['<USERS_TABLE>'], -- consumer tables
['<IMPRESSSIONS_TABLE>'], -- provider tables
object_construct() -- Rest of the custom arguments needed for the template
);
A saída desta análise será um ID que pode ser usado para recuperar os resultados da regressão usando o seguinte modelo:
set result_suffix = 'regression_results_<ID>';
call samooha_by_snowflake_local_db.consumer.run_analysis(
$cleanroom_name, -- cleanroom
'prod_get_results', -- template name
[], -- consumer tables
[], -- provider tables
object_construct(
'results_suffix', $result_suffix -- The suffix with the results
)
);
Como determinar as entradas para run_analysis¶
Para executar a análise, você precisa passar alguns parâmetros à função run_analysis. Esta seção mostrará como determinar quais parâmetros devem ser passados.
Nomes dos modelos
Primeiro, você pode ver os modelos de análise compatíveis chamando o procedimento a seguir.
call samooha_by_snowflake_local_db.consumer.view_added_templates($cleanroom_name);
Para executar a análise, você precisa passar alguns parâmetros à função run_analysis. Esta seção mostra como determinar quais parâmetros passar.
call samooha_by_snowflake_local_db.consumer.view_template_definition($cleanroom_name, 'prod_calculate_regression');
Isso geralmente também pode conter um grande número de diferentes parâmetros SQL Jinja. A funcionalidade a seguir analisa o modelo SQL Jinja e extrai os argumentos que precisam ser especificados em run_analysis, organizando-os em uma lista.
call samooha_by_snowflake_local_db.consumer.get_arguments_from_template($cleanroom_name, 'prod_calculate_regression');
Nomes dos conjuntos de dados
Se você quiser visualizar os nomes dos conjuntos de dados adicionados à sala limpa pelo provedor, chame o procedimento a seguir. Observe que você não pode visualizar os dados presentes nos conjuntos de dados adicionados à sala limpa pelo provedor devido às propriedades de segurança da sala limpa.
call samooha_by_snowflake_local_db.consumer.view_provider_datasets($cleanroom_name);
Você também pode ver as tabelas vinculadas à sala limpa usando a seguinte chamada:
call samooha_by_snowflake_local_db.consumer.view_consumer_datasets($cleanroom_name);
Recomendações¶
Mantenha todo o pré-processamento de dados significativos através de SQL dinâmico sempre que possível, armazenando os dados em tabelas intermediárias e usando o esquema cleanroom. É muito mais rápido e eficiente. Por exemplo:
session.sql("create or replace table cleanroom.intermediary as ...")
Criar UDFs, UDTFs e procedimentos executando SQL via session.sql no esquema cleanroom em vez de usar os decoradores Snowpark. Por exemplo:
session.sql("create or replace function cleanroom.udf(...")
Quando precisar carregar dados muito grandes para caber na memória, use .to_pandas_batches() e itere sobre eles. Por exemplo:
df_iter = session.table(intermediary_table_name).to_pandas_batches() for df in df_iter: ...