Snowflake ML Jobs

Use o Snowflake ML Jobs para executar fluxos de trabalho de aprendizado de máquina (ML) dentro dos tempos de execução do contêiner do Snowflake ML. Você pode executá-los em qualquer ambiente de desenvolvimento. Você não precisa executar o código em uma planilha ou notebook do Snowflake. Use os trabalhos para aproveitar a infraestrutura do Snowflake para executar tarefas com uso intensivo de recursos no seu fluxo de trabalho de desenvolvimento. Para obter informações sobre como configurar o Snowflake ML localmente, consulte Como usar o Snowflake ML localmente.

Importante

O Snowflake ML Jobs está disponível como visualização pública no pacote Snowpark ML Python (snowflake-ml-python) versão 1.8.2 e posterior.

O Snowflake ML Jobs permite que você faça o seguinte:

  • Executar cargas de trabalho de ML no Snowflake Compute Pools, incluindo instâncias da GPU e de alta memória da CPU.

  • Usar seu ambiente de desenvolvimento preferido, como VS Code ou Jupyter notebooks.

  • Instale e use pacotes Python personalizados em seu ambiente de tempo de execução.

  • Usar APIs distribuídas do Snowflake para otimizar o carregamento de dados, o treinamento e o ajuste de hiperparâmetros.

  • Integre-se com ferramentas de orquestração, como o Apache Airflow.

  • Monitorar e gerenciar trabalhos por meio de APIs do Snowflake.

Você pode usar esses recursos para fazer o seguinte:

  • Executar treinamento com uso intensivo de recursos em grandes conjuntos de dados que exigem aceleração de GPU ou recursos de computação significativos.

  • Produzir os fluxos de trabalho de ML movendo o código de ML do desenvolvimento para a produção com execução programática por meio de pipelines.

  • Manter seu ambiente de desenvolvimento existente e usar os recursos de computação do Snowflake.

  • Suspender e mudar os fluxos de trabalho de OSS ML com alterações mínimas no código.

  • Trabalhar diretamente com grandes conjuntos de dados do Snowflake para reduzir a movimentação de dados e evitar transferências de dados de alto custo.

Pré-requisitos

Importante

Atualmente, o Snowflake ML Jobs só oferece suporte a clientes Python 3.10. Entre em contato com a equipe da sua conta Snowflake se precisar de suporte para outras versões do Python.

  1. Instale o pacote Snowflake ML Python em seu ambiente Python 3.10.

    pip install snowflake-ml-python>=1.8.2
    
    Copy
  2. O tamanho padrão do pool de computação usa a família de instâncias CPU_X64_S. O número mínimo de nós é 1 e o máximo é 25. Você pode usar o seguinte comando SQL para criar um pool de computação personalizado:

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy
  3. O Snowflake ML Jobs requer uma sessão do Snowpark. Use o código a seguir para criá-la:

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context
    
    Copy

    Para obter informações sobre como criar uma sessão, consulte Como criar uma sessão.

Executar um trabalho do Snowflake ML Job

Você pode executar um Snowflake ML Job de uma das seguintes maneiras:

  • Usando um decorador remoto em seu código.

  • Enviando arquivos ou diretórios inteiros usando a Python API.

Executar uma função Python como um Snowflake ML Job

Use o Function Dispatch para executar funções Python individuais remotamente nos recursos de computação do Snowflake com o decorador @remote.

Usando o decorador @remote, você pode fazer o seguinte:

  • Serializar a função e suas dependências.

  • Carregá-lo em um estágio específico do Snowflake.

  • Executá-lo no Container Runtime para ML.

O exemplo de código Python a seguir usa o decorador @remote para enviar um Snowflake ML Job. Observe que é necessária uma Snowpark Session, consulte Pré-requisitos.

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
Copy

Invocar uma função decorada @remote retorna um objeto Snowflake MLJob que pode ser usado para gerenciar e monitorar a execução do trabalho. Para obter mais informações, consulte Gerenciamento do Snowflake ML Job.

Executar um arquivo Python como um Snowflake ML Job

Execute arquivos Python ou diretórios de projeto nos recursos de computação do Snowflake. Isso é útil quando:

  • Você tem projetos de ML complexos com vários módulos e dependências.

  • Você deseja manter a separação entre o desenvolvimento local e o código de produção.

  • Você precisa executar scripts que usam argumentos de linha de comando.

  • Você está trabalhando com projetos existentes de ML que não foram projetados especificamente para execução na computação do Snowflake.

A Snowflake Job API oferece dois métodos principais:

  • submit_file(): para executar arquivos Python individuais

  • submit_directory(): para executar diretórios de projetos inteiros com vários arquivos e recursos

Ambos os métodos são compatíveis com:

  • Passagem de argumentos na linha de comando

  • Configuração de variáveis de ambiente

  • Especificação de dependências personalizadas

  • Gerenciamento de ativos de projeto por meio dos estágios do Snowflake

O File Dispatch é particularmente útil para a produção de fluxos de trabalho de ML existentes e para manter uma separação clara entre os ambientes de desenvolvimento e de execução.

O código Python a seguir envia um arquivo para um Snowflake ML Job:

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)
Copy

O código Python a seguir envia um diretório para o Snowflake ML Job:

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)
Copy

O envio de um arquivo ou diretório retorna um objeto do Snowflake MLJob que pode ser usado para gerenciar e monitorar a execução do trabalho. Para obter mais informações, consulte Gerenciamento do Snowflake ML Job.

Gerenciamento do Snowflake ML Job

Quando você envia um Snowflake ML Job, a API cria um objeto MLJob. Você pode usá-lo para fazer o seguinte:

  • Acompanhar o progresso do trabalho por meio de atualizações de status

  • Depurar problemas usando logs de execução detalhados

  • Recuperar o resultado da execução (se houver)

Você pode usar a get_job() API para recuperar um objeto MLJob pelo seu ID. O código Python a seguir mostra como recuperar um objeto MLJob:

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
Copy

Chame o método result do trabalho para aguardar a conclusão da execução e recuperar o resultado da execução. Se a execução falhar, result gera uma exceção.

result = job.result()
Copy

Gerenciamento de dependências

A Snowflake ML Job API executa cargas úteis dentro do ambiente Container Runtime para ML. O ambiente tem os pacotes Python mais comumente usados para aprendizado de máquina e ciência de dados. A maioria dos casos de uso deve funcionar instantaneamente, sem configuração adicional. Se precisar de dependências personalizadas, você pode usar pip_requirements para instalá-las.

Para instalar dependências personalizadas, você deve habilitar o acesso à rede externa usando uma Integração de acesso externo. Você pode usar o seguinte comando de exemplo SQL para fornecer acesso:

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;
Copy

Para obter mais informações sobre integrações de acesso externo, consulte Criação e uso de uma integração de acesso externo.

Depois de fornecer acesso à rede externa, você pode usar os parâmetros pip_requirements e external_access_integrations para configurar dependências personalizadas. Você pode usar pacotes que não estão disponíveis no ambiente do Container Runtime ou se tiver versões específicas dos pacotes.

O código Python a seguir mostra como especificar dependências personalizadas para o decorador remote:

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here
Copy

O código Python a seguir mostra como especificar dependências personalizadas para o método submit_file():

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)
Copy

Feeds de pacotes privados

O Snowflake ML Jobs também oferece suporte ao carregamento de pacotes de feeds privados, como o JFrog Artifactory e o Sonatype Nexus Repository. Esses feeds são normalmente usados para distribuir pacotes internos e proprietários, manter o controle sobre as versões de dependência e garantir a segurança/conformidade.

Para instalar pacotes de um feed privado, você deve fazer o seguinte:

  1. Criar uma regra de rede para permitir o acesso ao URL do feed privado.

    1. Para fontes que usam autenticação básica, você pode simplesmente criar uma regra de rede.

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
      
      Copy
    2. Para configurar o acesso a uma fonte usando conectividade privada (ou seja, Private Link), siga as etapas em Saída da rede usando conectividade privada.

  2. Criar uma integração de acesso externo usando a regra de rede. Conceder permissão para usar EAI à função que enviará os trabalhos.

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
    
    Copy
  3. Especificar o URL do feed privado, a Integração de acesso externo e o(s) pacote(s) ao enviar o trabalho

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
        "--index-url=https://your.private.feed.url",
        "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    
    Copy
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )
    
    Copy

Se o seu URL de feed privado contiver informações confidenciais, como tokens de autenticação, gerencie o URL criando um segredo Snowflake. Use CREATE SECRET para criar um segredo. Configure os segredos durante o envio do trabalho com o argumento spec_overrides.

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)
Copy

Para obter mais informações sobre o container.secrets, consulte Campo containers.secrets.

Considerações sobre custo

Os Snowflake ML Jobs são executados no Snowpark Container Services e são cobrados com base no uso. Para obter informações sobre os custos de computação, consulte Custos do Snowpark Container Services.

As cargas úteis do trabalho são carregadas no estágio especificado com o argumento stage_name. Para evitar cobranças adicionais, você deve limpá-las. Para obter mais informações, consulte Explicação do custo de armazenamento e Exploração do custo de armazenamento para saber mais sobre os custos associados ao armazenamento em estágios.