Tutorial 2: Criar um trabalho do Snowpark Container Services¶
Importante
O recurso de trabalho do Snowpark Container Services está atualmente em versão preliminar privada e sujeito aos Termos de versão preliminar em https://snowflake.com/legal. Entre em contato com seu representante Snowflake para obter mais informações.
Introdução¶
Depois de concluir o Tutorial de configuração comum, você estará pronto para criar um trabalho. Neste tutorial, você cria um trabalho simples que se conecta ao Snowflake, executa uma consulta SQL SELECT e salva o resultado em uma tabela.
Existem duas partes neste tutorial:
Parte 1: Criar e testar um trabalho. Faça download do código fornecido para este tutorial e siga as instruções passo a passo:
Baixe o código do trabalho para este tutorial.
Crie uma imagem Docker para Snowpark Container Services e carregue a imagem em um repositório em sua conta.
Prepare o arquivo de especificação do trabalho, que fornece ao Snowflake as informações de configuração do contêiner. Além do nome da imagem a ser usada para iniciar um contêiner, o arquivo de especificação fornece:
Três argumentos: uma consulta SELECT, um warehouse virtual para executar a consulta e o nome da tabela na qual salvar o resultado.
O warehouse no qual executar a instrução SELECT.
Execute o trabalho. Usando o comando EXECUTE SERVICE, você pode executar o trabalho fornecendo o arquivo de especificação e o pool de computação onde o Snowflake pode executar o contêiner. E, finalmente, verifique os resultados do trabalho.
Parte 2: Entender o código do trabalho. Esta seção fornece uma visão geral do código do trabalho e destaca como os diferentes componentes colaboram.
1: Baixe o código de serviço¶
O código (um aplicativo Python) é fornecido para criar um trabalho.
Faça download do arquivo zip em um diretório.
Descompacte o conteúdo, que inclui um diretório para cada tutorial. O diretório
Tutorial-2
possui os seguintes arquivos:main.py
Dockerfile
my_job_spec.yaml
2: Crie e carregue uma imagem¶
Crie uma imagem para a plataforma Linux/AMD64 compatível com o Snowpark Container Services e, em seguida, faça upload da imagem para o repositório de imagens da sua conta (consulte Configuração comum).
Você precisará de informações sobre o repositório (o URL do repositório e o nome do host do registro) antes de poder construir e fazer upload da imagem. Para obter mais informações, consulte Registro e repositórios.
Obter informações sobre o repositório
Para obter o URL do repositório, execute o comando SHOW IMAGE REPOSITORIES SQL.
SHOW IMAGE REPOSITORIES;
A coluna
repository_url
na saída fornece o URL. Abaixo um exemplo:<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
O nome do host no URL do repositório é o nome do host do registro. Abaixo um exemplo:
<orgname>-<acctname>.registry.snowflakecomputing.com
Criar a imagem e carregá-la no repositório
Abra uma janela de terminal e mude para o diretório que contém os arquivos que você descompactou.
Para criar uma imagem do Docker, execute o seguinte comando
docker build
usando o Docker CLI. Observe que o comando especifica o diretório de trabalho atual (.) comoPATH
para arquivos a serem usados na construção da imagem.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Para
image_name
, usemy_job_image:latest
:
Exemplo
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
Faça upload da imagem para o repositório em sua conta Snowflake. Para que o Docker carregue uma imagem em seu nome para o seu repositório, você deve primeiro autenticar o Docker com Snowflake.
Para autenticar o Docker com o registro Snowflake, execute o seguinte comando.
docker login <registry_hostname> -u <username>
Para
username
, especifique seu nome de usuário do Snowflake. O Docker solicitará sua senha.
Para fazer upload da imagem, execute o seguinte comando:
docker push <repository_url>/<image_name>
Exemplo
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3: Prepare o arquivo de especificação¶
Para fazer upload do arquivo de especificação do trabalho (my_job_spec.yaml) para o estágio, use uma das seguintes opções:
A interface da Web do Snowsight: para obter instruções, consulte Escolha de um estágio interno para os arquivos locais.
O SnowSQL CLI: execute o seguinte comando PUT:
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Por exemplo:
Linux ou macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Você também pode especificar um caminho relativo.
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
O comando define OVERWRITE=TRUE para que você possa fazer upload do arquivo novamente, se necessário (por exemplo, se você corrigiu um erro em seu arquivo de especificação). Se o comando PUT for executado com sucesso, as informações sobre o arquivo carregado serão impressas.
4: Execute o trabalho¶
Agora você está pronto para criar um trabalho.
Para iniciar um trabalho, execute o comando EXECUTE SERVICE:
EXECUTE SERVICE IN COMPUTE POOL tutorial_compute_pool FROM @tutorial_stage SPEC='my_job_spec.yaml';
Observe o seguinte:
FROM e SPEC fornecem o nome do estágio e o nome do arquivo de especificação do trabalho. Quando o trabalho é executado, ele executa a instrução SQL e salva o resultado em uma tabela conforme especificado em
my_job_spec.yaml
.A instrução SQL no seu trabalho não é executada no contêiner do Docker. Em vez disso, o contêiner em execução se conecta ao Snowflake e executa a instrução SQL em um warehouse do Snowflake.
COMPUTE_POOL fornece os recursos de computação onde o Snowflake executa o trabalho.
EXECUTE SERVICE retorna a saída que inclui o UUID atribuído pelo Snowflake do trabalho, conforme mostrado no exemplo de saída a seguir:
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
Obtenha o ID da consulta que você executou (EXECUTE SERVICE é uma consulta).
SET jobid = LAST_QUERY_ID();
Você usa o ID nas etapas a seguir para recuperar o status do trabalho e as informações do log do trabalho.
Nota
É importante chamar LAST_QUERY_ID imediatamente após chamar EXECUTE SERVICE para garantir que o ID do trabalho retornado pelo comando seja para o comando EXECUTE SERVICE.
LAST_QUERY_ID retorna o ID da consulta de um trabalho somente após a conclusão do trabalho; para trabalhos contínuos de longa duração, não é adequado para obter informações de status em tempo real e, em vez disso, você deve usar a família QUERY HISTORY de funções de tabela para obter o ID de consulta do trabalho. Para obter mais informações, consulte Como trabalhar com trabalhos.
O trabalho executa uma consulta simples e salva o resultado na tabela de resultados. Você pode verificar se o trabalho foi concluído com êxito consultando a tabela de resultados:
SELECT * FROM results;
Exemplo de saída:
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
Se você deseja depurar a execução do seu trabalho, use as funções do sistema. Por exemplo, use SYSTEM$GET_JOB_STATUS para determinar se o trabalho ainda está em execução, falhou ao iniciar ou por que falhou. Além disso, supondo que seu código gere logs úteis para saída padrão ou erro padrão, você pode acessar os logs usando SYSTEM$GET_JOB_LOGS.
Para obter o status do trabalho, chame a função do sistema SYSTEM$GET_JOB_STATUS:
SELECT SYSTEM$GET_JOB_STATUS($jobid);
Exemplo de saída:
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a", "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
Na saída, como o trabalho não tem um nome,
serviceName
é o UUID atribuído pelo Snowflake (ID de consulta) do trabalho.Para obter as informações do log do trabalho, use a função do sistema SYSTEM$GET_JOB_LOGS:
SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5: Limpeza¶
Se você não planeja continuar com o Tutorial 3, remova os recursos faturáveis que criou. Para obter mais informações, consulte a Etapa 5 no Tutorial 3.
6: Análise do código do trabalho¶
Esta seção cobre os seguintes tópicos:
Análise dos arquivos fornecidos: revise vários arquivos de código que implementam a tarefa.
Criação e teste de uma imagem localmente. A seção fornece uma explicação de como você pode testar localmente a imagem do Docker antes de carregá-la em um repositório em sua conta Snowflake.
Análise dos arquivos fornecidos¶
O arquivo zip que você baixou no início do tutorial inclui os seguintes arquivos:
main.py
Dockerfile
my_job_spec.yaml
Esta seção fornece uma visão geral de como o código implementa o trabalho.
arquivo main.py¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
No código:
O código Python é executado em
main
, que então executa a funçãorun_job()
:if __name__ == "__main__": run_job()
A função
run_job()
lê as variáveis de ambiente e as utiliza para definir valores padrão para vários parâmetros. O contêiner usa esses parâmetros para se conectar ao Snowflake. Note que:Você pode substituir esses valores de parâmetro padrão. Para obter mais informações, consulte Referência de especificação de serviço.
Quando a imagem é executada no Snowflake, o Snowflake preenche alguns desses parâmetros (consulte o código-fonte) automaticamente. No entanto, ao testar a imagem localmente, você precisa fornecer esses parâmetros explicitamente (conforme mostrado na próxima seção, Criação e teste de uma imagem localmente).
Arquivo Docker¶
Este arquivo contém todos os comandos para criar uma imagem usando Docker.
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Arquivo my_job_spec.yaml (especificação do trabalho)¶
Snowflake usa as informações fornecidas nesta especificação para configurar e executar seu trabalho.
spec:
container:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
Além dos campos obrigatórios container.name
e container.image
(consulte Referência de especificação de serviço), a especificação inclui o campo opcional container.args
para listar os argumentos:
--query
fornece a consulta a ser executada quando o trabalho é executado.--result_table
identifica a tabela para salvar os resultados da consulta.
Criação e teste de uma imagem localmente¶
Você pode testar a imagem do Docker localmente antes de carregá-la em um repositório em sua conta Snowflake. Nos testes locais, seu contêiner é executado de forma independente (não é um trabalho executado pelo Snowflake).
Use as etapas a seguir para testar a imagem do Docker do Tutorial 2:
Para criar uma imagem do Docker, no Docker CLI, execute o comando
docker build
:docker build --rm -t my_service:local .
Para iniciar seu código, execute o comando
docker run
, fornecendo<nome da organização>-<nome da conta>
,<nome de usuário>
e<senha>
:docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
Ao testar a imagem localmente, observe que, além dos três argumentos (uma consulta, o warehouse para executar a consulta e uma tabela para salvar o resultado), você também fornece os parâmetros de conexão para o contêiner em execução localmente para se conectar ao Snowflake.
Quando você executa o contêiner como um trabalho, o Snowflake fornece esses parâmetros ao contêiner como variáveis de ambiente. Para obter mais informações, consulte Configuração de clientes Snowflake.
O trabalho executa a consulta (
select current_time() as time,'hello'
) e grava o resultado na tabela (tutorial_db.data_schema.results
). Se a tabela não existir, ela será criada. Se a tabela existir, o trabalho adiciona uma linha.Exemplo de resultado da consulta à tabela de resultados:
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
Qual é o próximo passo?¶
Agora você pode testar o Tutorial 3, que mostra como funciona a comunicação entre serviços.