Tutorial 3: Einen Dienst und einen Auftrag mithilfe der Snowflake Python-APIs erstellen¶
Einführung¶
In Tutorial 1 und Tutorial 2 verwenden Sie die SQL-Schnittstelle, um einen Snowpark Container Services-Dienst und -Job zu erstellen. In diesem Tutorial verwenden Sie die Snowflake Python APIs, um denselben Dienst und Job zu erstellen und so die Verwendung von Snowflake Python APIs zur Verwaltung von Snowpark Container Services-Ressourcen zu erkunden.
Das Tutorial verwendet ein Snowflake-Notebook zur Ausführung des Python-Codes. Der Code ist jedoch unabhängig vom Notebook und kann auch in anderen Umgebungen ausgeführt werden.
1: Erstkonfiguration¶
In dieser Erstkonfiguration erstellen Sie ein Snowflake-Notebook, importieren die benötigten Bibliotheken und definieren Konstanten, die von den Zellen in den nachfolgenden Schritten verwendet werden.
Snowflake Notebook erstellen
Notebook erstellen Eine Anweisung dazu finden Sie unter Neues Notebook erstellen. Beachten Sie, dass die Python-Umgebung, die Sie im UI wählen (Im Warehouse ausführen oder im Container ausführen), keine Rolle spielt.
Wählen Sie aus dem Dropdown-Menü Pakete das Paket „Snowflake“ und installieren Sie die neueste Version der Snowflake Python APIs-Bibliothek.
(Optional) Löschen Sie die Zellen, die standardmäßig im Notebook enthalten sind. Während Sie die Schritte in diesem Tutorial ausführen, fügen Sie Python-Zellen zum Notebook hinzu.
Erstellen und starten Sie die Zelle zum Importieren von Python-Bibliotheken, die von vielen Zellen in diesem Tutorial verwendet werden.
from snowflake.snowpark.context import get_active_session from snowflake.core import Root from snowflake.core import CreateMode
Erstellen Sie die Zelle, um Konstanten zu definieren, die Sie in nachfolgenden Zellen verwenden, und führen Sie sie aus Die unten angegebenen Werte entsprechen den Tutorials 1 und 2. Sie können diese Werte optional ändern.
current_user = get_active_session().get_current_user() user_role_name = "test_role" compute_pool_name = "tutorial_compute_pool" warehouse_name = "tutorial_warehouse" database_name = "tutorial_db" schema_name = "data_schema" repo_name = "tutorial_repository" stage_name = "tutorial_stage" service_name = "echo_service" print("configured!")
2: Snowflake-Objekte erstellen¶
Bevor Sie einen Dienst erstellen können, benötigen Sie Snowflake-Objekte, wie z. B. eine Datenbank, einen Benutzer, eine Rolle, einen Computepool und ein Image-Repository. Bei einigen dieser Objekte handelt es sich um kontobezogene Objekte, für deren Erstellung Administratorberechtigungen erforderlich sind. Die Namen der erstellten Objekte werden im vorangegangenen Schritt definiert.
2.1: Snowflake-Objekte für Konten erstellen¶
Der folgende Python-Code erstellt diese Objekte:
Rolle (
test_role
). Sie erteilen dieser Rolle alle Berechtigungen, die zur Erstellung und Nutzung des Dienstes erforderlich sind. Im Code erteilen Sie diese Rolle dem aktuellen Benutzer, damit dieser den Dienst erstellen und nutzen kann.Datenbank (
tutorial_db
). Im nächsten Schritt erstellen Sie ein Schema in dieser Datenbank.Computepool (
tutorial_compute_pool
). Ihr Dienstcontainer wird in diesem Computepool ausgeführt.Warehouse (
tutorial_warehouse
). Wenn der Dienst eine Verbindung zu Snowflake herstellt und Abfragen ausführt, wird dieses Warehouse zur Ausführung der Abfragen verwendet.
Erstellen Sie die Zelle und führen Sie sie aus, um diese Objekte für Konten mit der Rolle ACCOUNTADMIN zu erstellen. Beachten Sie, dass das Skript Ressourcen nur dann erstellt, wenn sie nicht vorhanden sind. Die Kommentare im Code zeigen die entsprechenden SQL-Anweisungen.
from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse
session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)
# CREATE ROLE test_role;
root.roles.create(
Role(name=user_role_name),
mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)
# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
securable=Securables.role(user_role_name),
grantee=Grantees.user(name=current_user),
))
# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
# MIN_NODES = 1 MAX_NODES = 1
# INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
mode=CreateMode.if_not_exists,
compute_pool=ComputePool(
name=compute_pool_name,
instance_family="CPU_X64_XS",
min_nodes=1,
max_nodes=2,
)
)
# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
securable=Securables.compute_pool(compute_pool_name),
grantee=Grantees.role(name=user_role_name)
))
print(f"Created compute pool:", compute_pool_name)
# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
Database(name=database_name),
mode=CreateMode.if_not_exists)
# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.all_privileges],
securable=Securables.database(database_name),
grantee=Grantees.role(name=user_role_name),
))
print("Created database:", database_name)
# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
mode=CreateMode.if_not_exists)
# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.usage],
grantee=Grantees.role(name=user_role_name),
securable=Securables.warehouse(warehouse_name)
))
print("Created warehouse:", warehouse_name)
# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.bind_service_endpoint],
securable=Securables.current_account,
grantee=Grantees.role(name=user_role_name)
))
print("Done: GRANT BIND SERVICE ENDPOINT")
Wenn Sie Ressourcen erstellen, erteilt der Code der Rolle auch die erforderlichen Berechtigungen (test_role
), damit die Rolle diese Ressourcen nutzen kann. Beachten Sie außerdem, dass der Echodienst, den Sie in diesem Tutorial erstellen, einen öffentlichen Endpunkt bereitstellt. Dieser öffentliche Endpunkt ermöglicht anderen Benutzern Ihres Kontos den Zugriff auf den Dienst über das öffentliche Web (Eingang). Um einen Dienst mit einem öffentlichen Endpunkt zu erstellen, muss die Rolle (test_role
) über die Berechtigung BIND SERVICE ENDPOINT
für das Konto verfügen.
2.2 Objekte im Bereich des Schemas erstellen¶
Der Python-Code in diesem Abschnitt verwendet die Rolle test_role
, um ein Schema und Objekte in diesem Schema zu erstellen. Sie benötigen keine Administratorberechtigungen, um diese Ressourcen zu erstellen.
Schema: (
data_schema
). In diesem Schema erstellen Sie ein Image-Repository, einen Dienst und einen Job.Image-Repository (
tutorial_repository
). Sie speichern Ihr Anwendungsimage in diesem Repository.Stagingbereich (
tutorial_stage
). Der Stagingbereich wird nur zur Veranschaulichung erstellt. Auch wenn dies in diesem Tutorial nicht gezeigt wird, können Stagingbereiche verwendet werden, um Daten an Ihre Dienste weiterzugeben oder Daten von ihnen zu sammeln.
Beachten Sie, dass das Skript Ressourcen nur dann erstellt, wenn sie nicht vorhanden sind.
from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable
session = get_active_session()
session.use_role(user_role_name)
root = Root(session)
# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
Schema(name=schema_name),
mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)
# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
ImageRepository(name=repo_name),
mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)
repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")
#CREATE STAGE IF NOT EXISTS tutorial_stage
# DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
Stage(
name=stage_name,
directory_table=StageDirectoryTable(enable=True)),
mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Der Python-Code gibt auch nützliche Informationen über das Repository aus (das Repository-URL), die Sie beim Pushen Ihrer Images in das Repository verwenden.
3: Image erstellen und hochladen¶
Sie laden den Code lokal herunter, wie in Tutorial 1 beschrieben, verwenden Docker-Befehle, um das Image zu erstellen, und laden es in das Image-Repository in Ihrem Konto hoch.
Erstellen Sie die Zelle und führen Sie sie aus, um den Hostnamen Ihrer Image-Registry und die URL zu Ihrem Image-Repository zu erhalten.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Der Python-Code ruft die Image-Repository-Ressourcenobjekt ab (
repo
), greift auf das Modellobjekt zu und extrahiert daraus das Repository URL.Führen Sie in Tutorial 1 die Schritte 1 und 2, um den Dienstcode herunterzuladen, ein Image zu erstellen und es in das Repository hochzuladen.
Erstellen Sie die Zelle und führen Sie sie aus, um zu überprüfen, ob sich das Image im Repository befindet.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
Der Code listet die Images aus der Image-Repository-Ressource (
repo
) und gibt denimage_path
für jedes Image aus.
4: Dienst erstellen¶
Erstellen Sie einen Dienst und eine Dienstfunktion zur Kommunikation mit dem Dienst.
Überprüfen Sie, ob der Computepool bereit ist. Nachdem Sie einen Computepool erstellt haben, dauert es einige Zeit, bis Snowflake alle Knoten bereitgestellt hat. Stellen Sie sicher, dass der Computepool bereit ist, bevor Sie einen Dienst erstellen, da Dienstcontainer innerhalb des angegebenen Computepools ausgeführt werden.
Erstellen Sie die Zelle und führen Sie sie aus, um den Status des Computepools abzurufen:
import time session = get_active_session() session.use_role(user_role_name) root = Root(session) cp = root.compute_pools[compute_pool_name] cpm = cp.fetch() print(cpm.state, cpm.status_message) if cpm.state == 'SUSPENDED': cp.resume() while cpm.state in ['STARTING', 'SUSPENDED']: time.sleep(5) cpm = cp.fetch() print(cpm.state, cpm.status_message)
Der Code holt das Computepool-Modell (
cpm
) aus der Computepool-Ressource (cp
), um den aktuellen Computepool-Status abzurufen. Wenn der Computepool angehalten wurde, setzt der Code den Computepool fort. Der Code durchläuft eine Schleife und hält jedes Mal fünf Sekunden lang an, bis sich der Computepool nicht mehr im Zustand STARTING oder SUSPENDED befindet.Die letzte Zeile der Ausgabe sollte „IDLE“ oder „ACTIVE“ lauten, was bedeutet, dass der Computepool bereit ist, Ihren Dienst auszuführen. Weitere Informationen finden Sie unter Lebenszyklus von Computepools. Wenn der Computepool nicht bereit ist, können Ihre Dienste nicht gestartet werden.
Erstellen Sie die Zelle und führen Sie sie aus, um den Echodienst zu erstellen.
from snowflake.core.service import Service, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url specification = f""" spec: containers: - name: echo image: {repo_url}/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true """ echo_service = schema.services.create(Service( name=service_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification), min_instances=1, max_instances=1), mode=CreateMode.if_not_exists) print("created service:", echo_service.name)
Der Code ruft die Repository-URL ab, wie im vorangegangenen Schritt geschehen. Der Code erstellt dann den
echo_service
unter Verwendung einer Inline-Spezifikation und des Images aus dem angegebenen Image-Repository.Wie Sie aus dem Python-Code ersehen können, ist es einfach, die Namen von Ressourcen zu parametrisieren. Im Folgenden finden Sie den entsprechenden SQL-Befehl, der einen Dienst erstellt, aber keine Parameter verwendet.
CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
Führen Sie die Zelle aus, um eine Dienstfunktion zu erstellen (
my_echo_function
). Eine Dienstfunktion ist eine der Möglichkeiten, den Dienst zu nutzen.from snowflake.core.function import ServiceFunction, FunctionArgument session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # CREATE FUNCTION my_echo_udf (inputtext VARCHAR) # RETURNS VARCHAR # SERVICE=echo_service # ENDPOINT=echoendpoint # AS '/echo'; svcfn = schema.functions.create(mode=CreateMode.or_replace, function=ServiceFunction( name="my_echo_function", arguments=[FunctionArgument(name="inputtext", datatype="TEXT")], returns="TEXT", service=service_name, endpoint="echoendpoint", path="/echo")) print("created service function:", svcfn.name_with_args)
Der Code ruft die Methode
create
in der Sammlungfunctions
vonschema
auf, um die Dienstfunktion (my_echo_function
) zu erstellen.
5: Dienst verwenden¶
In diesem Abschnitt verwenden Sie den Dienst wie folgt:
Rufen Sie die Dienstfunktion auf.
Verwenden Sie einen Browser, um mit dem öffentlichen Endpunkt des Dienstes zu interagieren.
Rufen Sie die Dienstfunktion auf.
svcfn = schema.functions["my_echo_function(TEXT)"] print(svcfn.execute(["hello"]))
Snowflake sendet eine POST-Anforderung an den Dienstendpunkt (
echoendpoint
). Nach Erhalt der Anfrage gibt der Dienst die Eingabezeichenfolge in der Antwort wieder.Ausgabe:
+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
Greifen Sie von einem Browser aus auf den öffentlichen Endpunkt zu, den der Dienst zur Verfügung stellt.
Rufen Sie die URL des öffentlichen Endpunkts ab.
# helper to check if service is ready and return endpoint url def get_ingress_for_endpoint(svc, endpoint): for _ in range(10): # only try 10 times # Find the target endpoint. target_endpoint = None for ep in svc.get_endpoints(): if ep.is_public and ep.name == endpoint: target_endpoint = ep break; else: print(f"Endpoint {endpoint} not found") return None # Return endpoint URL or wait for it to be provisioned. if target_endpoint.ingress_url.startswith("Endpoints provisioning "): print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.") time.sleep(10) else: return target_endpoint.ingress_url print("Timed out waiting for endpoint to become available") endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint") print(f"https://{endpoint_url}/ui")
Fügen Sie die ausgegebene URL in ein Browserfenster ein. Dies veranlasst den Dienst, die Funktion
ui()
auszuführen (sieheecho_service.py
).Beachten Sie, dass Sie beim ersten Zugriff auf den Endpunkt-URL aufgefordert werden, sich bei Snowflake anzumelden. Verwenden Sie für diesen Test denselben Benutzer, mit dem Sie den Dienst erstellt haben, um sicherzustellen, dass der Benutzer über die erforderlichen Berechtigungen verfügt.
Geben Sie in das Feld Input die Zeichenfolge „Hello“ ein, und drücken Sie die Eingabetaste.
6: Einen Job erstellen¶
In Tutorial 2 verwenden Sie die SQL-Schnittstelle, um einen Snowpark Container Services-Job zu erstellen. In diesem Abschnitt erstellen Sie denselben Job unter Verwendung von Snowflake Python APIs.
Erstellen Sie die Zelle und führen Sie sie aus, um den Hostnamen Ihrer Image-Registry und die URL zu Ihrem Image-Repository zu erhalten.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Der Python-Code ruft das Image-Repository Ressourcenobjekt (
repo
) ab, greift auf das Modellobjekt zu und extrahiert daraus die Repository-URL.Führen Sie in Tutorial 2 die Schritten 1 und 2, um den Dienstcode herunterzuladen, ein Image zu erstellen und es in das Repository hochzuladen.
Erstellen Sie die Zelle und führen Sie sie aus, um zu überprüfen, ob sich das Image im Repository befindet.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
Der Code listet die Images aus der Image-Repository-Ressource (
repo
) und gibt denimage_path
für jedes Image aus.Erstellen Sie die Zelle und führen Sie sie aus, um den Job zu erstellen.
from snowflake.core.service import JobService, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url job_name = "test_job" # cleanup previous job if present. schema.services[job_name].drop()(if_exists=True) specification = f""" spec: containers: - name: main image: {repo_url}/my_job_image:latest env: SNOWFLAKE_WAREHOUSE: {warehouse_name} args: - "--query=select current_time() as time,'hello'" - "--result_table=results" """ job = schema.services.execute_job(JobService( name=job_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification))) print("executed job:", job.name, "status:", job.fetch().status) print("job logs:") print(job.get_service_logs(0, "main"))
Der Job führt die angegebene Abfrage aus und speichert die Ergebnisse in einer Tabelle.
Führen Sie die folgende Zelle aus, um das in die Tabelle geschriebene Ergebnis zu überprüfen. Dieser Code verwendet Snowpark Python, um diese Tabelle abzufragen.
session = get_active_session() session.use_role(user_role_name) # show that above job wrote to results table session.sql(f"select * from {database_name}.{schema_name}.results").collect()
7: Bereinigen¶
Stoppen Sie den Dienst und beenden Sie ihn. Nach dem Beenden des Dienstes hält Snowflake standardmäßig den Computepool automatisch an (vorausgesetzt, es laufen keine anderen Dienste und Jobdienste). Weitere Informationen finden Sie unter Lebenszyklus von Computepools.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # now let's clean up schema.functions["my_echo_function(TEXT)"].drop() schema.services[service_name].drop()
Löschen Sie das Image-Repository, um nicht für die Speicherung zu bezahlen. Beachten Sie, dass alle anderen Bilder, die Sie im Repository gespeichert haben, gelöscht werden.
schema.image_repositories[repo_name].drop()
Schema löschen Wenn Sie ein Schema löschen, werden auch alle Objekte in diesem Schema gelöscht. Für dieses Tutorial gehören dazu der Dienst, die Funktion, das Image Repository und der von Ihnen erstellte Stagingbereich.
root.databases[database_name].schemas[schema_name].drop()
Anstatt darauf zu warten, dass Snowflake Ihren Computepool anhält, können Sie den Computepool auch explizit anhalten. In diesem Fall hält Snowflake alle laufenden Dienste an und wartet, bis alle laufenden Jobs beendet sind, und hält dann den Computepool an.
root.compute_pool[compute_pool_name].suspend()
Nächste Schritte¶
Dieses Tutorial zeigt die Verwendung von Snowflake Python APIs zur Erstellung und Verwaltung von Snowpark Container Services-Diensten und -Jobs. Weitere Informationen über Snowflake Python APIs finden Sie unter Snowflake Python APIs: Verwalten von Snowflake-Objekten mit Python.