Snowflake ML Jobs¶
Use o Snowflake ML Jobs para executar fluxos de trabalho de aprendizado de máquina (ML) dentro dos tempos de execução do contêiner do Snowflake ML. Você pode executá-los em qualquer ambiente de desenvolvimento. Você não precisa executar o código em uma planilha ou notebook do Snowflake. Use os trabalhos para aproveitar a infraestrutura do Snowflake para executar tarefas com uso intensivo de recursos no seu fluxo de trabalho de desenvolvimento. Para obter informações sobre como configurar o Snowflake ML localmente, consulte Como usar o Snowflake ML localmente.
Importante
Os trabalhos de ML do Snowflake estão disponíveis em snowflake-ml-python
versão 1.9.2 e posterior.
O Snowflake ML Jobs permite que você faça o seguinte:
Executar cargas de trabalho de ML no Snowflake Compute Pools, incluindo instâncias da GPU e de alta memória da CPU.
Usar seu ambiente de desenvolvimento preferido, como VS Code ou Jupyter notebooks.
Instale e use pacotes Python personalizados em seu ambiente de tempo de execução.
Usar APIs distribuídas do Snowflake para otimizar o carregamento de dados, o treinamento e o ajuste de hiperparâmetros.
Integre-se com ferramentas de orquestração, como o Apache Airflow.
Monitorar e gerenciar trabalhos por meio de APIs do Snowflake.
Você pode usar esses recursos para fazer o seguinte:
Executar treinamento com uso intensivo de recursos em grandes conjuntos de dados que exigem aceleração de GPU ou recursos de computação significativos.
Produzir os fluxos de trabalho de ML movendo o código de ML do desenvolvimento para a produção com execução programática por meio de pipelines.
Manter seu ambiente de desenvolvimento existente e usar os recursos de computação do Snowflake.
Suspender e mudar os fluxos de trabalho de OSS ML com alterações mínimas no código.
Trabalhar diretamente com grandes conjuntos de dados do Snowflake para reduzir a movimentação de dados e evitar transferências de dados de alto custo.
Pré-requisitos¶
Importante
Atualmente, o Snowflake ML Jobs só oferece suporte a clientes Python 3.10. Entre em contato com a equipe da sua conta Snowflake se precisar de suporte para outras versões do Python.
Instale o pacote Snowflake ML Python em seu ambiente Python 3.10.
pip install snowflake-ml-python>=1.9.2
O tamanho padrão do pool de computação usa a família de instâncias CPU_X64_S. O número mínimo de nós é 1 e o máximo é 25. Você pode usar o seguinte comando SQL para criar um pool de computação personalizado:
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
O Snowflake ML Jobs requer uma sessão do Snowpark. Use o código a seguir para criá-la:
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
Para obter informações sobre como criar uma sessão, consulte Como criar uma sessão.
Executar um trabalho do Snowflake ML Job¶
Você pode executar um Snowflake ML Job de uma das seguintes maneiras:
Usando um decorador de função no seu código.
Enviando arquivos ou diretórios inteiros usando a Python API.
Execução de uma função Python como trabalho de ML do Snowflake¶
Use o Function Dispatch para executar funções Python individuais remotamente nos recursos de computação do Snowflake com o decorador @remote
.
Usando o decorador @remote
, você pode fazer o seguinte:
Serializar a função e suas dependências.
Carregá-lo em um estágio específico do Snowflake.
Executá-lo no Container Runtime para ML.
O exemplo de código Python a seguir usa o decorador @remote
para enviar um Snowflake ML Job. Observe que é necessária uma Snowpark Session
, consulte Pré-requisitos.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
Invocar uma função decorada @remote
retorna um objeto Snowflake MLJob
que pode ser usado para gerenciar e monitorar a execução do trabalho. Para obter mais informações, consulte Gerenciamento de trabalhos de ML.
Executar um arquivo Python como um Snowflake ML Job¶
Execute arquivos Python ou diretórios de projeto nos recursos de computação do Snowflake. Isso é útil quando:
Você tem projetos de ML complexos com vários módulos e dependências.
Você deseja manter a separação entre o desenvolvimento local e o código de produção.
Você precisa executar scripts que usam argumentos de linha de comando.
Você está trabalhando com projetos existentes de ML que não foram projetados especificamente para execução na computação do Snowflake.
A API de trabalho do Snowflake oferece três métodos principais para enviar cargas úteis baseadas em arquivos:
submit_file()
: para executar arquivos Python individuaissubmit_directory()
: para executar projetos Python com vários arquivos e recursossubmit_from_stage()
: para executar projetos Python salvos em um estágio Snowflake
Ambos os métodos são compatíveis com:
Passagem de argumentos na linha de comando
Configuração de variáveis de ambiente
Especificação de dependências personalizadas
Gerenciamento de ativos de projeto por meio dos estágios do Snowflake
O File Dispatch é particularmente útil para a produção de fluxos de trabalho de ML existentes e para manter uma separação clara entre os ambientes de desenvolvimento e de execução.
O seguinte código Python envia um arquivo como trabalho de ML do Snowflake:
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
O seguinte código Python envia um diretório como um trabalho de ML do Snowflake:
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
O seguinte código Python envia um diretório de um estágio Snowflake como um trabalho de ML do Snowflake:
from snowflake.ml.jobs import submit_from_stage
# Run from a directory
job3 = submit_from_stage(
"@source_stage/ml_project/"
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
)
# Entrypoint may also be a relative path
job4 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py", # Resolves to @source_stage/ml_project/train.py
stage_name="payload_stage",
session=session,
)
O envio de um arquivo ou diretório retorna um objeto do Snowflake MLJob
que pode ser usado para gerenciar e monitorar a execução do trabalho. Para obter mais informações, consulte Gerenciamento de trabalhos de ML.
Suporte a cargas úteis adicionais em envios¶
Ao enviar um arquivo, diretório ou de um estágio, cargas úteis adicionais são suportadas para uso durante a execução do trabalho. O caminho de importação pode ser especificado explicitamente; caso contrário, ele será inferido a partir do local da carga útil adicional.
Importante
Somente diretórios podem ser especificados como fontes de importação. A importação de arquivos individuais não é suportada.
# Run from a file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/", "utils"), # the import path is utils
],
)
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/"), # the import path is utils
],
)
# Run from a stage
job3 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
],
)
Acesso à Sessão de Snowpark em trabalhos de ML¶
Ao executar trabalhos de ML no Snowflake, uma Sessão de Snowpark fica automaticamente disponível no contexto de execução. Você pode acessar o objeto da sessão a partir da carga útil do seu trabalho de ML usando as seguintes abordagens:
from snowflake.ml.jobs import remote
from snowflake.snowpark import Session
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
# This approach works for all payload types, including file and directory payloads
session = Session.builder.getOrCreate()
print(session.sql("SELECT CURRENT_VERSION()").collect())
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
# This approach works only for function dispatch payloads
# The session is injected automatically by the Snowflake ML Job API
print(session.sql("SELECT CURRENT_VERSION()").collect())
A Sessão de Snowpark pode ser usada para acessar tabelas, estágios e outros objetos de banco de dados do Snowflake dentro do seu trabalho de ML.
Retorno de resultados de trabalhos de ML¶
Os trabalhos de ML do Snowflake oferecem suporte ao retorno dos resultados da execução ao ambiente do cliente. Isso permite que você recupere valores computados, modelos treinados ou quaisquer outros artefatos produzidos pelas cargas de trabalho.
Para despacho de função, basta retornar um valor de sua função decorada. O valor retornado será serializado e disponibilizado por meio do método result()
.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
job1 = train_model("my_training_data")
Para trabalhos baseados em arquivos, use a variável especial __return__
para especificar o valor de retorno.
# Example: /path/to/repo/my_script.py
def main():
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
if __name__ == "__main__":
__return__ = main()
from snowflake.ml.jobs import submit_file
job2 = submit_file(
"/path/to/repo/my_script.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
)
Você pode recuperar o resultado da execução do trabalho usando a API MLJob.result()
. A API bloqueia o thread de chamada até que o trabalho atinja um estado terminal, depois retorna o valor de retorno da carga útil ou, se a execução falhar, gera uma exceção. Se a carga útil não definir um valor de retorno, o resultado será None
em caso de sucesso.
# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
Gerenciamento de trabalhos de ML¶
Quando você envia um Snowflake ML Job, a API cria um objeto MLJob
. Você pode usá-lo para fazer o seguinte:
Acompanhar o progresso do trabalho por meio de atualizações de status
Depurar problemas usando logs de execução detalhados
Recuperar o resultado da execução (se houver)
Você pode usar a get_job()
API para recuperar um objeto MLJob
pelo seu ID. O código Python a seguir mostra como recuperar um objeto MLJob
:
from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job
# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df) # Display list in table format
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
# Clean up the job
delete_job(job)
Gerenciamento de dependências¶
A Snowflake ML Job API executa cargas úteis dentro do ambiente Container Runtime para ML. O ambiente tem os pacotes Python mais comumente usados para aprendizado de máquina e ciência de dados. A maioria dos casos de uso deve funcionar instantaneamente, sem configuração adicional. Se precisar de dependências personalizadas, você pode usar pip_requirements
para instalá-las.
Para instalar dependências personalizadas, você deve habilitar o acesso à rede externa usando uma Integração de acesso externo. Você pode usar o seguinte comando de exemplo SQL para fornecer acesso:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
Para obter mais informações sobre integrações de acesso externo, consulte Criação e uso de uma integração de acesso externo.
Depois de fornecer acesso à rede externa, você pode usar os parâmetros pip_requirements
e external_access_integrations
para configurar dependências personalizadas. Você pode usar pacotes que não estão disponíveis no ambiente do Container Runtime ou se tiver versões específicas dos pacotes.
O código Python a seguir mostra como especificar dependências personalizadas para o decorador remote
:
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
O código Python a seguir mostra como especificar dependências personalizadas para o método submit_file()
:
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
Feeds de pacotes privados¶
O Snowflake ML Jobs também oferece suporte ao carregamento de pacotes de feeds privados, como o JFrog Artifactory e o Sonatype Nexus Repository. Esses feeds são normalmente usados para distribuir pacotes internos e proprietários, manter o controle sobre as versões de dependência e garantir a segurança/conformidade.
Para instalar pacotes de um feed privado, você deve fazer o seguinte:
Criar uma regra de rede para permitir o acesso ao URL do feed privado.
Para fontes que usam autenticação básica, você pode simplesmente criar uma regra de rede.
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
Para configurar o acesso a uma fonte usando conectividade privada (ou seja, Private Link), siga as etapas em Saída da rede usando conectividade privada.
Criar uma integração de acesso externo usando a regra de rede. Conceder permissão para usar EAI à função que enviará os trabalhos.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
Especificar o URL do feed privado, a Integração de acesso externo e o(s) pacote(s) ao enviar o trabalho
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
Se seu feed URL privado contém informações confidenciais como tokens de autenticação, gerencie o URL criando um segredo do Snowflake. Execute CREATE SECRET para criar um segredo. Configuração de segredos durante o envio do trabalho com o argumento spec_overrides
.
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
Para obter mais informações sobre o container.secrets
, consulte Campo containers.secrets.
Exemplos¶
Consulte `Amostras de código de trabalho de ML<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs>`_ para exemplos de como usar trabalhos de ML do Snowflake.
Considerações sobre custo¶
Os Snowflake ML Jobs são executados no Snowpark Container Services e são cobrados com base no uso. Para obter informações sobre os custos de computação, consulte Custos do Snowpark Container Services.
As cargas úteis do trabalho são carregadas no estágio especificado com o argumento stage_name
. Para evitar cobranças adicionais, você deve limpá-las. Para obter mais informações, consulte Explicação do custo de armazenamento e Exploração do custo de armazenamento para saber mais sobre os custos associados ao armazenamento em estágios.