Tutorial 3: Crie um serviço e um trabalho usando APIs do Snowflake Python

Introdução

Em Tutorial 1 e Tutorial 2, você usa a interface SQL para criar um serviço e um trabalho do Snowpark Container Services. Neste tutorial, você usa as Snowflake Python APIs para criar o mesmo serviço e trabalho e, assim, explorar o uso do Snowflake Python APIs para gerenciar os recursos do Snowpark Container Services.

O tutorial usa um notebook Snowflake para executar o código Python, mas o código é independente do notebook e você pode executar o código em outros ambientes.

1: Configuração inicial

Nessa configuração inicial, você cria um notebook do Snowflake, importa as bibliotecas necessárias e define as constantes que são usadas pelas células nas etapas subsequentes.

  1. Crie um notebook Snowflake

    1. Crie um notebook Para obter instruções, consulte Criação de um novo notebook. Observe que o ambiente Python que você escolher no UI (execução no warehouse ou execução no contêiner) não importa.

    2. No menu suspenso Pacotes, escolha o pacote «snowflake» e instale a versão mais recente da biblioteca Snowflake Python APIs.

    3. (Opcional) Exclua as células fornecidas no notebook por padrão. Ao seguir as etapas deste tutorial, você adiciona células Python ao notebook.

  2. Crie e execute a célula para importar as bibliotecas Python usadas por muitas células neste tutorial.

    from snowflake.snowpark.context import get_active_session
    from snowflake.core import Root
    from snowflake.core import CreateMode
    
    Copy
  3. Crie e execute a célula para definir as constantes que você usará nas células subsequentes. Os valores fornecidos abaixo correspondem aos tutoriais 1 e 2. Opcionalmente, você pode alterar esses valores.

    current_user = get_active_session().get_current_user()
    user_role_name = "test_role"
    compute_pool_name = "tutorial_compute_pool"
    warehouse_name = "tutorial_warehouse"
    database_name = "tutorial_db"
    schema_name = "data_schema"
    repo_name = "tutorial_repository"
    stage_name = "tutorial_stage"
    service_name = "echo_service"
    print("configured!")
    
    Copy

2: Criar objetos Snowflake

Antes de criar um serviço, você precisa de objetos do Snowflake, como um banco de dados, um usuário, uma função, um pool de computação e um repositório de imagens. Alguns desses objetos são objetos com escopo de conta que exigem privilégios administrativos para serem criados. Os nomes dos objetos criados são definidos na etapa anterior.

2.1: Criar objetos Snowflake com escopo de conta

O código Python a seguir cria esses objetos:

  • Função (test_role). Você concede a essa função todos os privilégios necessários para criar e usar o serviço. No código, o usuário concede essa função ao usuário atual para permitir que ele crie e use o serviço.

  • Banco de dados (tutorial_db) Na próxima etapa, você cria um esquema nesse banco de dados.

  • Pool de computação (tutorial_compute_pool). Seu contêiner de serviço é executado nesse pool de computação.

  • Warehouse (tutorial_warehouse). Quando o serviço se conecta ao Snowflake e executa consultas, esse warehouse é usado para executar as consultas.

Crie e execute a célula para criar esses objetos com escopo de conta usando a função ACCOUNTADMIN. Observe que o script cria recursos somente se eles não existirem. Os comentários no código mostram as declarações equivalentes em SQL.

from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse

session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)

# CREATE ROLE test_role;
root.roles.create(
    Role(name=user_role_name),
    mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)

# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
    securable=Securables.role(user_role_name),
    grantee=Grantees.user(name=current_user),
    ))

# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
#   MIN_NODES = 1 MAX_NODES = 1
#   INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
    mode=CreateMode.if_not_exists,
    compute_pool=ComputePool(
        name=compute_pool_name,
        instance_family="CPU_X64_XS",
        min_nodes=1,
        max_nodes=2,
    )
)

# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
    securable=Securables.compute_pool(compute_pool_name),
    grantee=Grantees.role(name=user_role_name)
    ))

print(f"Created compute pool:", compute_pool_name)

# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
    Database(name=database_name),
    mode=CreateMode.if_not_exists)

# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.all_privileges],
    securable=Securables.database(database_name),
    grantee=Grantees.role(name=user_role_name),
    ))

print("Created database:", database_name)

# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
    Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
    mode=CreateMode.if_not_exists)

# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.usage],
    grantee=Grantees.role(name=user_role_name),
    securable=Securables.warehouse(warehouse_name)
    ))

print("Created warehouse:", warehouse_name)

# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.bind_service_endpoint],
    securable=Securables.current_account,
    grantee=Grantees.role(name=user_role_name)
    ))

print("Done: GRANT BIND SERVICE ENDPOINT")
Copy

À medida que você cria recursos, o código também concede os privilégios necessários à função (test_role) para que a função possa usar esses recursos. Além disso, observe que o serviço echo que você criou neste tutorial expõe um endpoint público. Esse ponto de extremidade público permite que outros usuários da sua conta acessem o serviço a partir da Web pública (ingresso). Para criar um serviço com um ponto de extremidade público, a função (test_role) deve ter o privilégio BIND SERVICE ENDPOINT na conta.

2.2 Criar objetos com escopo de esquema

O código Python nesta seção usa a função test_role para criar um esquema e objetos nesse esquema. Você não precisa de privilégios administrativos para criar esses recursos.

  • Esquema (data_schema). Você cria um repositório de imagens, um serviço e um trabalho nesse esquema.

  • Repositório de imagens (tutorial_repository). Você armazena a imagem do aplicativo nesse repositório.

  • Estágio (tutorial_stage). O estágio foi criado apenas para fins ilustrativos. Embora não tenham sido demonstrados neste tutorial, os estágios podem ser usados para passar dados ou coletar dados dos seus serviços.

Observe que o script cria recursos somente se eles não existirem.

from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
    Schema(name=schema_name),
    mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)

# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
    ImageRepository(name=repo_name),
    mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)

repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")


#CREATE STAGE IF NOT EXISTS tutorial_stage
#  DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
    Stage(
        name=stage_name,
        directory_table=StageDirectoryTable(enable=True)),
    mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Copy

O código Python também imprime informações úteis sobre o repositório (o URL do repositório) que você usa ao enviar suas imagens para o repositório.

3: Criar uma imagem e carregar

Você faz o download do código localmente, conforme descrito em Tutorial 1, usa os comandos do Docker para criar a imagem e faz o upload dela para o repositório de imagens em sua conta.

  1. Crie e execute a célula para obter o nome do host do registro de imagens e o URL do repositório de imagens.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    O código Python recupera o Objeto de recurso do repositório de imagens (repo), acessa o objeto do modelo e extrai o URL do repositório dele.

  2. Siga as etapas 1 e 2 do Tutorial 1 para fazer o download do código de serviço, criar uma imagem e carregá-la no repositório.

  3. Crie e execute a célula para verificar se a imagem está no repositório.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    O código enumera as imagens do recurso de repositório de imagens (repo) e imprime o endereço image_path para cada imagem.

4: Criar um serviço

Crie um serviço e uma função de serviço para se comunicar com o serviço.

  1. Verifique se o pool de computação está pronto. Depois que você cria um pool de computação, o Snowflake leva algum tempo para provisionar todos os nós. Certifique-se de que o pool de computação esteja pronto antes de criar um serviço, pois os contêineres de serviço são executados no pool de computação especificado.

    Crie e execute a célula para obter o status do pool de computação:

    import time
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    cp = root.compute_pools[compute_pool_name]
    
    cpm = cp.fetch()
    print(cpm.state, cpm.status_message)
    if cpm.state == 'SUSPENDED':
        cp.resume()
    while cpm.state in ['STARTING', 'SUSPENDED']:
        time.sleep(5)
        cpm = cp.fetch()
        print(cpm.state, cpm.status_message)
    
    Copy

    O código obtém o modelo do pool de computação (cpm) do recurso do pool de computação (cp) para recuperar o estado atual do pool de computação. Se o pool de computação for suspenso, o código retomará o pool de computação. O código faz um loop, fazendo uma pausa de cinco segundos a cada vez, até que o pool de computação não esteja mais no estado STARTING ou SUSPENDED.

    A última linha de saída deve ser «IDLE» ou «ACTIVE», o que indica que o pool de computação está pronto para executar seu serviço. Para obter mais informações, consulte Ciclo de vida do pool de computação. Se o pool de computação não estiver pronto, seus serviços não poderão ser iniciados.

  2. Crie e execute a célula para criar o serviço echo.

    from snowflake.core.service import Service, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    specification = f"""
        spec:
          containers:
          - name: echo
            image: {repo_url}/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
    
        """
    echo_service = schema.services.create(Service(
        name=service_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification),
        min_instances=1,
        max_instances=1),
        mode=CreateMode.if_not_exists)
    print("created service:", echo_service.name)
    
    Copy

    O código recupera o URL do repositório, conforme feito na etapa anterior. Em seguida, o código cria o echo_service usando uma especificação em linha e a imagem do repositório de imagens especificado.

    Como você pode ver no código Python, é fácil parametrizar os nomes dos recursos. A seguir, o comando SQL equivalente que cria um serviço, mas não usa parâmetros.

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
      $$
      MIN_INSTANCES=1
      MAX_INSTANCES=1;
    
    Copy
  3. Execute a célula para criar uma função de serviço (my_echo_function). Uma função de serviço é uma das maneiras de usar o serviço.

    from snowflake.core.function import ServiceFunction, FunctionArgument
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # CREATE FUNCTION my_echo_udf (inputtext VARCHAR)
    #  RETURNS VARCHAR
    #  SERVICE=echo_service
    #  ENDPOINT=echoendpoint
    #  AS '/echo';
    svcfn = schema.functions.create(mode=CreateMode.or_replace,
        function=ServiceFunction(
            name="my_echo_function",
            arguments=[FunctionArgument(name="inputtext", datatype="TEXT")],
            returns="TEXT",
            service=service_name,
            endpoint="echoendpoint",
            path="/echo"))
    print("created service function:", svcfn.name_with_args)
    
    Copy

    O código chama o método create na coleção functions do schema para criar a função de serviço (my_echo_function).

5: Usar o serviço

Nesta seção, você usa o serviço da seguinte forma:

  • Invocar a função de serviço.

  • Usar um navegador para interagir com o ponto de extremidade público do serviço.

  1. Invocar a função de serviço.

    svcfn = schema.functions["my_echo_function(TEXT)"]
    print(svcfn.execute(["hello"]))
    
    Copy

    Snowflake envia uma solicitação POST ao ponto de extremidade do serviço (echoendpoint). Ao receber a solicitação, o serviço ecoa a cadeia de caracteres de entrada na resposta.

    Saída:

    +--------------------------+
    | **MY_ECHO_UDF('HELLO!')**|
    |------------------------- |
    | Bob said hello!          |
    +--------------------------+
    
  2. Acesse de um navegador o ponto de extremidade público que o serviço expõe.

    1. Obtenha o URL do endereço do ponto de extremidade público.

      # helper to check if service is ready and return endpoint url
      def get_ingress_for_endpoint(svc, endpoint):
          for _ in range(10): # only try 10 times
              # Find the target endpoint.
              target_endpoint = None
              for ep in svc.get_endpoints():
                  if ep.is_public and ep.name == endpoint:
                      target_endpoint = ep
                      break;
              else:
                  print(f"Endpoint {endpoint} not found")
                  return None
      
              # Return endpoint URL or wait for it to be provisioned.
              if target_endpoint.ingress_url.startswith("Endpoints provisioning "):
                  print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.")
                  time.sleep(10)
              else:
                  return target_endpoint.ingress_url
          print("Timed out waiting for endpoint to become available")
      
      endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint")
      print(f"https://{endpoint_url}/ui")
      
      Copy
    2. Cole o URL impresso em uma janela do navegador. Isso faz com que o serviço execute a função ui() (consulte echo_service.py).

      Observe que na primeira vez que você acessar o URL do ponto de extremidade, será solicitado a fazer login no Snowflake. Para este teste, use o mesmo usuário usado para criar o serviço para garantir que o usuário tenha os privilégios necessários.

      Formulário da web para comunicação com o serviço Echo.
    3. Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.

      Formulário da Web mostrando a resposta do serviço Echo.

6: Criar um trabalho

No Tutorial 2, você usa a interface SQL para criar um trabalho do Snowpark Container Services. Nesta seção, você cria o mesmo trabalho usando o Snowflake Python APIs.

  1. Crie e execute a célula para obter o nome do host do registro de imagens e o URL do repositório de imagens.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    O código Python recupera o objeto de recurso do repositório de imagens (repo), acessa o objeto modelo e extrai dele o URL do repositório.

  2. Siga as etapas 2 e 2 do Tutorial 2 para fazer o download do código de serviço, criar uma imagem e carregá-la no repositório.

  3. Crie e execute a célula para verificar se a imagem está no repositório.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    O código enumera as imagens do recurso de repositório de imagens (repo) e imprime o endereço image_path para cada imagem.

  4. Crie e execute a célula para criar o trabalho.

    from snowflake.core.service import JobService, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    job_name = "test_job"
    
    # cleanup previous job if present.
    schema.services[job_name].drop()(if_exists=True)
    
    specification = f"""
        spec:
          containers:
          - name: main
            image: {repo_url}/my_job_image:latest
            env:
              SNOWFLAKE_WAREHOUSE: {warehouse_name}
            args:
            - "--query=select current_time() as time,'hello'"
            - "--result_table=results"
        """
    job = schema.services.execute_job(JobService(
        name=job_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification)))
    print("executed job:", job.name, "status:", job.fetch().status)
    
    print("job logs:")
    print(job.get_service_logs(0, "main"))
    
    Copy

    O trabalho executa a consulta fornecida e armazena os resultados em uma tabela.

  5. Execute a célula a seguir para revisar o resultado gravado na tabela. Esse código usa o Snowpark Python para consultar essa tabela.

    session = get_active_session()
    session.use_role(user_role_name)
    # show that above job wrote to results table
    session.sql(f"select * from {database_name}.{schema_name}.results").collect()
    
    Copy

7: Limpeza

  1. Interrompa o serviço e descarte-o. Depois de descartar o serviço, o Snowflake, por padrão, suspende automaticamente o pool de computação (supondo que não haja outros serviços e serviços de trabalho em execução). Para obter mais informações, consulte Ciclo de vida do pool de computação.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # now let's clean up
    
    schema.functions["my_echo_function(TEXT)"].drop()
    schema.services[service_name].drop()
    
    Copy
  2. Descarte o repositório de imagens para evitar pagar pelo armazenamento. Observe que, se você tiver outras imagens armazenadas no repositório, elas serão excluídas.

    schema.image_repositories[repo_name].drop()
    
    Copy
  3. Descartar o esquema. O descarte de um esquema também elimina todos os objetos desse esquema. Para este tutorial, isso inclui o serviço, a função, o repositório de imagens e o estágio que você criou.

    root.databases[database_name].schemas[schema_name].drop()
    
    Copy
  4. Em vez de esperar que o Snowflake suspenda seu pool de computação, você também pode suspender explicitamente o pool de computação. Nesse caso, o Snowflake suspende todos os serviços em execução, aguarda a conclusão de todos os trabalhos em execução e, em seguida, suspende o pool de computação.

    root.compute_pool[compute_pool_name].suspend()
    
    Copy

Qual é o próximo passo?

Este tutorial demonstra o uso do Snowflake Python APIs para criar e gerenciar serviços e trabalhos do Snowpark Container Services. Para obter mais informações sobre o Snowflake Python APIs, consulte Snowflake Python APIs: Gerenciamento de objetos Snowflake com Python.