자습서 2: Snowpark Container Services 작업 만들기

중요

Snowpark Container Services 작업 기능은 현재 비공개 미리 보기로 제공되며 https://snowflake.com/legal 의 미리 보기 조건이 적용됩니다. 자세한 내용은 Snowflake 담당자에게 문의하십시오.

소개

자습서 공통 설정 을 완료하면 작업을 생성할 준비가 됩니다. 이 자습서에서는 Snowflake에 연결하고 SQL SELECT 쿼리를 실행하고 결과를 테이블에 저장하는 간단한 작업을 생성합니다.

이 자습서는 다음 두 부분으로 구성됩니다.

파트 1: 작업을 만들고 테스트합니다. 이 자습서에 제공된 코드를 다운로드하고 단계별 지침을 따릅니다.

  1. 이 자습서를 위한 작업 코드를 다운로드합니다.

  2. Snowpark Container Services용 Docker 이미지를 만들어 자기 계정의 리포지토리에 업로드합니다.

  3. Snowflake에 컨테이너 구성 정보를 제공하는 작업 사양 파일을 스테이징합니다. 컨테이너를 시작하는 데 사용할 이미지 이름 외에도 사양 파일은 다음을 제공합니다.

    • 세 가지 인자: SELECT 쿼리, 쿼리를 실행할 가상 웨어하우스, 결과를 저장할 테이블 이름.

    • SELECT 문을 실행할 웨어하우스.

  4. 작업을 실행합니다. EXECUTE SERVICE 명령을 사용하면 Snowflake가 컨테이너를 실행할 수 있는 사양 파일과 컴퓨팅 풀을 제공하여 작업을 실행할 수 있습니다. 마지막으로 작업 결과를 확인합니다.

파트 2: 작업 코드를 이해합니다. 이 섹션에서는 작업 코드의 개요를 제공하고 다양한 구성 요소가 어떻게 협력하는지 강조합니다.

1: 서비스 코드 다운로드

작업을 생성하기 위한 코드(Python 애플리케이션)가 제공됩니다.

  1. zip 파일 을 디렉토리에 다운로드합니다.

  2. 이 파일의 압축을 풀면 각 자습서마다 하나의 디렉터리가 포함된 것을 알 수 있습니다. Tutorial-2 디렉터리에는 다음 파일이 있습니다.

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: 이미지 만들기 및 업로드

Snowpark Container Services가 지원하는 linux/amd64 플랫폼의 이미지를 만든 다음, 해당 이미지를 계정의 이미지 리포지토리에 업로드합니다(공통 설정 참조).

리포지토리(리포지토리 URL 및 레지스트리 호스트 이름)에 대한 정보가 있어야 이미지를 만들어 업로드할 수 있습니다. 자세한 내용은 레지스트리 및 리포지토리 를 참조하십시오.

리포지토리에 대한 정보 얻기

  1. 리포지토리 URL을 가져오려면 SHOW IMAGE REPOSITORIES SQL 명령을 실행하십시오.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • 출력의 repository_url 열은 URL을 제공합니다. 아래에 예가 나와 있습니다.

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • 리포지토리 URL의 호스트 이름은 레지스트리 호스트 이름입니다. 아래에 예가 나와 있습니다.

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

이미지를 만들어 리포지토리에 업로드하기

  1. 터미널 창을 열고 압축을 푼 파일이 포함된 디렉터리로 변경합니다.

  2. Docker 이미지를 만들려면 Docker CLI를 사용하여 다음 docker build 명령을 실행하십시오. 이 명령은 현재 작업 디렉터리(.)를 이미지 만들기에 사용할 파일의 PATH 로 지정합니다.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • image_name 의 경우 my_job_image:latest 를 사용합니다.

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Snowflake 계정의 리포지토리에 이미지를 업로드합니다. Docker가 사용자를 대신하여 리포지토리에 이미지를 업로드하려면 먼저 Snowflake로 Docker를 인증해야 합니다.

    1. Snowflake 레지스트리로 Docker를 인증하려면 다음 명령을 실행하십시오.

      docker login <registry_hostname> -u <username>
      
      Copy
      • username 의 경우 Snowflake 사용자 이름을 지정합니다. Docker가 비밀번호를 묻는 메시지를 표시합니다.

    2. 이미지를 업로드하려면 다음 명령을 실행하십시오.

      docker push <repository_url>/<image_name>
      
      Copy

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: 사양 파일 스테이징

  • 작업 사양 파일(my_job_spec.yaml)을 스테이지에 업로드하려면 다음 옵션 중 하나를 사용하십시오.

    • Snowsight 웹 인터페이스: 지침은 로컬 파일을 위한 내부 스테이지 선택하기 섹션을 참조하십시오.

    • SnowSQL CLI: 다음 PUT 명령을 실행합니다.

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      예:

      • Linux 또는 macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      상대 경로를 지정할 수도 있습니다.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      이 명령은 필요한 경우(예: 사양 파일에서 오류를 수정한 경우) 파일을 다시 업로드할 수 있도록 OVERWRITE=TRUE를 설정합니다. PUT 명령이 성공적으로 실행되면 업로드된 파일에 대한 정보가 인쇄됩니다.

4: 작업 실행

이제 작업을 생성할 준비가 되었습니다.

  1. 작업을 시작하려면 EXECUTE SERVICE 명령을 실행하십시오.

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    다음 사항을 참고하십시오.

    • FROM 및 SPEC은 스테이지 이름과 작업 사양 파일의 이름을 제공합니다. 작업이 실행되면 SQL 문을 실행하고 결과를 my_job_spec.yaml 에 지정된 테이블에 저장합니다.

      작업의 SQL 문은 Docker 컨테이너 내에서 실행되지 않습니다. 대신 실행 중인 컨테이너가 Snowflake에 연결되어 Snowflake 웨어하우스에서 SQL 문을 실행합니다.

    • COMPUTE_POOL은 Snowflake가 작업을 실행하는 컴퓨팅 리소스를 제공합니다.

    • EXECUTE SERVICE는 다음 샘플 출력에 표시된 대로 Snowflake에서 할당한 작업 UUID를 포함하는 출력을 반환합니다.

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. |
      +------------------------------------------------------------------------------------+
      
  2. 실행한 쿼리의 ID를 가져옵니다(EXECUTE SERVICE는 쿼리임).

    SET jobid = LAST_QUERY_ID();
    
    Copy

    다음 단계에서 ID를 사용하여 작업 상태 및 작업 로그 정보를 검색합니다.

    참고

    이 명령으로 반환되는 작업 ID가 EXECUTE SERVICE 명령에 대한 것인지 확인하기 위해 EXECUTE SERVICE를 호출한 직후에 LAST_QUERY_ID를 호출하는 것이 중요합니다.

    LAST_QUERY_ID는 작업이 완료된 후에만 작업의 쿼리 ID를 반환합니다. 진행 중인 장기 실행 작업의 경우 실시간 상태 정보를 얻는 데 적합하지 않으며 대신 QUERY HISTORY 테이블 함수 패밀리를 사용하여 작업의 쿼리 ID를 가져와야 합니다. 자세한 내용은 작업 사용하기 를 참조하십시오.

  3. 작업은 간단한 쿼리를 실행하고 결과를 결과 테이블에 저장합니다. 결과 테이블을 쿼리하여 작업이 성공적으로 완료되었는지 확인할 수 있습니다.

    SELECT * FROM results;
    
    Copy

    샘플 출력:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  4. 작업 실행을 디버그하려면 시스템 함수를 사용하십시오. 예를 들어, SYSTEM$GET_JOB_STATUS를 사용하여 작업이 아직 실행 중인지, 시작하지 못했는지, 또는 시작했다면 실패한 이유가 무엇인지 확인합니다. 또한 코드가 표준 출력 또는 표준 오류로 유용한 로그를 출력한다고 가정하면 SYSTEM$GET_JOB_LOGS를 사용하여 로그에 액세스할 수 있습니다.

    1. 작업 상태를 확인하려면 시스템 함수 SYSTEM$GET_JOB_STATUS를 호출하십시오.

      SELECT SYSTEM$GET_JOB_STATUS($jobid);
      
      Copy

      샘플 출력:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy

      출력에서 작업에 이름이 없으므로 serviceName 은 Snowflake에서 할당한 작업 UUID(쿼리 ID)입니다.

    2. 작업 로그 정보를 얻으려면 시스템 함수 SYSTEM$GET_JOB_LOGS를 사용하십시오.

      SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: 정리

자습서 3 을 계속 진행할 계획이 없다면 생성한 청구 가능한 리소스를 제거해야 합니다. 자세한 내용은 자습서 3 의 5단계를 참조하십시오.

6: 작업 코드 검토

이 섹션에서는 다음 주제를 다룹니다.

제공된 파일 검사하기

자습서 시작 부분에서 다운로드한 zip 파일에는 다음 파일이 포함되어 있습니다.

  • main.py

  • Dockerfile

  • my_job_spec.yaml

이 섹션에서는 코드가 작업을 구현하는 방법에 대한 개요를 제공합니다.

main.py 파일

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

코드에서는 다음 사항이 적용됩니다.

  • Python 코드는 main 에서 실행된 후 run_job() 함수를 실행합니다.

    if __name__ == "__main__":
      run_job()
    
    Copy
  • run_job() 함수는 환경 변수를 읽고 이를 사용하여 다양한 매개 변수의 기본값을 설정합니다. 컨테이너는 이러한 매개 변수를 사용하여 Snowflake에 연결합니다. 참고:

    • 이러한 기본 매개 변수 값을 재정의할 수 있습니다. 자세한 내용은 서비스 사양 참조 섹션을 참조하십시오.

    • 이미지가 Snowflake에서 실행되면 Snowflake는 이러한 매개 변수 중 일부(소스 코드 참조)를 자동으로 채웁니다. 그러나 이미지를 로컬에서 테스트할 때는 이러한 매개 변수를 명시적으로 제공해야 합니다(다음 섹션의 로컬에서 이미지 만들기 및 테스트하기 참조).

Dockerfile

이 파일에는 Docker를 사용하여 이미지를 만드는 모든 명령이 포함되어 있습니다.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

my_job_spec.yaml 파일(작업 사양)

Snowflake는 사용자가 이 사양에 제공하는 정보를 사용하여 작업을 구성하고 실행합니다.

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

container.namecontainer.image 필수 필드(서비스 사양 참조 참조) 외에도, 사양에는 인자를 나열하기 위한 선택적 container.args 필드가 포함됩니다.

  • --query 는 작업이 실행될 때 실행할 쿼리를 제공합니다.

  • --result_table 은 쿼리 결과를 저장할 테이블을 식별합니다.

로컬에서 이미지 만들기 및 테스트하기

Docker 이미지를 Snowflake 계정의 리포지토리에 업로드하기 전에 로컬에서 테스트할 수 있습니다. 로컬 테스트에서는 컨테이너가 독립형으로 실행됩니다(Snowflake가 실행하는 작업이 아님).

자습서 2 Docker 이미지를 테스트하려면 다음 단계를 따르십시오.

  1. Docker 이미지를 생성하려면 Docker CLI에서 docker build 명령을 실행하십시오.

    docker build --rm -t my_service:local .
    
    Copy
  2. 코드를 실행하려면 <orgname>-<acctname>, <username>, <password> 를 제공하여 docker run 명령을 실행하십시오.

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    이미지를 로컬에서 테스트할 때 세 가지 인자(쿼리, 쿼리를 실행할 웨어하우스, 결과를 저장할 테이블) 외에도 로컬에서 실행 중인 컨테이너가 Snowflake에 연결할 연결 매개 변수도 제공합니다.

    컨테이너를 작업으로 실행하면 Snowflake는 이러한 매개 변수를 컨테이너에 환경 변수로 제공합니다. 자세한 내용은 Snowflake 클라이언트 구성하기 를 참조하십시오.

    이 작업은 쿼리를 실행하고(select current_time() as time,'hello') 결과를 테이블에 씁니다(tutorial_db.data_schema.results). 테이블이 없을 경우에는 생성됩니다. 테이블이 있을 경우에는 작업이 행을 추가합니다.

    결과 테이블 쿼리의 샘플 결과:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

다음에는 무엇을 해야 합니까?

이제 서비스 간 통신이 어떻게 작동하는지 보여주는 자습서 3 을 테스트할 수 있습니다.