Tutorial 2: Criar um trabalho do Snowpark Container Services

Importante

O recurso de trabalho do Snowpark Container Services está atualmente em versão preliminar privada e sujeito aos Termos de versão preliminar em https://snowflake.com/legal. Entre em contato com seu representante Snowflake para obter mais informações.

Introdução

Depois de concluir o Tutorial de configuração comum, você estará pronto para criar um trabalho. Neste tutorial, você cria um trabalho simples que se conecta ao Snowflake, executa uma consulta SQL SELECT e salva o resultado em uma tabela.

Existem duas partes neste tutorial:

Parte 1: Criar e testar um trabalho. Faça download do código fornecido para este tutorial e siga as instruções passo a passo:

  1. Baixe o código do trabalho para este tutorial.

  2. Crie uma imagem Docker para Snowpark Container Services e carregue a imagem em um repositório em sua conta.

  3. Prepare o arquivo de especificação do trabalho, que fornece ao Snowflake as informações de configuração do contêiner. Além do nome da imagem a ser usada para iniciar um contêiner, o arquivo de especificação fornece:

    • Três argumentos: uma consulta SELECT, um warehouse virtual para executar a consulta e o nome da tabela na qual salvar o resultado.

    • O warehouse no qual executar a instrução SELECT.

  4. Execute o trabalho. Usando o comando EXECUTE SERVICE, você pode executar o trabalho fornecendo o arquivo de especificação e o pool de computação onde o Snowflake pode executar o contêiner. E, finalmente, verifique os resultados do trabalho.

Parte 2: Entender o código do trabalho. Esta seção fornece uma visão geral do código do trabalho e destaca como os diferentes componentes colaboram.

1: Baixe o código de serviço

O código (um aplicativo Python) é fornecido para criar um trabalho.

  1. Faça download do arquivo zip em um diretório.

  2. Descompacte o conteúdo, que inclui um diretório para cada tutorial. O diretório Tutorial-2 possui os seguintes arquivos:

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: Crie e carregue uma imagem

Crie uma imagem para a plataforma Linux/AMD64 compatível com o Snowpark Container Services e, em seguida, faça upload da imagem para o repositório de imagens da sua conta (consulte Configuração comum).

Você precisará de informações sobre o repositório (o URL do repositório e o nome do host do registro) antes de poder construir e fazer upload da imagem. Para obter mais informações, consulte Registro e repositórios.

Obter informações sobre o repositório

  1. Para obter o URL do repositório, execute o comando SHOW IMAGE REPOSITORIES SQL.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • A coluna repository_url na saída fornece o URL. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • O nome do host no URL do repositório é o nome do host do registro. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Criar a imagem e carregá-la no repositório

  1. Abra uma janela de terminal e mude para o diretório que contém os arquivos que você descompactou.

  2. Para criar uma imagem do Docker, execute o seguinte comando docker build usando o Docker CLI. Observe que o comando especifica o diretório de trabalho atual (.) como PATH para arquivos a serem usados na construção da imagem.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Para image_name, use my_job_image:latest:

    Exemplo

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Faça upload da imagem para o repositório em sua conta Snowflake. Para que o Docker carregue uma imagem em seu nome para o seu repositório, você deve primeiro autenticar o Docker com Snowflake.

    1. Para autenticar o Docker com o registro Snowflake, execute o seguinte comando.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Para username, especifique seu nome de usuário do Snowflake. O Docker solicitará sua senha.

    2. Para fazer upload da imagem, execute o seguinte comando:

      docker push <repository_url>/<image_name>
      
      Copy

      Exemplo

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: Prepare o arquivo de especificação

  • Para fazer upload do arquivo de especificação do trabalho (my_job_spec.yaml) para o estágio, use uma das seguintes opções:

    • A interface da Web do Snowsight: para obter instruções, consulte Escolha de um estágio interno para os arquivos locais.

    • O SnowSQL CLI: execute o seguinte comando PUT:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Por exemplo:

      • Linux ou macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      Você também pode especificar um caminho relativo.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      O comando define OVERWRITE=TRUE para que você possa fazer upload do arquivo novamente, se necessário (por exemplo, se você corrigiu um erro em seu arquivo de especificação). Se o comando PUT for executado com sucesso, as informações sobre o arquivo carregado serão impressas.

4: Execute o trabalho

Agora você está pronto para criar um trabalho.

  1. Para iniciar um trabalho, execute o comando EXECUTE SERVICE:

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Observe o seguinte:

    • FROM e SPEC fornecem o nome do estágio e o nome do arquivo de especificação do trabalho. Quando o trabalho é executado, ele executa a instrução SQL e salva o resultado em uma tabela conforme especificado em my_job_spec.yaml.

      A instrução SQL no seu trabalho não é executada no contêiner do Docker. Em vez disso, o contêiner em execução se conecta ao Snowflake e executa a instrução SQL em um warehouse do Snowflake.

    • COMPUTE_POOL fornece os recursos de computação onde o Snowflake executa o trabalho.

    • EXECUTE SERVICE retorna a saída que inclui o UUID atribuído pelo Snowflake do trabalho, conforme mostrado no exemplo de saída a seguir:

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. |
      +------------------------------------------------------------------------------------+
      
  2. Obtenha o ID da consulta que você executou (EXECUTE SERVICE é uma consulta).

    SET jobid = LAST_QUERY_ID();
    
    Copy

    Você usa o ID nas etapas a seguir para recuperar o status do trabalho e as informações do log do trabalho.

    Nota

    É importante chamar LAST_QUERY_ID imediatamente após chamar EXECUTE SERVICE para garantir que o ID do trabalho retornado pelo comando seja para o comando EXECUTE SERVICE.

    LAST_QUERY_ID retorna o ID da consulta de um trabalho somente após a conclusão do trabalho; para trabalhos contínuos de longa duração, não é adequado para obter informações de status em tempo real e, em vez disso, você deve usar a família QUERY HISTORY de funções de tabela para obter o ID de consulta do trabalho. Para obter mais informações, consulte Como trabalhar com trabalhos.

  3. O trabalho executa uma consulta simples e salva o resultado na tabela de resultados. Você pode verificar se o trabalho foi concluído com êxito consultando a tabela de resultados:

    SELECT * FROM results;
    
    Copy

    Exemplo de saída:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  4. Se você deseja depurar a execução do seu trabalho, use as funções do sistema. Por exemplo, use SYSTEM$GET_JOB_STATUS para determinar se o trabalho ainda está em execução, falhou ao iniciar ou por que falhou. Além disso, supondo que seu código gere logs úteis para saída padrão ou erro padrão, você pode acessar os logs usando SYSTEM$GET_JOB_LOGS.

    1. Para obter o status do trabalho, chame a função do sistema SYSTEM$GET_JOB_STATUS:

      SELECT SYSTEM$GET_JOB_STATUS($jobid);
      
      Copy

      Exemplo de saída:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy

      Na saída, como o trabalho não tem um nome, serviceName é o UUID atribuído pelo Snowflake (ID de consulta) do trabalho.

    2. Para obter as informações do log do trabalho, use a função do sistema SYSTEM$GET_JOB_LOGS:

      SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: Limpeza

Se você não planeja continuar com o Tutorial 3, remova os recursos faturáveis que criou. Para obter mais informações, consulte a Etapa 5 no Tutorial 3.

6: Análise do código do trabalho

Esta seção cobre os seguintes tópicos:

Análise dos arquivos fornecidos

O arquivo zip que você baixou no início do tutorial inclui os seguintes arquivos:

  • main.py

  • Dockerfile

  • my_job_spec.yaml

Esta seção fornece uma visão geral de como o código implementa o trabalho.

arquivo main.py

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

No código:

  • O código Python é executado em main, que então executa a função run_job():

    if __name__ == "__main__":
      run_job()
    
    Copy
  • A função run_job() lê as variáveis de ambiente e as utiliza para definir valores padrão para vários parâmetros. O contêiner usa esses parâmetros para se conectar ao Snowflake. Note que:

    • Você pode substituir esses valores de parâmetro padrão. Para obter mais informações, consulte Referência de especificação de serviço.

    • Quando a imagem é executada no Snowflake, o Snowflake preenche alguns desses parâmetros (consulte o código-fonte) automaticamente. No entanto, ao testar a imagem localmente, você precisa fornecer esses parâmetros explicitamente (conforme mostrado na próxima seção, Criação e teste de uma imagem localmente).

Arquivo Docker

Este arquivo contém todos os comandos para criar uma imagem usando Docker.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

Arquivo my_job_spec.yaml (especificação do trabalho)

Snowflake usa as informações fornecidas nesta especificação para configurar e executar seu trabalho.

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

Além dos campos obrigatórios container.name e container.image (consulte Referência de especificação de serviço), a especificação inclui o campo opcional container.args para listar os argumentos:

  • --query fornece a consulta a ser executada quando o trabalho é executado.

  • --result_table identifica a tabela para salvar os resultados da consulta.

Criação e teste de uma imagem localmente

Você pode testar a imagem do Docker localmente antes de carregá-la em um repositório em sua conta Snowflake. Nos testes locais, seu contêiner é executado de forma independente (não é um trabalho executado pelo Snowflake).

Use as etapas a seguir para testar a imagem do Docker do Tutorial 2:

  1. Para criar uma imagem do Docker, no Docker CLI, execute o comando docker build:

    docker build --rm -t my_service:local .
    
    Copy
  2. Para iniciar seu código, execute o comando docker run, fornecendo <nome da organização>-<nome da conta>, <nome de usuário> e <senha>:

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    Ao testar a imagem localmente, observe que, além dos três argumentos (uma consulta, o warehouse para executar a consulta e uma tabela para salvar o resultado), você também fornece os parâmetros de conexão para o contêiner em execução localmente para se conectar ao Snowflake.

    Quando você executa o contêiner como um trabalho, o Snowflake fornece esses parâmetros ao contêiner como variáveis de ambiente. Para obter mais informações, consulte Configuração de clientes Snowflake.

    O trabalho executa a consulta (select current_time() as time,'hello') e grava o resultado na tabela (tutorial_db.data_schema.results). Se a tabela não existir, ela será criada. Se a tabela existir, o trabalho adiciona uma linha.

    Exemplo de resultado da consulta à tabela de resultados:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

Qual é o próximo passo?

Agora você pode testar o Tutorial 3, que mostra como funciona a comunicação entre serviços.