Tutorial 1: Snowpark Container Services-Dienst erstellen¶
Einführung¶
Nachdem Sie die grundlegende Einrichtung abgeschlossen haben, können Sie nun einen Dienst erstellen. In diesem Tutorial erstellen Sie einen Dienst (mit dem Namen echo_service
), der einfach den von Ihnen eingegebenen Text als Echo wieder zurückgibt. Wenn die Eingabezeichenfolge beispielsweise „Hello World“ ist, gibt der Dienst „I said, Hello World“ zurück.
Dieses Tutorial besteht aus zwei Teilen:
Teil 1: Dienst erstellen und testen. Sie laden den für dieses Tutorial bereitgestellten Code herunter und befolgen die schrittweise Anleitung:
Dienstcode für dieses Tutorial herunterladen
Docker-Image für Snowpark Container Services erstellen und Image in Repository im eigenen Konto hochladen
Dienst erstellen, indem die Dienstspezifikationsdatei und der Computepool für die Ausführung des Dienstes bereitgestellt werden
Dienstfunktion für die Kommunikation mit dem Dienst bereitstellen
Dienst einsetzen. Dazu wird eine Echo-Anforderung an den Dienst gesendet und die Antwort überprüft.
Teil 2: Erläuterungen zum Dienst. Dieser Abschnitt bietet eine Übersicht zum Dienstcode und zeigt auf, wie die verschiedenen Komponenten zusammenarbeiten.
1: Dienstcode herunterladen¶
Zum Erstellen des Echo-Dienstes wird ein Code (eine Python-Anwendung) bereitgestellt.
Laden Sie
SnowparkContainerServices-Tutorials.zip
herunter.Entpacken Sie den Inhalt, der ein Verzeichnis für jedes Tutorial enthält. Das Verzeichnis
Tutorial-1
enthält die folgenden Dateien:Dockerfile
echo_service.py
templates/basic_ui.html
2: Image erstellen und hochladen¶
Erstellen Sie ein Image für die linux/amd64-Plattform, die von Snowpark Container Services unterstützt wird, und laden Sie das Image dann in das Image-Repository in Ihrem Konto hoch (siehe Grundlegende Einrichtung).
Sie benötigen Informationen zum Repository (die Repository-URL und den Hostnamen der Registry), bevor Sie das Image erstellen und hochladen können. Weitere Informationen dazu finden Sie unter Registry und Repositorys.
Informationen zum Repository abrufen
Um die Repository-URL zu erhalten, führen Sie den SQL-Befehl SHOW IMAGE REPOSITORIES aus.
SHOW IMAGE REPOSITORIES;
Die URL ist in der Spalte
repository_url
der Ausgabe enthalten. Siehe folgendes Beispiel:<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
Der Hostname in der Repository-URL ist der Name des Registry-Hosts. Siehe folgendes Beispiel:
<orgname>-<acctname>.registry.snowflakecomputing.com
Image erstellen und in das Repository hochladen
Öffnen Sie ein Terminalfenster, und wechseln Sie in das Verzeichnis, das die entpackten Dateien enthält.
Um ein Docker-Image zu erstellen, führen Sie den folgenden Befehl
docker build
mithilfe der Docker-CLI aus. Beachten Sie, dass der Befehl das aktuelle Arbeitsverzeichnis (.
) alsPATH
für die Dateien angibt, die für das Erstellen des Images verwendet werden sollen.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Für
image_name
verwenden Siemy_echo_service_image:latest
:
Beispiel
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
Laden Sie das Image in das Repository in Ihrem Snowflake-Konto hoch. Damit Docker ein Image in Ihrem Namen in Ihr Repository hochladen kann, müssen Sie zunächst Docker mit der Registry authentifizieren.
Um Docker mit der Image-Registry zu authentifizieren, führen Sie den folgenden Befehl aus.
docker login <registry_hostname> -u <username>
Geben Sie dabei für
username
Ihren Snowflake-Benutzernamen an. Docker fordert Sie zur Eingabe Ihres Kennworts auf.
Führen Sie den folgenden Befehl aus, um das Image hochzuladen:
docker push <repository_url>/<image_name>
Beispiel
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
3: Dienst erstellen¶
In diesem Abschnitt erstellen Sie einen Dienst und dann eine Dienstfunktion, die mit dem Dienst kommuniziert.
Um einen Dienst zu erstellen, benötigen Sie Folgendes:
Einen Computepool. Snowflake führt Ihren Dienst in dem angegebenen Computepool aus. Sie haben einen Computepool als Teil der grundlegenden Einrichtung erstellt.
Eine Dienstspezifikation. Diese Spezifikation liefert Snowflake die Informationen, die zur Konfiguration und Ausführung Ihres Dienstes erforderlich sind. Weitere Informationen dazu finden Sie unter Snowpark Container Services: Verwenden von Diensten. In diesem Tutorial geben Sie die Spezifikation inline im Befehl CREATE SERVICE an. Sie können die Spezifikation auch in einer Datei in Ihrem Snowflake-Stagingbereich speichern und die Dateiinformationen im Befehl CREATE SERVICE bereitstellen, wie in Tutorial 2 gezeigt.
Eine Dienstfunktion ist eine der verfügbaren Methoden zur Kommunikation mit Ihrem Dienst. Eine Dienstfunktion ist eine benutzerdefinierte Funktion (UDF), die Sie mit dem Dienstendpunkt verknüpfen. Wenn die Dienstfunktion ausgeführt wird, sendet sie eine Anforderung an den Dienstendpunkt und erhält eine Antwort.
Überprüfen Sie, ob der Computepool bereit ist und ob Sie sich im korrekten Kontext befinden, um den Dienst zu erstellen.
Zuvor haben Sie den Kontext im Schritt Grundlegende Einrichtung festgelegt. Um sicherzustellen, dass Sie sich in diesem Schritt im korrekten Kontext für die SQL-Anweisungen befinden, führen Sie Folgendes aus:
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
Um sicherzustellen, dass der unter Grundlegende Einrichtung erstellte Computepool bereit ist, führen Sie
DESCRIBE COMPUTE POOL
aus, und überprüfen Sie dann, obstate
den WertACTIVE
oderIDLE
hat. Wenn diestate
den WertSTARTING
hat, müssen Sie warten, bisstate
entwederACTIVE
oderIDLE
ist.
DESCRIBE COMPUTE POOL tutorial_compute_pool;
Um den Dienst zu erstellen, führen Sie mit der Rolle
test_role
den folgenden Befehl aus:CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
Bemerkung
Wenn bereits ein Dienst mit diesem Namen existiert, verwenden Sie den Befehl DROP SERVICE, um den zuvor erstellten Dienst zu löschen, und erstellen Sie dann diesen Dienst.
Führen Sie die folgenden SQL-Befehle aus, um detaillierte Informationen zu dem gerade erstellten Dienst zu erhalten. Weitere Informationen dazu finden Sie unter Snowpark Container Services: Verwenden von Diensten.
Um die Dienste in Ihrem Konto aufzulisten, führen Sie den Befehl SHOW SERVICES aus:
SHOW SERVICES;
Um den Status Ihres Dienstes zu erfahren, rufen Sie die Systemfunktion SYSTEM$GET_SERVICE_STATUS auf:
SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
Um Informationen zu Ihrem Dienst zu erhalten, führen Sie den Befehl DESCRIBE SERVICE aus:
DESCRIBE SERVICE echo_service;
Um eine Dienstfunktion zu erstellen, führen Sie den folgenden Befehl aus:
CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo';
Beachten Sie Folgendes:
Die Eigenschaft SERVICE verknüpft die UDF mit dem Dienst
echo_service
.Die Eigenschaft ENDPOINT verknüpft die UDF mit dem Endpunkt
echoendpoint
innerhalb des Dienstes.„AS ‚/echo‘“ gibt den HTTP-Pfad zum Echo-Server an. Sie finden diesen Pfad im Dienstcode (
echo_service.py
).
4: Dienst einsetzen¶
Richten Sie zunächst den Kontext für die SQL-Anweisungen in diesem Abschnitt ein, indem Sie Folgendes ausführen:
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
Jetzt können Sie mit dem Echo-Dienst kommunizieren.
Verwenden einer Dienstfunktion: Sie können die Dienstfunktion in einer Abfrage aufrufen. Die Beispiel-Dienstfunktion (
my_echo_udf
) kann als Eingabe entweder eine einzelne Zeichenfolge oder eine Liste von Zeichenfolgen als Eingabe annehmen.Beispiel 1.1: Einzelne Zeichenfolge übergeben
Um die Dienstfunktion
my_echo_udf
aufzurufen, führen Sie die folgende SELECT-Anweisung aus und übergeben genau eine Eingabezeichenfolge ('hello'
):SELECT my_echo_udf('hello!');
Snowflake sendet eine POST-Anforderung an den Dienstendpunkt (
echoendpoint
). Nach Erhalt der Anforderung gibt der Dienst die Eingabezeichenfolge als Echo in der Antwort zurück.+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
Beispiel 1.2: Liste von Zeichenfolge übergeben
Wenn Sie eine Liste von Zeichenfolgen an die Dienstfunktion übergeben, fasst Snowflake diese Eingabezeichenfolgen in einem Batch zusammen und sendet eine Serie von POST-Anforderungen an den Dienst. Nachdem der Dienst alle Zeichenfolgen verarbeitet hat, kombiniert Snowflake die Ergebnisse und gibt sie zurück.
Im folgenden Beispiel wird eine Tabellenspalte als Eingabe an die Dienstfunktion übergeben.
Erstellen Sie eine Tabelle mit mehreren Zeichenfolgen:
CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
Überprüfen Sie, ob die Tabelle erstellt wurde, indem Sie folgende Anweisung ausführen:
SELECT * FROM messages;
Um die Dienstfunktion aufzurufen, führen Sie die folgende SELECT-Anweisung aus und übergeben dabei Tabellenzeilen als Eingabe:
SELECT my_echo_udf(message_text) FROM messages;
Ausgabe:
+---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+
Webbrowser verwenden: Der Dienst stellt den Endpunkt öffentlich zur Verfügung (siehe Inline-Spezifikation im Befehl CREATE SERVICE). Daher können Sie sich bei einer Web-UI anmelden, die der Dienst im Internet bereitstellt, und dann von einem Webbrowser aus Anforderungen an den Dienst senden.
Ermitteln Sie die URL des öffentlichen Endpunkts, den der Dienst bereitstellt:
SHOW ENDPOINTS IN SERVICE echo_service;
Die Spalte
ingress_url
in der Antwort enthält die URL.Beispiel
p6bye-myorg-myacct.snowflakecomputing.app
Hängen Sie
/ui
an die Endpunkt-URL an, und geben Sie diese Zeichenfolge in den Webbrowser ein. Dies veranlasst den Dienst, die Funktionui()
auszuführen (sieheecho_service.py
).Beachten Sie, dass Sie beim ersten Zugriff auf den Endpunkt-URL aufgefordert werden, sich bei Snowflake anzumelden. Verwenden Sie für diesen Test denselben Benutzer, mit dem Sie den Dienst erstellt haben, um sicherzustellen, dass der Benutzer über die erforderlichen Berechtigungen verfügt.
Geben Sie in das Feld Input die Zeichenfolge „Hello“ ein, und drücken Sie die Eingabetaste.
Bemerkung
Sie können auf den öffentlichen Endpunkt programmgesteuert zugreifen. Ein Beispiel für Code finden Sie unter Zugriff auf öffentliche Endpunkte von außerhalb von Snowflake und Authentifizierung. Beachten Sie, dass Sie im Code an die Endpunkt-
/ui
die Pfadangabe URL anhängen müssen,damit Snowflake die Anfrage an dieui()
-Funktion im Dienstcode weiterleiten kann.
5: (Optional) Greifen Sie auf den öffentlichen Endpunkt programmgesteuert zu¶
Im vorangegangenen Abschnitt haben Sie den Echo-Dienst mit Hilfe eines Browsers getestet. Im Browser haben Sie auf den öffentlichen Endpunkt (Ingress-Endpunkt) zugegriffen und Anfragen über die Weboberfläche UI gesendet, das der Dienste bereitstellt. In diesem Abschnitt testen Sie denselben öffentlichen Endpunkt programmgesteuert.
Das Beispiel verwendet Schlüsselpaar-Authentifizierung. Mit dem von Ihnen bereitgestellten Schlüsselpaar erzeugt der Beispielcode zunächst ein JSON-Web Token (JWT) und tauscht dann das Token mit Snowflake gegen ein OAuth-Token aus. Der Code verwendet dann das OAuth-Token zur Authentifizierung bei der Kommunikation mit dem öffentlichen Endpunkt des Echo-Dienstes.
Voraussetzungen¶
Stellen Sie sicher, dass Sie die folgenden Informationen haben:
Ingress URL des öffentlichen Endpunktes. Führen Sie den Befehl SHOW ENDPOINTS IN SERVICE aus, um den URL zu erhalten:
SHOW ENDPOINTS IN SERVICE echo_service;
Der Name Ihres Snowflake-Kontos. Weitere Informationen finden Sie unter Gemeinsame Einrichtung: Überprüfen Sie, ob Sie bereit sind, mit fortzufahren.
Ihr Snowflake-Konto URL: Es lautet
<acctname>.snowflakecomputing.com
.Benutzername im Snowflake Konto. Dies ist der Benutzer, den Sie unter Gemeinsame Einrichtung: Snowflake-Objekte erstellen gewählt haben. Sie melden sich bei Snowflake als dieser Benutzer an und testen den programmgesteuerten Zugriff.
Rollenname: Sie haben eine Rolle (
test_role
) als Teil der gemeinsamen Einrichtung erstellt. Der Benutzer nimmt diese Rolle an, um Aktionen durchzuführen.
Setup¶
Folgen Sie den Schritten, um programmgesteuert mit dem Echo-Dienst zu kommunizieren. Mit dem mitgelieferten Python-Code senden Sie Anfragen an den öffentlichen Endpunkt, den der Echo-Dienst zur Verfügung stellt.
Erstellen Sie an einer Eingabeaufforderung ein Verzeichnis und navigieren Sie dorthin.
Konfigurieren Sie die Authentifizierung des Schlüsselpaares für den Benutzer.
Erzeugen Sie ein Schlüsselpaar <label-configuring_key_pair_authentication>:
Erzeugen Sie einen privaten Schlüssel. Um die Übungsschritte zu vereinfachen, erzeugen Sie einen unverschlüsselten privaten Schlüssel. Sie können auch einen verschlüsselten privaten Schlüssel verwenden, aber dann müssen Sie das Kennwort eingeben.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
Erzeugen Sie einen öffentlichen Schlüssel (
rsa_key.pub
), indem Sie auf den von Ihnen erstellten privaten Schlüssel verweisen.openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Vergewissern Sie sich, dass Sie den privaten Schlüssel und den öffentlichen Schlüssel in dem Verzeichnis generiert haben.
Weisen Sie den öffentlichen Schlüssel dem Benutzer zu, den Sie zum Testen des programmatischen Zugriffs verwenden. Damit kann der Benutzer den Schlüssel für die Authentifizierung angeben.
ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
Speichern Sie den mitgelieferten Beispielcode in Python-Dateien.
Speichern Sie den folgenden Code in
generateJWT.py
.# To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object): """ Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the generated token and only regenerates the token if a specified period of time has passed. """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA): """ __init__ creates an object that generates JWTs for the specified user, account identifier, and private key. :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator. :param user: The Snowflake username. :param private_key_file_path: Path to the private key file used for signing the JWTs. :param lifetime: The number of minutes (as a timedelta) during which the key will be valid. :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT. """ logger.info( """Creating JWTGenerator with arguments account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text: """ Prepare the account identifier for use in the JWT. For the JWT, the account identifier must not include the subdomain or any region or cloud provider information. :param raw_account: The specified account identifier. :return: The account identifier in a form that can be used to generate the JWT. """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text: """ Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the specified renewal time has passed. :return: the new token """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text: """ Given a private key in PEM format, return the public key fingerprint. :param private_key: private key string :return: public key fingerprint """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main()
Speichern Sie den folgenden Code in
access-via-keypair.py
.from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.com') args = cli_parser.parse_args() return args if __name__ == "__main__": main()
Anfragen programmgesteuert an den Dienstendpunkt senden¶
Führen Sie den Python-Code access-via-keypair.py
aus, um den öffentlichen Endpunkt des Echo-Dienst aufzurufen.
python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui
Weitere Informationen zu account-identifier
finden Sie unter Kontobezeichner.
So funktioniert die Authentifizierung¶
Der Code wandelt zunächst das bereitgestellte Schlüsselpaar in ein JWT-Token um. Es sendet dann das JWT-Token an Snowflake, um ein OAuth-Token zu erhalten. Schließlich verwendet der Code das OAuth-Token, um eine Verbindung zu Snowflake herzustellen und auf den öffentlichen Endpunkt zuzugreifen. Genauer gesagt, bewirkt der Code Folgendes:
Ruft die Funktion
_get_token(args)
auf, um ein JWT-Token aus dem von Ihnen angegebenen Schlüsselpaar zu erzeugen. Die Funktionsimplementierung wird angezeigt:def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token
JWTGenerator
ist eine Hilfsklasse, die Ihnen zur Verfügung gestellt wird. Beachten Sie die folgenden Parameter, die Sie bei der Erstellung dieses Objekts angeben:args.account
und dieargs.user
Parameter: Ein JWT-Token hat mehrere Felder (siehe Token-Format),iss
ist eines der Felder. Der Feldwert enthält den Namen des Snowflake-Kontos und einen Benutzernamen. Daher geben Sie diese Werte als Parameter an.Die beiden Parameter von
timedelta
liefern die folgenden Informationen:lifetime
gibt die Anzahl der Minuten an, für die der Schlüssel gültig ist (60 Minuten).renewal_delay
gibt die Anzahl der Minuten ab jetzt an, nach denen der JWT-Generator die JWT erneuern soll.
Ruft die Funktion
token_exchange()
auf, um eine Verbindung zu Snowflake herzustellen und das JWT-Token gegen ein OAuth-Token auszutauschen.scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, }
Der vorangehende Code konstruiert eine JSON, die den Bereich für das OAuth-Token einstellt, den öffentlichen Endpunkt, auf den mit der angegebenen Rolle zugegriffen werden kann. Dieser Code stellt dann eine POST-Anfrage an Snowflake und übergibt das JSON, um das JWT-Token gegen ein OAuth-Token (siehe Token-Austausch) auszutauschen, wie gezeigt:
url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text
Der Code ruft dann die
connect_to_spcs()
-Funktion auf, um eine Verbindung mit dem öffentlichen Endpunkt des Echo-Dienstes herzustellen. Er stellt die URL (https://<ingress-URL>/ui
) des Endpunkts und das OAuth-Token zur Authentifizierung bereit.headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers)
url
ist dasspcs_url
, das Sie dem Programm zur Verfügung gestellt haben, undtoken
ist das OAuth-Token.Der Echo-Dienst in diesem Beispiel bedient eine HTML-Website (wie im vorangegangenen Abschnitt erläutert). Dieses Beispiel druckt einfach die HTML in der Antwort aus.
6: Bereinigen¶
Wenn Sie nicht vorhaben, mit Tutorial 2 oder Tutorial 3 fortzufahren, sollten Sie die von Ihnen erstellten abrechenbaren Ressourcen wieder entfernen. Weitere Informationen dazu finden Sie in Schritt 5 von Tutorial 3.
7: Erläuterungen zum Dienstcode¶
In diesem Abschnitt werden die folgenden Themen behandelt:
Code von Tutorial 1 überprüfen: Überprüfen Sie die Codedateien, die den Echo-Dienst implementieren.
Erläuterungen zur Dienstfunktion: In diesem Abschnitt wird erklärt, wie in diesem Tutorial die Dienstfunktion mit dem Dienst verknüpft ist.
Lokales Erstellen und Testen eines Images. In diesem Abschnitt wird erläutert, wie Sie das Docker-Image lokal testen können, bevor Sie es in ein Repository in Ihrem Snowflake-Konto hochladen.
Code von Tutorial 1 überprüfen¶
Die ZIP-Datei, die Sie in Schritt 1 heruntergeladen haben, enthält die folgenden Dateien:
Dockerfile
echo_service.py
templates/basic_ui.html
Beim Erstellen des Dienstes verwenden Sie außerdem Dienstspezifikationen. Im folgenden Abschnitt wird erläutert, wie diese Codekomponenten beim Erstellen des Dienstes zusammenwirken.
Datei „echo_service.py“¶
Diese Python-Datei enthält den Code, der einen minimalen HTTP-Server implementiert, der eingegebenen Text (als Echo) wieder zurückgibt. Der Code erfüllt in erster Linie zwei Aufgaben: die Bearbeitung von Echo-Anforderungen von Snowflake-Dienstfunktionen und die Bereitstellung einer Web-Benutzeroberfläche (UI) für die Übermittlung von Echo-Anforderungen.
from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys
SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
'%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)
return logger
logger = get_logger('echo-service')
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.post("/echo")
def echo():
'''
Main handler for input data sent by Snowflake.
'''
message = request.json
logger.debug(f'Received request: {message}')
if message is None or not message['data']:
logger.info('Received empty message')
return {}
# input format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...],
# ...
# ]}
input_rows = message['data']
logger.info(f'Received {len(input_rows)} rows')
# output format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...}],
# ...
# ]}
output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
logger.info(f'Produced {len(output_rows)} rows')
response = make_response({"data": output_rows})
response.headers['Content-type'] = 'application/json'
logger.debug(f'Sending response: {response.json}')
return response
@app.route("/ui", methods=["GET", "POST"])
def ui():
'''
Main handler for providing a web UI.
'''
if request.method == "POST":
# getting input in HTML form
input_text = request.form.get("input")
# display input and output
return render_template("basic_ui.html",
echo_input=input_text,
echo_reponse=get_echo_response(input_text))
return render_template("basic_ui.html")
def get_echo_response(input):
return f'{CHARACTER_NAME} said {input}'
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
Erläuterungen zum Code:
Die Funktion
echo
ermöglicht es einer Snowflake-Dienstfunktion, mit dem Dienst zu kommunizieren. Diese Funktion spezifiziert den Decorator@app.post()
wie gezeigt:@app.post("/echo") def echo():
Wenn der Echo-Server Ihre HTTP-POST-Anforderung mit dem
/echo
-Pfad erhält, leitet er die Anforderung an diese Funktion weiter. Die Funktion wird ausgeführt und gibt die Zeichenfolge aus dem Anforderungstext als Echo in der Antwort wieder zurück.Um Kommunikation einer Snowflake-Dienstfunktion zu unterstützen, implementiert dieser Server die externen Funktionen. Das heißt, die Serverimplementierung folgt einem bestimmten Eingabe-/Ausgabedatenformat, um als SQL-Funktion zu dienen, und dies ist das gleiche Eingabe-/Ausgabedatenformat, das von externen Funktionen verwendet wird.
Der Funktionsabschnitt
ui
des Codes zeigt ein Webformular an und verarbeitet die vom Webformular übermittelten Echo-Anforderungen. Diese Funktion verwendet den Decorator@app.route()
, um anzugeben, dass Anforderungen für/ui
von dieser Funktion bearbeitet werden:@app.route("/ui", methods=["GET", "POST"]) def ui():
Der Echo-Dienst stellt den Endpunkt
echoendpoint
öffentlich zur Verfügung (siehe Dienstspezifikation) und ermöglicht so die Kommunikation mit dem Dienst über das Web. Wenn Sie die URL des öffentlichen Endpunkts mit angehängtem „/ui“ in Ihrem Browser laden, sendet der Browser eine HTTP-GET-Anforderung für diesen Pfad, und der Server leitet die Anforderung an diese Funktion weiter. Die Funktion wird ausgeführt und gibt ein einfaches HTML-Formular zurück, in das der Benutzer eine Zeichenfolge eingeben kann.Nachdem der Benutzer eine Zeichenfolge eingegeben und das Formular übermittelt hat, sendet der Browser eine HTTP-Post-Anforderung für diesen Pfad, und der Server leitet die Anforderung an dieselbe Funktion weiter. Die Funktion wird ausgeführt und gibt eine HTTP-Antwort zurück, die die ursprüngliche Zeichenfolge enthält.
Die Funktion
readiness_probe
verwendet den Decorator@app.get()
, um zu spezifizieren, dass Anforderungen für/healthcheck
von dieser Funktion verarbeitet werden:@app.get("/healthcheck") def readiness_probe():
Mit dieser Funktion kann Snowflake die Bereitschaft des Dienstes überprüfen. Wenn der Container startet, möchte Snowflake bestätigen, dass die Anwendung funktioniert und der Dienst bereit ist, Anforderungen zu bedienen. Snowflake sendet eine HTTP-GET-Anforderung mit diesem Pfad, um auf Funktionsfähigkeit (Health Probe) und Bereitschaft (Readiness Probe) zu testen und somit sicherzustellen, dass nur funktionsfähige Container den Datenverkehr bedienen. Die Funktion kann alles tun, was Sie möchten.
Die Funktion
get_logger
hilft beim Einrichten der Protokollierung.
Datei „Dockerfile“¶
Diese Datei enthält alle Befehle, um ein Image mit Docker zu erstellen.
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
„Dockerfile“ enthält Anweisungen zur Installation der Flask-Bibliothek im Docker-Container. Der Code in echo_service.py
stützt sich auf die Flask-Bibliothek, um HTTP-Anforderungen zu verarbeiten.
Datei „/template/basic_ui.html“¶
Der Echo-Dienst stellt den Endpunkt echoendpoint
öffentlich zur Verfügung (siehe Dienstspezifikation) und ermöglicht so die Kommunikation mit dem Dienst über das Web. Wenn Sie die URL des öffentlichen Endpunkts mit angehängtem /ui
in Ihrem Browser laden, zeigt der Echo-Dienst dieses Formular an. Sie können eine Zeichenfolge in das Formular eingeben und das Formular absenden, und der Dienst gibt die Zeichenfolge in einer HTTP-Antwort zurück.
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to echo service!</title>
</head>
<body>
<h1>Welcome to echo service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="input">Input:<label><br>
<input type="text" id="input" name="input"><br>
</form>
<h2>Input:</h2>
{{ echo_input }}
<h2>Output:</h2>
{{ echo_reponse }}
</body>
</html>
Dienstspezifikation¶
Snowflake verwendet die von Ihnen in dieser Spezifikation angegebenen Informationen zur Konfiguration und Ausführung Ihres Dienstes.
spec:
containers:
- name: echo
image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
env:
SERVER_PORT: 8000
CHARACTER_NAME: Bob
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: echoendpoint
port: 8000
public: true
Erläuterungen zur Dienstspezifikation:
containers.image
gibt das Image an, mit dem Snowflake einen Container startet.Das optionale Feld
endpoints
gibt den Endpunkt an, den der Dienst öffentlich bereitstellt.name
gibt einen benutzerfreundlichen Namen für den TCP-Netzwerkport an, den der Container auf Anforderungen überwacht. Sie verwenden diesen benutzerfreundlichen Endpunktnamen, um Anforderung an den entsprechenden Port zu senden. Beachten Sie, dass diese Portnummer überenv.SERVER_PORT
gesteuert wird.Der Endpunkt ist ebenfalls als
public
konfiguriert. Dies ermöglicht Datenverkehr zu diesem Endpunkt aus dem öffentlichen Web.
Das optionale Feld
containers.env
wurde hinzugefügt, um zu veranschaulichen, wie Sie Umgebungsvariablen überschreiben können, die Snowflake an alle Prozesse in Ihrem Container übergibt. Der Dienstcode inecho_service.py
liest beispielsweise die Umgebungsvariablen mit Standardwerten wie gezeigt:CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080)
Dies funktioniert wie folgt:
Wenn der Echo-Dienst eine HTTP-POST-Anforderung mit einer Zeichenfolge (z. B. „Hallo“) im Anforderungstext erhält, gibt der Dienst standardmäßig „I said Hello“ (Ich sagte Hallo) zurück. Der Code verwendet die Umgebungsvariable
CHARACTER_NAME
, um das Wort vor „said“ (sagte) zu bestimmen. Standardmäßig istCHARACTER_NAME
auf „I“ (Ich) eingestellt.Sie können den Standardwert von CHARACTER_NAME in der Dienstspezifikation überschreiben. Wenn Sie den Wert beispielsweise auf „Bob“ setzen, gibt der Echo-Dienst die Antwort „Bob said Hello“ (Bob sagte Hallo) zurück.
In ähnlicher Weise überschreibt die Dienstspezifikation den Port (SERVER_PORT), den der Dienst überwacht, mit 8000, wodurch der Standardport 8080 außer Kraft gesetzt wird.
Das Feld
readinessProbe
identifiziert die Werte vonport
undpath
, mit denen Snowflake eine HTTP-GET-Anforderung an den Bereitschaftstest (Readiness Probe) senden kann, um zu überprüfen, ob der Dienst bereit ist, Datenverkehr zu verarbeiten.Der Dienstcode (
echo_python.py
) implementiert die Bereitschaftsprüfung wie folgt:@app.get("/healthcheck") def readiness_probe():
Daher enthält die Spezifikationsdatei das entsprechende Feld
container.readinessProbe
.
Weitere Informationen zu Dienstspezifikationen finden Sie unter Referenz der Dienstspezifikation.
Erläuterungen zur Dienstfunktion¶
Eine Dienstfunktion ist eine der Methoden, um mit Ihrem Dienst zu kommunizieren (siehe Verwenden eines Dienstes). Eine Dienstfunktion ist eine benutzerdefinierte Funktion (UDF), die Sie mit einem Dienstendpunkt verknüpfen. Wenn die Dienstfunktion ausgeführt wird, sendet sie eine Anforderung an den zugehörigen Dienstendpunkt und empfängt eine Antwort.
Sie erstellen die folgende Dienstfunktion, indem Sie den Befehl CREATE FUNCTION mit den folgenden Parametern ausführen:
CREATE FUNCTION my_echo_udf (InputText VARCHAR)
RETURNS VARCHAR
SERVICE=echo_service
ENDPOINT=echoendpoint
AS '/echo';
Beachten Sie Folgendes:
Die Funktion
my_echo_udf
nimmt eine Zeichenfolge als Eingabe entgegen und gibt eine Zeichenfolge zurück.Die Eigenschaft SERVICE bezeichnet den Dienst (
echo_service
), und die Eigenschaft ENDPOINT bezeichnet den benutzerfreundlichen Namen des Endpunkts (echoendpoint
).„AS ‚/echo‘“ gibt den Pfad für den Dienst an. In
echo_service.py
verknüpft der Decorator@app.post
diesen Pfad mit derecho
-Funktion.
Diese Funktion stellt eine Verbindung mit dem spezifischen Endpunkt in ENDPOINT des in SERVICE angegebenen Dienstes her. Wenn Sie diese Funktion aufrufen, sendet Snowflake eine Anforderung an den Pfad /echo
innerhalb des Dienstcontainers.
Lokales Erstellen und Testen eines Images¶
Sie können das Docker-Image lokal testen, bevor Sie es in ein Repository Ihres Snowflake-Kontos hochladen. Bei lokalen Tests wird Ihr Container eigenständig ausgeführt (er ist kein Dienst, der von Snowflake ausgeführt wird).
So testen Sie das Docker-Image von Tutorial 1:
Um ein Docker-Image zu erstellen, führen Sie über die Docker-CLI den folgenden Befehl aus:
docker build --rm -t my_service:local .
Um Ihren Code zu starten, führen Sie den folgenden Befehl aus:
docker run --rm -p 8080:8080 my_service:local
Senden Sie eine Echo-Anforderung an den Dienst mit einer der folgenden Methoden:
Verwenden des cURL-Befehls:
Senden Sie in einem anderen Terminalfenster mit cURL die folgende POST-Anforderung an Port 8080:
curl -X POST http://localhost:8080/echo \ -H "Content-Type: application/json" \ -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
Beachten Sie, dass der Anforderungstext zwei Zeichenfolgen enthält. Dieser cURL-Befehl sendet eine POST-Anforderung an Port 8080, der vom Dienst auf Anforderungen überwacht wird. Die 0 in den Daten ist der Index der Eingabezeichenfolge in der Liste. Der Echo-Dienst gibt die Eingabezeichenfolgen als Echo wie folgt zurück:
{"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
Verwenden eines Webbrowsers:
Öffnen Sie in Ihrem Browser auf demselben Computer
http://localhost:8080/ui
.Dadurch wird eine GET-Anforderung an Port 8080 gesendet, der vom Dienst auf Anforderungen überwacht wird. Der Dienst führt die Funktion
ui()
aus, die wie gezeigt ein HTML-Formular ausgibt:Geben Sie in das Feld Input die Zeichenfolge „Hello“ ein, und drücken Sie die Eingabetaste.
Nächste Schritte¶
Sie können nun mit Tutorial 2 fortfahren, in dem ein Job ausgeführt wird.