Tutorial 1: Snowpark Container Services-Dienst erstellen

Einführung

Nachdem Sie die grundlegende Einrichtung abgeschlossen haben, können Sie nun einen Dienst erstellen. In diesem Tutorial erstellen Sie einen Dienst (mit dem Namen echo_service), der einfach den von Ihnen eingegebenen Text als Echo wieder zurückgibt. Wenn die Eingabezeichenfolge beispielsweise „Hello World“ ist, gibt der Dienst „I said, Hello World“ zurück.

Dieses Tutorial besteht aus zwei Teilen:

Teil 1: Dienst erstellen und testen. Sie laden den für dieses Tutorial bereitgestellten Code herunter und befolgen die schrittweise Anleitung:

  1. Dienstcode für dieses Tutorial herunterladen

  2. Docker-Image für Snowpark Container Services erstellen und Image in Repository im eigenen Konto hochladen

  3. Dienst erstellen, indem die Dienstspezifikationsdatei und der Computepool für die Ausführung des Dienstes bereitgestellt werden

  4. Dienstfunktion für die Kommunikation mit dem Dienst bereitstellen

  5. Dienst einsetzen. Dazu wird eine Echo-Anforderung an den Dienst gesendet und die Antwort überprüft.

Teil 2: Erläuterungen zum Dienst. Dieser Abschnitt bietet eine Übersicht zum Dienstcode und zeigt auf, wie die verschiedenen Komponenten zusammenarbeiten.

1: Dienstcode herunterladen

Zum Erstellen des Echo-Dienstes wird ein Code (eine Python-Anwendung) bereitgestellt.

  1. Laden Sie SnowparkContainerServices-Tutorials.zip herunter.

  2. Entpacken Sie den Inhalt, der ein Verzeichnis für jedes Tutorial enthält. Das Verzeichnis Tutorial-1 enthält die folgenden Dateien:

    • Dockerfile

    • echo_service.py

    • templates/basic_ui.html

2: Image erstellen und hochladen

Erstellen Sie ein Image für die linux/amd64-Plattform, die von Snowpark Container Services unterstützt wird, und laden Sie das Image dann in das Image-Repository in Ihrem Konto hoch (siehe Grundlegende Einrichtung).

Sie benötigen Informationen zum Repository (die Repository-URL und den Hostnamen der Registry), bevor Sie das Image erstellen und hochladen können. Weitere Informationen dazu finden Sie unter Registry und Repositorys.

Informationen zum Repository abrufen

  1. Um die Repository-URL zu erhalten, führen Sie den SQL-Befehl SHOW IMAGE REPOSITORIES aus.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • Die URL ist in der Spalte repository_url der Ausgabe enthalten. Siehe folgendes Beispiel:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • Der Hostname in der Repository-URL ist der Name des Registry-Hosts. Siehe folgendes Beispiel:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Image erstellen und in das Repository hochladen

  1. Öffnen Sie ein Terminalfenster, und wechseln Sie in das Verzeichnis, das die entpackten Dateien enthält.

  2. Um ein Docker-Image zu erstellen, führen Sie den folgenden Befehl docker build mithilfe der Docker-CLI aus. Beachten Sie, dass der Befehl das aktuelle Arbeitsverzeichnis (.) als PATH für die Dateien angibt, die für das Erstellen des Images verwendet werden sollen.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Für image_name verwenden Sie my_echo_service_image:latest:

    Beispiel

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
    
    Copy
  3. Laden Sie das Image in das Repository in Ihrem Snowflake-Konto hoch. Damit Docker ein Image in Ihrem Namen in Ihr Repository hochladen kann, müssen Sie zunächst Docker mit der Registry authentifizieren.

    1. Um Docker mit der Image-Registry zu authentifizieren, führen Sie den folgenden Befehl aus.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Geben Sie dabei für username Ihren Snowflake-Benutzernamen an. Docker fordert Sie zur Eingabe Ihres Kennworts auf.

    2. Führen Sie den folgenden Befehl aus, um das Image hochzuladen:

      docker push <repository_url>/<image_name>
      
      Copy

      Beispiel

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
      
      Copy

3: Dienst erstellen

In diesem Abschnitt erstellen Sie einen Dienst und dann eine Dienstfunktion, die mit dem Dienst kommuniziert.

Um einen Dienst zu erstellen, benötigen Sie Folgendes:

  • Einen Computepool. Snowflake führt Ihren Dienst in dem angegebenen Computepool aus. Sie haben einen Computepool als Teil der grundlegenden Einrichtung erstellt.

  • Eine Dienstspezifikation. Diese Spezifikation liefert Snowflake die Informationen, die zur Konfiguration und Ausführung Ihres Dienstes erforderlich sind. Weitere Informationen dazu finden Sie unter Snowpark Container Services: Verwenden von Diensten. In diesem Tutorial geben Sie die Spezifikation inline im Befehl CREATE SERVICE an. Sie können die Spezifikation auch in einer Datei in Ihrem Snowflake-Stagingbereich speichern und die Dateiinformationen im Befehl CREATE SERVICE bereitstellen, wie in Tutorial 2 gezeigt.

Eine Dienstfunktion ist eine der verfügbaren Methoden zur Kommunikation mit Ihrem Dienst. Eine Dienstfunktion ist eine benutzerdefinierte Funktion (UDF), die Sie mit dem Dienstendpunkt verknüpfen. Wenn die Dienstfunktion ausgeführt wird, sendet sie eine Anforderung an den Dienstendpunkt und erhält eine Antwort.

  1. Überprüfen Sie, ob der Computepool bereit ist und ob Sie sich im korrekten Kontext befinden, um den Dienst zu erstellen.

    1. Zuvor haben Sie den Kontext im Schritt Grundlegende Einrichtung festgelegt. Um sicherzustellen, dass Sie sich in diesem Schritt im korrekten Kontext für die SQL-Anweisungen befinden, führen Sie Folgendes aus:

    USE ROLE test_role;
    USE DATABASE tutorial_db;
    USE SCHEMA data_schema;
    USE WAREHOUSE tutorial_warehouse;
    
    Copy
    1. Um sicherzustellen, dass der unter Grundlegende Einrichtung erstellte Computepool bereit ist, führen Sie DESCRIBE COMPUTE POOL aus, und überprüfen Sie dann, ob state den Wert ACTIVE oder IDLE hat. Wenn die state den Wert STARTING hat, müssen Sie warten, bis state entweder ACTIVE oder IDLE ist.

    DESCRIBE COMPUTE POOL tutorial_compute_pool;
    
    Copy
  2. Um den Dienst zu erstellen, führen Sie mit der Rolle test_role den folgenden Befehl aus:

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
          $$
       MIN_INSTANCES=1
       MAX_INSTANCES=1;
    
    Copy

    Bemerkung

    Wenn bereits ein Dienst mit diesem Namen existiert, verwenden Sie den Befehl DROP SERVICE, um den zuvor erstellten Dienst zu löschen, und erstellen Sie dann diesen Dienst.

  3. Führen Sie die folgenden SQL-Befehle aus, um detaillierte Informationen zu dem gerade erstellten Dienst zu erhalten. Weitere Informationen dazu finden Sie unter Snowpark Container Services: Verwenden von Diensten.

    • Um die Dienste in Ihrem Konto aufzulisten, führen Sie den Befehl SHOW SERVICES aus:

      SHOW SERVICES;
      
      Copy
    • Um den Status Ihres Dienstes zu erfahren, rufen Sie die Systemfunktion SYSTEM$GET_SERVICE_STATUS auf:

      SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
      
      Copy
    • Um Informationen zu Ihrem Dienst zu erhalten, führen Sie den Befehl DESCRIBE SERVICE aus:

      DESCRIBE SERVICE echo_service;
      
      Copy
  4. Um eine Dienstfunktion zu erstellen, führen Sie den folgenden Befehl aus:

    CREATE FUNCTION my_echo_udf (InputText varchar)
      RETURNS varchar
      SERVICE=echo_service
      ENDPOINT=echoendpoint
      AS '/echo';
    
    Copy

    Beachten Sie Folgendes:

    • Die Eigenschaft SERVICE verknüpft die UDF mit dem Dienst echo_service.

    • Die Eigenschaft ENDPOINT verknüpft die UDF mit dem Endpunkt echoendpoint innerhalb des Dienstes.

    • „AS ‚/echo‘“ gibt den HTTP-Pfad zum Echo-Server an. Sie finden diesen Pfad im Dienstcode (echo_service.py).

4: Dienst einsetzen

Richten Sie zunächst den Kontext für die SQL-Anweisungen in diesem Abschnitt ein, indem Sie Folgendes ausführen:

USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
Copy

Jetzt können Sie mit dem Echo-Dienst kommunizieren.

  1. Verwenden einer Dienstfunktion: Sie können die Dienstfunktion in einer Abfrage aufrufen. Die Beispiel-Dienstfunktion (my_echo_udf) kann als Eingabe entweder eine einzelne Zeichenfolge oder eine Liste von Zeichenfolgen als Eingabe annehmen.

    Beispiel 1.1: Einzelne Zeichenfolge übergeben

    • Um die Dienstfunktion my_echo_udf aufzurufen, führen Sie die folgende SELECT-Anweisung aus und übergeben genau eine Eingabezeichenfolge ('hello'):

      SELECT my_echo_udf('hello!');
      
      Copy

      Snowflake sendet eine POST-Anforderung an den Dienstendpunkt (echoendpoint). Nach Erhalt der Anforderung gibt der Dienst die Eingabezeichenfolge als Echo in der Antwort zurück.

      +--------------------------+
      | **MY_ECHO_UDF('HELLO!')**|
      |------------------------- |
      | Bob said hello!          |
      +--------------------------+
      

    Beispiel 1.2: Liste von Zeichenfolge übergeben

    Wenn Sie eine Liste von Zeichenfolgen an die Dienstfunktion übergeben, fasst Snowflake diese Eingabezeichenfolgen in einem Batch zusammen und sendet eine Serie von POST-Anforderungen an den Dienst. Nachdem der Dienst alle Zeichenfolgen verarbeitet hat, kombiniert Snowflake die Ergebnisse und gibt sie zurück.

    Im folgenden Beispiel wird eine Tabellenspalte als Eingabe an die Dienstfunktion übergeben.

    1. Erstellen Sie eine Tabelle mit mehreren Zeichenfolgen:

      CREATE TABLE messages (message_text VARCHAR)
        AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
      
      Copy
    2. Überprüfen Sie, ob die Tabelle erstellt wurde, indem Sie folgende Anweisung ausführen:

      SELECT * FROM messages;
      
      Copy
    3. Um die Dienstfunktion aufzurufen, führen Sie die folgende SELECT-Anweisung aus und übergeben dabei Tabellenzeilen als Eingabe:

      SELECT my_echo_udf(message_text) FROM messages;
      
      Copy

      Ausgabe:

      +---------------------------+
      | MY_ECHO_UDF(MESSAGE_TEXT) |
      |---------------------------|
      | Bob said Thank you        |
      | Bob said Hello            |
      | Bob said Hello World      |
      +---------------------------+
      
  2. Webbrowser verwenden: Der Dienst stellt den Endpunkt öffentlich zur Verfügung (siehe Inline-Spezifikation im Befehl CREATE SERVICE). Daher können Sie sich bei einer Web-UI anmelden, die der Dienst im Internet bereitstellt, und dann von einem Webbrowser aus Anforderungen an den Dienst senden.

    1. Ermitteln Sie die URL des öffentlichen Endpunkts, den der Dienst bereitstellt:

      SHOW ENDPOINTS IN SERVICE echo_service;
      
      Copy

      Die Spalte ingress_url in der Antwort enthält die URL.

      Beispiel

      p6bye-myorg-myacct.snowflakecomputing.app
      
    2. Hängen Sie /ui an die Endpunkt-URL an, und geben Sie diese Zeichenfolge in den Webbrowser ein. Dies veranlasst den Dienst, die Funktion ui() auszuführen (siehe echo_service.py).

      Beachten Sie, dass Sie beim ersten Zugriff auf den Endpunkt-URL aufgefordert werden, sich bei Snowflake anzumelden. Verwenden Sie für diesen Test denselben Benutzer, mit dem Sie den Dienst erstellt haben, um sicherzustellen, dass der Benutzer über die erforderlichen Berechtigungen verfügt.

      Webformular für die Kommunikation mit dem Echo-Dienst.
    3. Geben Sie in das Feld Input die Zeichenfolge „Hello“ ein, und drücken Sie die Eingabetaste.

      Webformular, das die Antwort des Echo-Dienstes anzeigt.

    Bemerkung

    Sie können auf den öffentlichen Endpunkt programmgesteuert zugreifen. Ein Beispiel für Code finden Sie unter Zugriff auf öffentliche Endpunkte von außerhalb von Snowflake und Authentifizierung. Beachten Sie, dass Sie im Code an die Endpunkt-/ui die Pfadangabe URL anhängen müssen,damit Snowflake die Anfrage an die ui()-Funktion im Dienstcode weiterleiten kann.

5: (Optional) Greifen Sie auf den öffentlichen Endpunkt programmgesteuert zu

Im vorangegangenen Abschnitt haben Sie den Echo-Dienst mit Hilfe eines Browsers getestet. Im Browser haben Sie auf den öffentlichen Endpunkt (Ingress-Endpunkt) zugegriffen und Anfragen über die Weboberfläche UI gesendet, das der Dienste bereitstellt. In diesem Abschnitt testen Sie denselben öffentlichen Endpunkt programmgesteuert.

Das Beispiel verwendet Schlüsselpaar-Authentifizierung. Mit dem von Ihnen bereitgestellten Schlüsselpaar erzeugt der Beispielcode zunächst ein JSON-Web Token (JWT) und tauscht dann das Token mit Snowflake gegen ein OAuth-Token aus. Der Code verwendet dann das OAuth-Token zur Authentifizierung bei der Kommunikation mit dem öffentlichen Endpunkt des Echo-Dienstes.

Voraussetzungen

Stellen Sie sicher, dass Sie die folgenden Informationen haben:

  • Ingress URL des öffentlichen Endpunktes. Führen Sie den Befehl SHOW ENDPOINTS IN SERVICE aus, um den URL zu erhalten:

    SHOW ENDPOINTS IN SERVICE echo_service;
    
    Copy
  • Der Name Ihres Snowflake-Kontos. Weitere Informationen finden Sie unter Gemeinsame Einrichtung: Überprüfen Sie, ob Sie bereit sind, mit fortzufahren.

  • Ihr Snowflake-Konto URL: Es lautet <acctname>.snowflakecomputing.com.

  • Benutzername im Snowflake Konto. Dies ist der Benutzer, den Sie unter Gemeinsame Einrichtung: Snowflake-Objekte erstellen gewählt haben. Sie melden sich bei Snowflake als dieser Benutzer an und testen den programmgesteuerten Zugriff.

  • Rollenname: Sie haben eine Rolle (test_role) als Teil der gemeinsamen Einrichtung erstellt. Der Benutzer nimmt diese Rolle an, um Aktionen durchzuführen.

Setup

Folgen Sie den Schritten, um programmgesteuert mit dem Echo-Dienst zu kommunizieren. Mit dem mitgelieferten Python-Code senden Sie Anfragen an den öffentlichen Endpunkt, den der Echo-Dienst zur Verfügung stellt.

  1. Erstellen Sie an einer Eingabeaufforderung ein Verzeichnis und navigieren Sie dorthin.

  2. Konfigurieren Sie die Authentifizierung des Schlüsselpaares für den Benutzer.

    1. Erzeugen Sie ein Schlüsselpaar <label-configuring_key_pair_authentication>:

      1. Erzeugen Sie einen privaten Schlüssel. Um die Übungsschritte zu vereinfachen, erzeugen Sie einen unverschlüsselten privaten Schlüssel. Sie können auch einen verschlüsselten privaten Schlüssel verwenden, aber dann müssen Sie das Kennwort eingeben.

        openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
        
        Copy
      2. Erzeugen Sie einen öffentlichen Schlüssel (rsa_key.pub), indem Sie auf den von Ihnen erstellten privaten Schlüssel verweisen.

        openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
        
        Copy
    2. Vergewissern Sie sich, dass Sie den privaten Schlüssel und den öffentlichen Schlüssel in dem Verzeichnis generiert haben.

    3. Weisen Sie den öffentlichen Schlüssel dem Benutzer zu, den Sie zum Testen des programmatischen Zugriffs verwenden. Damit kann der Benutzer den Schlüssel für die Authentifizierung angeben.

      ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
      
      Copy
  3. Speichern Sie den mitgelieferten Beispielcode in Python-Dateien.

    1. Speichern Sie den folgenden Code in generateJWT.py.

      # To run this on the command line, enter:
      #   python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file>
      
      from cryptography.hazmat.primitives.serialization import load_pem_private_key
      from cryptography.hazmat.primitives.serialization import Encoding
      from cryptography.hazmat.primitives.serialization import PublicFormat
      from cryptography.hazmat.backends import default_backend
      from datetime import timedelta, timezone, datetime
      import argparse
      import base64
      from getpass import getpass
      import hashlib
      import logging
      import sys
      
      # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/).
      import jwt
      
      logger = logging.getLogger(__name__)
      
      try:
          from typing import Text
      except ImportError:
          logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True)
          from typing_extensions import Text
      
      ISSUER = "iss"
      EXPIRE_TIME = "exp"
      ISSUE_TIME = "iat"
      SUBJECT = "sub"
      
      # If you generated an encrypted private key, implement this method to return
      # the passphrase for decrypting your private key. As an example, this function
      # prompts the user for the passphrase.
      def get_private_key_passphrase():
          return getpass('Passphrase for private key: ')
      
      class JWTGenerator(object):
          """
          Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the
          generated token and only regenerates the token if a specified period of time has passed.
          """
          LIFETIME = timedelta(minutes=59)  # The tokens will have a 59-minute lifetime
          RENEWAL_DELTA = timedelta(minutes=54)  # Tokens will be renewed after 54 minutes
          ALGORITHM = "RS256"  # Tokens will be generated using RSA with SHA256
      
          def __init__(self, account: Text, user: Text, private_key_file_path: Text,
                      lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA):
              """
              __init__ creates an object that generates JWTs for the specified user, account identifier, and private key.
              :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator.
              :param user: The Snowflake username.
              :param private_key_file_path: Path to the private key file used for signing the JWTs.
              :param lifetime: The number of minutes (as a timedelta) during which the key will be valid.
              :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT.
              """
      
              logger.info(
                  """Creating JWTGenerator with arguments
                  account : %s, user : %s, lifetime : %s, renewal_delay : %s""",
                  account, user, lifetime, renewal_delay)
      
              # Construct the fully qualified name of the user in uppercase.
              self.account = self.prepare_account_name_for_jwt(account)
              self.user = user.upper()
              self.qualified_username = self.account + "." + self.user
      
              self.lifetime = lifetime
              self.renewal_delay = renewal_delay
              self.private_key_file_path = private_key_file_path
              self.renew_time = datetime.now(timezone.utc)
              self.token = None
      
              # Load the private key from the specified file.
              with open(self.private_key_file_path, 'rb') as pem_in:
                  pemlines = pem_in.read()
                  try:
                      # Try to access the private key without a passphrase.
                      self.private_key = load_pem_private_key(pemlines, None, default_backend())
                  except TypeError:
                      # If that fails, provide the passphrase returned from get_private_key_passphrase().
                      self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend())
      
          def prepare_account_name_for_jwt(self, raw_account: Text) -> Text:
              """
              Prepare the account identifier for use in the JWT.
              For the JWT, the account identifier must not include the subdomain or any region or cloud provider information.
              :param raw_account: The specified account identifier.
              :return: The account identifier in a form that can be used to generate the JWT.
              """
              account = raw_account
              if not '.global' in account:
                  # Handle the general case.
                  idx = account.find('.')
                  if idx > 0:
                      account = account[0:idx]
              else:
                  # Handle the replication case.
                  idx = account.find('-')
                  if idx > 0:
                      account = account[0:idx]
              # Use uppercase for the account identifier.
              return account.upper()
      
          def get_token(self) -> Text:
              """
              Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the
              specified renewal time has passed.
              :return: the new token
              """
              now = datetime.now(timezone.utc)  # Fetch the current time
      
              # If the token has expired or doesn't exist, regenerate the token.
              if self.token is None or self.renew_time <= now:
                  logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)",
                              now, self.renew_time)
                  # Calculate the next time we need to renew the token.
                  self.renew_time = now + self.renewal_delay
      
                  # Prepare the fields for the payload.
                  # Generate the public key fingerprint for the issuer in the payload.
                  public_key_fp = self.calculate_public_key_fingerprint(self.private_key)
      
                  # Create our payload
                  payload = {
                      # Set the issuer to the fully qualified username concatenated with the public key fingerprint.
                      ISSUER: self.qualified_username + '.' + public_key_fp,
      
                      # Set the subject to the fully qualified username.
                      SUBJECT: self.qualified_username,
      
                      # Set the issue time to now.
                      ISSUE_TIME: now,
      
                      # Set the expiration time, based on the lifetime specified for this object.
                      EXPIRE_TIME: now + self.lifetime
                  }
      
                  # Regenerate the actual token
                  token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM)
                  # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string.
                  # If the token is a byte string, convert it to a string.
                  if isinstance(token, bytes):
                    token = token.decode('utf-8')
                  self.token = token
                  logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM]))
      
              return self.token
      
          def calculate_public_key_fingerprint(self, private_key: Text) -> Text:
              """
              Given a private key in PEM format, return the public key fingerprint.
              :param private_key: private key string
              :return: public key fingerprint
              """
              # Get the raw bytes of public key.
              public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
      
              # Get the sha256 hash of the raw bytes.
              sha256hash = hashlib.sha256()
              sha256hash.update(public_key_raw)
      
              # Base64-encode the value and prepend the prefix 'SHA256:'.
              public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8')
              logger.info("Public key fingerprint is %s", public_key_fp)
      
              return public_key_fp
      
      def main():
          logging.basicConfig(stream=sys.stdout, level=logging.INFO)
          cli_parser = argparse.ArgumentParser()
          cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").')
          cli_parser.add_argument('--user', required=True, help='The user name.')
          cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.')
          cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.')
          cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.')
          args = cli_parser.parse_args()
      
          token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token()
          print('JWT:')
          print(token)
      
      if __name__ == "__main__":
          main()
      
      Copy
    2. Speichern Sie den folgenden Code in access-via-keypair.py.

      from generateJWT import JWTGenerator
      from datetime import timedelta
      import argparse
      import logging
      import sys
      import requests
      logger = logging.getLogger(__name__)
      
      def main():
        args = _parse_args()
        token = _get_token(args)
        snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role,
                        snowflake_account_url=args.snowflake_account_url,
                        snowflake_account=args.account)
        spcs_url=f'https://{args.endpoint}{args.endpoint_path}'
        connect_to_spcs(snowflake_jwt, spcs_url)
      
      def _get_token(args):
        token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime),
                  timedelta(minutes=args.renewal_delay)).get_token()
        logger.info("Key Pair JWT: %s" % token)
        return token
      
      def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account):
        scope_role = f'session:role:{role}' if role is not None else None
        scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
        data = {
          'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
          'scope': scope,
          'assertion': token,
        }
        logger.info(data)
        url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token'
        if snowflake_account_url:
          url =       f'{snowflake_account_url}/oauth/token'
        logger.info("oauth url: %s" %url)
        response = requests.post(url, data=data)
        logger.info("snowflake jwt : %s" % response.text)
        assert 200 == response.status_code, "unable to get snowflake token"
        return response.text
      
      def connect_to_spcs(token, url):
        # Create a request to the ingress endpoint with authz.
        headers = {'Authorization': f'Snowflake Token="{token}"'}
        response = requests.post(f'{url}', headers=headers)
        logger.info("return code %s" % response.status_code)
        logger.info(response.text)
      
      def _parse_args():
        logging.basicConfig(stream=sys.stdout, level=logging.INFO)
        cli_parser = argparse.ArgumentParser()
        cli_parser.add_argument('--account', required=True,
                    help='The account identifier (for example, "myorganization-myaccount" for '
                      '"myorganization-myaccount.snowflakecomputing.com").')
        cli_parser.add_argument('--user', required=True, help='The user name.')
        cli_parser.add_argument('--private_key_file_path', required=True,
                    help='Path to the private key file used for signing the JWT.')
        cli_parser.add_argument('--lifetime', type=int, default=59,
                    help='The number of minutes that the JWT should be valid for.')
        cli_parser.add_argument('--renewal_delay', type=int, default=54,
                    help='The number of minutes before the JWT generator should produce a new JWT.')
        cli_parser.add_argument('--role',
                    help='The role we want to use to create and maintain a session for. If a role is not provided, '
                      'use the default role.')
        cli_parser.add_argument('--endpoint', required=True,
                    help='The ingress endpoint of the service')
        cli_parser.add_argument('--endpoint-path', default='/',
                    help='The url path for the ingress endpoint of the service')
        cli_parser.add_argument('--snowflake_account_url', default=None,
                    help='The account url of the account for which we want to log in. Type of '
                      'https://myorganization-myaccount.snowflakecomputing.com')
        args = cli_parser.parse_args()
        return args
      
      if __name__ == "__main__":
        main()
      
      Copy

Anfragen programmgesteuert an den Dienstendpunkt senden

Führen Sie den Python-Code access-via-keypair.py aus, um den öffentlichen Endpunkt des Echo-Dienst aufzurufen.

python3 access-via-keypair.py \
  --account <account-identifier> \
  --user <user-name> \
  --role TEST_ROLE \
  --private_key_file_path rsa_key.p8 \
  --endpoint <ingress-hostname> \
  --endpoint-path /ui
Copy

Weitere Informationen zu account-identifier finden Sie unter Kontobezeichner.

So funktioniert die Authentifizierung

Der Code wandelt zunächst das bereitgestellte Schlüsselpaar in ein JWT-Token um. Es sendet dann das JWT-Token an Snowflake, um ein OAuth-Token zu erhalten. Schließlich verwendet der Code das OAuth-Token, um eine Verbindung zu Snowflake herzustellen und auf den öffentlichen Endpunkt zuzugreifen. Genauer gesagt, bewirkt der Code Folgendes:

  1. Ruft die Funktion _get_token(args) auf, um ein JWT-Token aus dem von Ihnen angegebenen Schlüsselpaar zu erzeugen. Die Funktionsimplementierung wird angezeigt:

    def _get_token(args):
        token = JWTGenerator(args.account,
                            args.user,
                            args.private_key_file_path,
                            timedelta(minutes=args.lifetime),
                            timedelta(minutes=args.renewal_delay)).get_token()
        logger.info("Key Pair JWT: %s" % token)
        return token
    
    Copy

    JWTGenerator ist eine Hilfsklasse, die Ihnen zur Verfügung gestellt wird. Beachten Sie die folgenden Parameter, die Sie bei der Erstellung dieses Objekts angeben:

    • args.account und die args.user Parameter: Ein JWT-Token hat mehrere Felder (siehe Token-Format), iss ist eines der Felder. Der Feldwert enthält den Namen des Snowflake-Kontos und einen Benutzernamen. Daher geben Sie diese Werte als Parameter an.

    • Die beiden Parameter von timedelta liefern die folgenden Informationen:

      • lifetime gibt die Anzahl der Minuten an, für die der Schlüssel gültig ist (60 Minuten).

      • renewal_delay gibt die Anzahl der Minuten ab jetzt an, nach denen der JWT-Generator die JWT erneuern soll.

  2. Ruft die Funktion token_exchange() auf, um eine Verbindung zu Snowflake herzustellen und das JWT-Token gegen ein OAuth-Token auszutauschen.

    scope_role = f'session:role:{role}' if role is not None else None
    scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
    
    data = {
        'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
        'scope': scope,
        'assertion': token,
    }
    
    Copy

    Der vorangehende Code konstruiert eine JSON, die den Bereich für das OAuth-Token einstellt, den öffentlichen Endpunkt, auf den mit der angegebenen Rolle zugegriffen werden kann. Dieser Code stellt dann eine POST-Anfrage an Snowflake und übergibt das JSON, um das JWT-Token gegen ein OAuth-Token (siehe Token-Austausch) auszutauschen, wie gezeigt:

    url = f'{snowflake_account_url}/oauth/token'
    response = requests.post(url, data=data)
    assert 200 == response.status_code, "unable to get Snowflake token"
    return response.text
    
    Copy
  3. Der Code ruft dann die connect_to_spcs()-Funktion auf, um eine Verbindung mit dem öffentlichen Endpunkt des Echo-Dienstes herzustellen. Er stellt die URL (https://<ingress-URL>/ui) des Endpunkts und das OAuth-Token zur Authentifizierung bereit.

    headers = {'Authorization': f'Snowflake Token="{token}"'}
    response = requests.post(f'{url}', headers=headers)
    
    Copy

    url ist das spcs_url, das Sie dem Programm zur Verfügung gestellt haben, und token ist das OAuth-Token.

    Der Echo-Dienst in diesem Beispiel bedient eine HTML-Website (wie im vorangegangenen Abschnitt erläutert). Dieses Beispiel druckt einfach die HTML in der Antwort aus.

6: Bereinigen

Wenn Sie nicht vorhaben, mit Tutorial 2 oder Tutorial 3 fortzufahren, sollten Sie die von Ihnen erstellten abrechenbaren Ressourcen wieder entfernen. Weitere Informationen dazu finden Sie in Schritt 5 von Tutorial 3.

7: Erläuterungen zum Dienstcode

In diesem Abschnitt werden die folgenden Themen behandelt:

Code von Tutorial 1 überprüfen

Die ZIP-Datei, die Sie in Schritt 1 heruntergeladen haben, enthält die folgenden Dateien:

  • Dockerfile

  • echo_service.py

  • templates/basic_ui.html

Beim Erstellen des Dienstes verwenden Sie außerdem Dienstspezifikationen. Im folgenden Abschnitt wird erläutert, wie diese Codekomponenten beim Erstellen des Dienstes zusammenwirken.

Datei „echo_service.py“

Diese Python-Datei enthält den Code, der einen minimalen HTTP-Server implementiert, der eingegebenen Text (als Echo) wieder zurückgibt. Der Code erfüllt in erster Linie zwei Aufgaben: die Bearbeitung von Echo-Anforderungen von Snowflake-Dienstfunktionen und die Bereitstellung einer Web-Benutzeroberfläche (UI) für die Übermittlung von Echo-Anforderungen.

from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys

SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')


def get_logger(logger_name):
  logger = logging.getLogger(logger_name)
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  handler.setFormatter(
    logging.Formatter(
      '%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
  logger.addHandler(handler)
  return logger


logger = get_logger('echo-service')

app = Flask(__name__)


@app.get("/healthcheck")
def readiness_probe():
  return "I'm ready!"


@app.post("/echo")
def echo():
  '''
  Main handler for input data sent by Snowflake.
  '''
  message = request.json
  logger.debug(f'Received request: {message}')

  if message is None or not message['data']:
    logger.info('Received empty message')
    return {}

  # input format:
  #   {"data": [
  #     [row_index, column_1_value, column_2_value, ...],
  #     ...
  #   ]}
  input_rows = message['data']
  logger.info(f'Received {len(input_rows)} rows')

  # output format:
  #   {"data": [
  #     [row_index, column_1_value, column_2_value, ...}],
  #     ...
  #   ]}
  output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
  logger.info(f'Produced {len(output_rows)} rows')

  response = make_response({"data": output_rows})
  response.headers['Content-type'] = 'application/json'
  logger.debug(f'Sending response: {response.json}')
  return response


@app.route("/ui", methods=["GET", "POST"])
def ui():
  '''
  Main handler for providing a web UI.
  '''
  if request.method == "POST":
    # getting input in HTML form
    input_text = request.form.get("input")
    # display input and output
    return render_template("basic_ui.html",
      echo_input=input_text,
      echo_reponse=get_echo_response(input_text))
  return render_template("basic_ui.html")


def get_echo_response(input):
  return f'{CHARACTER_NAME} said {input}'

if __name__ == '__main__':
  app.run(host=SERVICE_HOST, port=SERVER_PORT)
Copy

Erläuterungen zum Code:

  • Die Funktion echo ermöglicht es einer Snowflake-Dienstfunktion, mit dem Dienst zu kommunizieren. Diese Funktion spezifiziert den Decorator @app.post() wie gezeigt:

    @app.post("/echo")
    def echo():
    
    Copy

    Wenn der Echo-Server Ihre HTTP-POST-Anforderung mit dem /echo-Pfad erhält, leitet er die Anforderung an diese Funktion weiter. Die Funktion wird ausgeführt und gibt die Zeichenfolge aus dem Anforderungstext als Echo in der Antwort wieder zurück.

    Um Kommunikation einer Snowflake-Dienstfunktion zu unterstützen, implementiert dieser Server die externen Funktionen. Das heißt, die Serverimplementierung folgt einem bestimmten Eingabe-/Ausgabedatenformat, um als SQL-Funktion zu dienen, und dies ist das gleiche Eingabe-/Ausgabedatenformat, das von externen Funktionen verwendet wird.

  • Der Funktionsabschnitt ui des Codes zeigt ein Webformular an und verarbeitet die vom Webformular übermittelten Echo-Anforderungen. Diese Funktion verwendet den Decorator @app.route(), um anzugeben, dass Anforderungen für /ui von dieser Funktion bearbeitet werden:

    @app.route("/ui", methods=["GET", "POST"])
    def ui():
    
    Copy

    Der Echo-Dienst stellt den Endpunkt echoendpoint öffentlich zur Verfügung (siehe Dienstspezifikation) und ermöglicht so die Kommunikation mit dem Dienst über das Web. Wenn Sie die URL des öffentlichen Endpunkts mit angehängtem „/ui“ in Ihrem Browser laden, sendet der Browser eine HTTP-GET-Anforderung für diesen Pfad, und der Server leitet die Anforderung an diese Funktion weiter. Die Funktion wird ausgeführt und gibt ein einfaches HTML-Formular zurück, in das der Benutzer eine Zeichenfolge eingeben kann.

    Nachdem der Benutzer eine Zeichenfolge eingegeben und das Formular übermittelt hat, sendet der Browser eine HTTP-Post-Anforderung für diesen Pfad, und der Server leitet die Anforderung an dieselbe Funktion weiter. Die Funktion wird ausgeführt und gibt eine HTTP-Antwort zurück, die die ursprüngliche Zeichenfolge enthält.

  • Die Funktion readiness_probe verwendet den Decorator @app.get(), um zu spezifizieren, dass Anforderungen für /healthcheck von dieser Funktion verarbeitet werden:

    @app.get("/healthcheck")
    def readiness_probe():
    
    Copy

    Mit dieser Funktion kann Snowflake die Bereitschaft des Dienstes überprüfen. Wenn der Container startet, möchte Snowflake bestätigen, dass die Anwendung funktioniert und der Dienst bereit ist, Anforderungen zu bedienen. Snowflake sendet eine HTTP-GET-Anforderung mit diesem Pfad, um auf Funktionsfähigkeit (Health Probe) und Bereitschaft (Readiness Probe) zu testen und somit sicherzustellen, dass nur funktionsfähige Container den Datenverkehr bedienen. Die Funktion kann alles tun, was Sie möchten.

  • Die Funktion get_logger hilft beim Einrichten der Protokollierung.

Datei „Dockerfile“

Diese Datei enthält alle Befehle, um ein Image mit Docker zu erstellen.

ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Copy

„Dockerfile“ enthält Anweisungen zur Installation der Flask-Bibliothek im Docker-Container. Der Code in echo_service.py stützt sich auf die Flask-Bibliothek, um HTTP-Anforderungen zu verarbeiten.

Datei „/template/basic_ui.html“

Der Echo-Dienst stellt den Endpunkt echoendpoint öffentlich zur Verfügung (siehe Dienstspezifikation) und ermöglicht so die Kommunikation mit dem Dienst über das Web. Wenn Sie die URL des öffentlichen Endpunkts mit angehängtem /ui in Ihrem Browser laden, zeigt der Echo-Dienst dieses Formular an. Sie können eine Zeichenfolge in das Formular eingeben und das Formular absenden, und der Dienst gibt die Zeichenfolge in einer HTTP-Antwort zurück.

<!DOCTYPE html>
<html lang="en">
  <head>
    <title>Welcome to echo service!</title>
  </head>
  <body>
    <h1>Welcome to echo service!</h1>
    <form action="{{ url_for("ui") }}" method="post">
      <label for="input">Input:<label><br>
      <input type="text" id="input" name="input"><br>
    </form>
    <h2>Input:</h2>
    {{ echo_input }}
    <h2>Output:</h2>
    {{ echo_reponse }}
  </body>
</html>
Copy

Dienstspezifikation

Snowflake verwendet die von Ihnen in dieser Spezifikation angegebenen Informationen zur Konfiguration und Ausführung Ihres Dienstes.

spec:
  containers:
  - name: echo
    image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
    env:
      SERVER_PORT: 8000
      CHARACTER_NAME: Bob
    readinessProbe:
      port: 8000
      path: /healthcheck
  endpoints:
  - name: echoendpoint
    port: 8000
    public: true
Copy

Erläuterungen zur Dienstspezifikation:

  • containers.image gibt das Image an, mit dem Snowflake einen Container startet.

  • Das optionale Feld endpoints gibt den Endpunkt an, den der Dienst öffentlich bereitstellt.

    • name gibt einen benutzerfreundlichen Namen für den TCP-Netzwerkport an, den der Container auf Anforderungen überwacht. Sie verwenden diesen benutzerfreundlichen Endpunktnamen, um Anforderung an den entsprechenden Port zu senden. Beachten Sie, dass diese Portnummer über env.SERVER_PORT gesteuert wird.

    • Der Endpunkt ist ebenfalls als public konfiguriert. Dies ermöglicht Datenverkehr zu diesem Endpunkt aus dem öffentlichen Web.

  • Das optionale Feld containers.env wurde hinzugefügt, um zu veranschaulichen, wie Sie Umgebungsvariablen überschreiben können, die Snowflake an alle Prozesse in Ihrem Container übergibt. Der Dienstcode in echo_service.py liest beispielsweise die Umgebungsvariablen mit Standardwerten wie gezeigt:

    CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
    SERVER_PORT = os.getenv('SERVER_PORT', 8080)
    
    Copy

    Dies funktioniert wie folgt:

    • Wenn der Echo-Dienst eine HTTP-POST-Anforderung mit einer Zeichenfolge (z. B. „Hallo“) im Anforderungstext erhält, gibt der Dienst standardmäßig „I said Hello“ (Ich sagte Hallo) zurück. Der Code verwendet die Umgebungsvariable CHARACTER_NAME, um das Wort vor „said“ (sagte) zu bestimmen. Standardmäßig ist CHARACTER_NAME auf „I“ (Ich) eingestellt.

      Sie können den Standardwert von CHARACTER_NAME in der Dienstspezifikation überschreiben. Wenn Sie den Wert beispielsweise auf „Bob“ setzen, gibt der Echo-Dienst die Antwort „Bob said Hello“ (Bob sagte Hallo) zurück.

    • In ähnlicher Weise überschreibt die Dienstspezifikation den Port (SERVER_PORT), den der Dienst überwacht, mit 8000, wodurch der Standardport 8080 außer Kraft gesetzt wird.

  • Das Feld readinessProbe identifiziert die Werte von port und path, mit denen Snowflake eine HTTP-GET-Anforderung an den Bereitschaftstest (Readiness Probe) senden kann, um zu überprüfen, ob der Dienst bereit ist, Datenverkehr zu verarbeiten.

    Der Dienstcode (echo_python.py) implementiert die Bereitschaftsprüfung wie folgt:

    @app.get("/healthcheck")
    def readiness_probe():
    
    Copy

    Daher enthält die Spezifikationsdatei das entsprechende Feld container.readinessProbe.

Weitere Informationen zu Dienstspezifikationen finden Sie unter Referenz der Dienstspezifikation.

Erläuterungen zur Dienstfunktion

Eine Dienstfunktion ist eine der Methoden, um mit Ihrem Dienst zu kommunizieren (siehe Verwenden eines Dienstes). Eine Dienstfunktion ist eine benutzerdefinierte Funktion (UDF), die Sie mit einem Dienstendpunkt verknüpfen. Wenn die Dienstfunktion ausgeführt wird, sendet sie eine Anforderung an den zugehörigen Dienstendpunkt und empfängt eine Antwort.

Sie erstellen die folgende Dienstfunktion, indem Sie den Befehl CREATE FUNCTION mit den folgenden Parametern ausführen:

CREATE FUNCTION my_echo_udf (InputText VARCHAR)
  RETURNS VARCHAR
  SERVICE=echo_service
  ENDPOINT=echoendpoint
  AS '/echo';
Copy

Beachten Sie Folgendes:

  • Die Funktion my_echo_udf nimmt eine Zeichenfolge als Eingabe entgegen und gibt eine Zeichenfolge zurück.

  • Die Eigenschaft SERVICE bezeichnet den Dienst (echo_service), und die Eigenschaft ENDPOINT bezeichnet den benutzerfreundlichen Namen des Endpunkts (echoendpoint).

  • „AS ‚/echo‘“ gibt den Pfad für den Dienst an. In echo_service.py verknüpft der Decorator @app.post diesen Pfad mit der echo-Funktion.

Diese Funktion stellt eine Verbindung mit dem spezifischen Endpunkt in ENDPOINT des in SERVICE angegebenen Dienstes her. Wenn Sie diese Funktion aufrufen, sendet Snowflake eine Anforderung an den Pfad /echo innerhalb des Dienstcontainers.

Lokales Erstellen und Testen eines Images

Sie können das Docker-Image lokal testen, bevor Sie es in ein Repository Ihres Snowflake-Kontos hochladen. Bei lokalen Tests wird Ihr Container eigenständig ausgeführt (er ist kein Dienst, der von Snowflake ausgeführt wird).

So testen Sie das Docker-Image von Tutorial 1:

  1. Um ein Docker-Image zu erstellen, führen Sie über die Docker-CLI den folgenden Befehl aus:

    docker build --rm -t my_service:local .
    
    Copy
  2. Um Ihren Code zu starten, führen Sie den folgenden Befehl aus:

    docker run --rm -p 8080:8080 my_service:local
    
    Copy
  3. Senden Sie eine Echo-Anforderung an den Dienst mit einer der folgenden Methoden:

    • Verwenden des cURL-Befehls:

      Senden Sie in einem anderen Terminalfenster mit cURL die folgende POST-Anforderung an Port 8080:

      curl -X POST http://localhost:8080/echo \
        -H "Content-Type: application/json" \
        -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
      
      Copy

      Beachten Sie, dass der Anforderungstext zwei Zeichenfolgen enthält. Dieser cURL-Befehl sendet eine POST-Anforderung an Port 8080, der vom Dienst auf Anforderungen überwacht wird. Die 0 in den Daten ist der Index der Eingabezeichenfolge in der Liste. Der Echo-Dienst gibt die Eingabezeichenfolgen als Echo wie folgt zurück:

      {"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
      
    • Verwenden eines Webbrowsers:

      1. Öffnen Sie in Ihrem Browser auf demselben Computer http://localhost:8080/ui.

        Dadurch wird eine GET-Anforderung an Port 8080 gesendet, der vom Dienst auf Anforderungen überwacht wird. Der Dienst führt die Funktion ui() aus, die wie gezeigt ein HTML-Formular ausgibt:

        Webformular für die Kommunikation mit dem Echo-Dienst.
      2. Geben Sie in das Feld Input die Zeichenfolge „Hello“ ein, und drücken Sie die Eingabetaste.

        Webformular, das die Antwort des Echo-Dienstes anzeigt.

Nächste Schritte

Sie können nun mit Tutorial 2 fortfahren, in dem ein Job ausgeführt wird.