자습서 3: Snowflake Python API를 사용하여 서비스와 작업 만들기

소개

자습서 1자습서 2 에서는 SQL 인터페이스를 사용하여 Snowpark Container Services 및 작업을 생성합니다. 이 자습서에서는 Snowflake Python API 를 사용하여 동일한 서비스와 작업을 생성하고, Snowflake Python APIs 를 사용하여 Snowpark Container Services 리소스를 관리하는 방법을 살펴봅니다.

자습서에서는 Snowflake Notebook 을 사용하여 Python 코드를 실행하지만, 코드는 노트북과 독립적이며 다른 환경에서도 코드를 실행할 수 있습니다.

1: 초기 구성

이 초기 구성에서는 Snowflake Notebook을 만들고, 필요한 라이브러리를 가져오고, 이후 단계의 셀에서 사용할 상수를 정의합니다.

  1. Snowflake Notebook 만들기

    1. 노트북을 만듭니다. 자세한 지침은 새로운 노트북 만들기 섹션을 참조하십시오. UI (웨어하우스에서 실행 또는 컨테이너에서 실행)에서 선택한 Python 환경 은 중요하지 않습니다.

    2. 패키지 드롭다운 메뉴에서 “Snowflake” 패키지를 선택하고 최신 버전의 Snowflake Python APIs 라이브러리를 설치합니다.

    3. (선택 사항) 기본적으로 노트북에 제공된 셀을 삭제합니다. 이 자습서의 단계를 따라 노트북에 Python 셀을 추가합니다.

  2. 이 자습서에서는 셀을 만들고 실행하여 여러 셀에서 사용하는 Python 라이브러리를 가져옵니다.

    from snowflake.snowpark.context import get_active_session
    from snowflake.core import Root
    from snowflake.core import CreateMode
    
    Copy
  3. 셀을 생성하고 실행하여 후속 셀에서 사용할 상수를 정의합니다. 아래 제공된 값은 자습서 1, 2와 일치합니다. 이러한 값은 선택적으로 변경할 수 있습니다.

    current_user = get_active_session().get_current_user()
    user_role_name = "test_role"
    compute_pool_name = "tutorial_compute_pool"
    warehouse_name = "tutorial_warehouse"
    database_name = "tutorial_db"
    schema_name = "data_schema"
    repo_name = "tutorial_repository"
    stage_name = "tutorial_stage"
    service_name = "echo_service"
    print("configured!")
    
    Copy

2: Snowflake 오브젝트 만들기

서비스를 만들려면 먼저 데이터베이스, 사용자, 역할, 컴퓨팅 풀, 이미지 리포지토리와 같은 Snowflake 오브젝트가 필요합니다. 이러한 오브젝트 중 일부는 계정 관리자 권한이 있어야 만들 수 있는 계정 범위 오브젝트입니다. 생성된 오브젝트의 이름은 이전 단계에서 정의됩니다.

2.1: 계정 범위의 Snowflake 오브젝트 만들기

다음 Python 코드는 이러한 오브젝트를 생성합니다.

  • 역할(test_role). 이 역할에 서비스를 만들고 사용하는 데 필요한 모든 권한을 부여합니다. 코드에서 현재 사용자에게 이 역할을 부여하여 사용자가 서비스를 생성하고 사용할 수 있도록 합니다.

  • 데이터베이스(tutorial_db). 다음 단계에서는 이 데이터베이스에 스키마를 만듭니다.

  • 컴퓨팅 풀(tutorial_compute_pool). 서비스 컨테이너는 이 컴퓨팅 풀에서 실행됩니다.

  • 웨어하우스(tutorial_warehouse). 서비스가 Snowflake에 연결하여 쿼리를 실행할 때 이 웨어하우스가 쿼리를 실행하는 데 사용됩니다.

셀을 만들고 실행하여 ACCOUNTADMIN 역할을 사용하여 이러한 계정 범위의 오브젝트를 만듭니다. 스크립트는 리소스가 존재하지 않는 경우에만 리소스를 생성한다는 점에 유의하십시오. 코드의 설명에 해당하는 SQL 문이 표시됩니다.

from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse

session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)

# CREATE ROLE test_role;
root.roles.create(
    Role(name=user_role_name),
    mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)

# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
    securable=Securables.role(user_role_name),
    grantee=Grantees.user(name=current_user),
    ))

# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
#   MIN_NODES = 1 MAX_NODES = 1
#   INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
    mode=CreateMode.if_not_exists,
    compute_pool=ComputePool(
        name=compute_pool_name,
        instance_family="CPU_X64_XS",
        min_nodes=1,
        max_nodes=2,
    )
)

# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
    securable=Securables.compute_pool(compute_pool_name),
    grantee=Grantees.role(name=user_role_name)
    ))

print(f"Created compute pool:", compute_pool_name)

# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
    Database(name=database_name),
    mode=CreateMode.if_not_exists)

# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.all_privileges],
    securable=Securables.database(database_name),
    grantee=Grantees.role(name=user_role_name),
    ))

print("Created database:", database_name)

# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
    Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
    mode=CreateMode.if_not_exists)

# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.usage],
    grantee=Grantees.role(name=user_role_name),
    securable=Securables.warehouse(warehouse_name)
    ))

print("Created warehouse:", warehouse_name)

# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.bind_service_endpoint],
    securable=Securables.current_account,
    grantee=Grantees.role(name=user_role_name)
    ))

print("Done: GRANT BIND SERVICE ENDPOINT")
Copy

리소스를 만들 때 코드에서 역할에 요구 사항 권한을 부여(test_role)하여 역할이 이러한 리소스를 사용할 수 있도록 합니다. 또한 이 자습서에서 만드는 에코 서비스는 하나의 공용 엔드포인트를 노출한다는 점에 유의하십시오. 이 공개 서비스 엔드포인트를 사용하면 계정의 다른 사용자가 공개 웹에서 서비스에 액세스할 수 있습니다(수신). 공용 엔드포인트가 있는 서비스를 만들려면 역할(test_role)에 계정에 대한 BIND SERVICE ENDPOINT 권한이 있어야 합니다.

2.2 스키마 범위 오브젝트 만들기

이 섹션의 Python 코드는 test_role 역할을 사용하여 스키마와 해당 스키마에 오브젝트를 생성합니다. 이러한 리소스를 만드는 데 관리자 권한이 필요하지 않습니다.

  • 스키마(data_schema). 이 스키마에서 이미지 리포지토리, 서비스 및 작업을 만듭니다.

  • 이미지 리포지토리(tutorial_repository). 이 리포지토리에 애플리케이션 이미지를 저장합니다.

  • 스테이지(tutorial_stage). 이 스테이지는 일러스트레이션 전용으로 제작되었습니다. 이 자습서에서는 설명하지 않지만 스테이지를 사용하여 서비스로 데이터를 전달하거나 서비스에서 데이터를 수집할 수 있습니다.

스크립트는 리소스가 존재하지 않는 경우에만 리소스를 생성한다는 점에 유의하십시오.

from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
    Schema(name=schema_name),
    mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)

# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
    ImageRepository(name=repo_name),
    mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)

repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")


#CREATE STAGE IF NOT EXISTS tutorial_stage
#  DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
    Stage(
        name=stage_name,
        directory_table=StageDirectoryTable(enable=True)),
    mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Copy

Python 코드는 이미지를 리포지토리에 푸시할 때 사용하는 리포지토리(URL)에 대한 유용한 정보도 출력합니다.

3: 이미지 만들기 및 업로드

자습서 1 에 설명된 대로 로컬로 코드를 다운로드하고 Docker 명령을 사용하여 이미지를 빌드한 다음 계정의 이미지 리포지토리에 업로드합니다.

  1. 셀을 생성하고 실행하여 이미지 레지스트리의 호스트 이름과 이미지 저장소에 대한 URL 주소를 얻습니다.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Python 코드는 이미지 리포지토리 리소스 오브젝트(repo)를 검색하고 모델 오브젝트에 액세스한 다음 URL 리포지토리를 추출합니다.

  2. 자습서 1 1단계와 2단계를 따라 서비스 코드를 다운로드하고 이미지를 빌드한 후 리포지토리에 업로드하십시오.

  3. 셀을 생성하고 실행하여 이미지가 리포지토리에 있는지 확인합니다.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    이 코드는 이미지 저장소 리소스(repo)에서 이미지를 열거하고 각 이미지에 대해 image_path 를 출력합니다.

4: 서비스 만들기

서비스와 통신할 서비스와 서비스 함수를 만듭니다.

  1. 컴퓨팅 풀이 준비되었는지 확인합니다. 컴퓨팅 풀을 생성한 후 Snowflake가 모든 노드를 프로비저닝하는 데 시간이 다소 걸립니다. 서비스 컨테이너는 지정된 컴퓨팅 풀 내에서 실행되므로 서비스를 만들기 전에 컴퓨팅 풀이 준비되었는지 확인하십시오.

    셀을 생성하고 실행하여 컴퓨팅 풀 상태를 확인합니다.

    import time
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    cp = root.compute_pools[compute_pool_name]
    
    cpm = cp.fetch()
    print(cpm.state, cpm.status_message)
    if cpm.state == 'SUSPENDED':
        cp.resume()
    while cpm.state in ['STARTING', 'SUSPENDED']:
        time.sleep(5)
        cpm = cp.fetch()
        print(cpm.state, cpm.status_message)
    
    Copy

    이 코드는 컴퓨팅 풀 리소스(cp)에서 컴퓨팅 풀 모델(cpm)을 가져와 현재 컴퓨팅 풀 상태를 검색합니다. 컴퓨팅 풀이 일시 중단된 경우 코드가 컴퓨팅 풀을 재개합니다. 컴퓨팅 풀이 더 이상 STARTING 또는 SUSPENDED 상태가 아닐 때까지 코드가 매번 5초 동안 일시 중지하면서 반복됩니다.

    출력의 마지막 줄은 컴퓨팅 풀이 서비스를 실행할 준비가 되었음을 나타내는 “IDLE” 또는 “ACTIVE”여야 합니다. 자세한 내용은 컴퓨팅 풀 수명 주기 섹션을 참조하십시오. 컴퓨팅 풀이 준비되지 않으면 서비스를 시작할 수 없습니다.

  2. 셀을 생성하고 실행하여 에코 서비스를 생성합니다.

    from snowflake.core.service import Service, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    specification = f"""
        spec:
          containers:
          - name: echo
            image: {repo_url}/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
    
        """
    echo_service = schema.services.create(Service(
        name=service_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification),
        min_instances=1,
        max_instances=1),
        mode=CreateMode.if_not_exists)
    print("created service:", echo_service.name)
    
    Copy

    이 코드는 이전 단계에서 수행한 것처럼 리포지토리 URL 을 검색합니다. 그런 다음 이 코드는 인라인 사양과 지정된 이미지 저장소의 이미지를 사용하여 echo_service 를 생성합니다.

    Python 코드에서 볼 수 있듯이 리소스 이름을 매개 변수로 설정하는 것은 쉽습니다. 다음은 서비스를 생성하지만 매개 변수를 사용하지 않는 SQL 명령과 동등한 명령입니다.

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
      $$
      MIN_INSTANCES=1
      MAX_INSTANCES=1;
    
    Copy
  3. 셀을 실행하여 서비스 함수(my_echo_function)를 만듭니다. 서비스 함수는 서비스를 사용하는 방법 중 하나입니다.

    from snowflake.core.function import ServiceFunction, FunctionArgument
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # CREATE FUNCTION my_echo_udf (inputtext VARCHAR)
    #  RETURNS VARCHAR
    #  SERVICE=echo_service
    #  ENDPOINT=echoendpoint
    #  AS '/echo';
    svcfn = schema.functions.create(mode=CreateMode.or_replace,
        function=ServiceFunction(
            name="my_echo_function",
            arguments=[FunctionArgument(name="inputtext", datatype="TEXT")],
            returns="TEXT",
            service=service_name,
            endpoint="echoendpoint",
            path="/echo"))
    print("created service function:", svcfn.name_with_args)
    
    Copy

    이 코드는 schemafunctions 컬렉션에서 create 메서드를 호출하여 서비스 함수(my_echo_function)를 생성합니다.

5: 서비스 사용

이 섹션에서는 다음과 같이 서비스를 사용합니다.

  • 서비스 함수를 호출합니다.

  • 브라우저를 사용하여 서비스의 공용 엔드포인트와 상호 작용합니다.

  1. 서비스 함수를 호출합니다.

    svcfn = schema.functions["my_echo_function(TEXT)"]
    print(svcfn.execute(["hello"]))
    
    Copy

    Snowflake는 서비스 엔드포인트(echoendpoint)에 POST 요청을 보냅니다. 요청을 받으면 서비스는 응답에 입력 문자열을 에코합니다.

    출력:

    +--------------------------+
    | **MY_ECHO_UDF('HELLO!')**|
    |------------------------- |
    | Bob said hello!          |
    +--------------------------+
    
  2. 브라우저에서 서비스가 노출하는 공개 엔드포인트에 액세스합니다.

    1. 공용 엔드포인트의 URL 을 가져옵니다.

      # helper to check if service is ready and return endpoint url
      def get_ingress_for_endpoint(svc, endpoint):
          for _ in range(10): # only try 10 times
              # Find the target endpoint.
              target_endpoint = None
              for ep in svc.get_endpoints():
                  if ep.is_public and ep.name == endpoint:
                      target_endpoint = ep
                      break;
              else:
                  print(f"Endpoint {endpoint} not found")
                  return None
      
              # Return endpoint URL or wait for it to be provisioned.
              if target_endpoint.ingress_url.startswith("Endpoints provisioning "):
                  print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.")
                  time.sleep(10)
              else:
                  return target_endpoint.ingress_url
          print("Timed out waiting for endpoint to become available")
      
      endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint")
      print(f"https://{endpoint_url}/ui")
      
      Copy
    2. 인쇄된 URL 을 브라우저 창에 붙여넣습니다. 그러면 서비스가 ui() 함수를 실행하게 됩니다(echo_service.py 참조).

      엔드포인트 URL에 처음 액세스하면 Snowflake에 로그인하라는 메시지가 표시됩니다. 이 테스트에서는 서비스를 만드는 데 사용한 것과 동일한 사용자를 사용하여 사용자에게 필요한 권한이 있는지 확인하십시오.

      Echo 서비스와 통신하기 위한 웹 양식.
    3. Input 상자에 문자열 “Hello”를 입력하고 Return 을 누릅니다.

      Echo 서비스의 응답을 보여주는 웹 양식.

6: 작업 만들기

자습서 2에서는 SQL 인터페이스를 사용하여 Snowpark Container Services 작업을 생성합니다. 이 섹션에서는 Snowflake Python APIs 를 사용하여 동일한 작업을 생성합니다.

  1. 셀을 생성하고 실행하여 이미지 레지스트리의 호스트 이름과 이미지 저장소에 대한 URL 주소를 얻습니다.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Python 코드는 이미지 리포지토리 리소스 오브젝트(repo)를 검색하고 모델 오브젝트에 액세스한 다음 URL 리포지토리를 추출합니다.

  2. 자습서 2 1단계와 2단계를 따라 서비스 코드를 다운로드하고 이미지를 빌드한 후 리포지토리에 업로드하십시오.

  3. 셀을 생성하고 실행하여 이미지가 리포지토리에 있는지 확인합니다.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    이 코드는 이미지 저장소 리소스(repo)에서 이미지를 열거하고 각 이미지에 대해 image_path 를 출력합니다.

  4. 셀을 만들고 실행하여 작업을 생성합니다.

    from snowflake.core.service import JobService, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    job_name = "test_job"
    
    # cleanup previous job if present.
    schema.services[job_name].drop()(if_exists=True)
    
    specification = f"""
        spec:
          containers:
          - name: main
            image: {repo_url}/my_job_image:latest
            env:
              SNOWFLAKE_WAREHOUSE: {warehouse_name}
            args:
            - "--query=select current_time() as time,'hello'"
            - "--result_table=results"
        """
    job = schema.services.execute_job(JobService(
        name=job_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification)))
    print("executed job:", job.name, "status:", job.fetch().status)
    
    print("job logs:")
    print(job.get_service_logs(0, "main"))
    
    Copy

    이 작업은 주어진 쿼리를 실행하고 그 결과를 테이블에 저장합니다.

  5. 다음 셀을 실행하여 테이블에 기록된 결과를 검토합니다. 이 코드는 Snowpark Python을 사용하여 해당 테이블을 쿼리합니다.

    session = get_active_session()
    session.use_role(user_role_name)
    # show that above job wrote to results table
    session.sql(f"select * from {database_name}.{schema_name}.results").collect()
    
    Copy

7: 정리

  1. 서비스를 중지하고 삭제합니다. 서비스를 중단하면 기본적으로 Snowflake는 컴퓨팅 풀을 자동으로 일시 중단합니다(실행 중인 다른 서비스 및 작업 서비스가 없다는 가정 하에). 자세한 내용은 컴퓨팅 풀 수명 주기 섹션을 참조하십시오.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # now let's clean up
    
    schema.functions["my_echo_function(TEXT)"].drop()
    schema.services[service_name].drop()
    
    Copy
  2. 저장소 비용을 지불하지 않으려면 이미지 저장소를 삭제하십시오. 저장소에 다른 이미지가 저장되어 있는 경우 해당 이미지는 삭제된다는 점에 유의하십시오.

    schema.image_repositories[repo_name].drop()
    
    Copy
  3. 스키마를 삭제합니다. 스키마를 삭제하면 해당 스키마에 있는 모든 오브젝트도 삭제됩니다. 이 자습서에는 서비스, 함수, 이미지 저장소 및 생성한 스테이지가 포함되어 있습니다.

    root.databases[database_name].schemas[schema_name].drop()
    
    Copy
  4. Snowflake가 컴퓨팅 풀을 일시 중단할 때까지 기다리는 대신 명시적으로 컴퓨팅 풀을 일시 중단할 수도 있습니다. 이 경우 Snowflake는 실행 중인 모든 서비스를 일시 중단하고 실행 중인 모든 작업이 완료될 때까지 기다린 다음 컴퓨팅 풀을 일시 중단합니다.

    root.compute_pool[compute_pool_name].suspend()
    
    Copy

다음에는 무엇을 해야 합니까?

이 자습서에서는 Snowflake Python APIs 를 사용하여 Snowpark Container Services 및 작업을 만들고 관리하는 방법을 설명합니다. Snowflake Python APIs 에 대한 자세한 내용은 Snowflake Python APIs: Python으로 Snowflake 오브젝트 관리하기 섹션을 참조하십시오.