Tutorial 2: Criar um serviço de trabalho do Snowpark Container Services

Introdução

Depois de concluir o Tutorial de configuração comum, você estará pronto para criar um serviço de trabalho. Neste tutorial, você cria um serviço de trabalho simples que se conecta ao Snowflake, executa uma consulta SQL SELECT e salva o resultado em uma tabela.

Existem duas partes neste tutorial:

Parte 1: Criar e testar um serviço de trabalho. Faça download do código fornecido para este tutorial e siga as instruções passo a passo:

  1. Baixe o código de serviço de trabalho deste tutorial.

  2. Crie uma imagem Docker para Snowpark Container Services e carregue a imagem em um repositório em sua conta.

  3. Prepare o arquivo de especificação de serviço, que fornece ao Snowflake as informações de configuração do contêiner. Além do nome da imagem a ser usada para iniciar um contêiner, o arquivo de especificação especifica três argumentos: uma consulta SELECT, um warehouse virtual para executar a consulta e o nome da tabela para salvar o resultado.

  4. Execute o serviço de trabalho. Usando o comando EXECUTE JOB SERVICE, você pode executar o serviço de trabalho fornecendo o arquivo de especificação e o pool de computação onde o Snowflake pode executar o contêiner. E, por fim, verifique os resultados do serviço.

Parte 2: Entender o código do serviço do trabalho. Esta seção fornece uma visão geral do código do serviço de trabalho e destaca como os diferentes componentes colaboram.

1: Baixe o código do serviço de trabalho

O código (um aplicativo Python) é fornecido para implementar um serviço de trabalho.

  1. Baixe SnowparkContainerServices-Tutorials.zip.

  2. Descompacte o conteúdo, que inclui um diretório para cada tutorial. O diretório Tutorial-2 possui os seguintes arquivos:

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: Crie e carregue uma imagem

Crie uma imagem para a plataforma Linux/AMD64 compatível com o Snowpark Container Services e, em seguida, faça upload da imagem para o repositório de imagens da sua conta (consulte Configuração comum).

Você precisará de informações sobre o repositório (o URL do repositório e o nome do host do registro) antes de poder construir e fazer upload da imagem. Para obter mais informações, consulte Registro e repositórios.

Obter informações sobre o repositório

  1. Para obter o URL do repositório, execute o comando SHOW IMAGE REPOSITORIES SQL.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • A coluna repository_url na saída fornece o URL. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • O nome do host no URL do repositório é o nome do host do registro. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Criar a imagem e carregá-la no repositório

  1. Abra uma janela de terminal e mude para o diretório que contém os arquivos que você descompactou.

  2. Para criar uma imagem do Docker, execute o seguinte comando docker build usando o Docker CLI. Observe que o comando especifica o diretório de trabalho atual (.) como PATH para arquivos a serem usados na construção da imagem.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Para image_name, use my_job_image:latest:

    Exemplo

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Faça upload da imagem para o repositório em sua conta Snowflake. Para que o Docker carregue uma imagem em seu nome para seu repositório, é necessário primeiro autenticar o Docker com o registro.

    1. Para autenticar o Docker com o registro Snowflake, execute o seguinte comando.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Para username, especifique seu nome de usuário do Snowflake. O Docker solicitará sua senha.

    2. Para fazer upload da imagem, execute o seguinte comando:

      docker push <repository_url>/<image_name>
      
      Copy

      Exemplo

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: Prepare o arquivo de especificação

  • Para fazer upload do arquivo de especificação do serviço (my_job_spec.yaml) para o estágio, use uma das seguintes opções:

    • A interface da Web do Snowsight: para obter instruções, consulte Escolha de um estágio interno para os arquivos locais.

    • O SnowSQL CLI: execute o seguinte comando PUT:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Por exemplo:

      • Linux ou macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      Você também pode especificar um caminho relativo.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      O comando define OVERWRITE=TRUE para que você possa fazer upload do arquivo novamente, se necessário (por exemplo, se você corrigiu um erro em seu arquivo de especificação). Se o comando PUT for executado com sucesso, as informações sobre o arquivo carregado serão impressas.

4: Execute o serviço do trabalho

Agora você está pronto para criar um trabalho.

  1. Para iniciar um serviço de trabalho, execute o comando EXECUTE JOB SERVICE:

    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME=tutorial_2_job_service
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Observe o seguinte:

    • FROM e SPEC fornecem o nome do estágio e o nome do arquivo de especificação do serviço de trabalho. Quando o serviço de trabalho é executado, ele executa a instrução SQL e salva o resultado em uma tabela conforme especificado em my_job_spec.yaml.

      A instrução SQL não é executada no contêiner do Docker. Em vez disso, o contêiner em execução se conecta ao Snowflake e executa a instrução SQL em um warehouse do Snowflake.

    • COMPUTE_POOL fornece os recursos de computação onde o Snowflake executa o serviço de trabalho.

    • EXECUTE JOB SERVICE retorna a saída que inclui o nome do trabalho, conforme mostrado no exemplo de saída a seguir:

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE.               |
      +------------------------------------------------------------------------------------+
      
  2. O serviço de trabalho executa uma consulta simples e salva o resultado na tabela de resultados. Você pode verificar se o serviço de trabalho foi concluído com êxito consultando a tabela de resultados:

    SELECT * FROM results;
    
    Copy

    Exemplo de saída:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  3. Se você deseja depurar a execução do seu serviço de trabalho, use as funções do sistema. Por exemplo, use SYSTEM$GET_SERVICE_STATUS para determinar se o serviço de trabalho ainda está em execução, falhou ao iniciar ou por que falhou. Além disso, supondo que seu código gere logs úteis para saída padrão ou erro padrão, você pode acessar os logs usando SYSTEM$GET_SERVICE_LOGS.

    1. Para obter o status do serviço de trabalho, chame a função do sistema SYSTEM$GET_SERVICE_STATUS — Obsoleto:

      SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
      
      Copy

      Exemplo de saída:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"TUTORIAL_2_JOB_SERVICE",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy
    2. Para obter as informações do log do serviço de trabalho, use a função do sistema SYSTEM$GET_SERVICE_LOGS:

      SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: Limpeza

Se você não planeja continuar com o Tutorial 3, remova os recursos faturáveis que criou. Para obter mais informações, consulte a Etapa 5 no Tutorial 3.

6: Revisando o código do serviço de trabalho

Esta seção cobre os seguintes tópicos:

Análise dos arquivos fornecidos

O arquivo zip que você baixou no início do tutorial inclui os seguintes arquivos:

  • main.py

  • Dockerfile

  • my_job_spec.yaml

Esta seção fornece uma visão geral do código.

arquivo main.py

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

No código:

  • O código Python é executado em main, que então executa a função run_job():

    if __name__ == "__main__":
      run_job()
    
    Copy
  • A função run_job() lê as variáveis de ambiente e as utiliza para definir valores padrão para vários parâmetros. O contêiner usa esses parâmetros para se conectar ao Snowflake. Note que:

    • Você pode substituir os valores dos parâmetros usados no serviço usando os campos containers.env e containers.args na especificação do serviço. Para obter mais informações, consulte Referência de especificação de serviço.

    • Quando a imagem é executada no Snowflake, o Snowflake preenche alguns desses parâmetros (consulte o código-fonte) automaticamente. No entanto, ao testar a imagem localmente, você precisa fornecer esses parâmetros explicitamente (conforme mostrado na próxima seção, Criação e teste de uma imagem localmente).

Arquivo Docker

Este arquivo contém todos os comandos para criar uma imagem usando Docker.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

Arquivo my_job_spec.yaml (Especificação de serviço)

Snowflake usa as informações fornecidas nesta especificação para configurar e executar seu serviço de trabalho.

spec:
  containers:
  - name: main
    image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
    env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
    args:
    - "--query=select current_time() as time,'hello'"
    - "--result_table=results"
Copy

Além dos campos obrigatórios container.name e container.image (consulte Referência de especificação de serviço), a especificação inclui o campo opcional container.args para listar os argumentos:

  • --query fornece a consulta a ser executada quando o serviço é executado.

  • --result_table identifica a tabela para salvar os resultados da consulta.

Criação e teste de uma imagem localmente

Você pode testar a imagem do Docker localmente antes de carregá-la em um repositório em sua conta Snowflake. Nos testes locais, seu contêiner é executado de forma independente (não é um serviço de trabalho executado pelo Snowflake).

Use as etapas a seguir para testar a imagem do Docker do Tutorial 2:

  1. Para criar uma imagem do Docker, no Docker CLI, execute o comando docker build:

    docker build --rm -t my_service:local .
    
    Copy
  2. Para iniciar seu código, execute o comando docker run, fornecendo <nome da organização>-<nome da conta>, <nome de usuário> e <senha>:

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    Ao testar a imagem localmente, observe que, além dos três argumentos (uma consulta, o warehouse para executar a consulta e uma tabela para salvar o resultado), você também fornece os parâmetros de conexão para o contêiner em execução localmente para se conectar ao Snowflake.

    Quando você executa o contêiner como um serviço, o Snowflake fornece esses parâmetros ao contêiner como variáveis de ambiente. Para obter mais informações, consulte Configuração de clientes Snowflake.

    O serviço de trabalho executa a consulta (select current_time() as time,'hello') e grava o resultado na tabela (tutorial_db.data_schema.results). Se a tabela não existir, ela será criada. Se a tabela existir, o serviço de trabalho adicionará uma linha.

    Exemplo de resultado da consulta à tabela de resultados:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

Qual é o próximo passo?

Agora você pode testar o Tutorial 3, que mostra como funciona a comunicação entre serviços.