Tutorial 3: Crie um serviço e um trabalho usando APIs do Snowflake Python¶
Introdução¶
Em Tutorial 1 e Tutorial 2, você usa a interface SQL para criar um serviço e um trabalho do Snowpark Container Services. Neste tutorial, você usa as Snowflake Python APIs para criar o mesmo serviço e trabalho e, assim, explorar o uso do Snowflake Python APIs para gerenciar os recursos do Snowpark Container Services.
O tutorial usa um notebook Snowflake para executar o código Python, mas o código é independente do notebook e você pode executar o código em outros ambientes.
1: Configuração inicial¶
Nessa configuração inicial, você cria um notebook do Snowflake, importa as bibliotecas necessárias e define as constantes que são usadas pelas células nas etapas subsequentes.
Crie um notebook Snowflake
Crie um notebook Para obter instruções, consulte Criação de um novo notebook. Observe que o ambiente Python que você escolher no UI (execução no warehouse ou execução no contêiner) não importa.
No menu suspenso Pacotes, escolha o pacote «snowflake» e instale a versão mais recente da biblioteca Snowflake Python APIs.
(Opcional) Exclua as células fornecidas no notebook por padrão. Ao seguir as etapas deste tutorial, você adiciona células Python ao notebook.
Crie e execute a célula para importar as bibliotecas Python usadas por muitas células neste tutorial.
from snowflake.snowpark.context import get_active_session from snowflake.core import Root from snowflake.core import CreateMode
Crie e execute a célula para definir as constantes que você usará nas células subsequentes. Os valores fornecidos abaixo correspondem aos tutoriais 1 e 2. Opcionalmente, você pode alterar esses valores.
current_user = get_active_session().get_current_user() user_role_name = "test_role" compute_pool_name = "tutorial_compute_pool" warehouse_name = "tutorial_warehouse" database_name = "tutorial_db" schema_name = "data_schema" repo_name = "tutorial_repository" stage_name = "tutorial_stage" service_name = "echo_service" print("configured!")
2: Criar objetos Snowflake¶
Antes de criar um serviço, você precisa de objetos do Snowflake, como um banco de dados, um usuário, uma função, um pool de computação e um repositório de imagens. Alguns desses objetos são objetos com escopo de conta que exigem privilégios administrativos para serem criados. Os nomes dos objetos criados são definidos na etapa anterior.
2.1: Criar objetos Snowflake com escopo de conta¶
O código Python a seguir cria esses objetos:
Função (
test_role
). Você concede a essa função todos os privilégios necessários para criar e usar o serviço. No código, o usuário concede essa função ao usuário atual para permitir que ele crie e use o serviço.Banco de dados (
tutorial_db
) Na próxima etapa, você cria um esquema nesse banco de dados.Pool de computação (
tutorial_compute_pool
). Seu contêiner de serviço é executado nesse pool de computação.Warehouse (
tutorial_warehouse
). Quando o serviço se conecta ao Snowflake e executa consultas, esse warehouse é usado para executar as consultas.
Crie e execute a célula para criar esses objetos com escopo de conta usando a função ACCOUNTADMIN. Observe que o script cria recursos somente se eles não existirem. Os comentários no código mostram as declarações equivalentes em SQL.
from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse
session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)
# CREATE ROLE test_role;
root.roles.create(
Role(name=user_role_name),
mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)
# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
securable=Securables.role(user_role_name),
grantee=Grantees.user(name=current_user),
))
# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
# MIN_NODES = 1 MAX_NODES = 1
# INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
mode=CreateMode.if_not_exists,
compute_pool=ComputePool(
name=compute_pool_name,
instance_family="CPU_X64_XS",
min_nodes=1,
max_nodes=2,
)
)
# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
securable=Securables.compute_pool(compute_pool_name),
grantee=Grantees.role(name=user_role_name)
))
print(f"Created compute pool:", compute_pool_name)
# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
Database(name=database_name),
mode=CreateMode.if_not_exists)
# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.all_privileges],
securable=Securables.database(database_name),
grantee=Grantees.role(name=user_role_name),
))
print("Created database:", database_name)
# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
mode=CreateMode.if_not_exists)
# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.usage],
grantee=Grantees.role(name=user_role_name),
securable=Securables.warehouse(warehouse_name)
))
print("Created warehouse:", warehouse_name)
# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.bind_service_endpoint],
securable=Securables.current_account,
grantee=Grantees.role(name=user_role_name)
))
print("Done: GRANT BIND SERVICE ENDPOINT")
À medida que você cria recursos, o código também concede os privilégios necessários à função (test_role
) para que a função possa usar esses recursos. Além disso, observe que o serviço echo que você criou neste tutorial expõe um endpoint público. Esse ponto de extremidade público permite que outros usuários da sua conta acessem o serviço a partir da Web pública (ingresso). Para criar um serviço com um ponto de extremidade público, a função (test_role
) deve ter o privilégio BIND SERVICE ENDPOINT
na conta.
2.2 Criar objetos com escopo de esquema¶
O código Python nesta seção usa a função test_role
para criar um esquema e objetos nesse esquema. Você não precisa de privilégios administrativos para criar esses recursos.
Esquema (
data_schema
). Você cria um repositório de imagens, um serviço e um trabalho nesse esquema.Repositório de imagens (
tutorial_repository
). Você armazena a imagem do aplicativo nesse repositório.Estágio (
tutorial_stage
). O estágio foi criado apenas para fins ilustrativos. Embora não tenham sido demonstrados neste tutorial, os estágios podem ser usados para passar dados ou coletar dados dos seus serviços.
Observe que o script cria recursos somente se eles não existirem.
from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable
session = get_active_session()
session.use_role(user_role_name)
root = Root(session)
# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
Schema(name=schema_name),
mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)
# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
ImageRepository(name=repo_name),
mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)
repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")
#CREATE STAGE IF NOT EXISTS tutorial_stage
# DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
Stage(
name=stage_name,
directory_table=StageDirectoryTable(enable=True)),
mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
O código Python também imprime informações úteis sobre o repositório (o URL do repositório) que você usa ao enviar suas imagens para o repositório.
3: Criar uma imagem e carregar¶
Você faz o download do código localmente, conforme descrito em Tutorial 1, usa os comandos do Docker para criar a imagem e faz o upload dela para o repositório de imagens em sua conta.
Crie e execute a célula para obter o nome do host do registro de imagens e o URL do repositório de imagens.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
O código Python recupera o Objeto de recurso do repositório de imagens (
repo
), acessa o objeto do modelo e extrai o URL do repositório dele.Siga as etapas 1 e 2 do Tutorial 1 para fazer o download do código de serviço, criar uma imagem e carregá-la no repositório.
Crie e execute a célula para verificar se a imagem está no repositório.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
O código enumera as imagens do recurso de repositório de imagens (
repo
) e imprime o endereçoimage_path
para cada imagem.
4: Criar um serviço¶
Crie um serviço e uma função de serviço para se comunicar com o serviço.
Verifique se o pool de computação está pronto. Depois que você cria um pool de computação, o Snowflake leva algum tempo para provisionar todos os nós. Certifique-se de que o pool de computação esteja pronto antes de criar um serviço, pois os contêineres de serviço são executados no pool de computação especificado.
Crie e execute a célula para obter o status do pool de computação:
import time session = get_active_session() session.use_role(user_role_name) root = Root(session) cp = root.compute_pools[compute_pool_name] cpm = cp.fetch() print(cpm.state, cpm.status_message) if cpm.state == 'SUSPENDED': cp.resume() while cpm.state in ['STARTING', 'SUSPENDED']: time.sleep(5) cpm = cp.fetch() print(cpm.state, cpm.status_message)
O código obtém o modelo do pool de computação (
cpm
) do recurso do pool de computação (cp
) para recuperar o estado atual do pool de computação. Se o pool de computação for suspenso, o código retomará o pool de computação. O código faz um loop, fazendo uma pausa de cinco segundos a cada vez, até que o pool de computação não esteja mais no estado STARTING ou SUSPENDED.A última linha de saída deve ser «IDLE» ou «ACTIVE», o que indica que o pool de computação está pronto para executar seu serviço. Para obter mais informações, consulte Ciclo de vida do pool de computação. Se o pool de computação não estiver pronto, seus serviços não poderão ser iniciados.
Crie e execute a célula para criar o serviço echo.
from snowflake.core.service import Service, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url specification = f""" spec: containers: - name: echo image: {repo_url}/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true """ echo_service = schema.services.create(Service( name=service_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification), min_instances=1, max_instances=1), mode=CreateMode.if_not_exists) print("created service:", echo_service.name)
O código recupera o URL do repositório, conforme feito na etapa anterior. Em seguida, o código cria o
echo_service
usando uma especificação em linha e a imagem do repositório de imagens especificado.Como você pode ver no código Python, é fácil parametrizar os nomes dos recursos. A seguir, o comando SQL equivalente que cria um serviço, mas não usa parâmetros.
CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
Execute a célula para criar uma função de serviço (
my_echo_function
). Uma função de serviço é uma das maneiras de usar o serviço.from snowflake.core.function import ServiceFunction, FunctionArgument session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # CREATE FUNCTION my_echo_udf (inputtext VARCHAR) # RETURNS VARCHAR # SERVICE=echo_service # ENDPOINT=echoendpoint # AS '/echo'; svcfn = schema.functions.create(mode=CreateMode.or_replace, function=ServiceFunction( name="my_echo_function", arguments=[FunctionArgument(name="inputtext", datatype="TEXT")], returns="TEXT", service=service_name, endpoint="echoendpoint", path="/echo")) print("created service function:", svcfn.name_with_args)
O código chama o método
create
na coleçãofunctions
doschema
para criar a função de serviço (my_echo_function
).
5: Usar o serviço¶
Nesta seção, você usa o serviço da seguinte forma:
Invocar a função de serviço.
Usar um navegador para interagir com o ponto de extremidade público do serviço.
Invocar a função de serviço.
svcfn = schema.functions["my_echo_function(TEXT)"] print(svcfn.execute(["hello"]))
Snowflake envia uma solicitação POST ao ponto de extremidade do serviço (
echoendpoint
). Ao receber a solicitação, o serviço ecoa a cadeia de caracteres de entrada na resposta.Saída:
+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
Acesse de um navegador o ponto de extremidade público que o serviço expõe.
Obtenha o URL do endereço do ponto de extremidade público.
# helper to check if service is ready and return endpoint url def get_ingress_for_endpoint(svc, endpoint): for _ in range(10): # only try 10 times # Find the target endpoint. target_endpoint = None for ep in svc.get_endpoints(): if ep.is_public and ep.name == endpoint: target_endpoint = ep break; else: print(f"Endpoint {endpoint} not found") return None # Return endpoint URL or wait for it to be provisioned. if target_endpoint.ingress_url.startswith("Endpoints provisioning "): print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.") time.sleep(10) else: return target_endpoint.ingress_url print("Timed out waiting for endpoint to become available") endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint") print(f"https://{endpoint_url}/ui")
Cole o URL impresso em uma janela do navegador. Isso faz com que o serviço execute a função
ui()
(consulteecho_service.py
).Observe que na primeira vez que você acessar o URL do ponto de extremidade, será solicitado a fazer login no Snowflake. Para este teste, use o mesmo usuário usado para criar o serviço para garantir que o usuário tenha os privilégios necessários.
Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.
6: Criar um trabalho¶
No Tutorial 2, você usa a interface SQL para criar um trabalho do Snowpark Container Services. Nesta seção, você cria o mesmo trabalho usando o Snowflake Python APIs.
Crie e execute a célula para obter o nome do host do registro de imagens e o URL do repositório de imagens.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
O código Python recupera o objeto de recurso do repositório de imagens (
repo
), acessa o objeto modelo e extrai dele o URL do repositório.Siga as etapas 2 e 2 do Tutorial 2 para fazer o download do código de serviço, criar uma imagem e carregá-la no repositório.
Crie e execute a célula para verificar se a imagem está no repositório.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
O código enumera as imagens do recurso de repositório de imagens (
repo
) e imprime o endereçoimage_path
para cada imagem.Crie e execute a célula para criar o trabalho.
from snowflake.core.service import JobService, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url job_name = "test_job" # cleanup previous job if present. schema.services[job_name].drop()(if_exists=True) specification = f""" spec: containers: - name: main image: {repo_url}/my_job_image:latest env: SNOWFLAKE_WAREHOUSE: {warehouse_name} args: - "--query=select current_time() as time,'hello'" - "--result_table=results" """ job = schema.services.execute_job(JobService( name=job_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification))) print("executed job:", job.name, "status:", job.fetch().status) print("job logs:") print(job.get_service_logs(0, "main"))
O trabalho executa a consulta fornecida e armazena os resultados em uma tabela.
Execute a célula a seguir para revisar o resultado gravado na tabela. Esse código usa o Snowpark Python para consultar essa tabela.
session = get_active_session() session.use_role(user_role_name) # show that above job wrote to results table session.sql(f"select * from {database_name}.{schema_name}.results").collect()
7: Limpeza¶
Interrompa o serviço e descarte-o. Depois de descartar o serviço, o Snowflake, por padrão, suspende automaticamente o pool de computação (supondo que não haja outros serviços e serviços de trabalho em execução). Para obter mais informações, consulte Ciclo de vida do pool de computação.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # now let's clean up schema.functions["my_echo_function(TEXT)"].drop() schema.services[service_name].drop()
Descarte o repositório de imagens para evitar pagar pelo armazenamento. Observe que, se você tiver outras imagens armazenadas no repositório, elas serão excluídas.
schema.image_repositories[repo_name].drop()
Descartar o esquema. O descarte de um esquema também elimina todos os objetos desse esquema. Para este tutorial, isso inclui o serviço, a função, o repositório de imagens e o estágio que você criou.
root.databases[database_name].schemas[schema_name].drop()
Em vez de esperar que o Snowflake suspenda seu pool de computação, você também pode suspender explicitamente o pool de computação. Nesse caso, o Snowflake suspende todos os serviços em execução, aguarda a conclusão de todos os trabalhos em execução e, em seguida, suspende o pool de computação.
root.compute_pool[compute_pool_name].suspend()
Qual é o próximo passo?¶
Este tutorial demonstra o uso do Snowflake Python APIs para criar e gerenciar serviços e trabalhos do Snowpark Container Services. Para obter mais informações sobre o Snowflake Python APIs, consulte Snowflake Python APIs: Gerenciamento de objetos Snowflake com Python.