Processamento de dados com lógica personalizada em partições

Use a função de partição distribuída (DPF) para processar dados em paralelo em um ou mais nós em um pool de computação. Ela lida automaticamente com orquestração distribuída, erros, observabilidade e persistência de artefatos.

A DPF particiona seu Snowpark DataFrame por uma coluna especificada e executa sua função Python em cada partição em paralelo. Concentre-se em sua lógica de processamento enquanto a DPF lida com a complexidade da infraestrutura e ajusta a escala automaticamente.

Você pode usar a DPF para processar grandes conjuntos de dados de forma eficiente em diferentes segmentos de dados. Essa ferramenta é ideal para cenários como análise de dados de vendas por região, processamento de dados de clientes por segmentos geográficos ou realização de transformações de dados em que cada partição de dados requer a mesma lógica de processamento. A DPF lida com o processamento de dados distribuídos automaticamente, eliminando a complexidade do gerenciamento da infraestrutura de computação distribuída.

A DPF permite escrever código Python personalizado usando bibliotecas de código aberto em infraestrutura em contêiner com acesso à GPU.

Importante

Antes de usar a DPF, certifique-se de ter o seguinte:

  • Ambiente de tempo de execução do contêiner: a DPF requer um ambiente de tempo de execução do contêiner do Snowflake ML.

  • Permissões de acesso ao estágio: a DPF armazena automaticamente resultados e artefatos em estágios do Snowflake. Certifique-se de ter as permissões apropriadas para acessar o estágio nomeado especificado.

A seção a seguir mostra como usar a DPF em um conjunto de dados de vendas hipotético.

Processamento de dados de vendas por região

O exemplo a seguir demonstra como usar a DPF para processar dados de vendas por região em paralelo:

  1. Definir a função de processamento - crie uma função Python que especifique a lógica de transformação a ser aplicada a cada partição de dados (região).

  2. Usar a DPF para processar dados - inicialize a DPF com sua função de processamento e execute-a em todas as partições simultaneamente.

  3. Monitorar o progresso e aguardar a conclusão - rastreie o status de processamento e aguarde a conclusão da execução de todas as partições.

  4. Recuperar resultados de cada partição - colete e combine os resultados processados de todas as regiões após a conclusão bem-sucedida.

  5. Opcional: Restaurar resultados mais tarde - acesse resultados concluídos anteriormente sem executar novamente todo o processo.

Se você tiver um conjunto de dados de vendas, poderá ter dados regionais como uma coluna no dataframe. Talvez você queira aplicar lógica de processamento aos dados de cada região. Você pode usar a função de partição distribuída (DPF) para processar dados de vendas de diferentes regiões em paralelo.

Você poderia ter um dataframe denominado sales_data contendo as seguintes colunas:

  • region: ‘Norte’, ‘Sul’, ‘Leste’, ‘Oeste’

  • customer_id: identificadores exclusivos de cliente

  • amount: quantidades de transação

  • order_date: datas de transação

Definição da função de processamento

O código a seguir define uma função de processamento que processa os dados de vendas por região. Ele especifica a lógica de transformação aplicada a cada partição de dados e salva os resultados em um estágio.

from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus

# Define function to process each region's data
Copy
def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access
    import json
    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

A função process_sales_data recebe dois argumentos chave que permitem o processamento distribuído:

  • data_connector: usado para fornecer acesso ao DataFrame sales_data

  • context: usado para gravar os resultados em um estágio

Para cada região, a função cria as seguintes colunas:

  • total_sales

  • avg_order_value

  • customer_count

  • record_count

Em seguida, ela grava os resultados em um estágio como um arquivo JSON denominado sales_summary.json.

Uso da DPF para processar dados com a função

Após criar uma função de processamento, você pode usar o código a seguir para processar os dados em paralelo entre as partições com a função DPF.

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",  # Creates separate partitions for North, South, East, West
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

Você pode usar o seguinte código para executar DPF em vários nós:

from snowflake.ml.runtime_cluster import scale_cluster

# Scale cluster before running tuner
scale_cluster(2)  # Scale to 2 nodes for parallel trials
Copy

Monitoramento do progresso e espera pela conclusão

Você pode usar o código a seguir para monitorar o progresso da execução da DPF.

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

A saída do código é a seguinte:

Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS

Recuperação dos resultados de cada partição

Se a execução for concluída com sucesso, você poderá recuperar os resultados de cada partição. O código a seguir obtém os resultados de vendas por região.

if final_status == RunStatus.SUCCESS:
    # Get results from each region
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
else:
    # Handle failures - check logs for failed partitions
    for partition_id, details in run.partition_details.items():
        if details.status != "DONE":
            error_logs = details.logs
Copy

Opcional: Restauração dos resultados da execução concluída

Você pode restaurar a execução concluída do estágio e acessar os mesmos resultados sem executar novamente o processo. O código a seguir demonstra como fazer isso:

# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
    DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Copy

Próximos passos

  • Explorar Treinamento de modelos em partições de dados para saber mais sobre o treinamento de vários modelos de ML usando a DPF como infraestrutura subjacente

  • Para padrões de uso mais avançados, estratégias de tratamento de erros e técnicas de otimização de desempenho, consulte a documentação completa de API no pacote Python de ML do Snowflake