Snowflake ML Jobs

Use o Snowflake ML Jobs para executar fluxos de trabalho de aprendizado de máquina (ML) dentro dos tempos de execução do contêiner do Snowflake ML. Você pode executá-los em qualquer ambiente de desenvolvimento. Você não precisa executar o código em uma planilha ou notebook do Snowflake. Use os trabalhos para aproveitar a infraestrutura do Snowflake para executar tarefas com uso intensivo de recursos no seu fluxo de trabalho de desenvolvimento. Para obter informações sobre como configurar o Snowflake ML localmente, consulte Como usar o Snowflake ML localmente.

Importante

Os trabalhos de ML do Snowflake estão disponíveis em snowflake-ml-python versão 1.9.2 e posterior.

O Snowflake ML Jobs permite que você faça o seguinte:

  • Executar cargas de trabalho de ML no Snowflake Compute Pools, incluindo instâncias da GPU e de alta memória da CPU.

  • Usar seu ambiente de desenvolvimento preferido, como VS Code ou Jupyter notebooks.

  • Instale e use pacotes Python personalizados em seu ambiente de tempo de execução.

  • Usar APIs distribuídas do Snowflake para otimizar o carregamento de dados, o treinamento e o ajuste de hiperparâmetros.

  • Integre-se com ferramentas de orquestração, como o Apache Airflow.

  • Monitorar e gerenciar trabalhos por meio de APIs do Snowflake.

Você pode usar esses recursos para fazer o seguinte:

  • Executar treinamento com uso intensivo de recursos em grandes conjuntos de dados que exigem aceleração de GPU ou recursos de computação significativos.

  • Produzir os fluxos de trabalho de ML movendo o código de ML do desenvolvimento para a produção com execução programática por meio de pipelines.

  • Manter seu ambiente de desenvolvimento existente e usar os recursos de computação do Snowflake.

  • Suspender e mudar os fluxos de trabalho de OSS ML com alterações mínimas no código.

  • Trabalhar diretamente com grandes conjuntos de dados do Snowflake para reduzir a movimentação de dados e evitar transferências de dados de alto custo.

Pré-requisitos

Importante

Atualmente, o Snowflake ML Jobs só oferece suporte a clientes Python 3.10. Entre em contato com a equipe da sua conta Snowflake se precisar de suporte para outras versões do Python.

  1. Instale o pacote Snowflake ML Python em seu ambiente Python 3.10.

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. O tamanho padrão do pool de computação usa a família de instâncias CPU_X64_S. O número mínimo de nós é 1 e o máximo é 25. Você pode usar o seguinte comando SQL para criar um pool de computação personalizado:

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy
  3. O Snowflake ML Jobs requer uma sessão do Snowpark. Use o código a seguir para criá-la:

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context
    
    Copy

    Para obter informações sobre como criar uma sessão, consulte Como criar uma sessão.

Executar um trabalho do Snowflake ML Job

Você pode executar um Snowflake ML Job de uma das seguintes maneiras:

  • Usando um decorador de função no seu código.

  • Enviando arquivos ou diretórios inteiros usando a Python API.

Execução de uma função Python como trabalho de ML do Snowflake

Use o Function Dispatch para executar funções Python individuais remotamente nos recursos de computação do Snowflake com o decorador @remote.

Usando o decorador @remote, você pode fazer o seguinte:

  • Serializar a função e suas dependências.

  • Carregá-lo em um estágio específico do Snowflake.

  • Executá-lo no Container Runtime para ML.

O exemplo de código Python a seguir usa o decorador @remote para enviar um Snowflake ML Job. Observe que é necessária uma Snowpark Session, consulte Pré-requisitos.

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
Copy

Invocar uma função decorada @remote retorna um objeto Snowflake MLJob que pode ser usado para gerenciar e monitorar a execução do trabalho. Para obter mais informações, consulte Gerenciamento de trabalhos de ML.

Executar um arquivo Python como um Snowflake ML Job

Execute arquivos Python ou diretórios de projeto nos recursos de computação do Snowflake. Isso é útil quando:

  • Você tem projetos de ML complexos com vários módulos e dependências.

  • Você deseja manter a separação entre o desenvolvimento local e o código de produção.

  • Você precisa executar scripts que usam argumentos de linha de comando.

  • Você está trabalhando com projetos existentes de ML que não foram projetados especificamente para execução na computação do Snowflake.

A API de trabalho do Snowflake oferece três métodos principais para enviar cargas úteis baseadas em arquivos:

  • submit_file(): para executar arquivos Python individuais

  • submit_directory(): para executar projetos Python com vários arquivos e recursos

  • submit_from_stage(): para executar projetos Python salvos em um estágio Snowflake

Ambos os métodos são compatíveis com:

  • Passagem de argumentos na linha de comando

  • Configuração de variáveis de ambiente

  • Especificação de dependências personalizadas

  • Gerenciamento de ativos de projeto por meio dos estágios do Snowflake

O File Dispatch é particularmente útil para a produção de fluxos de trabalho de ML existentes e para manter uma separação clara entre os ambientes de desenvolvimento e de execução.

O seguinte código Python envia um arquivo como trabalho de ML do Snowflake:

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)
Copy

O seguinte código Python envia um diretório como um trabalho de ML do Snowflake:

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)
Copy

O seguinte código Python envia um diretório de um estágio Snowflake como um trabalho de ML do Snowflake:

from snowflake.ml.jobs import submit_from_stage

# Run from a directory
job3 = submit_from_stage(
  "@source_stage/ml_project/"
  "MY_COMPUTE_POOL",
  entrypoint="@source_stage/ml_project/train.py",
  stage_name="payload_stage",
  session=session,
)

# Entrypoint may also be a relative path
job4 = submit_from_stage(
  "@source_stage/ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",  # Resolves to @source_stage/ml_project/train.py
  stage_name="payload_stage",
  session=session,
)
Copy

O envio de um arquivo ou diretório retorna um objeto do Snowflake MLJob que pode ser usado para gerenciar e monitorar a execução do trabalho. Para obter mais informações, consulte Gerenciamento de trabalhos de ML.

Suporte a cargas úteis adicionais em envios

Ao enviar um arquivo, diretório ou de um estágio, cargas úteis adicionais são suportadas para uso durante a execução do trabalho. O caminho de importação pode ser especificado explicitamente; caso contrário, ele será inferido a partir do local da carga útil adicional.

Importante

Somente diretórios podem ser especificados como fontes de importação. A importação de arquivos individuais não é suportada.

# Run from a file
 job1 = submit_file(
   "train.py",
   "MY_COMPUTE_POOL",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("src/utils/", "utils"), # the import path is utils
   ],
 )

 # Run from a directory
 job2 = submit_directory(
   "./ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="train.py",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("src/utils/"), # the import path is utils
   ],
 )

 # Run from a stage
 job3 = submit_from_stage(
   "@source_stage/ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="@source_stage/ml_project/train.py",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
   ],
 )
Copy

Acesso à Sessão de Snowpark em trabalhos de ML

Ao executar trabalhos de ML no Snowflake, uma Sessão de Snowpark fica automaticamente disponível no contexto de execução. Você pode acessar o objeto da sessão a partir da carga útil do seu trabalho de ML usando as seguintes abordagens:

from snowflake.ml.jobs import remote
from snowflake.snowpark import Session

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
  # This approach works for all payload types, including file and directory payloads
  session = Session.builder.getOrCreate()
  print(session.sql("SELECT CURRENT_VERSION()").collect())

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
  # This approach works only for function dispatch payloads
  # The session is injected automatically by the Snowflake ML Job API
  print(session.sql("SELECT CURRENT_VERSION()").collect())
Copy

A Sessão de Snowpark pode ser usada para acessar tabelas, estágios e outros objetos de banco de dados do Snowflake dentro do seu trabalho de ML.

Retorno de resultados de trabalhos de ML

Os trabalhos de ML do Snowflake oferecem suporte ao retorno dos resultados da execução ao ambiente do cliente. Isso permite que você recupere valores computados, modelos treinados ou quaisquer outros artefatos produzidos pelas cargas de trabalho.

Para despacho de função, basta retornar um valor de sua função decorada. O valor retornado será serializado e disponibilizado por meio do método result().

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
  # Your ML code here
  model = XGBClassifier()
  model.fit(data_table)
  return model

job1 = train_model("my_training_data")
Copy

Para trabalhos baseados em arquivos, use a variável especial __return__ para especificar o valor de retorno.

# Example: /path/to/repo/my_script.py
def main():
    # Your ML code here
    model = XGBClassifier()
    model.fit(data_table)
    return model

if __name__ == "__main__":
    __return__ = main()
Copy
from snowflake.ml.jobs import submit_file

job2 = submit_file(
    "/path/to/repo/my_script.py",
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
)
Copy

Você pode recuperar o resultado da execução do trabalho usando a API MLJob.result(). A API bloqueia o thread de chamada até que o trabalho atinja um estado terminal, depois retorna o valor de retorno da carga útil ou, se a execução falhar, gera uma exceção. Se a carga útil não definir um valor de retorno, o resultado será None em caso de sucesso.

# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
Copy

Gerenciamento de trabalhos de ML

Quando você envia um Snowflake ML Job, a API cria um objeto MLJob. Você pode usá-lo para fazer o seguinte:

  • Acompanhar o progresso do trabalho por meio de atualizações de status

  • Depurar problemas usando logs de execução detalhados

  • Recuperar o resultado da execução (se houver)

Você pode usar a get_job() API para recuperar um objeto MLJob pelo seu ID. O código Python a seguir mostra como recuperar um objeto MLJob:

from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job

# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df)  # Display list in table format

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())

# Clean up the job
delete_job(job)
Copy

Gerenciamento de dependências

A Snowflake ML Job API executa cargas úteis dentro do ambiente Container Runtime para ML. O ambiente tem os pacotes Python mais comumente usados para aprendizado de máquina e ciência de dados. A maioria dos casos de uso deve funcionar instantaneamente, sem configuração adicional. Se precisar de dependências personalizadas, você pode usar pip_requirements para instalá-las.

Para instalar dependências personalizadas, você deve habilitar o acesso à rede externa usando uma Integração de acesso externo. Você pode usar o seguinte comando de exemplo SQL para fornecer acesso:

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;
Copy

Para obter mais informações sobre integrações de acesso externo, consulte Criação e uso de uma integração de acesso externo.

Depois de fornecer acesso à rede externa, você pode usar os parâmetros pip_requirements e external_access_integrations para configurar dependências personalizadas. Você pode usar pacotes que não estão disponíveis no ambiente do Container Runtime ou se tiver versões específicas dos pacotes.

O código Python a seguir mostra como especificar dependências personalizadas para o decorador remote:

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here
Copy

O código Python a seguir mostra como especificar dependências personalizadas para o método submit_file():

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)
Copy

Feeds de pacotes privados

O Snowflake ML Jobs também oferece suporte ao carregamento de pacotes de feeds privados, como o JFrog Artifactory e o Sonatype Nexus Repository. Esses feeds são normalmente usados para distribuir pacotes internos e proprietários, manter o controle sobre as versões de dependência e garantir a segurança/conformidade.

Para instalar pacotes de um feed privado, você deve fazer o seguinte:

  1. Criar uma regra de rede para permitir o acesso ao URL do feed privado.

    1. Para fontes que usam autenticação básica, você pode simplesmente criar uma regra de rede.

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
      
      Copy
    2. Para configurar o acesso a uma fonte usando conectividade privada (ou seja, Private Link), siga as etapas em Saída da rede usando conectividade privada.

  2. Criar uma integração de acesso externo usando a regra de rede. Conceder permissão para usar EAI à função que enviará os trabalhos.

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
    
    Copy
  3. Especificar o URL do feed privado, a Integração de acesso externo e o(s) pacote(s) ao enviar o trabalho

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
        "--index-url=https://your.private.feed.url",
        "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    
    Copy
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )
    
    Copy

Se seu feed URL privado contém informações confidenciais como tokens de autenticação, gerencie o URL criando um segredo do Snowflake. Execute CREATE SECRET para criar um segredo. Configuração de segredos durante o envio do trabalho com o argumento spec_overrides.

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)
Copy

Para obter mais informações sobre o container.secrets, consulte Campo containers.secrets.

Exemplos

Consulte `Amostras de código de trabalho de ML<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs>`_ para exemplos de como usar trabalhos de ML do Snowflake.

Considerações sobre custo

Os Snowflake ML Jobs são executados no Snowpark Container Services e são cobrados com base no uso. Para obter informações sobre os custos de computação, consulte Custos do Snowpark Container Services.

As cargas úteis do trabalho são carregadas no estágio especificado com o argumento stage_name. Para evitar cobranças adicionais, você deve limpá-las. Para obter mais informações, consulte Explicação do custo de armazenamento e Exploração do custo de armazenamento para saber mais sobre os custos associados ao armazenamento em estágios.