Processamento de dados com lógica personalizada em partições

Use a função de partição distribuída (Distributed Partition Function, DPF) para processar dados em paralelo de um ou mais nós em um pool de computação. A DPF processa automaticamente orquestração distribuída, erros, observabilidade e persistência de artefatos. Você pode executar uma DPF em um notebook Snowflake ou em um trabalho de ML do Snowflake.

A DPF oferece suporte aos seguintes modos de execução:

  • Modo DataFrame (run()): particionar um DataFrame Snowpark por valores de coluna e executar sua função em cada partição simultaneamente. Os dados são pré-buscados em paralelo para uma taxa de transferência ideal.

  • Modo de preparação (run_from_stage()): processar arquivos de uma área de preparação do Snowflake em que cada arquivo se torna uma partição. Ideal para processamento de arquivos em grande escala com uso de memória previsível.

Você pode usar a DPF para processar grandes conjuntos de dados de forma eficiente em diferentes segmentos de dados.

Essa ferramenta é ideal nos seguintes cenários:

  • Análise de dados de vendas por região

  • Processamento de dados de clientes por segmentos geográficos

  • Treinamento de modelos de ML em cada partição de dados

  • Execução de transformações de dados em que cada partição de dados requer a mesma lógica de processamento

A DPF opera automaticamente o processamento de dados distribuído. Você não precisa gerenciar a infraestrutura de computação distribuída.

A DPF permite escrever código Python personalizado usando bibliotecas de código aberto em infraestrutura em contêiner com acesso à GPU.

Importante

A DPF armazena automaticamente os resultados e artefatos em áreas de preparação do Snowflake. Antes de usar a DPF, certifique-se de ter permissões para a área de preparação em que a DPF armazena os resultados e artefatos.

Modo DataFrame: processar dados por partições de coluna

O modo DataFrame particiona um DataFrame Snowpark por uma coluna especificada e executa sua função Python em cada partição paralelamente. O exemplo a seguir demonstra o processamento de dados de vendas por região.

  1. Definir a função de processamento

  2. Inicializar e executar uma DPF

  3. Monitorar o progresso e aguardar a conclusão

  4. Recuperar os resultados de cada partição

  5. Tratar erros

  6. Restaurar os resultados de uma execução concluída

Definição da função de processamento

Importe as classes necessárias para executar o processamento distribuído:

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, ExecutionOptions
)
Copy

Defina uma função de processamento que use dois argumentos:

  • data_connector: um DataConnector que fornece acesso aos dados da partição. Chame data_connector.to_pandas() para carregá-lo como um DataFrame pandas ou use outros métodos, como to_torch_dataset() ou to_ray_dataset().

  • context: um objeto PartitionContext que fornece o ID da partição e os métodos para carregar e baixar os artefatos.

import json

def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access

    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

Para cada região, essa função calcula as estatísticas resumidas e salva os resultados como um arquivo JSON no diretório de preparação dedicado da partição.

Inicializar e executar uma DPF

Crie uma instância de DPF com sua função de processamento e um nome de área de preparação de saída, depois chame run() para iniciar o processamento distribuído.

Importante

O DataFrame Snowpark que você fornece deve ser criado de uma tabela. Para obter informações sobre como criar um DataFrame de uma tabela, consulte Como criar um DataFrame.

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

O método run() aceita os seguintes parâmetros:

  • partition_by (str): nome da coluna pela qual particionar o DataFrame. Cada valor exclusivo cria uma partição separada.

  • snowpark_dataframe: o DataFrame Snowpark para particionar e processar.

  • run_id (str): identificador exclusivo desta execução. Cria um diretório @{stage_name}/{run_id}/ dedicado para todos os artefatos. Use nomes descritivos como experiment_2024_01_15 ou model_v1_retrain.

  • on_existing_artifacts (cadeia de caracteres, opcional): ação quando os artefatos para o run_id já existem. "error" (padrão) gera um erro; "overwrite" substitui os artefatos existentes.

  • execution_options (ExecutionOptions, opcional): configuração para alocação de recursos e comportamento de execução.

Monitorar o progresso e aguardar a conclusão

Chame wait() para bloquear até que a execução seja concluída. Por padrão, ele exibe uma barra de progresso.

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

A seguir, veja um exemplo da saída:

Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS

Você também pode verificar o status e o progresso a qualquer momento sem bloqueio:

# Check overall status
current_status = run.status

# Get progress grouped by partition status
progress = run.get_progress()
Copy

Recuperação dos resultados de cada partição

Após a conclusão bem-sucedida da execução, recupere os resultados de cada partição usando a propriedade partition_details. Os detalhes de cada partição incluem um stage_artifacts_manager para acessar os artefatos salvos.

if final_status == RunStatus.SUCCESS:
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        # List available artifacts for this partition
        files = details.stage_artifacts_manager.list()
        print(f"Partition {partition_id} artifacts: {files}")

        # Load an artifact using a custom deserializer
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
Copy

O stage_artifacts_manager fornece três métodos:

  • list(): retorna uma lista de nomes de arquivos salvos no diretório de preparação da partição.

  • get(filename, read_function=None): baixa e desserializa um artefato. Por padrão, usa pickle, ou read_function personalizado se fornecido.

  • download(filename, local_dir): baixa um arquivo bruto para um diretório local e retorna o caminho do arquivo local.

Tratar erros

Se a execução não for bem-sucedida, inspecione os detalhes das partições individuais para diagnosticar as falhas:

if final_status != RunStatus.SUCCESS:
    for partition_id, details in run.partition_details.items():
        if details.status != PartitionStatus.DONE:
            print(f"Partition {partition_id} status: {details.status}")
            try:
                error_logs = details.logs
                print(error_logs)
            except RuntimeError:
                print("Logs not available for this partition")
Copy

O RunStatus geral reflete o resultado agregado:

  • SUCCESS: todas as partições foram concluídas com sucesso.

  • PARTIAL_FAILURE: algumas partições foram bem-sucedidas, mas pelo menos uma falhou.

  • FAILURE: nenhuma partição foi concluída com sucesso.

  • CANCELLED: a execução foi cancelada.

  • IN_PROGRESS: a execução ainda está em andamento.

Cada partição tem um PartitionStatus:

  • PENDING: aguardando o processamento.

  • RUNNING: está em processamento.

  • DONE: concluído com sucesso.

  • FAILED: a função do usuário gerou uma exceção.

  • CANCELLED: cancelado pelo usuário.

  • INTERNAL_ERROR: ocorreu um erro interno no sistema (por exemplo, falta de memória).

Para importar estes enums:

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, PartitionStatus
)
Copy

Para cancelar um trabalho em execução:

run.cancel()
Copy

Nota

As partições que já foram concluídas não são afetadas pelo cancelamento. Resultados parciais, logs e artefatos de partições concluídas permanecem disponíveis.

Restaurar os resultados de uma execução concluída

Você pode restaurar uma execução concluída de seu estado persistente e acessar os mesmos resultados sem executar novamente o processo:

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun

restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")

# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
    print(f"{partition_id}: {details.status}")
Copy

Nota

As execuções restauradas são somente leitura. Você não pode chamar wait() ou cancel() em uma execução restaurada.

Modo de preparação: processar arquivos de uma área de preparação

Use o modo de preparação para processar arquivos de uma área de preparação do Snowflake em que cada arquivo se torna uma partição. Ele é ideal para processamento de arquivos em grande escala, como de uma coleção de arquivos Parquet que foram preparados.

Definir a função de processamento

A assinatura da função de processamento é a mesma do modo DataFrame. O data_connector fornece acesso aos dados do arquivo, e context.partition_id é o caminho de arquivo relativo na área de preparação.

def process_file(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing file {context.partition_id}: {len(df)} rows")

    # Process data and save results
    result = {"row_count": len(df), "columns": list(df.columns)}
    import json
    context.upload_to_stage(result, "result.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

Executar uma DPF da área de preparação

Chame run_from_stage() em vez de run(). Especifique o stage_location de entrada com os arquivos de origem e, opcionalmente, um file_pattern para filtrar quais arquivos processar.

dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
    stage_location="@my_db.my_schema.input_stage/data/",
    run_id="file_processing_2024",
    file_pattern="*.parquet",
)
final_status = run.wait()
Copy

O método run_from_stage() aceita os seguintes parâmetros:

  • stage_location (str): caminho da área de preparação de entrada com os arquivos de dados de origem. Cada arquivo correspondente ao file_pattern torna-se uma partição. Oferece suporte a nomes de área de preparação simples e totalmente qualificados:

    • Simples: "@my_stage/data/"

    • Totalmente qualificado: "@my_db.my_schema.my_stage/data/"

  • run_id (cadeia de caracteres): identificador exclusivo da execução.

  • file_pattern (cadeia de caracteres, opcional): padrão glob para filtrar arquivos. O padrão é "*.parquet".

  • on_existing_artifacts (cadeia de caracteres, opcional): "error" (padrão) ou "overwrite".

  • execution_options (ExecutionOptions, opcional): configuração para alocação de recursos e comportamento de execução.

Nota

O stage_location é a fonte de dados de entrada. O stage_name fornecido para a DPF() é o local de saída dos artefatos, como logs e resultados. Eles podem ser áreas de preparação diferentes.

O monitoramento, a recuperação de resultados, o tratamento de erros e a restauração da execução funcionam da mesma maneira que no modo DataFrame.

Para processamento de arquivo com vinculação de E/S, defina num_cpus_per_worker=1 em ExecutionOptions para maximizar o paralelismo (um ator por CPU). Para cargas de trabalho vinculadas a CPU, use o padrão ou aumente num_cpus_per_worker.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

run = dpf.run_from_stage(
    stage_location="@my_stage/data/",
    run_id="io_bound_processing",
    execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Copy

Configurar as opções de execução

Use ExecutionOptions para controlar a alocação de recursos e o comportamento de execução, como alocação de CPU/GPU por trabalhador, contagem de novas tentativas e comportamento fail-fast. Todos os campos são opcionais com padrões sensíveis.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

options = ExecutionOptions(
    num_cpus_per_worker=4,
    num_gpus_per_worker=1,
    fail_fast=True,
)

run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="my_run",
    execution_options=options,
)
Copy

Para conferir a lista completa de parâmetros e saber o comportamento de dimensionamento do trabalhador, consulte a Referência da API ExecutionOptions.

Trabalhar com artefatos usando PartitionContext

O objeto PartitionContext é passado como o segundo argumento para sua função de processamento. Ele fornece métodos para interagir com artefatos e sessões do Snowflake durante a execução da partição.

Carregar artefatos

Use upload_to_stage() para salvar os resultados da sua função de processamento. Por padrão, os objetos são serializados usando pickle. Forneça uma write_function para serialização personalizada.

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Save a pickle object (default serialization)
    results = {'total': df['amount'].sum(), 'count': len(df)}
    context.upload_to_stage(results, "summary.pkl")

    # Save JSON data with a custom write function
    import json
    context.upload_to_stage(
        results, "summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
    )

    # Save a CSV file
    df_processed = df.groupby('category').sum()
    context.upload_to_stage(
        df_processed, "aggregated.csv",
        write_function=lambda df, path: df.to_csv(path, index=False)
    )
Copy

Baixar artefatos

Use download_from_stage() para carregar os artefatos da sua função de processamento. É possível usar essa função para acessar os artefatos de uma execução anterior. Por exemplo, você pode usá-la para carregar um modelo para inferência.

def my_inference_function(data_connector, context):
    # Load a pickle object from the current partition's stage path
    model = context.download_from_stage("model.pkl")

    # Load from a different stage path (e.g., from a prior training run)
    model = context.download_from_stage(
        "model.pkl",
        stage_path="@db.schema.stage/training_run/partition_0"
    )

    # Load JSON with a custom deserializer
    import json
    config = context.download_from_stage(
        "config.json",
        read_function=lambda path: json.load(open(path, 'r'))
    )
Copy

Usar sessões do Snowflake

Use with_session() para executar operações que exigem uma sessão do Snowflake, como gravar resultados em uma tabela. Esse método usa um pool limitado de sessões para evitar atingir os limites de sessão do Snowflake ao executar muitas partições simultaneamente.

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Load a model from a prior training run
    model = context.download_from_stage("model.pkl")

    predictions = model.predict(df[['X1', 'X2']])

    results = df.copy()
    results['predictions'] = predictions
    results['partition_id'] = context.partition_id

    # Write results to a Snowflake table using the bounded session pool
    context.with_session(lambda session:
        session.create_dataframe(results)
            .write.mode("append")
            .save_as_table("predictions_table")
    )
Copy

Nota

A função passada para with_session() é serializada usando cloudpickle. Evite capturar objetos grandes ou recursos não serializáveis no fechamento.

Dimensionar em vários nós

Para executar uma DPF em vários nós, dimensione seu cluster antes de iniciar a execução:

from snowflake.ml.runtime_cluster import scale_cluster

# Scale to 3 nodes for increased parallelism
scale_cluster(3)

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="multi_node_run",
    execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
Copy

Na execução em vários nós, defina use_head_node=False se você quiser que o nó principal atue somente como coordenador sem executar funções de usuário. Isso pode melhorar a confiabilidade das cargas de trabalho de longa duração porque um erro de falta de memória em um nó de trabalho não afeta o coordenador.

Limitações e restrições

  • Uma execução simultânea: apenas uma execução de DPF pode ser feita por vez. Iniciar uma nova execução enquanto outra está em andamento gera um erro. Cancele a execução anterior com run.cancel() antes de iniciar uma nova.

  • Requisitos do DataFrame: no modo DataFrame, o DataFrame Snowpark deve conter exatamente uma consulta e nenhuma pós-ação.

  • Restrição de nó único: use_head_node=False não é compatível com clusters de nó único.

  • Estrutura do caminho do artefato: os artefatos são armazenados em @{stage_name}/{run_id}/{partition_id}/. Essa estrutura de caminho é fixa e não pode ser personalizada.

  • Execuções restauradas são somente leitura: as execuções restauradas com DPFRun.restore_from() não podem chamar wait() ou cancel().

Próximos passos