Verwenden des Python-Konnektors

Unter diesem Thema werden zahlreiche Beispiele bereitgestellt, die veranschaulichen, wie Sie den Snowflake-Konnektor verwenden können, um Snowflake-Standardoperationen wie Benutzeranmeldung, Datenbank- und Tabellenerstellung, Warehouse-Erstellung, Einfügen/Laden von Daten sowie Abfragen durchzuführen.

Im Beispielcode am Ende dieses Themas werden einzelne Beispiele zu einem einzigen, funktionierenden Python-Programm zusammengeführt.

Unter diesem Thema:

Überprüfen der Netzwerkverbindung zu Snowflake mit SnowCD

Nach der Konfiguration des Treibers können Sie die Netzwerkkonnektivität zu Snowflake mit SnowCD testen und Probleme beheben.

Sie können während der Erstkonfiguration und bei Bedarf jederzeit SnowCD verwenden, um Ihre Netzwerkverbindung zu Snowflake zu testen und Probleme zu beheben.

Verbinden mit Snowflake

Importieren Sie das snowflake.connector-Modul:

import snowflake.connector

Ermitteln Sie die Anmeldeinformationen aus Umgebungsvariablen, über die Befehlszeile, aus einer Konfigurationsdatei oder einer anderen geeigneten Quelle. Beispiel:

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...

Der Parameter ACCOUNT erfordert möglicherweise die Region und die Cloudplattform, in bzw. auf der sich Ihr Konto befindet, und zwar in folgender Form:

'<your_account_name>.<region_id>.<cloud>' (e.g. 'xy12345.east-us-2.azure').

Bemerkung

Beschreibungen zu den verfügbaren Konnektorparametern finden Sie unter den snowflake.connector-Methoden.

Wenn Sie Daten aus Ihrem eigenen Amazon S3-Bucket kopieren möchten, benötigen Sie die AWS_ACCESS_KEY_ID und AWS_SECRET_ACCESS_KEY.

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

Bemerkung

Wenn Ihre Daten in einem Microsoft Azure-Container gespeichert sind, geben Sie die Anmeldeinformationen direkt in der COPY-Anweisung an.

Stellen Sie nach dem Lesen der Verbindungsinformationen eine Verbindung mit dem Standardauthentifikator oder der Verbundauthentifizierung her (falls aktiviert).

Einstellen der Sitzungsparameter

Es gibt mehrere Möglichkeiten, Sitzungsparameter festzulegen, z. B. QUERY_TAG, wenn Sie den Python-Konnektor verwenden.

Sie können Parameter auf Sitzungsebene festlegen, wenn Sie eine Verbindung zu Snowflake herstellen. Übergeben Sie dazu den optionalen Verbindungsparameter „session_parameters“ wie unten gezeigt:

con = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'EndOfMonthFinancials',
    }
)

Das an die connect()-Methode übergebene „session_parameters“-Wörterbuch kann einen oder mehrere Parameter auf Sitzungsebene enthalten.

Sie können Sitzungsparameter auch festlegen, indem Sie nach dem Herstellen der Verbindung die SQL-Anweisung ALTER SESSION SET ... ausführen:

con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")

Weitere Informationen zu Sitzungsparametern finden Sie in den Beschreibungen der einzelnen Parameter auf der allgemeinen Seite Parameter.

Verbinden mit dem Standardauthentifikator

Stellen Sie eine Verbindung zu Snowflake unter Verwendung der erforderlichen Anmeldeparameter her:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )

Möglicherweise müssen Sie dies mit anderen Informationen erweitern.

Verwenden von Single Sign-On (SSO) zur Authentifizierung

Wenn Sie Snowflake für die Verwendung von Single Sign-On (SSO) konfiguriert haben, können Sie Ihre Clientanwendung so konfigurieren, dass SSO für die Authentifizierung verwendet wird. Weitere Informationen dazu finden Sie unter Verwenden von SSO bei Clientanwendungen, die sich mit Snowflake verbinden.

Verwenden der Schlüsselpaar-Authentifizierung

Snowflake unterstützt die Verwendung von Schlüsselpaar-Authentifizierung anstelle der typischen Authentifizierung durch Benutzernamen und Kennwort. Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das öffentlich-private PEM (Privacy Enhanced Mail)-Schlüsselpaar unter Verwendung von OpenSSL. Der öffentliche Schlüssel wird dem Snowflake-Benutzer zugewiesen, der den Snowflake-Client verwendet.

So konfigurieren Sie das Public/Private-Schlüsselpaar:

  1. Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:

    Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.

    Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
    

    Um eine verschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl (ohne „-nocrypt“):

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    Es ist normalerweise sicherer, eine verschlüsselte Version zu erstellen.

    Wenn Sie den zweiten Befehl zum Verschlüsseln des privaten Schlüssels verwenden, fordert OpenSSL Sie zur Eingabe einer Passphrase auf, die zum Verschlüsseln der Datei des privaten Schlüssels verwendet wird. Wir empfehlen, zum Schutz des privaten Schlüssels eine starke Passphrase zu verwenden. Notieren Sie sich diese Passphrase an einem sicheren Ort. Sie müssen sie beim Herstellen der Verbindung mit Snowflake eingeben. Beachten Sie, dass die Passphrase nur zum Schutz des privaten Schlüssels verwendet wird und niemals an Snowflake gesendet wird.

    Beispiel für einen privaten PEM-Schlüssel

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:

    Angenommen, der private Schlüssel befindet sich in der Datei „rsa_key.p8“, verwenden Sie den folgenden Befehl:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Beispiel für einen öffentlichen PEM-Schlüssel

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in einem lokalen Verzeichnis. Zeichnen Sie den Pfad zu den Dateien auf. Beachten Sie, dass der private Schlüssel im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt wird; die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in Ihrer Verantwortung, die Datei zu sichern, wenn sie nicht verwendet wird.

  4. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu. Beispiel:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Bemerkung

    • Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.

    • Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.

    Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Bemerkung

    Die RSA_PUBLIC_KEY_2_FP-Eigenschaft wird unter Schlüsselrotation (unter diesem Thema) beschrieben.

  5. Ändern Sie den unten stehenden Beispielcode, und führen Sie ihn aus. Der Code entschlüsselt die Datei, die den privaten Schlüssel enthält, und übergibt sie an den Snowflake-Treiber, um eine Verbindung herzustellen:

  • Aktualisieren Sie die Sicherheitsparameter:

    • Pfad: Gibt den lokalen Pfad zu der von Ihnen erstellten Datei mit dem privaten Schlüssel an.

  • Aktualisieren Sie die Verbindungsparameter:

    • user: Gibt Ihren Snowflake-Anmeldenamen an.

    • account: Gibt den vollen Namen Ihres Kontos an (bereitgestellt von Snowflake). Abhängig von der Cloudplattform (AWS oder Azure) und der Region, in der Ihr Konto gehostet wird, erfordert der vollständige Kontoname möglicherweise zusätzliche Segmente. Weitere Details dazu finden Sie unter Nutzungshinweise für die Verwendung des account-Parameters (für die connect-Methode).

    • region: Veraltet

      Gibt die ID für die Region an, in der sich Ihr Konto befindet. Beachten Sie, dass dieser Parameter veraltet ist und nur aus Gründen der Abwärtskompatibilität dokumentiert wurde.

      Der neue Code muss die Region, falls erforderlich, als Teil von account angeben.

Beispielcode

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()

Schlüsselrotation

Snowflake unterstützt mehrere aktive Schlüssel, um eine ununterbrochene Rotation zu ermöglichen. Rotieren und ersetzen Sie Ihre öffentlichen und privaten Schlüssel basierend auf dem Ablaufzeitplan, den Sie intern einhalten.

Derzeit können Sie die Parameter RSA_PUBLIC_KEY und RSA_PUBLIC_KEY_2 für ALTER USER verwenden, um einem einzelnen Benutzer bis zu zwei öffentliche Schlüssel zuzuordnen.

So rotieren Sie Ihre Schlüssel:

  1. Führen Sie die unter Verwenden der Schlüsselpaar-Authentifizierung angegebenen Schritte aus, um Folgendes zu ermöglichen:

    • Generieren eines neuen privaten und öffentlichen Schlüsselsatzes

    • Zuweisen des öffentlichen Schlüssels an einen Benutzer Setzen Sie den Wert des öffentlichen Schlüssels entweder auf RSA_PUBLIC_KEY oder RSA_PUBLIC_KEY_2 (je nachdem, welcher Schlüsselwert gerade nicht verwendet wird). Beispiel:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Aktualisieren Sie den Code, um eine Verbindung zu Snowflake herzustellen. Geben Sie den neuen privaten Schlüssel an.

    Snowflake überprüft den korrekten aktiven öffentlichen Schlüssel für die Authentifizierung basierend auf dem privaten Schlüssel, der mit Ihren Verbindungsinformationen übermittelt wurde.

  3. Entfernen Sie den alten öffentlichen Schlüssel aus dem Benutzerprofil. Beispiel:

    alter user jsmith unset rsa_public_key;
    

Verwenden eines Proxyservers

Um einen Proxyserver zu verwenden, konfigurieren Sie die folgenden Umgebungsvariablen:

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

Bemerkung

Die Proxyparameter (d. h. proxy_host, proxy_port, proxy_user und proxy_password) sind veraltet. Verwenden Sie stattdessen die Umgebungsvariablen.

Beispiel:

Linux oder macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80

Tipp

Das Sicherheitsmodell von Snowflake erlaubt keine Secure Sockets Layer (SSL)-Proxys (unter Verwendung eines HTTPS-Zertifikats). Ihr Proxyserver muss eine öffentlich zugängliche Zertifizierungsstelle (CA) verwenden, um potenzielle Sicherheitsrisiken wie einen MITM (Man In The Middle)-Angriff durch einen kompromittierten Proxy zu reduzieren.

Wenn Sie Ihren SSL-Proxy verwenden müssen, empfehlen wir Ihnen dringend, die Serverrichtlinie zu aktualisieren, um die Snowflake-Zertifikatsprüfung zu bestehen, sodass mitten in der Kommunikation kein Zertifikat geändert wird.

Optional kann NO_PROXY verwendet werden, um den Proxy für bestimmte Arten der Kommunikation zu umgehen. Beispielsweise kann für den Zugriff auf Amazon S3 der Proxyserver durch Angabe von NO_PROXY=".amazonaws.com" umgangen werden.

NO_PROXY unterstützt keine Platzhalter. Jeder angegebene Wert sollte einer der folgenden Werte sein:

  • Das Ende eines Hostnamens (oder eines vollständigen Hostnamens), zum Beispiel:

    • .amazonaws.com

    • xy12345.snowflakecomputing.com

  • Eine IP-Adresse, zum Beispiel:

    • 192.196.1.15

Wenn mehrere Werte angegeben werden, müssen diese durch Kommas getrennt sein. Beispiel:

localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16

Verbinden mit OAuth

Um eine Verbindung mit OAuth herzustellen, muss die Verbindungszeichenfolge den Parameter authenticator mit dem Wert oauth und den Parameter token mit dem Wert oauth_access_token enthalten. Weitere Informationen dazu finden Sie unter OAuth mit Clients, Treibern und Konnektoren.

ctx = snowflake.connector.connect(
    user="<username>",
    host="<hostname>",
    account="<account_name>",
    authenticator="oauth",
    token="<oauth_access_token>",
    warehouse="test_warehouse",
    database="test_db",
    schema="test_schema"
)

OCSP

Wenn der Treiber eine Verbindung herstellt, sendet Snowflake ein Zertifikat, um zu bestätigen, dass die Verbindung zu Snowflake und nicht zu einem Host besteht, der sich als Snowflake ausgibt. Der Treiber sendet dieses Zertifikat an einen OCSP (Online Certificate Status Protocol)-Server, um zu überprüfen, ob das Zertifikat widerrufen wurde.

Wenn der Treiber den OCSP-Server nicht erreichen kann, um das Zertifikat zu überprüfen, kann beim Treiber „Fail-open“ oder „Fail-close“ auftreten.

Auswahl des Fail-open- oder Fail-close-Modus

Versionen des Snowflake-Konnektors für Python vor 1.8.0 verwenden standardmäßig den Fail-close-Modus. Bei Versionen ab 1.8.0 wird standardmäßig der Fail-open-Modus verwendet. Sie können das Standardverhalten überschreiben, indem Sie beim Aufrufen der connect() -Methode den optionalen Verbindungsparameter ocsp_fail_open einstellen. Beispiel:

con = snowflake.connector.connect(
    account=<account>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);

Überprüfen der Version von OCSP-Konnektor oder -Treiber

Die Version von Treiber oder Konnektor bestimmt zusammen mit deren Konfiguration das OCSP-Verhalten. Weitere Informationen zur Treiber- oder Konnektor-Version, ihrer Konfiguration und zum OCSP-Verhalten finden Sie unter OCSP-Konfiguration.

Zwischenspeichern von OCSP-Antworten

Um die Sicherheit der gesamten Kommunikation zu gewährleisten, verwendet der Snowflake-Konnektor für Python das HTTPS-Protokoll für die Verbindung zu Snowflake und zu allen anderen Services (z. B. Amazon S3 für das Staging von Datendateien und Okta für die Verbundauthentifizierung). Zusätzlich zum regulären HTTPS-Protokoll überprüft der Konnektor bei jeder Verbindung über OCSP (Online Certificate Status Protocol) auch den Status der TLS/SSL-Zertifikatssperre und bricht die Verbindung ab, wenn er feststellt, dass das Zertifikat gesperrt wurde oder der OCSP-Status nicht zuverlässig ist.

Da jede Snowflake-Verbindung bis zu drei Roundtrips mit dem OCSP-Server auslöst, wurden mehrere Ebenen des Cache für OCSP-Antworten eingeführt, um die zusätzliche Netzwerklast der Verbindung zu reduzieren:

  • Speichercache, der während der gesamten Lebensdauer des Prozesses beibehalten wird.

  • Dateicache, der so lange beibehalten wird, bis das Cacheverzeichnis (z. B. ~/.cache/snowflake) gelöscht ist.

  • OCSP-Antwort-Servercache.

Das Caching behebt auch Verfügbarkeitsprobleme für OCSP-Server (d. h. für den Fall, dass der eigentliche OCSP-Server ausfällt). Solange der Cache gültig ist, kann der Konnektor den Status der Zertifikatssperre noch überprüfen.

Wenn keine der Cacheschichten die OCSP-Antwort enthält, versucht der Client, den Validierungsstatus direkt vom OCSP-Server der CA abzurufen.

Ändern des Speicherortes für den OCSP-Antwort-Dateicache.

Standardmäßig ist der Dateicache an den folgenden Speicherorten aktiviert, sodass keine zusätzlichen Konfigurationsaufgaben erforderlich sind:

Linux

~/.cache/snowflake/ocsp_response_cache.json

macOS

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

Wenn Sie jedoch einen anderen Speicherort und/oder Dateinamen für die OCSP-Antwort-Cachedatei angeben möchten, akzeptiert die connect-Methode den Parameter ocsp_response_cache_filename, der Pfad und Namen der OCSP-Cachedatei in Form einer URI angibt.

OCSP-Antwort-Cacheserver

Bemerkung

Der OCSP-Antwort-Cacheserver wird derzeit vom Snowflake-Konnektor für Python 1.6.0 und höher unterstützt.

Die Speicher- und Dateitypen des OCSP-Cache eignen sich gut für Anwendungen, die mit Snowflake über einen der von uns bereitgestellten Clients mit einem persistenten Host verbunden sind. Sie funktionieren jedoch nicht in dynamisch bereitgestellten Umgebungen wie AWS Lambda oder Docker.

Um dieser Situation zu begegnen, bietet Snowflake eine dritte Schicht des Caching: den OCSP-Antwort-Cacheserver. Der OCSP-Antwort-Cacheserver holt OCSP-Antworten stündlich von den OCSP-Servern des CA und speichert sie 24 Stunden lang. Clients können dann den Validierungsstatus eines bestimmten Snowflake-Zertifikats von diesem Servercache anfordern.

Wichtig

Wenn Ihre Serverrichtlinie den Zugriff auf die meisten oder alle externen IP-Adressen und Websites verweigert, müssen Sie die Adresse des Cacheservers auf die Whitelist setzen, um einen normalen Servicebetrieb zu ermöglichen. Die Cacheserver-URL ist ocsp*.snowflakecomputing.com:80.

Wenn Sie den Cacheserver aus irgendeinem Grund deaktivieren müssen, setzen Sie die Umgebungsvariable SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED auf false. Beachten Sie, dass der Wert zwischen Groß- und Kleinschreibung unterscheidet und in Kleinbuchstaben angegeben werden muss.

Erstellen einer Datenbank, eines Schemas und eines Warehouse

Erstellen Sie nach der Anmeldung mit den Befehlen CREATE DATABASE, CREATE SCHEMA und CREATE WAREHOUSE eine Datenbank, ein Schema und ein Warehouse, falls diese noch nicht vorhanden sind.

Das folgende Beispiel zeigt, wie sich ein Warehouse mit dem Namen tiny_warehouse, eine Datenbank mit dem Namen testdb und ein Schema mit dem Namen testschema erstellen lassen. Beachten Sie, dass Sie beim Erstellen des Schemas entweder den Namen der Datenbank angeben müssen, in der das Schema erstellt werden soll, oder bereits mit der Datenbank verbunden sein müssen, in der das Schema erstellt werden soll. Im folgenden Beispiel wird vor dem Befehl CREATE SCHEMA ein USE DATABASE-Befehl ausgeführt, um sicherzustellen, dass das Schema in der korrekten Datenbank erstellt wird.

        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

Verwenden von Datenbank, Schema und Warehouse

Geben Sie Datenbank und Schema an, in denen Sie Tabellen erstellen möchten. Geben Sie auch das Warehouse an, das Ressourcen für die Ausführung von DML-Anweisungen und -Abfragen bereitstellen soll.

Folgendes Beispiel zeigt die Verwendung von Datenbank testdb, Schema testschema und Warehouse tiny_warehouse (zuvor erstellt):

        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")

Erstellen von Tabellen und Einfügen von Daten

Verwenden Sie den Befehl CREATE TABLE, um Tabellen zu erstellen, und den Befehl INSERT, um die Tabellen mit Daten zu füllen.

Erstellen Sie beispielsweise eine Tabelle mit dem Namen testtable, und fügen Sie zwei Zeilen in die Tabelle ein:

    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

Laden von Daten

Anstatt Daten mit einzelnen INSERT-Befehlen in Tabellen einzufügen, können Sie Daten aus Dateien, die entweder an einem internen oder externen Speicherort bereitgestellt werden, per Massenladen hinzufügen.

Kopieren von Daten von einem internen Speicherort

Um Daten aus Dateien auf Ihrem Hostcomputer in eine Tabelle zu laden, verwenden Sie zunächst den Befehl PUT für das Staging der Datei an einem internen Speicherort. Verwenden Sie dann den Befehl COPY INTO <Tabelle>, um die Daten aus den Dateien in die Tabelle zu kopieren.

Beispiel:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

Hier werden die CSV-Daten in einem lokalen Verzeichnis namens /tmp/data in einer Linux- oder macOS-Umgebung gespeichert. Das Verzeichnis enthält dann Dateien mit den Namen file0, file1, … file100.

Kopieren von Daten von einem externen Speicherort

Um Daten aus Dateien, die bereits an einem externen Speicherort (z. B. einem eigenen S3-Bucket) bereitgestellt wurden, in eine Tabelle zu laden, verwenden Sie den Befehl COPY INTO <Tabelle>.

Beispiel:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

Wobei:

  • s3://<Ihr_S3-Bucket>/data/ gibt den Namen Ihres S3-Buckets an.

  • Den Dateien im Bucket wird data vorangestellt.

  • Der Zugriff auf den Bucket erfolgt über eine Speicherintegration, die mit CREATE STORAGE INTEGRATION von einem Kontoadministrator (d. h. einem Benutzer mit der Rolle ACCOUNTADMIN) oder einer Rolle mit der globalen Berechtigung CREATE INTEGRATION erstellt wurde. Bei Verwendung einer Speicherintegration benötigen Benutzer für den Zugriff auf einen privaten Speicherort keine Anmeldeinformationen mehr.

Bemerkung

In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.

Abfragen von Daten

Verwenden von cursor zum Abrufen von Werten

Rufen Sie Werte aus einer Tabelle mit der Cursorobjektiterator-Methode ab.

Um beispielsweise Spalten mit dem Namen „col1“ und „col2“ aus der zuvor mit testtable erstellten Tabelle (siehe Erstellen von Tabellen und Einfügen von Daten) abzurufen, verwenden Sie Code, der dem folgenden ähnelt:

    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()

Alternativ bietet der Snowflake-Konnektor für Python ein abgekürztes Verfahren:

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

Wenn Sie ein einzelnes Ergebnis (d. h. eine einzelnen Zeile) erhalten möchten, verwenden Sie die Methode fetchone:

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

Wenn Sie die angegebene Anzahl von Zeilen auf einmal erhalten möchten, verwenden Sie die Methode fetchmany mit der Anzahl der Zeilen:

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

Bemerkung

Verwenden Sie fetchone oder fetchmany, wenn das Resultset zu groß für den verfügbaren Speicher ist.

Wenn Sie alle Ergebnisse auf einmal erhalten müssen:

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

Um ein Zeitlimit (Timeout) für eine Abfrage festzulegen, führen Sie einen „begin“-Befehl aus, und fügen Sie einen Timeout-Parameter in die Abfrage ein. Wenn die Abfrage die Zeitdauer des Parameterwertes überschreitet, wird ein Fehler generiert und ein Rollback durchgeführt.

Im folgenden Code bedeutet der Fehler 604, dass die Abfrage abgebrochen wurde. Der Timeout-Parameter startet Timer() und bricht ab, wenn die Abfrage nicht innerhalb der angegebenen Zeit beendet wird.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

Verwenden von DictCursor zum Abrufen von Werten nach Spaltenname

Wenn Sie einen Wert anhand des Spaltennamens abrufen möchten, erstellen Sie ein cursor-Objekt vom Typ DictCursor.

Beispiel:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

Abbrechen einer Abfrage nach Abfrage-ID

So brechen Sie eine Abfrage über die Abfrage-ID ab:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()

Ersetzen Sie die Zeichenfolge „queryID“ durch die tatsächliche Abfrage-ID.

Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung

Verwenden Sie zur Verbesserung der Abfrageleistung die Klasse SnowflakeNoConverterToPython im Modul snowflake.connector.converter_null, um die Datenkonvertierung vom internen Snowflake-Datentyp in den systemeigenen Python-Datentyp zu umgehen, z. B.:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data

Im Ergebnis werden alle Daten als Zeichenfolgen dargestellt, sodass die Anwendung für die Konvertierung in die systemeigenen Python-Datentypen verantwortlich ist. Beispielsweise sind TIMESTAMP_NTZ- und TIMESTAMP_LTZ-Daten die als Zeichenfolgen dargestellte Epochenzeit, und TIMESTAMP_TZ-Daten sind die Epochenzeit, gefolgt von einem Leerzeichen, gefolgt vom Offset zu UTC in Minuten, dargestellt als Zeichenfolge.

Das Binden der Daten wird nicht beeinflusst. Python-native Daten können weiterhin für Updates gebunden werden.

Binden von Daten

Um Werte anzugeben, die in einer SQL-Anweisung verwendet werden sollen, können Sie Literale in die Anweisung einschließen oder Variablen binden. Wenn Sie Variablen binden, fügen Sie einen oder mehrere Platzhalter in den Text der SQL-Anweisung ein, und geben Sie dann für jeden Platzhalter die Variable (den zu verwendenden Wert) an.

Im folgenden Beispiel wird die Verwendung von Literalen und die Bindung gegenübergestellt:

Literale:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")

Bindung:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))

Bemerkung

Es gibt eine Obergrenze für die Datengröße, die Sie binden oder in einem Batch kombinieren können. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.

Snowflake unterstützt die folgenden Bindungstypen:

  • pyformat

  • format

  • qmark

  • numeric

Jeder Typ wird unten erklärt.

pyformat- oder format-Bindung

Sowohl die pyformat-Bindung als auch die format-Bindung binden Daten auf der Clientseite und nicht auf der Serverseite.

Standardmäßig unterstützt der Snowflake-Konnektor für Python sowohl pyformat als auch format, sodass Sie %(name)s oder %s als Platzhalter verwenden können. Beispiel:

  • Verwenden von %(name)s als Platzhalter:

        conn.cursor().execute(
            "INSERT INTO test_table(col1, col2) "
            "VALUES(%(col1)s, %(col2)s)", {
                'col1': 789,
                'col2': 'test string3',
                })
    
  • Verwenden von %s als Platzhalter:

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

Sie können auch ein Listenobjekt verwenden, um Daten für den IN-Operator zu binden:

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

Das Prozentzeichen („%“) ist sowohl ein Platzhalterzeichen für SQL LIKE als auch ein Formatbindungszeichen für Python. Wenn Sie die Formatbindung verwenden und Ihr SQL-Befehl das Prozentzeichen enthält, müssen Sie möglicherweise das Prozentzeichen maskieren. Wenn Ihre SQL-Anweisung beispielsweise wie folgt lautet:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.

Dann sollte Ihr Python-Code wie folgt aussehen (beachten Sie das zusätzliche Prozentzeichen, um das ursprüngliche Prozentzeichen zu umgehen):

        sql_command = "select col1, col2 from test_table "
        sql_command += " where col2 like '%%York' limit %(lim)s"
        parameter_dictionary = {'lim': 1 }
        cur.execute(sql_command, parameter_dictionary)

qmark- oder numeric-Bindung

Sowohl die qmark-Bindung als auch die numeric-Bindung binden Daten eher auf der Serverseite als auf der Clientseite.

Zur Verwendung einer qmark- oder numeric-Bindung setzen Sie den Modus auf „qmark“, indem Sie Folgendes ausführen:

snowflake.connector.paramstyle='qmark'

Wichtig

Sie müssen erst das Parameterformat festlegen, bevor Sie die connect()-Methode aufrufen.

Wenn paramstyle im Verbindungsparameter als qmark oder numeric angegeben ist, sollte für die Bindungsvariablen ? oder :N festgelegt sein, und die Bindung erfolgt auf der Serverseite.

Beispiel:

  • Verwenden von ? als Platzhalter:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • Verwenden von :N als Platzhalter:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
  • Binden von datetime mit TIMESTAMP mithilfe der qmark-Bindung:

    Wenn Sie die qmark- oder numeric-Bindung verwenden, um Daten an den Snowflake-Datentyp TIMESTAMP zu binden, geben Sie den Snowflake-Zeitstempeldatentyp, d. h. TIMESTAMP_LTZ oder TIMESTAMP_TZ, in Form eines Tupels an.

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "CREATE OR REPLACE TABLE testtable2 ("
        "   col1 int, "
        "   col2 string, "
        "   col3 timestamp_ltz"
        ")"
    )
    
    con.cursor().execute(
        "INSERT INTO testtable2(col1,col2,col3) "
        "VALUES(?,?,?)", (
            987,
            'test string4',
            ("TIMESTAMP_LTZ", datetime.now())
        )
    )
    

    Im Gegensatz zur clientseitigen Bindung benötigt die serverseitige Bindung den Snowflake-Datentyp für die Spalte. Die meisten gängigen Python-Datentypen weisen bereits implizite Zuordnungen zu Snowflake-Datentypen auf (z. B. int ist FIXED zugeordnet). Da Python-datetime-Daten jedoch mit mehreren Snowflake-Datentypen (z. B. TIMESTAMP_NTZ, TIMESTAMP_LTZ oder TIMESTAMP_TZ) verknüpft werden können und die Standardzuordnung TIMESTAMP_NTZ lautet, muss der Benutzer zum Binden von Python-datetime-Daten einen bestimmten Typ angeben (z. B. TIMESTAMP_LTZ). Der Datentyp muss daher wie im obigen Beispiel angegeben werden.

Angriffe durch Einschleusung von SQL-Befehlen verhindern

Binden Sie Daten nicht mit der Formatierungsfunktion von Python, da Sie eine Einschleusung von SQL-Befehlen riskieren. Beispiel:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

Speichern Sie stattdessen die Werte in Variablen, überprüfen Sie diese Werte (z. B. indem Sie Zeichenfolgen nach verdächtigen Semikolons durchsuchen), und binden Sie die Parameter dann mit qmark oder einem numerischen Bindungsformat ein.

Abrufen von Spaltenmetadaten

Die Spalten-Metadaten werden im Objekt Cursor im Attribut description gespeichert.

Im folgenden einfachen Beispiel wird die Liste der Spaltennamen abgerufen:

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col[0] for col in cur.description]))

Abrufen der Snowflake-Abfrage-IDs

Jeder von Snowflake ausgeführten Abfrage ist eine Abfrage-ID zugewiesen. Auf der Snowflake-Weboberfläche werden die Abfrage-IDs auf der Seite History History tab und bei der Überprüfung des Status einer Abfrage angezeigt.

Der Snowflake-Konnektor für Python stellt ein spezielles Attribut sfqid im Cursor-Objekt zur Verfügung, sodass Sie es mit dem Status auf der Weboberfläche verknüpfen können. Um die Snowflake-Abfrage-ID abzurufen, führen Sie zuerst die Abfrage aus, und rufen Sie diese dann über das sfqid-Attribut ab:

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

Fehlerbehandlung

Die Anwendung muss auf Ausnahmen, die vom Snowflake-Konnektor ausgelöst werden, korrekt reagieren und entscheiden, ob die Codeausführung fortgesetzt oder abgebrochen wird.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

Verwenden von execute_stream zum Ausführen von SQL-Skripten

Mit der Funktion execute_stream können Sie ein oder mehrere SQL-Skripte in einem Stream ausführen:

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

Schließen der Verbindung

Als Best Practice schließen Sie die Verbindung, indem Sie die Methode close aufrufen:

        connection.close()

Dadurch wird sichergestellt, dass die gesammelten Clientmetriken an den Server übermittelt werden und die Sitzung gelöscht wird. Außerdem helfen try-finally Blöcke sicherzustellen, dass die Verbindung geschlossen wird, auch wenn mittendrin eine Ausnahme ausgelöst wird:

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()

Verwenden eines Kontextmanagers zum Verbinden und Steuern von Transaktionen

Der Snowflake-Konnektor für Python unterstützt einen Kontextmanager, der bei Bedarf Ressourcen zuweist und freigibt. Der Kontextmanager ist nützlich, um für Transaktionen auf Basis des Anweisungsstatus ein Commit oder ein Rollback auszuführen, wenn autocommit deaktiviert ist.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail

Im obigen Beispiel führt der Kontextmanager nach Fehlschlagen der dritten Anweisung ein Rollback der Änderungen in der Transaktion aus und schließt die Verbindung. Wenn alle Anweisungen erfolgreich waren, würde der Kontextmanager die Änderungen committen und die Verbindung schließen.

Der äquivalente Code mit try- und except-Blöcken ist wie folgt:

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()

Protokollieren

Der Snowflake-Konnektor für Python nutzt das logging-Standardmodul von Python, um den Status in regelmäßigen Abständen zu protokollieren, sodass die Anwendung ihre Aktivitäten im Hintergrund verfolgen kann. Der einfachste Weg, die Protokollierung zu aktivieren, ist der Aufruf von logging.basicConfig() am Anfang der Anwendung.

Im folgenden Beispiel wird der Protokolliergrad auf INFO eingestellt, und die Protokolle werden in einer Datei namens /tmp/snowflake_python_connector.log gespeichert.

        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)

Eine umfassendere Protokollierung kann durch Einstellen des Protokolliergrads auf DEBUG wie folgt aktiviert werden:

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

Die optionale, aber empfohlene Formatierungsklasse SecretDetector stellt sicher, dass ein bestimmter Satz bekannter vertraulicher Informationen maskiert wird, bevor diese in die Protokolldateien des Python-Konnektors von Snowflake geschrieben werden. Verwenden Sie für SecretDetector folgenden Code:

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

Bemerkung

botocore und boto3 sind über das AWS (Amazon Web Services) SDK für Python verfügbar.

Beispielprogramm

Der folgende Beispielcode kombiniert die meisten der in den vorangegangenen Abschnitten beschriebenen Beispiele zu einem funktionierenden Python-Programm: Dieses Beispiel enthält zwei Teile:

  • Eine übergeordnete Klasse („python_veritas_base“) enthält den Code für viele gängige Operationen, z. B. das Herstellen einer Verbindung zum Server.

  • Eine untergeordnete Klasse („python_connector_example“) repräsentiert die benutzerdefinierten Teile eines bestimmten Clients, z. B. zum Abfragen einer Tabelle.

Dieser Beispielcode wird direkt aus einem unserer Tests importiert, um sicherzustellen, dass er in einem neueren Build des Produkts ausgeführt wurde.

Da dies aus einem Test stammt, enthält es eine kleine Menge Code, um einen alternativen Port und ein alternatives Protokoll festzulegen, die in einigen Tests verwendet werden. Kunden sollten Protokoll oder Portnummer nicht festlegen. Lassen Sie diese Werte stattdessen aus, und verwenden Sie die Standardeinstellungen.

Dies enthält auch einige Abschnittsmarkierungen (manchmal als „Snippet-Tags“ bezeichnet), um Code zu identifizieren, der unabhängig in die Dokumentation importiert werden kann. Abschnittsmarkierungen sehen normalerweise ähnlich aus wie:

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------

Diese Abschnittsmarkierungen sind im Kunden-Clientcode nicht erforderlich.

Der erste Teil des Codebeispiels enthält die allgemeinen Unterroutinen für:

  • Lesen Sie Befehlszeilenargumente (z. B. „–warehouse MyWarehouse“), die Verbindungsinformationen enthalten.

  • Stellen Sie eine Verbindung zum Server her.

  • Erstellen und verwenden Sie ein Warehouse, eine Datenbank und ein Schema.

  • Löschen Sie das Schema, die Datenbank und das Warehouse, wenn Sie damit fertig sind.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This connects gets account and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account and login information from environment variables and
        # command-line parameters.
        # Note that ACCOUNT might require the region and cloud platform where
        # your account is located, in the form of
        #     '<your_account_name>.<region>.<cloud>'
        # for example
        #     'xy12345.us-east-1.azure')
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
                PROTOCOL = connection_parameters["protocol"]
            except:
                PORT = ""
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


Der zweite Teil des Codebeispiels erstellt eine Tabelle, fügt Zeilen ein usw.:


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

Gehen Sie folgendermaßen vor, um dieses Beispiel auszuführen:

  1. Kopieren Sie den ersten Teil des Codes in eine Datei mit dem Namen „python_veritas_base.py“.

  2. Kopieren Sie den zweiten Teil des Codes in eine Datei mit dem Namen „python_connector_example.py“.

  3. Setzen Sie die Umgebungsvariable SNOWSQL_PWD auf Ihr Kennwort, zum Beispiel:

    export SNOWSQL_PWD='MyPassword'
    
  4. Führen Sie das Programm über eine Befehlszeile ähnlich der folgenden aus (ersetzen Sie die Benutzer- und Kontoinformationen natürlich durch Ihre eigenen Benutzer- und Kontoinformationen).

    Warnung

    Dies löscht das Warehouse, die Datenbank und das Schema am Ende des Programms! Verwenden Sie nicht den Namen einer vorhandenen Datenbank, da Sie diese verlieren werden!

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account xy98764 --user MyUserName
    

Und hier ist die Ausgabe:

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...

Hier ist ein längeres Beispiel:

Bemerkung

Achten Sie darauf, dass Sie in dem Abschnitt, in dem Sie Ihre Konto- und Anmeldeinformationen festlegen, die Variablen bei Bedarf so ersetzen, dass sie Ihren Snowflake-Anmeldeinformationen (Name, Kennwort usw.) entsprechen.

In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values). Note that ACCOUNT might also require the
# region and cloud platform where your account is located, in the form of
# '<your_account_name>.<region_id>.<cloud_platform>' (e.g. 'xy12345.east-us-2.azure')
ACCOUNT = '<your_account_name>'
USER = '<your_login_name>'
PASSWORD = '<your_password>'

import os

# Only required if you copy data from your own S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <your_s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()