Processamento de dados com lógica personalizada em partições¶
Use a função de partição distribuída (Distributed Partition Function, DPF) para processar dados em paralelo de um ou mais nós em um pool de computação. A DPF processa automaticamente orquestração distribuída, erros, observabilidade e persistência de artefatos. Você pode executar uma DPF em um notebook Snowflake ou em um trabalho de ML do Snowflake.
A DPF oferece suporte aos seguintes modos de execução:
Modo DataFrame (
run()): particionar um DataFrame Snowpark por valores de coluna e executar sua função em cada partição simultaneamente. Os dados são pré-buscados em paralelo para uma taxa de transferência ideal.Modo de preparação (
run_from_stage()): processar arquivos de uma área de preparação do Snowflake em que cada arquivo se torna uma partição. Ideal para processamento de arquivos em grande escala com uso de memória previsível.
Você pode usar a DPF para processar grandes conjuntos de dados de forma eficiente em diferentes segmentos de dados.
Essa ferramenta é ideal nos seguintes cenários:
Análise de dados de vendas por região
Processamento de dados de clientes por segmentos geográficos
Treinamento de modelos de ML em cada partição de dados
Execução de transformações de dados em que cada partição de dados requer a mesma lógica de processamento
A DPF opera automaticamente o processamento de dados distribuído. Você não precisa gerenciar a infraestrutura de computação distribuída.
A DPF permite escrever código Python personalizado usando bibliotecas de código aberto em infraestrutura em contêiner com acesso à GPU.
Importante
A DPF armazena automaticamente os resultados e artefatos em áreas de preparação do Snowflake. Antes de usar a DPF, certifique-se de ter permissões para a área de preparação em que a DPF armazena os resultados e artefatos.
Modo DataFrame: processar dados por partições de coluna¶
O modo DataFrame particiona um DataFrame Snowpark por uma coluna especificada e executa sua função Python em cada partição paralelamente. O exemplo a seguir demonstra o processamento de dados de vendas por região.
Definição da função de processamento¶
Importe as classes necessárias para executar o processamento distribuído:
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, ExecutionOptions
)
Defina uma função de processamento que use dois argumentos:
data_connector: um DataConnector que fornece acesso aos dados da partição. Chamedata_connector.to_pandas()para carregá-lo como um DataFrame pandas ou use outros métodos, comoto_torch_dataset()outo_ray_dataset().context: um objeto PartitionContext que fornece o ID da partição e os métodos para carregar e baixar os artefatos.
import json
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Para cada região, essa função calcula as estatísticas resumidas e salva os resultados como um arquivo JSON no diretório de preparação dedicado da partição.
Inicializar e executar uma DPF¶
Crie uma instância de DPF com sua função de processamento e um nome de área de preparação de saída, depois chame run() para iniciar o processamento distribuído.
Importante
O DataFrame Snowpark que você fornece deve ser criado de uma tabela. Para obter informações sobre como criar um DataFrame de uma tabela, consulte Como criar um DataFrame.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
O método run() aceita os seguintes parâmetros:
partition_by(str): nome da coluna pela qual particionar o DataFrame. Cada valor exclusivo cria uma partição separada.snowpark_dataframe: o DataFrame Snowpark para particionar e processar.run_id(str): identificador exclusivo desta execução. Cria um diretório@{stage_name}/{run_id}/dedicado para todos os artefatos. Use nomes descritivos comoexperiment_2024_01_15oumodel_v1_retrain.on_existing_artifacts(cadeia de caracteres, opcional): ação quando os artefatos para orun_idjá existem."error"(padrão) gera um erro;"overwrite"substitui os artefatos existentes.execution_options(ExecutionOptions, opcional): configuração para alocação de recursos e comportamento de execução.
Monitorar o progresso e aguardar a conclusão¶
Chame wait() para bloquear até que a execução seja concluída. Por padrão, ele exibe uma barra de progresso.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
A seguir, veja um exemplo da saída:
Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS
Você também pode verificar o status e o progresso a qualquer momento sem bloqueio:
# Check overall status
current_status = run.status
# Get progress grouped by partition status
progress = run.get_progress()
Recuperação dos resultados de cada partição¶
Após a conclusão bem-sucedida da execução, recupere os resultados de cada partição usando a propriedade partition_details. Os detalhes de cada partição incluem um stage_artifacts_manager para acessar os artefatos salvos.
if final_status == RunStatus.SUCCESS:
import json
all_results = []
for partition_id, details in run.partition_details.items():
# List available artifacts for this partition
files = details.stage_artifacts_manager.list()
print(f"Partition {partition_id} artifacts: {files}")
# Load an artifact using a custom deserializer
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
O stage_artifacts_manager fornece três métodos:
list(): retorna uma lista de nomes de arquivos salvos no diretório de preparação da partição.get(filename, read_function=None): baixa e desserializa um artefato. Por padrão, usapickle, ouread_functionpersonalizado se fornecido.download(filename, local_dir): baixa um arquivo bruto para um diretório local e retorna o caminho do arquivo local.
Tratar erros¶
Se a execução não for bem-sucedida, inspecione os detalhes das partições individuais para diagnosticar as falhas:
if final_status != RunStatus.SUCCESS:
for partition_id, details in run.partition_details.items():
if details.status != PartitionStatus.DONE:
print(f"Partition {partition_id} status: {details.status}")
try:
error_logs = details.logs
print(error_logs)
except RuntimeError:
print("Logs not available for this partition")
O RunStatus geral reflete o resultado agregado:
SUCCESS: todas as partições foram concluídas com sucesso.PARTIAL_FAILURE: algumas partições foram bem-sucedidas, mas pelo menos uma falhou.FAILURE: nenhuma partição foi concluída com sucesso.CANCELLED: a execução foi cancelada.IN_PROGRESS: a execução ainda está em andamento.
Cada partição tem um PartitionStatus:
PENDING: aguardando o processamento.RUNNING: está em processamento.DONE: concluído com sucesso.FAILED: a função do usuário gerou uma exceção.CANCELLED: cancelado pelo usuário.INTERNAL_ERROR: ocorreu um erro interno no sistema (por exemplo, falta de memória).
Para importar estes enums:
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, PartitionStatus
)
Para cancelar um trabalho em execução:
run.cancel()
Nota
As partições que já foram concluídas não são afetadas pelo cancelamento. Resultados parciais, logs e artefatos de partições concluídas permanecem disponíveis.
Restaurar os resultados de uma execução concluída¶
Você pode restaurar uma execução concluída de seu estado persistente e acessar os mesmos resultados sem executar novamente o processo:
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")
# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
print(f"{partition_id}: {details.status}")
Nota
As execuções restauradas são somente leitura. Você não pode chamar wait() ou cancel() em uma execução restaurada.
Modo de preparação: processar arquivos de uma área de preparação¶
Use o modo de preparação para processar arquivos de uma área de preparação do Snowflake em que cada arquivo se torna uma partição. Ele é ideal para processamento de arquivos em grande escala, como de uma coleção de arquivos Parquet que foram preparados.
Definir a função de processamento¶
A assinatura da função de processamento é a mesma do modo DataFrame. O data_connector fornece acesso aos dados do arquivo, e context.partition_id é o caminho de arquivo relativo na área de preparação.
def process_file(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing file {context.partition_id}: {len(df)} rows")
# Process data and save results
result = {"row_count": len(df), "columns": list(df.columns)}
import json
context.upload_to_stage(result, "result.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Executar uma DPF da área de preparação¶
Chame run_from_stage() em vez de run(). Especifique o stage_location de entrada com os arquivos de origem e, opcionalmente, um file_pattern para filtrar quais arquivos processar.
dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
stage_location="@my_db.my_schema.input_stage/data/",
run_id="file_processing_2024",
file_pattern="*.parquet",
)
final_status = run.wait()
O método run_from_stage() aceita os seguintes parâmetros:
stage_location(str): caminho da área de preparação de entrada com os arquivos de dados de origem. Cada arquivo correspondente aofile_patterntorna-se uma partição. Oferece suporte a nomes de área de preparação simples e totalmente qualificados:Simples:
"@my_stage/data/"Totalmente qualificado:
"@my_db.my_schema.my_stage/data/"
run_id(cadeia de caracteres): identificador exclusivo da execução.file_pattern(cadeia de caracteres, opcional): padrão glob para filtrar arquivos. O padrão é"*.parquet".on_existing_artifacts(cadeia de caracteres, opcional):"error"(padrão) ou"overwrite".execution_options(ExecutionOptions, opcional): configuração para alocação de recursos e comportamento de execução.
Nota
O stage_location é a fonte de dados de entrada. O stage_name fornecido para a DPF() é o local de saída dos artefatos, como logs e resultados. Eles podem ser áreas de preparação diferentes.
O monitoramento, a recuperação de resultados, o tratamento de erros e a restauração da execução funcionam da mesma maneira que no modo DataFrame.
Para processamento de arquivo com vinculação de E/S, defina num_cpus_per_worker=1 em ExecutionOptions para maximizar o paralelismo (um ator por CPU). Para cargas de trabalho vinculadas a CPU, use o padrão ou aumente num_cpus_per_worker.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
run = dpf.run_from_stage(
stage_location="@my_stage/data/",
run_id="io_bound_processing",
execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Configurar as opções de execução¶
Use ExecutionOptions para controlar a alocação de recursos e o comportamento de execução, como alocação de CPU/GPU por trabalhador, contagem de novas tentativas e comportamento fail-fast. Todos os campos são opcionais com padrões sensíveis.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
options = ExecutionOptions(
num_cpus_per_worker=4,
num_gpus_per_worker=1,
fail_fast=True,
)
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="my_run",
execution_options=options,
)
Para conferir a lista completa de parâmetros e saber o comportamento de dimensionamento do trabalhador, consulte a Referência da API ExecutionOptions.
Trabalhar com artefatos usando PartitionContext¶
O objeto PartitionContext é passado como o segundo argumento para sua função de processamento. Ele fornece métodos para interagir com artefatos e sessões do Snowflake durante a execução da partição.
Carregar artefatos¶
Use upload_to_stage() para salvar os resultados da sua função de processamento. Por padrão, os objetos são serializados usando pickle. Forneça uma write_function para serialização personalizada.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Save a pickle object (default serialization)
results = {'total': df['amount'].sum(), 'count': len(df)}
context.upload_to_stage(results, "summary.pkl")
# Save JSON data with a custom write function
import json
context.upload_to_stage(
results, "summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
)
# Save a CSV file
df_processed = df.groupby('category').sum()
context.upload_to_stage(
df_processed, "aggregated.csv",
write_function=lambda df, path: df.to_csv(path, index=False)
)
Baixar artefatos¶
Use download_from_stage() para carregar os artefatos da sua função de processamento. É possível usar essa função para acessar os artefatos de uma execução anterior. Por exemplo, você pode usá-la para carregar um modelo para inferência.
def my_inference_function(data_connector, context):
# Load a pickle object from the current partition's stage path
model = context.download_from_stage("model.pkl")
# Load from a different stage path (e.g., from a prior training run)
model = context.download_from_stage(
"model.pkl",
stage_path="@db.schema.stage/training_run/partition_0"
)
# Load JSON with a custom deserializer
import json
config = context.download_from_stage(
"config.json",
read_function=lambda path: json.load(open(path, 'r'))
)
Usar sessões do Snowflake¶
Use with_session() para executar operações que exigem uma sessão do Snowflake, como gravar resultados em uma tabela. Esse método usa um pool limitado de sessões para evitar atingir os limites de sessão do Snowflake ao executar muitas partições simultaneamente.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Load a model from a prior training run
model = context.download_from_stage("model.pkl")
predictions = model.predict(df[['X1', 'X2']])
results = df.copy()
results['predictions'] = predictions
results['partition_id'] = context.partition_id
# Write results to a Snowflake table using the bounded session pool
context.with_session(lambda session:
session.create_dataframe(results)
.write.mode("append")
.save_as_table("predictions_table")
)
Nota
A função passada para with_session() é serializada usando cloudpickle. Evite capturar objetos grandes ou recursos não serializáveis no fechamento.
Dimensionar em vários nós¶
Para executar uma DPF em vários nós, dimensione seu cluster antes de iniciar a execução:
from snowflake.ml.runtime_cluster import scale_cluster
# Scale to 3 nodes for increased parallelism
scale_cluster(3)
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="multi_node_run",
execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
Na execução em vários nós, defina use_head_node=False se você quiser que o nó principal atue somente como coordenador sem executar funções de usuário. Isso pode melhorar a confiabilidade das cargas de trabalho de longa duração porque um erro de falta de memória em um nó de trabalho não afeta o coordenador.
Limitações e restrições¶
Uma execução simultânea: apenas uma execução de DPF pode ser feita por vez. Iniciar uma nova execução enquanto outra está em andamento gera um erro. Cancele a execução anterior com
run.cancel()antes de iniciar uma nova.Requisitos do DataFrame: no modo DataFrame, o DataFrame Snowpark deve conter exatamente uma consulta e nenhuma pós-ação.
Restrição de nó único:
use_head_node=Falsenão é compatível com clusters de nó único.Estrutura do caminho do artefato: os artefatos são armazenados em
@{stage_name}/{run_id}/{partition_id}/. Essa estrutura de caminho é fixa e não pode ser personalizada.Execuções restauradas são somente leitura: as execuções restauradas com
DPFRun.restore_from()não podem chamarwait()oucancel().
Próximos passos¶
Explore Treinamento de modelos em partições de dados para saber mais sobre o treinamento de vários modelos de ML usando a DPF como infraestrutura subjacente.
Para acessar a documentação completa da API, consulte a Referência da API Distributed Partition Function (DPF).
Para ver exemplos completos, consulte os notebooks de amostra de ML do Snowflake.